Member since
07-30-2019
3427
Posts
1632
Kudos Received
1011
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 93 | 01-27-2026 12:46 PM | |
| 503 | 01-13-2026 11:14 AM | |
| 1086 | 01-09-2026 06:58 AM | |
| 936 | 12-17-2025 05:55 AM | |
| 997 | 12-15-2025 01:29 PM |
04-03-2023
12:57 AM
Good day, everyone. This problem has been resolved. I made a new subfolder called /opt/nifi_server/ and installed the NIFI in it. When I first began, it gave me the error "Unable to bind the IP with port 844." I terminated the PID and launched the Nifi.Everything is back to normal now.
... View more
04-02-2023
09:42 PM
@swanifi, Have any of the replies helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future.
... View more
03-31-2023
02:27 AM
Hi , Thank you for your assistance with this matter. The answers to your questions is as follows: Is that complete stack trace from the nifi-app.log? No, the complete stack trace is the following one: 2023-03-29 10:02:21,002 ERROR [Timer-Driven Process Thread-24] o.a.n.p.standard.PartitionRecord PartitionRecord[id=3be1c42e-5fa9-3144-3365-f568bb616028] Processing halted: yielding [1 sec]
java.lang.IllegalArgumentException: newLimit > capacity: (92 > 83)
at java.base/java.nio.Buffer.createLimitException(Buffer.java:372)
at java.base/java.nio.Buffer.limit(Buffer.java:346)
at java.base/java.nio.ByteBuffer.limit(ByteBuffer.java:1107)
at java.base/java.nio.MappedByteBuffer.limit(MappedByteBuffer.java:235)
at java.base/java.nio.MappedByteBuffer.limit(MappedByteBuffer.java:67)
at org.xerial.snappy.Snappy.compress(Snappy.java:156)
at org.apache.parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:78)
at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81)
at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)
at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:167)
at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:168)
at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:59)
at org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:387)
at org.apache.parquet.column.impl.ColumnWriteStoreBase.flush(ColumnWriteStoreBase.java:186)
at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:29)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:185)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:124)
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319)
at org.apache.nifi.parquet.record.WriteParquetResult.close(WriteParquetResult.java:69)
at java.base/jdk.internal.reflect.GeneratedMethodAccessor983.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:254)
at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.access$100(StandardControllerServiceInvocationHandler.java:38)
at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:240)
at com.sun.proxy.$Proxy316.close(Unknown Source)
at org.apache.nifi.processors.standard.PartitionRecord.onTrigger(PartitionRecord.java:274)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1356)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246)
at org.apache.nifi.controller.scheduling.AbstractTimeBasedSchedulingAgent.lambda$doScheduleOnce$0(AbstractTimeBasedSchedulingAgent.java:59)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829) What version of Apache NiFi? Currently running on Apache NiFi open source 1.19.1 What version of Java? Currently running on openjdk version "11.0.17" 2022-10-18 LTS Have you tried using ConsumeKafkaRecord processor instead of ConsumeKafka --> MergeContent? No I did not, but for a good reason. The files coming out of Kafka require some "data manipulation" before using PartitionRecord, where I have defined the CSVReader and the ParquetRecordSetWriter. If I were to use ConsumeKafkaRecord, I would have to define a CSV Reader and the Parquet(or CSV)RecordSetWriter and the result will be very bad, as the data is not formatted as per the required schema. I will give it a try with ConsumeKafkaRecord using CSVReader and CSVRecordSetWriter, to see if I still encounter the same issue. Do you have issue only when using the ParquetRecordSetWriter? Unfortunately I can only test with parquet as this file format is somehow mandatory for the current project. I will try to reproduce the flow with an AVRO format, to see if I can reproduce the error or not. How large are the FlowFiles coming out of the MergeContent processor? So directly out of Kafka, 1 FlowFile has around 600-700 rows, as text/plain and the size is 300-600KB. Using MergeContent, I combine a total of 100-150 files, resulting in a total of 50MB. Have you tried reducing the size of the Content being output from MergeContent processor? Yes, I have played with several combinations of sizes and most of them either resulted in the same error or in an "to many open files" error.
... View more
03-30-2023
02:44 PM
1 Kudo
@wffger2 @cotopaul The InstanceIdentifier was introduced when NiFi switched from using the flow.xml.gz to the flow.josn.gz for flow persistence. It is also currently being written to the flows stored in the NiFi-Registry. I say "currently" because this jira exist and is still open: https://issues.apache.org/jira/browse/NIFI-11321 So as @cotopaul points out, the "instanceIdentifier" will be unique for each instantiation of the flow definition, but the "identifier" will be the same. Downside here is every time you want to do a comparison, you'll need to download latest flow definition from DEV and UAT environments and compare for difference based on the Identifiers. Thanks, Matt
... View more
03-30-2023
05:52 AM
@davehkd Changes committed to an early version 1.19 will persist into the next version unless a specific closed jiras exists that makes another changes impacting version. I saw no other newer jiras related to Kotlin version changes at time of this response. Matt
... View more
03-28-2023
04:50 AM
1 Kudo
PublishKafka writes messages only to those Kafka nodes that are leaders for a given topic: partition. Now it's Kafka internal job to keep the In-Sync Replicas in sync with its leader. So with respect to your question: When the Publisher client is set to run ,client sends a (read/write) request the bootstrap server, listed in the configuration bootstrap.servers to get the metadata info about topic: partition details, that's how the client knows who are all leaders in given topic partitions and the Publisher client writes into leaders of topic: partition With "Guarantee single node" and if kafka broker node goes down which was happen to be a leader for topic: partition then Kafka will assign a new leader from ISR list for topic: partition and through Kafka client setting metadata.max.age.ms producer refreshed its metadata information will get to know who is next leader to produce. If you found this response assisted with your issue, please take a moment and click on "Accept as Solution" below this post. Thank you
... View more
03-27-2023
10:40 AM
1 Kudo
@ManishR NiFi offers many components (processors, controller services, reporting tasks, etc) that can be used to construct a flow based program on the NiFi canvas (Referred to as a NiFi dataflow). While this list of default available components may be different depending on the release of NiFi being used, NiFi has embedded documentation found under help within the NiFi UI that shows all components available in that installed release. Apache NiFi also publishes the same info for the most current released version here: https://nifi.apache.org/docs/nifi-docs/ Selecting a component from the documentation with open a description of the component and all list configurable properties. Building a dataflow on the NiFi canvas consist of dragging and dropping new component processors to the canvas. You can then drag connection between these components to construct your end-to-end dataflow. There are 100s of component processors available out of the box and even more that you can download and add to your NiFi from the apache community. Once a dataflow is built and configured, starting those components would result in the creation of FlowFile (for testing, you can add a GenerateFlowFile processor that generates a FlowFile rather then ingesting content from an external source like the local file system, kafka, DB, etc. As each component executes against a FlowFile, that FlowFile is routed to one of the available relationships the particular processor offers. These relationships would be assigned to one of the connection exiting the processor and connecting to another downstream processor. The following Apache NiFi docs explain how to build a dtaflow: https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#building-dataflow This covers how to search for a component in yoru dataflow(s): https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#search Then when it comes to looking at the detailed lineage of an individual FlowFile, you can use NiFi's data Provenance for that. Individual processor components generate provenance events as they execute on FlowFile (create, route, drop, etc...). You can look at the entire lineage from create to drop of a FlowFile (assuming you configure NiFi provenance with enough storage to store all the lineage). BY default NiFI is configured to only use 10GB for Provenance and only store Provenance for 24 hours, but this can be configured in the nifi.properties file. You can write click on NiFi processor component in your dataflow and Select data provenance from the pop-up context menu. This will open a provenance search query result set that show FlowFile that traversed the component. You can select one and even expand the lineage of that select component. The lineage of a FlowFile will show all events associated to that FlowFile created by the processor components that FlowFile traversed. This covers how to use NiFi's Data Provenance: https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
03-27-2023
10:21 AM
@apmmahesh Make sure that the nifi.properties file on all nodes is configured the same. Make sure that the "nifi.cluster.protocol.is.secure" property is set to true on all the nodes. Matt
... View more
03-27-2023
10:04 AM
@NafeesShaikh93 Interesting Use case you have. I am not all that familiar with all the methods the Graylog offers for ingesting logs from other servers. I'd assume Syslog is one of them? If so, NiFi offers. putSyslog processor. Looking at the dataflow you build thus far, I am not sure what you are trying to accomplish. The LogAttribute and logMessage processors allows you to write a log entry in a NiFi log defined by an appender and logger in the logback.xml NiFi configuration file. By default these log lines would end up in the nifi-app.log. You could however add an additional appender and the a custom logger to send log lines produced by these processors classes to the new appender thus isolating them from the other logging in the nifi-app.log. There is no way to setup a specific logger by processor on canvas. So every logAttribute and logMessage processor you use will write to the same destination NiFi appender log. The classes for the logAttribute and logMessage processors are: org.apache.nifi.processors.standard.LogAttribute
org.apache.nifi.processors.standard.LogMessage NiFi also has a tailFile processor that can tail a log file and create FlowFiles with that log entries at content. You could then use PutSyslog processor to send those log lines to yoru Graylog server possibly. The above design involves extra disk I/O that may not be necessary since you could possibly design your flow to create FlowFile attributes with all the file information you want to send to GrayLog an then use a replaceText at end of successful dataflow to replace the content of yoru FlowFile with a crafted syslog formatted content from those attributes and send directly to Graylog via the PutSyslog processor. This removes the need to write to a new logger and consume from that new log before sending o syslog. But again this is a matter of preference. Perhaps in you case maybe you want a local copy of these logs as well. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
03-27-2023
02:09 AM
@TB_19 as this is an older post, you would have a better chance of receiving a resolution by starting a new thread. This will also be an opportunity to provide details specific to your environment that could aid others in assisting you with a more accurate answer to your question. You can link this thread as a reference in your new post.
... View more