Created on 03-28-2023 02:34 AM - edited 03-28-2023 02:35 AM
So guys,
This time I could really use your help with something because I cannot figure this on my own and neither do I know where to look in the source code exactly.
My flow is as follows:
ConsumeKafka ----> MergeContent (as I have plenty of small files I prefer to merge them in bigger files for further processing) ----> ReplaceText (I have some empty spaces and I want them removed) ---> PartitionRecord.
The problems comes here, in PartitionRecord. I have defined two Controller Services, one Record Reader (CSVReader, with a pre-defined working schema) and and Record Writer (ParquetRecordSetWriter, with the same exact schema as in the CSV reader). I have no strange data types, only a couple of FLOATs and around 100 STRINGS. I defined a property called time, which extracts the value from a field in our File.
Unfortunately, when executing the flow, I keep on getting the following error message:
" PartitionRecord[id=3be1c42e-5fa9-3144-3365-f568bb616028] Processing halted: yielding [1 sec]: java.lang.IllegalArgumentException: newLimit > capacity: (90 > 82) "
I have nothing else in the logs. The data which enters the PartionRecord looks fine to me, but something happens when we transform it from CSV (plain text) to Parquet ... and I do not know at all what to further check.
Has anybody encountered such and error and if so, what was the cause and how did you manage to solve it?
Thank you in advance 🙂
Created on 03-28-2023 04:14 AM - edited 03-28-2023 04:15 AM
@MattWho, @steven-matison @SAMSAL @ckumar, can anyone please help our super user @cotopaul with their query in this post?
Regards,
Vidya Sargur,Created 03-30-2023 01:31 PM
@cotopaul
Is that complete stack trace from the nifi-app.log?
What version of Apache NiFi?
What version of Java?
Have you tried using ConsumeKafkaRecord processor instead of ConsumeKafka --> MergeContent?
Do you have issue only when using the ParquetRecordSetWriter?
How large are the FlowFiles coming out of the MergeContent processor?
Have you tried reducing the size of the Content being output from MergeContent processor?
Thanks,
Matt
Created 03-31-2023 02:27 AM
Hi ,
Thank you for your assistance with this matter. The answers to your questions is as follows:
Is that complete stack trace from the nifi-app.log?
No, the complete stack trace is the following one:
2023-03-29 10:02:21,002 ERROR [Timer-Driven Process Thread-24] o.a.n.p.standard.PartitionRecord PartitionRecord[id=3be1c42e-5fa9-3144-3365-f568bb616028] Processing halted: yielding [1 sec]
java.lang.IllegalArgumentException: newLimit > capacity: (92 > 83)
at java.base/java.nio.Buffer.createLimitException(Buffer.java:372)
at java.base/java.nio.Buffer.limit(Buffer.java:346)
at java.base/java.nio.ByteBuffer.limit(ByteBuffer.java:1107)
at java.base/java.nio.MappedByteBuffer.limit(MappedByteBuffer.java:235)
at java.base/java.nio.MappedByteBuffer.limit(MappedByteBuffer.java:67)
at org.xerial.snappy.Snappy.compress(Snappy.java:156)
at org.apache.parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:78)
at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81)
at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)
at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:167)
at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:168)
at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:59)
at org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:387)
at org.apache.parquet.column.impl.ColumnWriteStoreBase.flush(ColumnWriteStoreBase.java:186)
at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:29)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:185)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:124)
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319)
at org.apache.nifi.parquet.record.WriteParquetResult.close(WriteParquetResult.java:69)
at java.base/jdk.internal.reflect.GeneratedMethodAccessor983.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:254)
at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.access$100(StandardControllerServiceInvocationHandler.java:38)
at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:240)
at com.sun.proxy.$Proxy316.close(Unknown Source)
at org.apache.nifi.processors.standard.PartitionRecord.onTrigger(PartitionRecord.java:274)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1356)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246)
at org.apache.nifi.controller.scheduling.AbstractTimeBasedSchedulingAgent.lambda$doScheduleOnce$0(AbstractTimeBasedSchedulingAgent.java:59)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
What version of Apache NiFi?
Currently running on Apache NiFi open source 1.19.1
What version of Java?
Currently running on openjdk version "11.0.17" 2022-10-18 LTS
Have you tried using ConsumeKafkaRecord processor instead of ConsumeKafka --> MergeContent?
No I did not, but for a good reason. The files coming out of Kafka require some "data manipulation" before using PartitionRecord, where I have defined the CSVReader and the ParquetRecordSetWriter. If I were to use ConsumeKafkaRecord, I would have to define a CSV Reader and the Parquet(or CSV)RecordSetWriter and the result will be very bad, as the data is not formatted as per the required schema. I will give it a try with ConsumeKafkaRecord using CSVReader and CSVRecordSetWriter, to see if I still encounter the same issue.
Do you have issue only when using the ParquetRecordSetWriter?
Unfortunately I can only test with parquet as this file format is somehow mandatory for the current project. I will try to reproduce the flow with an AVRO format, to see if I can reproduce the error or not.
How large are the FlowFiles coming out of the MergeContent processor?
So directly out of Kafka, 1 FlowFile has around 600-700 rows, as text/plain and the size is 300-600KB. Using MergeContent, I combine a total of 100-150 files, resulting in a total of 50MB.
Have you tried reducing the size of the Content being output from MergeContent processor?
Yes, I have played with several combinations of sizes and most of them either resulted in the same error or in an "to many open files" error.