Member since
09-27-2016
22
Posts
11
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
12634 | 10-18-2016 10:01 AM |
09-26-2017
08:33 AM
Yes, it has been solved. But looking at the other comments it looks like not all the orc-related problems have been addressed and resolved.
... View more
09-26-2017
08:32 AM
Thanks for the info. Mine got actually solved with the upgrade, it might be related to a different reason.
... View more
09-04-2017
09:48 AM
Thank you, but I've already tried that. Seems like I've ran into this known issue. I will upgrade to 2.6.2 and share the result.
... View more
09-01-2017
08:50 AM
Hello, I am trying to evaluate hive LLAP on a Hortonworks HDP 2.6 cluster. Unfortunately, I get a java.lang.RuntimeException: ORC split generation failed when trying to execute queries: ERROR : Status: Failed
ERROR : Vertex failed, vertexName=Map 1, vertexId=vertex_1504166274656_0006_3_00, diagnostics=[Vertex vertex_1504166274656_0006_3_00 [Map 1] killed/failed due to:ROOT_INPUT_INIT_FAILURE, Vertex Input: gprs_records initializer failed, vertex=vertex_1504166274656_0006_3_00 [Map 1], java.lang.RuntimeException: ORC split generation failed with exception: java.lang.ArrayIndexOutOfBoundsException: 5
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1615)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1701)
at org.apache.hadoop.hive.ql.io.HiveInputFormat.addSplitsForGroup(HiveInputFormat.java:446)
at org.apache.hadoop.hive.ql.io.HiveInputFormat.getSplits(HiveInputFormat.java:569)
at org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator.initialize(HiveSplitGenerator.java:196)
at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:278)
at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:269)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:269)
at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:253)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException: 5
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1609)
... 15 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 5
at org.apache.orc.OrcFile$WriterVersion.from(OrcFile.java:145)
at org.apache.orc.impl.OrcTail.getWriterVersion(OrcTail.java:73)
at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:383)
at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.<init>(ReaderImpl.java:63)
at org.apache.hadoop.hive.ql.io.orc.OrcFile.createReader(OrcFile.java:89)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.populateAndCacheStripeDetails(OrcInputFormat.java:1419)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.callInternal(OrcInputFormat.java:1305)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.access$2600(OrcInputFormat.java:1104)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator$1.run(OrcInputFormat.java:1285)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator$1.run(OrcInputFormat.java:1282)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.call(OrcInputFormat.java:1282)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.call(OrcInputFormat.java:1104)
... 4 more
]
ERROR : Vertex killed, vertexName=Reducer 2, vertexId=vertex_1504166274656_0006_3_01, diagnostics=[Vertex received Kill in INITED state., Vertex vertex_1504166274656_0006_3_01 [Reducer 2] killed/failed due to:OTHER_VERTEX_FAILURE]
ERROR : DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:1
INFO : org.apache.tez.common.counters.DAGCounter:
INFO : AM_CPU_MILLISECONDS: 840
INFO : AM_GC_TIME_MILLIS: 23
ERROR : FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.tez.TezTask. Vertex failed, vertexName=Map 1, vertexId=vertex_1504166274656_0006_3_00, diagnostics=[Vertex vertex_1504166274656_0006_3_00 [Map 1] killed/failed due to:ROOT_INPUT_INIT_FAILURE, Vertex Input: gprs_records initializer failed, vertex=vertex_1504166274656_0006_3_00 [Map 1], java.lang.RuntimeException: ORC split generation failed with exception: java.lang.ArrayIndexOutOfBoundsException: 5
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1615)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1701)
at org.apache.hadoop.hive.ql.io.HiveInputFormat.addSplitsForGroup(HiveInputFormat.java:446)
at org.apache.hadoop.hive.ql.io.HiveInputFormat.getSplits(HiveInputFormat.java:569)
at org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator.initialize(HiveSplitGenerator.java:196)
at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:278)
at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:269)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:269)
at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:253)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException: 5
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1609)
... 15 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 5
at org.apache.orc.OrcFile$WriterVersion.from(OrcFile.java:145)
at org.apache.orc.impl.OrcTail.getWriterVersion(OrcTail.java:73)
at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:383)
at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.<init>(ReaderImpl.java:63)
at org.apache.hadoop.hive.ql.io.orc.OrcFile.createReader(OrcFile.java:89)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.populateAndCacheStripeDetails(OrcInputFormat.java:1419)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.callInternal(OrcInputFormat.java:1305)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.access$2600(OrcInputFormat.java:1104)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator$1.run(OrcInputFormat.java:1285)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator$1.run(OrcInputFormat.java:1282)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.call(OrcInputFormat.java:1282)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.call(OrcInputFormat.java:1104)
... 4 more
]Vertex killed, vertexName=Reducer 2, vertexId=vertex_1504166274656_0006_3_01, diagnostics=[Vertex received Kill in INITED state., Vertex vertex_1504166274656_0006_3_01 [Reducer 2] killed/failed due to:OTHER_VERTEX_FAILURE]DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:1
INFO : Resetting the caller context to HIVE_SSN_ID:2415846c-fb92-480f-b869-240a0b0f30ed
INFO : Completed executing command(queryId=hive_20170831085623_51baf1df-5823-459e-80cf-76fa1f81789f); Time taken: 0.342 seconds
Error: Error while processing statement: FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.tez.TezTask. Vertex failed, vertexName=Map 1, vertexId=vertex_1504166274656_0006_3_00, diagnostics=[Vertex vertex_1504166274656_0006_3_00 [Map 1] killed/failed due to:ROOT_INPUT_INIT_FAILURE, Vertex Input: gprs_records initializer failed, vertex=vertex_1504166274656_0006_3_00 [Map 1], java.lang.RuntimeException: ORC split generation failed with exception: java.lang.ArrayIndexOutOfBoundsException: 5
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1615)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1701)
at org.apache.hadoop.hive.ql.io.HiveInputFormat.addSplitsForGroup(HiveInputFormat.java:446)
at org.apache.hadoop.hive.ql.io.HiveInputFormat.getSplits(HiveInputFormat.java:569)
at org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator.initialize(HiveSplitGenerator.java:196)
at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:278)
at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:269)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:269)
at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:253)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException: 5
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1609)
... 15 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 5
at org.apache.orc.OrcFile$WriterVersion.from(OrcFile.java:145)
at org.apache.orc.impl.OrcTail.getWriterVersion(OrcTail.java:73)
at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:383)
at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.<init>(ReaderImpl.java:63)
at org.apache.hadoop.hive.ql.io.orc.OrcFile.createReader(OrcFile.java:89)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.populateAndCacheStripeDetails(OrcInputFormat.java:1419)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.callInternal(OrcInputFormat.java:1305)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.access$2600(OrcInputFormat.java:1104)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator$1.run(OrcInputFormat.java:1285)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator$1.run(OrcInputFormat.java:1282)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.call(OrcInputFormat.java:1282)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.call(OrcInputFormat.java:1104)
... 4 more
]Vertex killed, vertexName=Reducer 2, vertexId=vertex_1504166274656_0006_3_01, diagnostics=[Vertex received Kill in INITED state., Vertex vertex_1504166274656_0006_3_01 [Reducer 2] killed/failed due to:OTHER_VERTEX_FAILURE]DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:1 (state=08S01,code=2) Here's some information to complete the picture: I've enabled LLAP using the Ambari interface, installed HiveServer interactive, restarted the services. I am using Beeline version 1.2.1000.2.6.0 to connect to it. The table being queried is in ORC format. ORC files are generated by an ETL pipeline and written by Java code using ORC Core. I have no problem querying it without LLAP using HiveServer2. Any advice is really appreciated. Thank you all!
... View more
Labels:
- Labels:
-
Apache Hive
05-03-2017
01:33 PM
Sorry to bother again, two additional questions:
from my understanding, the File(), Execute(), Script() ... are meant to execute commands cluster-wise, right?
I imagine the configuration loaded using the get_config() method in the params and status_params scripts is backed up by some configuration file in the cluster. If I am right, where is it located?
Thank you for the help @bhagan
... View more
04-28-2017
02:14 PM
Hello everybody, I am trying to add a custom service to ambari using the documentation as main resource. The service is made up by two master components implemented as java processes. I can add the service to the stack and install it via the ambari web interface using the dummy python scripts provided as example. There are some aspects not covered I would like to know more about: Is there some additional resource on how to implement the actual methods to install, start and stop the components of the service? How can I distribute the program files in the cluster?
Thank you all for the help.
... View more
Labels:
- Labels:
-
Apache Ambari
04-05-2017
08:42 AM
Thank you, I've read that documentation page, but maybe I misunderstood something. Trying to explain what I actually would like to achieve maybe makes more sense:
I would like to create two separate folders on HDFS, one with storage policy All_SSD, and the other with storage policy of Hot. Files placed in these folders will be moved internally to SSDs or HDDs accordingly. When I move one file from one folder to another, will it actually involve the whole cluster, hence redistributing data, or the node will try to move its share of blocks between drives?
From your answer, I assume this move operation actually does nothing on a physical level, everything on that level will be managed by the mover tool, right? In that case, the question simply applies to the mover: will it try to perform the operations locally or will it involve all the nodes?
... View more
04-04-2017
11:56 AM
Hello,
I would like to implement tiered storage in a cluster. Suppose each node has several drives (HDDs and SSDs). If a move command is issued from a tier to another, will each node try to perform the operation "locally", moving its share of blocks between its drives, or conversely will the data be distributed again in the cluster hence consuming network capacity?
... View more
Labels:
- Labels:
-
Apache Hadoop
03-28-2017
10:51 AM
Hi @Matt Burgess, thank you for the reply. Sorry to answer this late, but I clearly missed your post. Just in case someone runs into this, I wish to link a mirror question on S.O. that can be useful.
... View more
03-08-2017
04:20 PM
Hi all!
I am trying to develop a custom processor in NiFi which writes directly orc files into a remote hadoop cluster. In order to write them, I am using the orc core api. I have tried writing orc files on the local FS and everything is fine so far (hive, which is their "final destination", has no problem in reading them). The issue is that, while trying to create a Writer object, I get a NoClassDefFoundError on org.apache.hadoop.hdfs.DistributedFileSystem. That's the code used: Configuration conf = new Configuration();
conf.addResource(new Path(hadoopConfigurationPath+"/core-site.xml"));
conf.addResource(new Path(hadoopConfigurationPath+"/hdfs-site.xml"));
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
String hdfsUri = conf.get("fs.default.name");
...
try{
writer = OrcFile.createWriter(new Path(hdfsUri+"/"+filename+".orc"), OrcFile.writerOptions(conf).setSchema(orcSchema));
}
catch(IOException e){
log.error("Cannot open hdfs file. Reason: "+e.getMessage());
session.transfer(flowfile, hdfsFailure);
return;
}
... I've copied the hadoop-hdfs jar in the lib directory, and I tried looking runtime at the jar loaded in the classpath using ClassLoader: the jar with the required class can be seen. Including the jar in the processor dependencies does not solve the issue too. Any suggestion on how to get rid of this error is really appreciated. Thank you all!
... View more
Labels:
- Labels:
-
Apache Hadoop
-
Apache NiFi
10-18-2016
10:01 AM
2 Kudos
Hi everybody! I would like to share how my scenario has developed in case someone will have similar issues.
As a brief recap, my task was to design a rather simple flow to process csv data, apply some transformations to some of its fields, output them to Hive. As everybody I had some performance requirements, but in general you want to create a flow as fast as possible.
My original design was to deal with the csv file at record-level. Files would have been split up to single lines, fields ingested in NiFi as flowfile attributes, processed by custom processors, then finally buffered together and sent to HDFS. The main reason why I initially designed it this way was to try to perform the computation entirely in memory, as flowfile attributes are stored in memory (given that flowfiles in the queues are below a configurable threshold). This seemed to me the fastest way.
Some takeaways from the learning process of a relatively newbie NiFi user: The basic work unit that you choose at design time has a huge impact in terms of latency/throughput. Seems like a obvious thing (was not clear to me it could have this impact on the performances), but if you want high throughput you should consider batching things together. On the other hand, small work unit get processed faster and give to you better latency. I thought that having single csv rows would have been a compromise between the two, but I was wrong. Really wrong. Batching rows together, and still processing them in memory, I got almost a 3x performance gain, but still was not enough given that the flow processed only 10MB/s.
Splitting and merging is very costly. As @mpayne wrote, things probably are going to improve, but if you are not forced to split the input, you will get a significant performance gain.
Flowfile attribute processing should be limited to few aspects, and not adopted as a way to ingest data into NiFi for further processing. After some performance gains with batching, I tried to increase the speed of the flow allocating multiple threads to processors. I was not able to achieve any improvement. This generated instead a really huge GC problem, which I reported in this question. After some time studying G1, and with some help from S.O. folks, I discovered that basically the issue could not be solved (or at least easily), since I had a huge live data set in memory that could not be collected by the garbage collector and that led to continuous full GCs. Thanks to @jwitt I started developing a "classic" flow which simply read flowfiles content instead of attributes, in which we batch rows together as work unit (maybe I overthougth the flow design a little bit at the beginning 😄 ).
With this new design I managed to get almost 55MB/s of throughput with a single machine. I don't think that this is bad, since csv records have 80 field each, and that the code has not been profiled. Adding more processing to the data I probably will see this value lower, but I think that I am in a sweet spot to eventually get a fast NiFi flow.
Thank you all for your support, hope the effort I put into learning NiFi will help someone else!
... View more
10-11-2016
02:59 PM
Thanks for sharing your knowledge, I will try your tips. This specific GC issue is happening only when I assign multiple threads to the processors and try to speed up the flow, that otherwise runs at roughly 10MB/s in single thread.
I originally designed the flow to use flowfile attributes cause I was tempted to make the computation happen in memory. I thought that it would have been faster with respect to reading the flowfile content in each processor, and consequently parsing it to get specific fields. Do you suggest trying to implement a version that works, let's say, "on disk" on flowfile content instead of attributes?
... View more
10-04-2016
10:12 AM
1 Kudo
UPDATE
I've played a little bit with GC1 using Oracle documentation. I still do not have a definitive solution. I settled to the default configuration, modifying the region size accordingly to the heap (54GB, 30MB region size). I tried to better understand the problem and obtained a detailed picture of the heap using GCViewer. That's the associated heap chart after three consecutive full GCs:
A little bit of explanation for the output if you have not played with GCViewer:
It looks to me that the issue is in that the tenured heap: while the young heap is constantly cleaned in the GC young cycles (the grey line oscillating at the top of the picture), the tenured heap is used but never cleaned up.
Some questions:
Could it be a viable option to triggering more mixed GCs? How do I obtain that using the configuration parameters?
Should I switch to the old CMS GC even if it proved itself to be not that good on huge heaps?
Any suggestion is really appreciated. Thank you
_________________________________________________________________________________________________
Hello everybody,
I am developing a flow which requires good performances. It is running on a single instance with 16 cores and 64GB memory. I've configured NiFi to use 54GB of them with the G1 garbage collector. I am currently facing this memory issue: with a fresh NiFi start, all processors running but no input, it uses 1GB. I get some input in (1.5GB), the flow starts, consumes all the data and ends the computation without triggering any full GC. Tracking the memory using the system diagnostic page, I can see that it is constantly eating memory, at the point that only 2-3 free GB are left when no input is left in the flow. No space is released after that. I would expect that that's the most memory it needs to consume that input data, but if I try to get the same input size back in, it basically does not release any memory, in fact it continues eating it until full GCs are triggered. It takes really too much time to complete them, that's a screenshot of the system diagnostic page after the first one is triggered:
Is this a normal behavior for the JVM? Is there a way I can avoid triggering full GCs? I've run over the custom processors code, trying to limit object creations as much as I could (not a java expert, tried to google around a little bit in particular regarding Strings). A great part of the computation is performed accessing flowfile attributes (except for one processor that loads the input as flowfiles attributes, and one that writes them back to the flowfile content). Maybe this could be a programming issue, but given my limited experience I cannot tell, I would be glad to read suggestions and discover this is indeed the case. I've also tried to reduce the assigned heap down to 16GB. Full GCs are quicker, but more frequent as I expected.
Is there a way to completely avoid full GCs?
Thank you all
... View more
Labels:
- Labels:
-
Apache NiFi
10-03-2016
05:12 PM
2 Kudos
UPDATE I am moving to the new flowFile design as suggested in which the work unit is a set of lines instead of single ones. Since then it looks to me that there are significant improvements given that each block is working at 10 Mb/s in single thread. The current problem is that after some processing time I start to trigger full GCs. I tried reviewing the code and substituting concatenations with StringBuilder.append(). Things seem to improve but I still eventually trigger full GCs. Any suggestion on how to avoid them? Currently I've been using NiFi with 32GB of configured memory and G1 as GC.
... View more
09-30-2016
07:38 AM
I figured there was something not working at his best, but I could not think I was the first one getting into this kind of issue. However, thank YOU all for the help and support provided, really appreciated.
... View more
09-29-2016
01:13 PM
I was reasoning about the simple split-merge flow and your results (10-20k) vs mine (12k): if you have an ssd on your laptop, would they be similar anyway given that each processor has assigned one thread and they both write on a fast disk? Any difference would be decided only by the processor speed.
I've tried to implement the same flow but assigning more threads to the split processor: this basically makes the merge processor drown because hundreds of thousands flowfile are created. This maybe suggests that I do have the speed to process the data the way I want, but I cannot setup queues and NiFi in general to support that.
Which guidelines should I try to follow regarding the number of total flowfiles in the queues (and in the complete flow actually)? Correct me if I am wrong, but limiting the number of flowfiles waiting in a queue does not imply that I could waste the speed of a fast processor trying to gather input from it? Sorry for the number of questions.
... View more
09-29-2016
08:52 AM
I really do not know what to think. I tried batch processing, still no major improvement also replacing the custom processor with UpdateAttribute (also noticed the advanced section, thank you) which is faster. Then, given disk speed that your flow required to reach 20k I think we can exclude the disk speed from the list of possible causes.
I have one doubt about the garbage collector. Is the time reported the total time spent on garbage collector? How do you evaluate the JVM behavior with respect to these values?
... View more
09-28-2016
01:49 PM
I was not aware of NiFi's batching capabilities, I'll have a look at that. The machine is a 16 cores CPU, 64 GB RAM ubuntu 16.04 running on AWS. It has separate attached ebs SSDs for the flowfile repository and content repository. To evaluate the performance of the disks I've executed this command as suggested in several forums: time sh -c "dd if=/dev/zero of=testfile bs=100k count=1k && sync"
1024+0 records in
1024+0 records out
104857600 bytes (105 MB, 100 MiB) copied, 0.0617739 s, 1.7 GB/s
real 0m0.600s
user 0m0.004s
sys 0m0.060s
Yes, I could definitely combine even all of them even though I would prefer having sevaral processors performing simple but specific transformations of the data instead of mixing all. From what I understood, the performances of the built-in processors alone would not be still ideal right? I've never tried a debugger/profiler at runtime (basically just started using NiFi). Do you have any useful resource to suggest? It simply copies some attributes into different ones that could be later transformed Thank you again for your time
... View more
09-28-2016
09:39 AM
1) I've not configured NiFi on a cluster yet. My idea was to evaluate the performance of a single machine and then move to a cluster. Given the situation I will definitely try to do this. 2) That's something I can do but since I've not used huge csv files (at most 2GB, 1,4M rows), after some seconds of delay the first phase of the split has emitted the first flowfiles and the flow actually starts. 3-4-6) No throttling is configured on the system. I tried not to configure backpressure on the links, but it was better with it. I am leaving out the HDFS output phase since that seems not to be the issue (its queue was basically always empty) 5) The custom processors are clearly the slowest one. As I pointed out in the other anwer, I expected that since they perform more complex operations than the built-in processors. But still, they just access flowfile attributes and create new ones. What really concerns me, and let me think that maybe there's something else I cannot recognize is that even removing them and just splitting rows and batching them together in order to save them on the local disk, I get 12k rows/sec. As also @jwitt wrote, this number should be way higher. 7) NiFi 0.7 😎 Oracle JDK 8 9) bootstrap.txt Thank you all, really appreciated
... View more
09-28-2016
09:12 AM
The input file is 2GB and has 1.4M rows. I do perform a two phase split with 5k and 1 steps. I ha configured backpressure and my swap threshold is 40k. However, even without setting it, would that number of flowfiles be a problem?
I've attached the flow screenshots and the GC info: I can see that the slowest processors are the custom ones. I expected that since they perform more complex operations than the built-in processors. But still, they just access flowfile attributes and create new ones. Assigning several threads to them seems not to work. However, the point of the last part of the post was that even bypassing them, and just splitting the rows and merging them together I get 12k rows/seconds. Should I move to a higher level and consider batches of rows as work units?
Thank you for your time.
... View more
09-27-2016
01:41 PM
6 Kudos
I've developed a NiFi flow prototype for data ingestion in HDFS. Now I would like to improve the overall performances but it seems I cannot really move forward.
The flow takes in input csv files (each row has 80 fields), split them at row level, applies some transformations to the fields (using 4 custom processors executed sequentially), buffers the new rows into csv files, outputs them into HDFS. I've developed the processors in such a way the content of the flow file is accessed only once when each individual record is read and its fields are moved to flowfile attributes.
Tests have been performed on a amazon EC2 m4.4xlarge instance (16 cores CPU, 64 GB RAM). This is what I tried so far: Moved the flowfile repository and the content repository on different SSD drives Moved the provenance repository in memory (NiFi could not keep up with the events rate) Configuring the system according to the configurationbest practices I've tried assigning multiple threads to each of the processors in order to reach different numbers of total threads I've tried increasing the nifi.queue.swap.threshold and setting backpressure to never reach the swap limit Tried different JVM memory settings from 8 up to 32 GB (in combination with the G1GC) I've tried increasing the instance specifications, nothing changes
From the monitoring I've performed it looks like disks are not the bottleneck (they are basically idle a great part of the time, showing the computation is actually being performed in-memory) and the average CPU load is below 60%.
The most I can get is 215k rows/minute, which is 3,5k rows/second. In terms of size, it's just 4,7 MB/s. I am aiming to something definitely greater than this.
Just as a comparison, I created a flow that reads a file, splits it in rows, merges them together in blocks and outputs on disk. Here I get 12k rows/second, or 17 MB/s. Doesn't look surprisingly fast too and let me think that probably I am doing something wrong.
Does anyone has suggestions about how to improve the performances? How much will I benefit from running NiFi on cluster instead of growing with the instance specs? Thank you all
... View more
Labels:
- Labels:
-
Apache NiFi