Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar
Guru

Avro is a popular file format within the Big Data and streaming space. Avro has 3 important characteristics that make it a great fit for both Big Data and streaming applications.

  1. Avro files are self describing. Meaning the Avro files can be opened and the schema definition viewed as standard JSON or inspected programmatically by numerous applications. This makes your application code much less brittle as the schema information can be obtained from the incoming Avro file itself rather than manually defining them in your code.
  2. Avro can handle a wide range of data type natively. This includes things like complex types, Maps, Arrays, and even raw bytes.
  3. Avro supports schema evolution which can come in very handy in streaming systems where the data flowing through the system can change without notice.

Now that all of the pros of Avro have been called out there is a problem. In reality we rarely encounter Avro files while ingesting/streaming data. Why is this? Mostly because storing data in the Avro format requires defining the Avro schema up front. Defining the Avro schema up front requires some planning and knowledge of the Avro file format itself not everyone has. This is a tragedy given the many benefits Avro provides to us. This was the driving factor for me creating the “InferAvroSchema” processor within Apache NiFi. InferAvroSchema exists to help endusers who either don’t have the time or the knowledge to create Avro files. InferAvroSchema exists to overcome the initial creation complexity issues with Avro and allows Apache NiFi users to quickly take more common flat data files, like CSV, and transform them into Avro. So now that we have a little background lets get into the details about how we make this happen using Apache NiFi and InferAvroSchema. Before we start lets take a look at the end result to get the big picture.

3542-endresult.png

Our end result is a workflow that takes loads a CSV file holding Weather specific data and converts it to an Avro file. The Weather.csv file is loaded using GetFile and then examined by the InferAvroSchema processor to determine the appropriate Avro schema. After the Avro schema is generated the ConvertCSVToAvro processor uses that schema to convert the CSV weather data to an Avro file. The resulting Avro file is ultimately written back to a local file on the NiFi instance machine.

While the CSV data certainly doesn’t have to come from a local file (could have been FTP, S3, HDFS, etc) that was the easiest to demonstrate here. There is a local CSV file on my Mac called “Weather.csv”. “Weather.csv” is loaded by the GetFile processor which places the complete contents of “Weather.csv” into a new NiFi FlowFile. Below is a snippet pf the contents of “Weather.csv” to provide context.

3543-weatherdatasnippet.png

As you can see the CSV data contains a couple of different weather data points for a certain zip code. We want to take this data from its original CSV format and convert it to an Avro file. So where do we start? First we want to use the “InferAvroSchema” processor to help us make our Avro schema definition without having the manually define it ourselves since we are pretending we have no idea how to make Avro schemas by hand. InferAvroSchema can examine the contents of CSV or JSON data and provide for us a recommended Avro schema definition based on the data that it encounters in the incoming FlowFile content. InferAvroSchema provides a lot of flexibility via configurations so lets go through those and what they mean now.

3544-inferavroschema-configuration.png

The above screenshot shows the available properties for InferAvroSchema. At first look it can be a little daunting so lets breakdown what is happening here by stepping through each property.

  • Schema Output Destination - This property controls where the Avro Schema will be written to once it has been generated. Remember we are not converting the CSV to Avro with this processor we are only creating the Avro schema. The actual work of converting the CSV to Avro is done by another processor (ConvertCSVToAvro). ConvertCSVToAvro requires an Avro schema to perform its duties so it makes sense to place the resulting schema in a location that can be easily accessed by ConvertCSVToAvro when it is needed. That is why I have chose to output the Schema as an attribute on the FlowFile so that I can use the NiFi expression language from within the ConvertCSVToAvro processor as you will see later.
  • Input Content Type - Lets the processor know what type of data is in the FlowFile content and that it should try and infer the Avro schema from. In our example here that is CSV but JSON is also valid.
  • CSV Header Definition - Since an Avro Schema needs to know the names for each field it contains this provides us a mechanism to provide those. This value can also be loaded from the CSV header definition as well but I placed it here just to demonstrate. You will notice it is also present in the Weather.csv screenshot above and we handle that in the next property.
  • Get CSV Header Definition From Data - Since we manually specified the CSV Header Definition we don’t want to get the Header Definition from the Weather.csv file itself. IF we had of choosen to not manually specifiy it we could have set this to true to pull that value from the Weather.csv file itself.
  • CSV Header Line Skip Count - Since the Weather.csv file does in fact contain a header line but we chose not to use it and manually we need to make sure we skip that line so that it is not present in the Avro schema logic itself. This is why the value is set to 1 to skip that first line.
  • CSV Escape String - Character used to escape strings.
  • CSV Quote String - Character used for CSV data quote.
  • Pretty Avro Output - Makes the results Avro output pretty formatted or not. Strictly for aesthetic purposes only.
  • Avro Record Name - This value will be the name of the Avro record in the resulting Avro schema. You can set this value to whatever you desire. Of course it makes sense to name it something relative to the data so here I have called it “Weather"
  • Numer of Records To Analyze - This is how many records the processor should analyze to determine the type (String, Long, etc) of the data present in the CSV data. 10 is the default and seems to be the sweet spot for accuracy and performance.
  • Charset - The character encoding of the incoming FlowFile content. In this case the Weather.csv is UTF-8 encoded so that is what I have specified.

At this point you will have a Avro schema that was automatically generated based on the raw incoming CSV data, congratulations! This doesn’t do us much good however. We still need to put that Avro schema to work and convert the original Weather.csv data into an Avro file. We do that in our next step with the ConvertCSVToAvro processor. The configuration for that processor is described below.

3545-convertcsvtoavro-configuration.png

The properties for ConvertCSVToAvro are a little more straightfoward so we aren’t going to go through them one by one. I do want to point out the value for Record Schema however. If you will notice it has a value of ${inferred.avro.schema}. If you recall in the InferAvroSchema processor above we told it to write the resulting Avro Schema to the FlowFile attribute. So now we are able to access that value using the NiFi expression language here. The name of the FlowFile property will always be inferred.avro.schema.

At this point our Weather.csv data has successfully been converted to an Avro file and we can do whatever we desire with it. I chose to simply write the data back to another local file which will be named “Weather.csv.avro”. Here is a screenshot of that output.

3546-resultingavro.png

43,820 Views
Comments
avatar
New Contributor

This is a great article Jeremy, solving a real case many business face now. Is this project on public domain on git-hub ?

Thanks

John

avatar
Contributor

Great article. One thing I've found with the InferSchemaWithAvro and working with CSVs is that it only seems to infer the type based on the first line after the header.

The "Number Of Records To Analyze" property specifically states that it's only to be used with JSON content type. I'm having some difficulty right now thinking on a way to work around this. I've got a CSV column with "12345" on the first line and "12345.45" on the second line and it's inferring the type as LONG when I want it to be DOUBLE.

EDIT: It also doesn't work completely as expected with negative values. (STRING instead of numeric)

5873-screen-shot-2016-07-19-at-25701-pm.png

avatar
New Contributor

This is great article, I am also facing same issue in inferring the type for number column in CSV. It has DOUBLE value data but processor is inferring the type as LONG based on first row.

Any work around for this? except putting first row with double value in CSV. 🙂

avatar
New Contributor

You have to part the content first as line by line utilizing Split Text Processor. Use regex to extricate values by utilizing Extract Text processor, it will results esteems as traits for the each stream record. Supplant content processor to supplant the traits as substance of the stream record LiteBlue.