Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

NiFi: unable to improve performances

avatar
Contributor

I've developed a NiFi flow prototype for data ingestion in HDFS. Now I would like to improve the overall performances but it seems I cannot really move forward.

The flow takes in input csv files (each row has 80 fields), split them at row level, applies some transformations to the fields (using 4 custom processors executed sequentially), buffers the new rows into csv files, outputs them into HDFS. I've developed the processors in such a way the content of the flow file is accessed only once when each individual record is read and its fields are moved to flowfile attributes. Tests have been performed on a amazon EC2 m4.4xlarge instance (16 cores CPU, 64 GB RAM).

This is what I tried so far:

  • Moved the flowfile repository and the content repository on different SSD drives
  • Moved the provenance repository in memory (NiFi could not keep up with the events rate)
  • Configuring the system according to the configurationbest practices
  • I've tried assigning multiple threads to each of the processors in order to reach different numbers of total threads
  • I've tried increasing the nifi.queue.swap.threshold and setting backpressure to never reach the swap limit
  • Tried different JVM memory settings from 8 up to 32 GB (in combination with the G1GC)
  • I've tried increasing the instance specifications, nothing changes

From the monitoring I've performed it looks like disks are not the bottleneck (they are basically idle a great part of the time, showing the computation is actually being performed in-memory) and the average CPU load is below 60%. The most I can get is 215k rows/minute, which is 3,5k rows/second. In terms of size, it's just 4,7 MB/s. I am aiming to something definitely greater than this. Just as a comparison, I created a flow that reads a file, splits it in rows, merges them together in blocks and outputs on disk. Here I get 12k rows/second, or 17 MB/s. Doesn't look surprisingly fast too and let me think that probably I am doing something wrong. Does anyone has suggestions about how to improve the performances? How much will I benefit from running NiFi on cluster instead of growing with the instance specs? Thank you all

1 ACCEPTED SOLUTION

avatar
Contributor

Hi everybody!

I would like to share how my scenario has developed in case someone will have similar issues.

As a brief recap, my task was to design a rather simple flow to process csv data, apply some transformations to some of its fields, output them to Hive. As everybody I had some performance requirements, but in general you want to create a flow as fast as possible.

My original design was to deal with the csv file at record-level. Files would have been split up to single lines, fields ingested in NiFi as flowfile attributes, processed by custom processors, then finally buffered together and sent to HDFS. The main reason why I initially designed it this way was to try to perform the computation entirely in memory, as flowfile attributes are stored in memory (given that flowfiles in the queues are below a configurable threshold). This seemed to me the fastest way. Some takeaways from the learning process of a relatively newbie NiFi user:

  • The basic work unit that you choose at design time has a huge impact in terms of latency/throughput. Seems like a obvious thing (was not clear to me it could have this impact on the performances), but if you want high throughput you should consider batching things together. On the other hand, small work unit get processed faster and give to you better latency. I thought that having single csv rows would have been a compromise between the two, but I was wrong. Really wrong. Batching rows together, and still processing them in memory, I got almost a 3x performance gain, but still was not enough given that the flow processed only 10MB/s.
  • Splitting and merging is very costly. As @mpayne wrote, things probably are going to improve, but if you are not forced to split the input, you will get a significant performance gain.
  • Flowfile attribute processing should be limited to few aspects, and not adopted as a way to ingest data into NiFi for further processing. After some performance gains with batching, I tried to increase the speed of the flow allocating multiple threads to processors. I was not able to achieve any improvement. This generated instead a really huge GC problem, which I reported in this question. After some time studying G1, and with some help from S.O. folks, I discovered that basically the issue could not be solved (or at least easily), since I had a huge live data set in memory that could not be collected by the garbage collector and that led to continuous full GCs. Thanks to @jwitt I started developing a "classic" flow which simply read flowfiles content instead of attributes, in which we batch rows together as work unit (maybe I overthougth the flow design a little bit at the beginning 😄 ).
  • With this new design I managed to get almost 55MB/s of throughput with a single machine. I don't think that this is bad, since csv records have 80 field each, and that the code has not been profiled. Adding more processing to the data I probably will see this value lower, but I think that I am in a sweet spot to eventually get a fast NiFi flow.

