Support Questions

Find answers, ask questions, and share your expertise

Need Help infering an avro schema for a json file in NiFi

avatar
Expert Contributor

Hello,

I am trying to create a flow in NiFi that takes a valid json file and puts it directly into a hive table using the PutHiveStreaming processor. My json looks something like the following:

{
 "Raw_Json": {
  "SystemInfo": {
   "Id": "a string ID",
   "TM": null,
   "CountID": "a string ID",
   "Topic": null,
   "AccountID": "some number",
   "StationID": "some number",
   "STime": "some Timestamp",
   "ETime": "some Timestamp"
  },
  "Profile": {
   "ID": "ID number",
   "ProductID": "Some Number",
   "City": "City Name",
   "State": "State Name",
   "Number": "XXX-XXX-XXXX",
   "ExtNumber": null,
   "Unit": null,
   "Name": "Person Name",
   "Service": "Purchase",
   "AddrID": "00000000",
   "Products": {
    "Product": [{
     "Code": "CODE",
     "Description": "some description"
     
    },
    {
     "Code": "CODE",
     "Description": "some description"
     
    },
    {
     "Code": "CODE",
     "Description": "some description"
     
    },
    {
     "Code": "CODE",
     "Description": "some description"
    
    },
    {
     "Code": "CODE",
     "Description": "some description"
     
    },
    {
     "Code": "CODE",
     "Description": "some description"
     
    },
    {
     "Code": "CODE",
     "Description": "some description"
     
    },
    {
     "Code": "CODE",
     "Description": "some description"
     
    },
    {
     "Code": "CODE",
     "Description": "some description"
     
    },
    {
     "Code": "CODE",
     "Description": "some description"
    }]
   }
  },
  "Total": {
   "Amount": "some amount",
   "Delivery": "some address",
   "Estimate": "some amount",
   "Tax": null,
   "Delivery_Type": null
   
  }
  
 },
 "partition_date":"2017-05-19"
}

I am getting the json, using the InferAvroSchema processor and from there converting the json to avro format by using the inferred avro schema and sending it into the PutHiveStreaming processor. My Flow looks something like this:

15577-flowexample.jpg

The main goal is that I want all of the "Raw_Json" column to be dumped into one column in the hive table and the table will be partitioned by the "partition_date" column which will be the second column of the table. The problem is that for some reason NiFi is having problems inferring the nested json from the "Raw_Json" column and is dumping it like Null on the table as shown below:

15578-tableexample.jpg

Does anyone know how could I make NiFi read the entire nested Json of the "Raw_Json" column as a string column and send it to the hive table? How could I create my own avro schema for it to do this? My main goal would be that the Raw_Json can be read as a string column. Any insight or ideas on how to fix this issue would be greatly appreciated!

1 ACCEPTED SOLUTION

avatar
Expert Contributor

I was able to figure it out. I used the EvaluateJsonPath processor and grabbed the 'Raw_Json' and the 'partition_date' column and then I used the AttributestoJson processor to turn those two attributes into a Json. Afterwards the Inferavroschema processor was able to infer the 'Raw_Json" column as a string and it is now putting it into the Hive table via HiveStreaming correctly.

View solution in original post

3 REPLIES 3

avatar
Super Guru
@Adda Fuentes

When you infer the schema, do you store the schema in content (default) or send it to attribute "inferred.avro.schema"? Can you try setting the inferred schema to attribute? Also set the input content type explicitly to json if it is not.

avatar
Expert Contributor

@mqureshi I was sending "inferred.avro.schema" as an attribute and the input content was set to json

avatar
Expert Contributor

I was able to figure it out. I used the EvaluateJsonPath processor and grabbed the 'Raw_Json' and the 'partition_date' column and then I used the AttributestoJson processor to turn those two attributes into a Json. Afterwards the Inferavroschema processor was able to infer the 'Raw_Json" column as a string and it is now putting it into the Hive table via HiveStreaming correctly.