Support Questions
Find answers, ask questions, and share your expertise

NiFi: unable to improve performances

I've developed a NiFi flow prototype for data ingestion in HDFS. Now I would like to improve the overall performances but it seems I cannot really move forward.

The flow takes in input csv files (each row has 80 fields), split them at row level, applies some transformations to the fields (using 4 custom processors executed sequentially), buffers the new rows into csv files, outputs them into HDFS. I've developed the processors in such a way the content of the flow file is accessed only once when each individual record is read and its fields are moved to flowfile attributes. Tests have been performed on a amazon EC2 m4.4xlarge instance (16 cores CPU, 64 GB RAM).

This is what I tried so far:

  • Moved the flowfile repository and the content repository on different SSD drives
  • Moved the provenance repository in memory (NiFi could not keep up with the events rate)
  • Configuring the system according to the configurationbest practices
  • I've tried assigning multiple threads to each of the processors in order to reach different numbers of total threads
  • I've tried increasing the nifi.queue.swap.threshold and setting backpressure to never reach the swap limit
  • Tried different JVM memory settings from 8 up to 32 GB (in combination with the G1GC)
  • I've tried increasing the instance specifications, nothing changes

From the monitoring I've performed it looks like disks are not the bottleneck (they are basically idle a great part of the time, showing the computation is actually being performed in-memory) and the average CPU load is below 60%. The most I can get is 215k rows/minute, which is 3,5k rows/second. In terms of size, it's just 4,7 MB/s. I am aiming to something definitely greater than this. Just as a comparison, I created a flow that reads a file, splits it in rows, merges them together in blocks and outputs on disk. Here I get 12k rows/second, or 17 MB/s. Doesn't look surprisingly fast too and let me think that probably I am doing something wrong. Does anyone has suggestions about how to improve the performances? How much will I benefit from running NiFi on cluster instead of growing with the instance specs? Thank you all

17 REPLIES 17

So without doing anything fancy configuration wise and having a very basic template like pure-split-merge.xml (it assumes compressed input) i get around 13,333 events/sec on a very stable basis. The disk is moving but fine. CPU is pretty busy but fine. GC is busy but fine and no full GCs.

So, at this point it looks like there are some opportunities to improve how we schedule processors to be both more aggressive and less noisy (when there is no work to do). So a few of us are looking into that. This goes to your question of wasting speed. We see some cases where our scheduler itself could be wasting speed opportunities.

Now, in the mean time a definitely fast option is to avoid the need to split data in the first place. Simply have the processors which were extracting attributes and then later altering content be composed together and operate on the dataset events. That is less elegant and reusable admittedly so I'm not proposing that is the end solution just stating that this approach works well.

Anyway, we'll keep this conversation going. This is a perfect case to evaluate performance as it exercises a few important elements and can be a common pattern.

More to follow as we learn more. This will almost certainly end up in a blog/article 🙂

Riccardo, I'm sorry you were having this problem, but I just wanted to say thank you for writing such a complete and detailed question. Providing the list of things you have already tried and specific expectations makes answering it much easier for everyone involved. It definitely cuts down on the mental gymnastics of trying to estimate the experience level and comprehension of a user we haven't communicated with before.

Rising Star

@Riccardo Iacomini, Using the 'pure-split-merge.xml' template that @jwitt provided, I am seeing numbers that I think we can improve. It's easy to see when running that template for a while that the bottleneck in the flow is MergeContent. Looking at what it is doing, and poking around a bit with a profiler, it looks like it is pretty inefficient in its pulling of FlowFiles from the queue. It was done this way for good reason, but I think with a minor (backward compatible enhancement to the API we can actually improve it quite a bit. I will look into it some more and let you know what I come up with.Thanks for bringing this to our attention! This is a pretty common use case and one that we certainly want to do be able to handle extremely well.

I figured there was something not working at his best, but I could not think I was the first one getting into this kind of issue. However, thank YOU all for the help and support provided, really appreciated.

UPDATE

I am moving to the new flowFile design as suggested in which the work unit is a set of lines instead of single ones. Since then it looks to me that there are significant improvements given that each block is working at 10 Mb/s in single thread.

The current problem is that after some processing time I start to trigger full GCs. I tried reviewing the code and substituting concatenations with StringBuilder.append(). Things seem to improve but I still eventually trigger full GCs. Any suggestion on how to avoid them? Currently I've been using NiFi with 32GB of configured memory and G1 as GC.

Hi everybody!

I would like to share how my scenario has developed in case someone will have similar issues.

As a brief recap, my task was to design a rather simple flow to process csv data, apply some transformations to some of its fields, output them to Hive. As everybody I had some performance requirements, but in general you want to create a flow as fast as possible.

My original design was to deal with the csv file at record-level. Files would have been split up to single lines, fields ingested in NiFi as flowfile attributes, processed by custom processors, then finally buffered together and sent to HDFS. The main reason why I initially designed it this way was to try to perform the computation entirely in memory, as flowfile attributes are stored in memory (given that flowfiles in the queues are below a configurable threshold). This seemed to me the fastest way. Some takeaways from the learning process of a relatively newbie NiFi user:

  • The basic work unit that you choose at design time has a huge impact in terms of latency/throughput. Seems like a obvious thing (was not clear to me it could have this impact on the performances), but if you want high throughput you should consider batching things together. On the other hand, small work unit get processed faster and give to you better latency. I thought that having single csv rows would have been a compromise between the two, but I was wrong. Really wrong. Batching rows together, and still processing them in memory, I got almost a 3x performance gain, but still was not enough given that the flow processed only 10MB/s.
  • Splitting and merging is very costly. As @mpayne wrote, things probably are going to improve, but if you are not forced to split the input, you will get a significant performance gain.
  • Flowfile attribute processing should be limited to few aspects, and not adopted as a way to ingest data into NiFi for further processing. After some performance gains with batching, I tried to increase the speed of the flow allocating multiple threads to processors. I was not able to achieve any improvement. This generated instead a really huge GC problem, which I reported in this question. After some time studying G1, and with some help from S.O. folks, I discovered that basically the issue could not be solved (or at least easily), since I had a huge live data set in memory that could not be collected by the garbage collector and that led to continuous full GCs. Thanks to @jwitt I started developing a "classic" flow which simply read flowfiles content instead of attributes, in which we batch rows together as work unit (maybe I overthougth the flow design a little bit at the beginning 😄 ).
  • With this new design I managed to get almost 55MB/s of throughput with a single machine. I don't think that this is bad, since csv records have 80 field each, and that the code has not been profiled. Adding more processing to the data I probably will see this value lower, but I think that I am in a sweet spot to eventually get a fast NiFi flow.

Thank you all for your support, hope the effort I put into learning NiFi will help someone else!

View solution in original post

@Riccardo Iacomini great post here and I do think it will be quite helpful to others. As a result of this thread there are a couple really important and helpful JIRAs being worked on as well. SplitText performance will be substantially improved and MergeContent will as well. But, the design you've come to will still be the highest sustained performing approach. As you noted FlowFile attributes need to be used wisely. They're a powerful tool but they should be generally focused on things like tagging/routing rather than as a sort of in memory holder between deserialization and serialization.

Anyway great post and follow-through to help others!

@Riccardo Iacomini Thank you for the great post! This is very helpful. Here I am wondering how you batch things together like having many csv rows instead of one csv row. Because if we want to batch csv row into multiple rows, we use MergeContent processor, but you also mention that MergeContent is costly. So how batch processing will work on Nifi??