- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Nifi dataflow best practices (CSV to many targets)
- Labels:
-
Apache NiFi
Created 07-25-2018 05:53 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi experts,
Good day!
I've been using Nifi for a couple of months, so I'm still learning lots of new things every day. I'm building a dataflow to get csv data (separated by pipes - '|' ) and push it to different targets (e.g. Hive, SQL Server, and Kafka). The project started fine but the dataflow started getting bigger and bigger and now I'm finding it difficult to manage. I just wanted to ask for some help understanding if I'm currently working with the best possible scenario. More details below.
I'm getting data from a ListenHTTP processor. Data comes as csv separated by pipes. One of the fields is a code that identifies which table the data should be pushed to, so I've created one process group for each "table". Here's where I think the dataflow gets complicated. Each of those groups (23, to be precise) has 4 other groups, each responsible to push data to a specific target. Since I have a Hive dataflow inside these groups, I had to create the Avro schema defining the structure for each table. I was just wondering if I could substitute this dataflow with a single one that evaluates the code in the csv, and "chooses" the correct avro schema to be used. I did some research but couldn't progress further. If there's a way to do it, I could simply substitute those 23 groups with a single dynamic dataflow.
Hopefully you can help me with this scenario. Thanks in advance!
Sincerely,
Cesar Rodrigues
Created 07-25-2018 06:17 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
There are a few mechanisms and recent improvements that should allow you to create a more scalable flow. If my comments below don't apply or are a little off, please feel free to elaborate on any/all of them so I can understand your use case better.
It appears you have a code as a column in the data, so either your incoming pipe-delimited data is coming in one row at a time, or as a batch with the same value for the "table code", or as a batch with various values for the "table code". If each flow file has a single row, you may want ExtractText (to get the "table code" into an attribute) followed by LookupAttribute to add the "table name" as an attribute from the "table code" value. You should be able to use the same pattern (with perhaps a different regex/config for ExtractText) if each flow file contains multiple rows with the same value for "table code". If you're getting multiple rows with multiple "table code" values, use PartitionRecord to split the flow file into outgoing flow files based on the "table code" value. Then you'll end up with scenario #2 (multiple rows with the same table code value) but the attribute will already be available (see PartitionRecord doc for details). Hopefully (depending on your flow) this will allow you to avoid 23 process groups, and instead have attributes set (and use NiFi Expression Language in the processors' configs) to have a single flow.
If you are replicating this data across 4 different targets, and each flow file has (or could be enriched with) enough data to "self-describe" its destination, then you might consider Remote Process Group from any of your flow branches that will end up at the same target processor(s). That way you'd have one Input Port for each of the 4 targets, and any branch sending to target 1 would have an RPG pointing at the Input Port for target 1. If you can collapse your 23 process groups (PGs) into 1 as described above, then you may not even need that, you could still have the 4 separate PGs coming from the single flow.
Created 07-25-2018 06:17 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
There are a few mechanisms and recent improvements that should allow you to create a more scalable flow. If my comments below don't apply or are a little off, please feel free to elaborate on any/all of them so I can understand your use case better.
It appears you have a code as a column in the data, so either your incoming pipe-delimited data is coming in one row at a time, or as a batch with the same value for the "table code", or as a batch with various values for the "table code". If each flow file has a single row, you may want ExtractText (to get the "table code" into an attribute) followed by LookupAttribute to add the "table name" as an attribute from the "table code" value. You should be able to use the same pattern (with perhaps a different regex/config for ExtractText) if each flow file contains multiple rows with the same value for "table code". If you're getting multiple rows with multiple "table code" values, use PartitionRecord to split the flow file into outgoing flow files based on the "table code" value. Then you'll end up with scenario #2 (multiple rows with the same table code value) but the attribute will already be available (see PartitionRecord doc for details). Hopefully (depending on your flow) this will allow you to avoid 23 process groups, and instead have attributes set (and use NiFi Expression Language in the processors' configs) to have a single flow.
If you are replicating this data across 4 different targets, and each flow file has (or could be enriched with) enough data to "self-describe" its destination, then you might consider Remote Process Group from any of your flow branches that will end up at the same target processor(s). That way you'd have one Input Port for each of the 4 targets, and any branch sending to target 1 would have an RPG pointing at the Input Port for target 1. If you can collapse your 23 process groups (PGs) into 1 as described above, then you may not even need that, you could still have the 4 separate PGs coming from the single flow.
Created 07-25-2018 06:47 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi Matt. First of all, thank you so much for the explanation.
My scenario currently falls into the 3rd one you described: I have multiple table codes coming in a single flowfile. Could you please elaborate on how to use the PartitionRecord processor? I tried here using the CSVReader and CSVSetWriter controller services, but they ask for an Avro schema as well. All the tables' structures I'm working with right now have only the 3 first fields in common (the last one being the table code). The rest of them varies, so I got a little bit confused on how to set this avro schema.
Created 07-26-2018 02:36 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
If all the CSV columns are strings and you have a header line, you could set up a CSVReader that has "Use String Fields From Header" as the access strategy. If they have non-String columns you could use InferAvroSchema, that will try to figure out the fields' types and generate an Avro schema in the "inferred.avro.schema" attribute, which you can use in a CSV Reader/Writer as "Use Schema Text" with "${inferred.avro.schema}" as the value of "Schema Text". If all else fails, you could always create an AvroSchemaRegistry and add each of the 23 table schemas manually, but you can refer to them by name (if the name is the table code, you'll save a step) by using "Use Schema Name" with ${table.code} as the value.
The general idea is to enrich/annotate each FlowFile and use the same controller services, processors, and other components by dynamically configuring them with the same properties but using Expression Language to specify the per-file information.
Created 07-26-2018 02:38 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
For PartitionRecord, you'll want a "RecordPath" that points to the field in the schema that holds the table code value. Assuming it's called "tableCode", the RecordPath would be
/tableCode
Created 07-27-2018 03:33 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Matt, thanks a lot for all your help. I was able to refactor my dataflow, reducing the number of groups and keeping everything simple in a single dynamic flow.
Just to elaborate a little bit better, here's what I did.
Data coming in CSV format separated by pipes. e.g.:
(transaction #, sequence #, table code)
123|456|35|
123|456|36|
123|456|100|
First I split the flowfile into multiple ones using SplitText >> then I used the ExtractText processor to grab the 3rd field (table code) >> LookupAttribute setting the user-defined-field schema.name (to be used by AvroSchemaRegistry controller service) >> Push the data to Kafka and Hive using the appropriate processors.
Thanks a lot!