Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Nifi dataflow best practices (CSV to many targets)

avatar

Hi experts,
Good day!

I've been using Nifi for a couple of months, so I'm still learning lots of new things every day. I'm building a dataflow to get csv data (separated by pipes - '|' ) and push it to different targets (e.g. Hive, SQL Server, and Kafka). The project started fine but the dataflow started getting bigger and bigger and now I'm finding it difficult to manage. I just wanted to ask for some help understanding if I'm currently working with the best possible scenario. More details below.

I'm getting data from a ListenHTTP processor. Data comes as csv separated by pipes. One of the fields is a code that identifies which table the data should be pushed to, so I've created one process group for each "table". Here's where I think the dataflow gets complicated. Each of those groups (23, to be precise) has 4 other groups, each responsible to push data to a specific target. Since I have a Hive dataflow inside these groups, I had to create the Avro schema defining the structure for each table. I was just wondering if I could substitute this dataflow with a single one that evaluates the code in the csv, and "chooses" the correct avro schema to be used. I did some research but couldn't progress further. If there's a way to do it, I could simply substitute those 23 groups with a single dynamic dataflow.

Hopefully you can help me with this scenario. Thanks in advance!

Sincerely,

Cesar Rodrigues

1 ACCEPTED SOLUTION

avatar
Master Guru
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login
5 REPLIES 5

avatar
Master Guru
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login

avatar

Hi Matt. First of all, thank you so much for the explanation.
My scenario currently falls into the 3rd one you described: I have multiple table codes coming in a single flowfile. Could you please elaborate on how to use the PartitionRecord processor? I tried here using the CSVReader and CSVSetWriter controller services, but they ask for an Avro schema as well. All the tables' structures I'm working with right now have only the 3 first fields in common (the last one being the table code). The rest of them varies, so I got a little bit confused on how to set this avro schema.

avatar
Master Guru

If all the CSV columns are strings and you have a header line, you could set up a CSVReader that has "Use String Fields From Header" as the access strategy. If they have non-String columns you could use InferAvroSchema, that will try to figure out the fields' types and generate an Avro schema in the "inferred.avro.schema" attribute, which you can use in a CSV Reader/Writer as "Use Schema Text" with "${inferred.avro.schema}" as the value of "Schema Text". If all else fails, you could always create an AvroSchemaRegistry and add each of the 23 table schemas manually, but you can refer to them by name (if the name is the table code, you'll save a step) by using "Use Schema Name" with ${table.code} as the value.

The general idea is to enrich/annotate each FlowFile and use the same controller services, processors, and other components by dynamically configuring them with the same properties but using Expression Language to specify the per-file information.

avatar
Master Guru

For PartitionRecord, you'll want a "RecordPath" that points to the field in the schema that holds the table code value. Assuming it's called "tableCode", the RecordPath would be

 /tableCode

avatar

Matt, thanks a lot for all your help. I was able to refactor my dataflow, reducing the number of groups and keeping everything simple in a single dynamic flow.

Just to elaborate a little bit better, here's what I did.

Data coming in CSV format separated by pipes. e.g.:

(transaction #, sequence #, table code)

123|456|35|

123|456|36|

123|456|100|

First I split the flowfile into multiple ones using SplitText >> then I used the ExtractText processor to grab the 3rd field (table code) >> LookupAttribute setting the user-defined-field schema.name (to be used by AvroSchemaRegistry controller service) >> Push the data to Kafka and Hive using the appropriate processors.

Thanks a lot!