Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (2)
avatar
Super Guru

In this article, I am going to explain how you can work with the Schema Registry directly in your NiFi Data Flow.  In my previous article Using the Schema Registry API I talk about the work required to expose the API methods needed to Create a Schema Registry Entity and update that Entity with an Avro Schema.   In this article, I am going to take it one step further and complete both operations directly in my NiFi Data Flow. I will also include the Use Case I am working on. This flow accepts a CSV Parameter List which contains the columns and data types found within the fields object of the Avro Schema, processes the contents of the file, builds the required Avro Schema and saves it to the Schema Registry Entity I created.

 

At the time of writing this article I have not handled data types, so in these examples, the schema data types are all strings.  In future updates to my flow, I will be mapping different types to appropriate data types for hive queries.   For now, this article and the NiFi template will include two main parts:

  1. Create a Schema Registry Entity Via NiFi using InvokeHttp Processor
  2. Update the Created Schema Registry with an Avro Schema created from a Parameter List

The NiFi Template

https://github.com/steven-dfheinz/NiFi-Templates/raw/master/Schema_Registry_Demo.xml

Create a Schema Registry Entity Via NiFi using InvokeHttp Processor

GenerateFlowFile→UpdateAttribute→AttributesToJSON→InvokeHttp

Screen Shot 2019-12-23 at 3.58.01 PM.png

**Note: notice my use of an output port to route failures during testing. While working on my flows I route all relationships like this until a time I decide it is appropriate to auto-terminate or route to error handling process group.

 

  1. GenerateFlowFile - Very simple here. This proc just starts the flow. I have the Run Schedule at 1 minute so I can start & stop to create only a single flow file through the flow for testing.
  2. UpdateAttribute - Manually create attributes required for Schema Creation:
    • name
    • type
    • schemaGroup
    • description
    • evolve
    • compatibility
  3. AttributesToJSON - Writes Attributes above to flow-file contents.
  4. InvokeHTTP - Executes API Post to Schema Registry URL

 **Note: to see specific configuration, please download the template, add to your workspace, and inspect procs.

Update the Created Schema Registry with an Avro Schema created from a Parameter List

HandleHttpRequest→HandleHttpResponse→UpdateAttribute→ConvertCSVToAvro→ConvertAvroToJSON→ExtractText→AttributesToJSON→ReplaceText→UpdateAttribute→AttributesToJSON→InvokeHTTP

Screen Shot 2019-12-23 at 4.07.23 PM.png

**Note: notice my use of an output port to route failures during testing.  While working on my flows I route all relationships like this until a time I decide it is appropriate to auto-terminate or route to error handling process group.

  1. HandleHttpRequest - API Endpoint in NiFi Flow necessary to accept POST of Parameter List.
  2. HandleHttpResponse - Sends API Response 200 and closes API Connection.
  3. UpdateAttribute - Manually set some Attributes Required for Avro Schema
    • type
    • name
  4. ConvertCSVToAvro - Converts the CSV contents to Avro Schema
  5. ConvertAvroToJSON - Converts Avro to JSON
  6. ExtractText - Grabs content of flow-file into fields attribute
  7. AttributesToJSON - Writes Attributes above to flow-file contents.
  8. ReplaceText - Handles some JSON object formatting requirements 
    • ${'$1':unescapeJson():replace('"[','['):replace(']"',']')}
  9. UpdateAttribute - Attributes required for Schema Update:
    • schemaText
    • description
  10. AttributesToJSON - Writes Attributes above to flow-file contents.
  11. InvokeHTTP - Executes API Post to Schema Registry URL

 **Note: to see specific configuration, please download the template, add to your workspace, and inspect procs.

As always, if you have any questions, or comments feel free to add them below or send me a message.  If you need more help getting this template to work, I would be more than happy to help out.

12,692 Views
Comments
avatar
New Contributor

I suppose nowadays you would use ConvertRecord instead of ConvertCSVToAvro and then ConvertAvroToJSON.