Created 09-27-2016 01:41 PM
I've developed a NiFi flow prototype for data ingestion in HDFS. Now I would like to improve the overall performances but it seems I cannot really move forward.
The flow takes in input csv files (each row has 80 fields), split them at row level, applies some transformations to the fields (using 4 custom processors executed sequentially), buffers the new rows into csv files, outputs them into HDFS. I've developed the processors in such a way the content of the flow file is accessed only once when each individual record is read and its fields are moved to flowfile attributes. Tests have been performed on a amazon EC2 m4.4xlarge instance (16 cores CPU, 64 GB RAM).
This is what I tried so far:
From the monitoring I've performed it looks like disks are not the bottleneck (they are basically idle a great part of the time, showing the computation is actually being performed in-memory) and the average CPU load is below 60%. The most I can get is 215k rows/minute, which is 3,5k rows/second. In terms of size, it's just 4,7 MB/s. I am aiming to something definitely greater than this. Just as a comparison, I created a flow that reads a file, splits it in rows, merges them together in blocks and outputs on disk. Here I get 12k rows/second, or 17 MB/s. Doesn't look surprisingly fast too and let me think that probably I am doing something wrong. Does anyone has suggestions about how to improve the performances? How much will I benefit from running NiFi on cluster instead of growing with the instance specs? Thank you all
Created 10-18-2016 10:01 AM
Hi everybody!
I would like to share how my scenario has developed in case someone will have similar issues.As a brief recap, my task was to design a rather simple flow to process csv data, apply some transformations to some of its fields, output them to Hive. As everybody I had some performance requirements, but in general you want to create a flow as fast as possible.
My original design was to deal with the csv file at record-level. Files would have been split up to single lines, fields ingested in NiFi as flowfile attributes, processed by custom processors, then finally buffered together and sent to HDFS. The main reason why I initially designed it this way was to try to perform the computation entirely in memory, as flowfile attributes are stored in memory (given that flowfiles in the queues are below a configurable threshold). This seemed to me the fastest way. Some takeaways from the learning process of a relatively newbie NiFi user:
Thank you all for your support, hope the effort I put into learning NiFi will help someone else!
Created 09-29-2016 03:26 PM
So without doing anything fancy configuration wise and having a very basic template like pure-split-merge.xml (it assumes compressed input) i get around 13,333 events/sec on a very stable basis. The disk is moving but fine. CPU is pretty busy but fine. GC is busy but fine and no full GCs.
So, at this point it looks like there are some opportunities to improve how we schedule processors to be both more aggressive and less noisy (when there is no work to do). So a few of us are looking into that. This goes to your question of wasting speed. We see some cases where our scheduler itself could be wasting speed opportunities.
Now, in the mean time a definitely fast option is to avoid the need to split data in the first place. Simply have the processors which were extracting attributes and then later altering content be composed together and operate on the dataset events. That is less elegant and reusable admittedly so I'm not proposing that is the end solution just stating that this approach works well.
Anyway, we'll keep this conversation going. This is a perfect case to evaluate performance as it exercises a few important elements and can be a common pattern.
More to follow as we learn more. This will almost certainly end up in a blog/article 🙂
Created 09-28-2016 06:47 PM
Riccardo, I'm sorry you were having this problem, but I just wanted to say thank you for writing such a complete and detailed question. Providing the list of things you have already tried and specific expectations makes answering it much easier for everyone involved. It definitely cuts down on the mental gymnastics of trying to estimate the experience level and comprehension of a user we haven't communicated with before.
Created 09-29-2016 05:54 PM
@Riccardo Iacomini, Using the 'pure-split-merge.xml' template that @jwitt provided, I am seeing numbers that I think we can improve. It's easy to see when running that template for a while that the bottleneck in the flow is MergeContent. Looking at what it is doing, and poking around a bit with a profiler, it looks like it is pretty inefficient in its pulling of FlowFiles from the queue. It was done this way for good reason, but I think with a minor (backward compatible enhancement to the API we can actually improve it quite a bit. I will look into it some more and let you know what I come up with.Thanks for bringing this to our attention! This is a pretty common use case and one that we certainly want to do be able to handle extremely well.
Created 09-30-2016 07:38 AM
I figured there was something not working at his best, but I could not think I was the first one getting into this kind of issue. However, thank YOU all for the help and support provided, really appreciated.
Created 10-03-2016 05:12 PM
UPDATE
I am moving to the new flowFile design as suggested in which the work unit is a set of lines instead of single ones. Since then it looks to me that there are significant improvements given that each block is working at 10 Mb/s in single thread.
The current problem is that after some processing time I start to trigger full GCs. I tried reviewing the code and substituting concatenations with StringBuilder.append(). Things seem to improve but I still eventually trigger full GCs. Any suggestion on how to avoid them? Currently I've been using NiFi with 32GB of configured memory and G1 as GC.
Created 10-18-2016 10:01 AM
Hi everybody!
I would like to share how my scenario has developed in case someone will have similar issues.As a brief recap, my task was to design a rather simple flow to process csv data, apply some transformations to some of its fields, output them to Hive. As everybody I had some performance requirements, but in general you want to create a flow as fast as possible.
My original design was to deal with the csv file at record-level. Files would have been split up to single lines, fields ingested in NiFi as flowfile attributes, processed by custom processors, then finally buffered together and sent to HDFS. The main reason why I initially designed it this way was to try to perform the computation entirely in memory, as flowfile attributes are stored in memory (given that flowfiles in the queues are below a configurable threshold). This seemed to me the fastest way. Some takeaways from the learning process of a relatively newbie NiFi user:
Thank you all for your support, hope the effort I put into learning NiFi will help someone else!
Created 10-18-2016 11:21 AM
@Riccardo Iacomini great post here and I do think it will be quite helpful to others. As a result of this thread there are a couple really important and helpful JIRAs being worked on as well. SplitText performance will be substantially improved and MergeContent will as well. But, the design you've come to will still be the highest sustained performing approach. As you noted FlowFile attributes need to be used wisely. They're a powerful tool but they should be generally focused on things like tagging/routing rather than as a sort of in memory holder between deserialization and serialization.
Anyway great post and follow-through to help others!
Created 08-02-2019 02:25 PM
@Riccardo Iacomini Thank you for the great post! This is very helpful. Here I am wondering how you batch things together like having many csv rows instead of one csv row. Because if we want to batch csv row into multiple rows, we use MergeContent processor, but you also mention that MergeContent is costly. So how batch processing will work on Nifi??