Thank you all for your support, hope the effort I put into learning NiFi will help someone else!

View solution in original post

17 REPLIES 17

avatar

Hello, how many lines/rows are in each incoming CSV file? A common pattern here is to do two-phase splits where the first phase splits into say 5000 line bundles and the second phase splits into single lines. Then using back pressure you can avoid ever creating too many flowfiles at once that aren't being operated on and simply causing excessive GC pressure.

On a rather modest system you should very easily see performance (in conservative terms) of 30+ MB/s for 10s or 100s of thousands of rows per second (as described in this flow). The bottleneck points in the flow should be fairly easily spotted through the UI. Could you attach a screenshot of the running flow? One thing that is not easily spotted is when it is GC pressure causing the issue. If you click on the summary page on go to 'system diagnostics' you can view garbage collection info. What does it show?

avatar
Contributor

The input file is 2GB and has 1.4M rows. I do perform a two phase split with 5k and 1 steps. I ha configured backpressure and my swap threshold is 40k. However, even without setting it, would that number of flowfiles be a problem? I've attached the flow screenshots and the GC info:

8075-selection-123.png

8076-selection-126.png

8077-selection-127.png

8078-selection-129.png

8079-selection-130.png

I can see that the slowest processors are the custom ones. I expected that since they perform more complex operations than the built-in processors. But still, they just access flowfile attributes and create new ones. Assigning several threads to them seems not to work. However, the point of the last part of the post was that even bypassing them, and just splitting the rows and merging them together I get 12k rows/seconds. Should I move to a higher level and consider batches of rows as work units? Thank you for your time.

avatar
Master Guru

1. You can try two smaller nodes and split the load. Can you split the load across to smaller nodes?

2. Can you split the input csv so that multiple processors read it at once?

3. Do you have any throttling in any of the processors or queues in the system? how fast is the HDFS writer? How large is your HDFS cluster?

4. Can you try it with output to a non-HDFS file directory?

5. Can you look at the NiFi Data Provenance for each step and see where it is slow? See the ingest times.

6. For PutHDFS did you set IO Buffer Size, Block Size, Compression Codec?

7. What version of NIFI are you using? HDF 2 version is a bit faster than older ones.

8. Which JDK version?

9. Can you post the contents of your bootstrap.conf?

avatar
Contributor
  • 1) I've not configured NiFi on a cluster yet. My idea was to evaluate the performance of a single machine and then move to a cluster. Given the situation I will definitely try to do this.
  • 2) That's something I can do but since I've not used huge csv files (at most 2GB, 1,4M rows), after some seconds of delay the first phase of the split has emitted the first flowfiles and the flow actually starts.
  • 3-4-6) No throttling is configured on the system. I tried not to configure backpressure on the links, but it was better with it. I am leaving out the HDFS output phase since that seems not to be the issue (its queue was basically always empty)
  • 5) The custom processors are clearly the slowest one. As I pointed out in the other anwer, I expected that since they perform more complex operations than the built-in processors. But still, they just access flowfile attributes and create new ones. What really concerns me, and let me think that maybe there's something else I cannot recognize is that even removing them and just splitting rows and batching them together in order to save them on the local disk, I get 12k rows/sec. As also @jwitt wrote, this number should be way higher.
  • 7) NiFi 0.7
  • 😎 Oracle JDK 8
  • 9) bootstrap.txt

Thank you all, really appreciated

avatar

Cool. Thanks for all the details. Yes let's definitely avoid going the cluster route right now. Once we see reasonable node performance then we can deal with scale out.

Some quick observations

  • Your garbage collection results look great.
  • The custom procs are indeed rather slow. Though I'm not overly impressed with the numbers I see on the standard procs either. The second split text took 18 seconds to split 300MB worth of lines. Not ideal.
  • You definitely should take advantage of NiFi's automatic session batching capability. Check out the SupportsBatching annotation and you can find several examples of its use. By having a processor support that and in the UI having 'run duration' higher than 0 NiFI can automatically combine several commits into one and this can yield far higher throughput at the expense of latency (on the order of milliseconds)

