Created 09-27-2016 01:41 PM
I've developed a NiFi flow prototype for data ingestion in HDFS. Now I would like to improve the overall performances but it seems I cannot really move forward.
The flow takes in input csv files (each row has 80 fields), split them at row level, applies some transformations to the fields (using 4 custom processors executed sequentially), buffers the new rows into csv files, outputs them into HDFS. I've developed the processors in such a way the content of the flow file is accessed only once when each individual record is read and its fields are moved to flowfile attributes. Tests have been performed on a amazon EC2 m4.4xlarge instance (16 cores CPU, 64 GB RAM).
This is what I tried so far:
From the monitoring I've performed it looks like disks are not the bottleneck (they are basically idle a great part of the time, showing the computation is actually being performed in-memory) and the average CPU load is below 60%. The most I can get is 215k rows/minute, which is 3,5k rows/second. In terms of size, it's just 4,7 MB/s. I am aiming to something definitely greater than this. Just as a comparison, I created a flow that reads a file, splits it in rows, merges them together in blocks and outputs on disk. Here I get 12k rows/second, or 17 MB/s. Doesn't look surprisingly fast too and let me think that probably I am doing something wrong. Does anyone has suggestions about how to improve the performances? How much will I benefit from running NiFi on cluster instead of growing with the instance specs? Thank you all
Created 10-18-2016 10:01 AM
Hi everybody!
I would like to share how my scenario has developed in case someone will have similar issues.As a brief recap, my task was to design a rather simple flow to process csv data, apply some transformations to some of its fields, output them to Hive. As everybody I had some performance requirements, but in general you want to create a flow as fast as possible.
My original design was to deal with the csv file at record-level. Files would have been split up to single lines, fields ingested in NiFi as flowfile attributes, processed by custom processors, then finally buffered together and sent to HDFS. The main reason why I initially designed it this way was to try to perform the computation entirely in memory, as flowfile attributes are stored in memory (given that flowfiles in the queues are below a configurable threshold). This seemed to me the fastest way. Some takeaways from the learning process of a relatively newbie NiFi user:
Thank you all for your support, hope the effort I put into learning NiFi will help someone else!
Created 09-27-2016 03:01 PM
Hello, how many lines/rows are in each incoming CSV file? A common pattern here is to do two-phase splits where the first phase splits into say 5000 line bundles and the second phase splits into single lines. Then using back pressure you can avoid ever creating too many flowfiles at once that aren't being operated on and simply causing excessive GC pressure.
On a rather modest system you should very easily see performance (in conservative terms) of 30+ MB/s for 10s or 100s of thousands of rows per second (as described in this flow). The bottleneck points in the flow should be fairly easily spotted through the UI. Could you attach a screenshot of the running flow? One thing that is not easily spotted is when it is GC pressure causing the issue. If you click on the summary page on go to 'system diagnostics' you can view garbage collection info. What does it show?
Created on 09-28-2016 09:12 AM - edited 08-18-2019 03:37 AM
The input file is 2GB and has 1.4M rows. I do perform a two phase split with 5k and 1 steps. I ha configured backpressure and my swap threshold is 40k. However, even without setting it, would that number of flowfiles be a problem? I've attached the flow screenshots and the GC info:
I can see that the slowest processors are the custom ones. I expected that since they perform more complex operations than the built-in processors. But still, they just access flowfile attributes and create new ones. Assigning several threads to them seems not to work. However, the point of the last part of the post was that even bypassing them, and just splitting the rows and merging them together I get 12k rows/seconds. Should I move to a higher level and consider batches of rows as work units? Thank you for your time.
Created 09-27-2016 03:15 PM
1. You can try two smaller nodes and split the load. Can you split the load across to smaller nodes?
2. Can you split the input csv so that multiple processors read it at once?
3. Do you have any throttling in any of the processors or queues in the system? how fast is the HDFS writer? How large is your HDFS cluster?
4. Can you try it with output to a non-HDFS file directory?
5. Can you look at the NiFi Data Provenance for each step and see where it is slow? See the ingest times.
6. For PutHDFS did you set IO Buffer Size, Block Size, Compression Codec?
7. What version of NIFI are you using? HDF 2 version is a bit faster than older ones.
8. Which JDK version?
9. Can you post the contents of your bootstrap.conf?
Created 09-28-2016 09:39 AM
Thank you all, really appreciated
Created 09-28-2016 01:13 PM
Cool. Thanks for all the details. Yes let's definitely avoid going the cluster route right now. Once we see reasonable node performance then we can deal with scale out.
Some quick observations
Questions:
I'll setup a vanilla flow that brings in a file like you mention, splits it, merges it, all on a basic laptop setup and let you know the results I see.
Created 09-28-2016 01:49 PM
I was not aware of NiFi's batching capabilities, I'll have a look at that.
time sh -c "dd if=/dev/zero of=testfile bs=100k count=1k && sync" 1024+0 records in 1024+0 records out 104857600 bytes (105 MB, 100 MiB) copied, 0.0617739 s, 1.7 GB/s real 0m0.600s user 0m0.004s sys 0m0.060s
Thank you again for your time
Created 09-28-2016 03:15 PM
No problem at all on time. Happy to help. This should be faster so let's figure out what is happening. Appreciate the details you're providing.
I've recreated a very similar flow. I am seeing basically 10,000-20,000 events per second (depending on tuning). In a very basic, default everything, single threaded flow i am getting an end-to-end 20,000 events/sec equating to about 20MB/sec. This is on my macbook. The amount of disk usage happening to make this happen given all the processors I have in the flow equates to about 60MB/s read with 50MB/s write. That is all steady state and healthy. But it does seem like it should be faster. Disk isn't tapped out nor is CPU and GC looks great. So, adding threads...performance actually seemed to drop a bit in this case and when I pushed it with a variety of scenarios it then did show these OOME. So, will be looking into this more. I've still got a 512MB heap so first I'll bump that a bit which is reasonable given what I'm trying to do now.
Regarding your CopyProcessor keep in mind the UpdateAttribute processor does what you describe already and supports batching nicely.
Regarding the logic of when to combine them into one or not yeah I totally agree with your thinking. Just wanted to put that out there for consideration. If you've already thought through that then I'm all with you.
Will provide more thoughts as I/we get a better handle on the bottlenecks and options to move the needle.
Created 09-29-2016 08:52 AM
I really do not know what to think. I tried batch processing, still no major improvement also replacing the custom processor with UpdateAttribute (also noticed the advanced section, thank you) which is faster. Then, given disk speed that your flow required to reach 20k I think we can exclude the disk speed from the list of possible causes. I have one doubt about the garbage collector. Is the time reported the total time spent on garbage collector? How do you evaluate the JVM behavior with respect to these values?
Created 09-29-2016 01:13 PM
I was reasoning about the simple split-merge flow and your results (10-20k) vs mine (12k): if you have an ssd on your laptop, would they be similar anyway given that each processor has assigned one thread and they both write on a fast disk? Any difference would be decided only by the processor speed. I've tried to implement the same flow but assigning more threads to the split processor: this basically makes the merge processor drown because hundreds of thousands flowfile are created. This maybe suggests that I do have the speed to process the data the way I want, but I cannot setup queues and NiFi in general to support that. Which guidelines should I try to follow regarding the number of total flowfiles in the queues (and in the complete flow actually)? Correct me if I am wrong, but limiting the number of flowfiles waiting in a queue does not imply that I could waste the speed of a fast processor trying to gather input from it? Sorry for the number of questions.