Created 01-16-2018 04:11 PM
Hi all,
I am in the process of migrating our current data pipeline into Hadoop from traditional tools (SSIS, Python, etc) to using Nifi. We have several hundred tables that run daily. I like the approach that Kylo is taking and while it doesn't satisfy my needs 100%, I have used a few ideas from looking at their general data templates. I am currently at a stage now where I am not entirely sure how to: (1) best optimize this process to run concurrently for ~50 tables at a time and (2) get the MergeContent to work correctly for how I am using it. I am still fairly new to Nifi, so I am looking for someone to help me with best practices, if possible. Quick background on the how the entire flow works:
Step 1 -- I have one processor group for each table we are bringing in. Its categorized by our servers, database, then table. I was not able to find a great way to create a reusable flow for these as the controller service for the DB couldn't be dynamically set. Nonetheless, I have a bit of SQL executing to fetch the delta from the source. That then feeds into an InferSchema processor so I can grab the schema and have that as an attribute. Next the flow goes into an UpdateAttribute processor where I have it pick up some table specific attributes I have set, such as schema_name, table_name, hdfs_parent_location, etc... All of these attributes are there to help the flow be unique and I also use them to build the hive "create if not exists" and "msck repair" scripts at the end of the process.
Step 2 -- Once that bit of data is set for each source, they all flow into one reusable template, using the Input and Output processors. This is where I am a bit concerned that I am doing things correctly. The reusable flow consists of the following:
- AvroToJson
- Custom Groovy Script to encrypt all the incoming PII (being read from an attribute the user would set)
- Split JSON
- JSONtoAvro: Sending the flow back to Avro
- MergeContent: Merging all with Bin-Packing, Avro Format, Using ${avro_schema}_${tablename} as the Correlation Attribute and I have it set now to try and merge the files to around 64MB, using 5 bins.
- PutHDFS: Dynamically creates the fully qualified location from the attributes and partitions data by "DAY".
- Replace Text: I build the create statement and msck repair table to add any partitions.
- PutHQL: Run statement
Here are my questions:
1. I am not sure the MergeContent processor is working correctly 100% of the time. I have found that if I use larger tables (1-2MM records) at the same time, it generally fails at the MergeContent step. It seems that it has a hard time using the correlation attribute that I have set and I have a bit of trouble getting the file sizes to be around that 64MB size. For the correlation attribute issue, I find that the processors fails off records (I have failure route back to itself so I can check the queue) and they generally sit in the queue. I can see which table its coming from, but when I check the failure queue, all of the attributes which were previously set are now gone. I have checked them step by step and they don't get erased until the MergeContent runs. So that is a bit strange. For file size, my assumption is the bins are getting full, but I have only two tables running durning my test and the MergeContent processor is set to 64MB on min group size and 5 minutes for the wait time. So please tell me if I am missing something (I am sure I am). Also please let me know if you think the correlation attribute problem I am having is typical in how I am using the processor or if you think it should work. In production, it will be much more than 2 tables running at one time... probably 50 or so.
2. From what I have listed above does anyone think the flow for each table should be its own end to end or am I on the right track with having this single reusable template for which all flows go into? I would assume the thread count would be much less for the reusable approach, but do worry a bit about moving that much data through a single group.
Any help in direction would be much appreciated!!
Thanks,
Chris
Created 01-16-2018 06:10 PM
Maybe helpful to share your MergeContent processors configuration here.
1. How many bins is the processor configured to use?
2. Sounds like each incoming FlowFile may have a considerable Attribute map size. All the attributes of the FlowFiles being merged are held in heap memory until the merge is complete, You may be having heap issues. Seen any Out of Memory errors in the nifi app .log?
3. What is the correlation attribute you are using to bin like FlowFiles?
4. How large is each FlowFile being merged? If they are very small (meaning it would take more then 20,000 of them to reach a 64 MB merged file), you may want to use multiple mergeContent processors in series to reduce the heap usage.
Useful links:
https://community.hortonworks.com/questions/149047/nifi-how-to-handle-with-mergecontent-processor.ht...
https://community.hortonworks.com/questions/87178/merge-fileflow-files-based-on-time-rather-than-siz...
I have no personally seen FlowFiles routed to Failure losing their attributes. That seems very odd to me. The merged FlowFile, depending on configuration, may have different attributes however.
I am assuming that your "avro_schema" attribute nay be fairly large. It may be better to use something smaller for your correlation attribute value in the MergeContent processor. You could use the ExtractAvroMetadata processor before the MergeContent processor. It will give you a "schema.fingerprint" attribute you could use instead to accomplish the same.
Are you putting "${avro_schema}_${tablename}" in the mergeContent processor's Correlation Attribute Name property value? What this property does is resolve the provide EL above to its actual values then checks the incoming Flowfiles for an attribute with that resolved value. If found it places FlowFiles where the value of that resolved attribute match in the same bin. Just want to make sure you are using this property correctly. All FlowFiles that do not have the FlowFile attribute are allocated to a single bin.
You also need to make sure your mergeContent processor is configured to have enough bins (number of needed bins +1) to accommodate all the various possible unique correlation attribute values. If you do not have enough bins, the mergeContent will force the merging of the oldest bin to free a bin to continue allocating additional FlowFiles.
Thank you,
Matt
Created 01-16-2018 06:10 PM
Maybe helpful to share your MergeContent processors configuration here.
1. How many bins is the processor configured to use?
2. Sounds like each incoming FlowFile may have a considerable Attribute map size. All the attributes of the FlowFiles being merged are held in heap memory until the merge is complete, You may be having heap issues. Seen any Out of Memory errors in the nifi app .log?
3. What is the correlation attribute you are using to bin like FlowFiles?
4. How large is each FlowFile being merged? If they are very small (meaning it would take more then 20,000 of them to reach a 64 MB merged file), you may want to use multiple mergeContent processors in series to reduce the heap usage.
Useful links:
https://community.hortonworks.com/questions/149047/nifi-how-to-handle-with-mergecontent-processor.ht...
https://community.hortonworks.com/questions/87178/merge-fileflow-files-based-on-time-rather-than-siz...
I have no personally seen FlowFiles routed to Failure losing their attributes. That seems very odd to me. The merged FlowFile, depending on configuration, may have different attributes however.
I am assuming that your "avro_schema" attribute nay be fairly large. It may be better to use something smaller for your correlation attribute value in the MergeContent processor. You could use the ExtractAvroMetadata processor before the MergeContent processor. It will give you a "schema.fingerprint" attribute you could use instead to accomplish the same.
Are you putting "${avro_schema}_${tablename}" in the mergeContent processor's Correlation Attribute Name property value? What this property does is resolve the provide EL above to its actual values then checks the incoming Flowfiles for an attribute with that resolved value. If found it places FlowFiles where the value of that resolved attribute match in the same bin. Just want to make sure you are using this property correctly. All FlowFiles that do not have the FlowFile attribute are allocated to a single bin.
You also need to make sure your mergeContent processor is configured to have enough bins (number of needed bins +1) to accommodate all the various possible unique correlation attribute values. If you do not have enough bins, the mergeContent will force the merging of the oldest bin to free a bin to continue allocating additional FlowFiles.
Thank you,
Matt
Created 01-16-2018 08:14 PM
Hey Matt,
Thanks for the quick and detailed response. Below are some answers for you:
1. Currently I have two tables I am testing with and I have the bin set to 5.
2. I did see some memory issues in the past and have upped the JVM to ~10GB. That seemed to work just fine.
3. I was using the following correlation attribute: "${avro_schema}_${tablename}". I like your suggestion about using the fingerprint from the avro metadata and have changed the correlation attribute to be: ${schema.fingerprint}. One question about that... is the processor smart enough to give a unique attribute for two different tables that might contain the same column names? Example: If i have two dimension tables that have something like ID, Name, Value, am I going to get the same fingerprint for those two tables because their schema is the same? If yes, should I concat another attribute onto it, like I was doing in the beginning?
4. The flow files being merged were tiny and I have implemented your suggestion to have two MergeContent processors. The first one groups in bundles of around 20K files and the second one further groups into chunk sizes of around 64MB. That worked pretty great. Thanks for the solution here.
On your note about the following....
"""
Are you putting "${avro_schema}_${tablename}" in the mergeContent processor's Correlation Attribute Name property value? What this property does is resolve the provide EL above to its actual values then checks the incoming Flowfiles for an attribute with that resolved value.
"""
My assumption with underscoring both attributes was to create a very unique attribute for the processor to merge on.. So if I had a schema named "foo" and a table named "bar", then my assumption was it would look for BOTH attributes and then create a "foo_bar" attribute. From the sound of it, I think you are saying the MergeContent processor is looking for a single attribute named "foo_bar"... which does make sense to me. That might be why the flow files were failing as the pre-merged attribute was not present.
Am I on the right track with the above statement?
Also do you have a suggestion on if a reusable flow template would be better to route these hundred tables to or if it would make sense to have 100 unique "end to end" flows (one for each table)?
Thanks again.. I really appreciate your insights.
Chris
Created on 01-16-2018 09:54 PM - edited 08-18-2019 12:17 AM
Are you entering "${schema.fingerprint}" or "schema.fingerprint" in the "Correlation Attribute Name" property field of the MergeContent processor?
If you are looking to bin files where the value assigned to the "schema.fingerprint" attribute matches, you will want to enter only "schema.fingerprint" in that property.
If you want your correlation attribute name to be more unique, you can use the updateAttribute processor before the MergeContent to create something more unique based on both ${schema.fingerprint} and ${tablename}.
for example:
followed by MergeContent configured similar to below:
You understanding of my original explanation was correct. That could also explain missing attributes since all Flwofile without the attribute would end up in same bin and only matching attributes would be retained on merged FlowFiles.
As far as one or many duplicate flows. It makes sense to have only a single flow here, but having many identical flows is also ok. Only thing to consider is NiFi processors obtain threads from a thread pull configured in NiFi. The more processors you have, the more processors there are requesting to use a thread from that thread pool. Plus more dataflows just means more processors to manage.
The "Max Timer Driven thread count" is set within "Controller settings" found within the hamburger menu icon in the upper right corner of the NiFi UI. You will also find a setting for "Max Event Driven Thread count there",but do not change the value there. There is nothing you will add to the canvas that will use Event Driven Threads unless you specifically configure the processor to use them. This is a deprecated feature that only still exist to avoid breaking backwards compatibility with older flows. Timer Driven thread are much more efficient and will out perform Event Driven threads anyway. The default for Timer Driven threads is only 10. a good staring place here is 2 - 4 times the number of cores you have on a single NiFi host.
Assume a 4 node NiFi cluster 4 hosts X (32 cores per node). You would set the Max Timer driven Thread count setting to 64 - 128. Assuming you set it to 32, this would mean there are 32 threads available per host for a cluster total of 128.
Monitor top on your systems as you run your dataflow and adjusts as you see fit from there.
Thanks,
Matt
Please take a moment to click "Accept" if you feel I have addressed your questions.
Created 01-16-2018 10:04 PM
I was putting the "${schema.fingerprint}" within the MergeContent processor. That is likely the issue I am facing here. Thanks for the feedback. Also, thank you for the explanation about the thread pool and cluster info. I will take that into consideration as I move forward. Appreicate the tips!
Chris