Member since
09-27-2016
22
Posts
11
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
21486 | 10-18-2016 10:01 AM |
10-18-2016
10:01 AM
2 Kudos
Hi everybody! I would like to share how my scenario has developed in case someone will have similar issues.
As a brief recap, my task was to design a rather simple flow to process csv data, apply some transformations to some of its fields, output them to Hive. As everybody I had some performance requirements, but in general you want to create a flow as fast as possible.
My original design was to deal with the csv file at record-level. Files would have been split up to single lines, fields ingested in NiFi as flowfile attributes, processed by custom processors, then finally buffered together and sent to HDFS. The main reason why I initially designed it this way was to try to perform the computation entirely in memory, as flowfile attributes are stored in memory (given that flowfiles in the queues are below a configurable threshold). This seemed to me the fastest way.
Some takeaways from the learning process of a relatively newbie NiFi user: The basic work unit that you choose at design time has a huge impact in terms of latency/throughput. Seems like a obvious thing (was not clear to me it could have this impact on the performances), but if you want high throughput you should consider batching things together. On the other hand, small work unit get processed faster and give to you better latency. I thought that having single csv rows would have been a compromise between the two, but I was wrong. Really wrong. Batching rows together, and still processing them in memory, I got almost a 3x performance gain, but still was not enough given that the flow processed only 10MB/s.
Splitting and merging is very costly. As @mpayne wrote, things probably are going to improve, but if you are not forced to split the input, you will get a significant performance gain.
Flowfile attribute processing should be limited to few aspects, and not adopted as a way to ingest data into NiFi for further processing. After some performance gains with batching, I tried to increase the speed of the flow allocating multiple threads to processors. I was not able to achieve any improvement. This generated instead a really huge GC problem, which I reported in this question. After some time studying G1, and with some help from S.O. folks, I discovered that basically the issue could not be solved (or at least easily), since I had a huge live data set in memory that could not be collected by the garbage collector and that led to continuous full GCs. Thanks to @jwitt I started developing a "classic" flow which simply read flowfiles content instead of attributes, in which we batch rows together as work unit (maybe I overthougth the flow design a little bit at the beginning 😄 ).
With this new design I managed to get almost 55MB/s of throughput with a single machine. I don't think that this is bad, since csv records have 80 field each, and that the code has not been profiled. Adding more processing to the data I probably will see this value lower, but I think that I am in a sweet spot to eventually get a fast NiFi flow.
Thank you all for your support, hope the effort I put into learning NiFi will help someone else!
... View more
10-11-2016
02:59 PM
Thanks for sharing your knowledge, I will try your tips. This specific GC issue is happening only when I assign multiple threads to the processors and try to speed up the flow, that otherwise runs at roughly 10MB/s in single thread.
I originally designed the flow to use flowfile attributes cause I was tempted to make the computation happen in memory. I thought that it would have been faster with respect to reading the flowfile content in each processor, and consequently parsing it to get specific fields. Do you suggest trying to implement a version that works, let's say, "on disk" on flowfile content instead of attributes?
... View more
10-04-2016
10:12 AM
1 Kudo
UPDATE
I've played a little bit with GC1 using Oracle documentation. I still do not have a definitive solution. I settled to the default configuration, modifying the region size accordingly to the heap (54GB, 30MB region size). I tried to better understand the problem and obtained a detailed picture of the heap using GCViewer. That's the associated heap chart after three consecutive full GCs:
A little bit of explanation for the output if you have not played with GCViewer:
It looks to me that the issue is in that the tenured heap: while the young heap is constantly cleaned in the GC young cycles (the grey line oscillating at the top of the picture), the tenured heap is used but never cleaned up.
Some questions:
Could it be a viable option to triggering more mixed GCs? How do I obtain that using the configuration parameters?
Should I switch to the old CMS GC even if it proved itself to be not that good on huge heaps?
Any suggestion is really appreciated. Thank you
_________________________________________________________________________________________________
Hello everybody,
I am developing a flow which requires good performances. It is running on a single instance with 16 cores and 64GB memory. I've configured NiFi to use 54GB of them with the G1 garbage collector. I am currently facing this memory issue: with a fresh NiFi start, all processors running but no input, it uses 1GB. I get some input in (1.5GB), the flow starts, consumes all the data and ends the computation without triggering any full GC. Tracking the memory using the system diagnostic page, I can see that it is constantly eating memory, at the point that only 2-3 free GB are left when no input is left in the flow. No space is released after that. I would expect that that's the most memory it needs to consume that input data, but if I try to get the same input size back in, it basically does not release any memory, in fact it continues eating it until full GCs are triggered. It takes really too much time to complete them, that's a screenshot of the system diagnostic page after the first one is triggered:
Is this a normal behavior for the JVM? Is there a way I can avoid triggering full GCs? I've run over the custom processors code, trying to limit object creations as much as I could (not a java expert, tried to google around a little bit in particular regarding Strings). A great part of the computation is performed accessing flowfile attributes (except for one processor that loads the input as flowfiles attributes, and one that writes them back to the flowfile content). Maybe this could be a programming issue, but given my limited experience I cannot tell, I would be glad to read suggestions and discover this is indeed the case. I've also tried to reduce the assigned heap down to 16GB. Full GCs are quicker, but more frequent as I expected.
Is there a way to completely avoid full GCs?
Thank you all
... View more
Labels:
- Labels:
-
Apache NiFi
10-03-2016
05:12 PM
2 Kudos
UPDATE I am moving to the new flowFile design as suggested in which the work unit is a set of lines instead of single ones. Since then it looks to me that there are significant improvements given that each block is working at 10 Mb/s in single thread. The current problem is that after some processing time I start to trigger full GCs. I tried reviewing the code and substituting concatenations with StringBuilder.append(). Things seem to improve but I still eventually trigger full GCs. Any suggestion on how to avoid them? Currently I've been using NiFi with 32GB of configured memory and G1 as GC.
... View more
09-30-2016
07:38 AM
I figured there was something not working at his best, but I could not think I was the first one getting into this kind of issue. However, thank YOU all for the help and support provided, really appreciated.
... View more
09-29-2016
01:13 PM
I was reasoning about the simple split-merge flow and your results (10-20k) vs mine (12k): if you have an ssd on your laptop, would they be similar anyway given that each processor has assigned one thread and they both write on a fast disk? Any difference would be decided only by the processor speed.
I've tried to implement the same flow but assigning more threads to the split processor: this basically makes the merge processor drown because hundreds of thousands flowfile are created. This maybe suggests that I do have the speed to process the data the way I want, but I cannot setup queues and NiFi in general to support that.
Which guidelines should I try to follow regarding the number of total flowfiles in the queues (and in the complete flow actually)? Correct me if I am wrong, but limiting the number of flowfiles waiting in a queue does not imply that I could waste the speed of a fast processor trying to gather input from it? Sorry for the number of questions.
... View more
09-29-2016
08:52 AM
I really do not know what to think. I tried batch processing, still no major improvement also replacing the custom processor with UpdateAttribute (also noticed the advanced section, thank you) which is faster. Then, given disk speed that your flow required to reach 20k I think we can exclude the disk speed from the list of possible causes.
I have one doubt about the garbage collector. Is the time reported the total time spent on garbage collector? How do you evaluate the JVM behavior with respect to these values?
... View more
09-28-2016
01:49 PM
I was not aware of NiFi's batching capabilities, I'll have a look at that. The machine is a 16 cores CPU, 64 GB RAM ubuntu 16.04 running on AWS. It has separate attached ebs SSDs for the flowfile repository and content repository. To evaluate the performance of the disks I've executed this command as suggested in several forums: time sh -c "dd if=/dev/zero of=testfile bs=100k count=1k && sync"
1024+0 records in
1024+0 records out
104857600 bytes (105 MB, 100 MiB) copied, 0.0617739 s, 1.7 GB/s
real 0m0.600s
user 0m0.004s
sys 0m0.060s
Yes, I could definitely combine even all of them even though I would prefer having sevaral processors performing simple but specific transformations of the data instead of mixing all. From what I understood, the performances of the built-in processors alone would not be still ideal right? I've never tried a debugger/profiler at runtime (basically just started using NiFi). Do you have any useful resource to suggest? It simply copies some attributes into different ones that could be later transformed Thank you again for your time
... View more
09-28-2016
09:39 AM
1) I've not configured NiFi on a cluster yet. My idea was to evaluate the performance of a single machine and then move to a cluster. Given the situation I will definitely try to do this. 2) That's something I can do but since I've not used huge csv files (at most 2GB, 1,4M rows), after some seconds of delay the first phase of the split has emitted the first flowfiles and the flow actually starts. 3-4-6) No throttling is configured on the system. I tried not to configure backpressure on the links, but it was better with it. I am leaving out the HDFS output phase since that seems not to be the issue (its queue was basically always empty) 5) The custom processors are clearly the slowest one. As I pointed out in the other anwer, I expected that since they perform more complex operations than the built-in processors. But still, they just access flowfile attributes and create new ones. What really concerns me, and let me think that maybe there's something else I cannot recognize is that even removing them and just splitting rows and batching them together in order to save them on the local disk, I get 12k rows/sec. As also @jwitt wrote, this number should be way higher. 7) NiFi 0.7 😎 Oracle JDK 8 9) bootstrap.txt Thank you all, really appreciated
... View more
09-28-2016
09:12 AM
The input file is 2GB and has 1.4M rows. I do perform a two phase split with 5k and 1 steps. I ha configured backpressure and my swap threshold is 40k. However, even without setting it, would that number of flowfiles be a problem?
I've attached the flow screenshots and the GC info: I can see that the slowest processors are the custom ones. I expected that since they perform more complex operations than the built-in processors. But still, they just access flowfile attributes and create new ones. Assigning several threads to them seems not to work. However, the point of the last part of the post was that even bypassing them, and just splitting the rows and merging them together I get 12k rows/seconds. Should I move to a higher level and consider batches of rows as work units?
Thank you for your time.
... View more
- « Previous
-
- 1
- 2
- Next »