Questions:

  • What is the underlying storage device that NiFi is using? Type of disk (local disk, HDD or SDD). Type of partitioning (are all the repos on a single disk).
  • Have you considered restructuring the composition of those custom processors? Could/should a couple reasonably be combined into a single step?
  • ProcessNullFields performance appears really poor. Have you done any testing/evaluation to see what that processor is spending the bulk of its time on? Attaching a debugger/profiler at runtime could be really enlightening.
  • CopyProcessor also appears heavy in terms of time. What does that one do?

I'll setup a vanilla flow that brings in a file like you mention, splits it, merges it, all on a basic laptop setup and let you know the results I see.

avatar
Contributor

I was not aware of NiFi's batching capabilities, I'll have a look at that.

  • The machine is a 16 cores CPU, 64 GB RAM ubuntu 16.04 running on AWS. It has separate attached ebs SSDs for the flowfile repository and content repository. To evaluate the performance of the disks I've executed this command as suggested in several forums:
time sh -c "dd if=/dev/zero of=testfile bs=100k count=1k && sync"
1024+0 records in
1024+0 records out
104857600 bytes (105 MB, 100 MiB) copied, 0.0617739 s, 1.7 GB/s

real	0m0.600s
user	0m0.004s
sys	0m0.060s
  • Yes, I could definitely combine even all of them even though I would prefer having sevaral processors performing simple but specific transformations of the data instead of mixing all. From what I understood, the performances of the built-in processors alone would not be still ideal right?
  • I've never tried a debugger/profiler at runtime (basically just started using NiFi). Do you have any useful resource to suggest?
  • It simply copies some attributes into different ones that could be later transformed

Thank you again for your time

avatar

No problem at all on time. Happy to help. This should be faster so let's figure out what is happening. Appreciate the details you're providing.

I've recreated a very similar flow. I am seeing basically 10,000-20,000 events per second (depending on tuning). In a very basic, default everything, single threaded flow i am getting an end-to-end 20,000 events/sec equating to about 20MB/sec. This is on my macbook. The amount of disk usage happening to make this happen given all the processors I have in the flow equates to about 60MB/s read with 50MB/s write. That is all steady state and healthy. But it does seem like it should be faster. Disk isn't tapped out nor is CPU and GC looks great. So, adding threads...performance actually seemed to drop a bit in this case and when I pushed it with a variety of scenarios it then did show these OOME. So, will be looking into this more. I've still got a 512MB heap so first I'll bump that a bit which is reasonable given what I'm trying to do now.

Regarding your CopyProcessor keep in mind the UpdateAttribute processor does what you describe already and supports batching nicely.

Regarding the logic of when to combine them into one or not yeah I totally agree with your thinking. Just wanted to put that out there for consideration. If you've already thought through that then I'm all with you.

Will provide more thoughts as I/we get a better handle on the bottlenecks and options to move the needle.

avatar
Contributor

I really do not know what to think. I tried batch processing, still no major improvement also replacing the custom processor with UpdateAttribute (also noticed the advanced section, thank you) which is faster. Then, given disk speed that your flow required to reach 20k I think we can exclude the disk speed from the list of possible causes. I have one doubt about the garbage collector. Is the time reported the total time spent on garbage collector? How do you evaluate the JVM behavior with respect to these values?

avatar
Contributor

I was reasoning about the simple split-merge flow and your results (10-20k) vs mine (12k): if you have an ssd on your laptop, would they be similar anyway given that each processor has assigned one thread and they both write on a fast disk? Any difference would be decided only by the processor speed. I've tried to implement the same flow but assigning more threads to the split processor: this basically makes the merge processor drown because hundreds of thousands flowfile are created. This maybe suggests that I do have the speed to process the data the way I want, but I cannot setup queues and NiFi in general to support that. Which guidelines should I try to follow regarding the number of total flowfiles in the queues (and in the complete flow actually)? Correct me if I am wrong, but limiting the number of flowfiles waiting in a queue does not imply that I could waste the speed of a fast processor trying to gather input from it? Sorry for the number of questions.