Member since
07-30-2019
3369
Posts
1615
Kudos Received
996
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
109 | 10-08-2025 10:52 AM | |
75 | 10-08-2025 10:36 AM | |
166 | 10-03-2025 06:04 AM | |
121 | 10-02-2025 07:44 AM | |
287 | 09-23-2025 10:09 AM |
07-03-2025
05:56 AM
@NifiEnjoyer Welcome to the community. As this thread is related to the deprecation of NiFi templates in Apache NiFi 2 and an old thread, it would be better to start a new community question with your query about downloading and uploading flow definitions. You'll want to include yoru source and destination Apache NiFi versions in your question details. Fell free to @MattWho in your new community question. Thank you, Matt
... View more
07-02-2025
08:31 AM
@HoangNguyen All the ForkEnrichment processor does is add two specific FlowFile Attributes to each FlowFile it outputs: The JoinEnrichment processor depends on receiving two FlowFiles with Matching "enrichment.group.ids" and one with "enrichment.role" = ORIGINAL and other FlowFile with "enrichment.id" = ENRICHMENT. So you can do something like this for example: In the above you above you fork the staring FlowFile and then join that first Enrichment, then you use ForkEnrichment again to generate the needed FlowFile attributes for the second Join enrichment operation. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
07-02-2025
05:26 AM
@Rohit1997jio The content of a NiFi FlowFile does not live in NiFi heap memory space. Only the FlowFile Metadata/Attributes are held in NiFi heap memory. Even then there are thresholds per connection in which swap files would be created to reduce that heap usage. Some Processors may need to load content into heap memory when they execute against a FlowFile(s). Before making recommendations on your ConsumeKafkaRecord processor configuration, more information about your NiFi and Kafka topic are needed. Are you running a multi-node NiFi cluster or a Single instance of NiFi? If a cluster, how many nodes make up yoru NiFi cluster? How many partitions are setup on the target kafka topic? Kafka partitions are assigned by Kafka to different consumers in consumer group. So lets say you have 10 partitions on your kafka topic, 1 NiFi instance, and a consumeKafka configured with 1 concurrent task. all 10 of these partitions would be assigned to that one consumer. When the ConsumeKafkaRecord executes, it will consume from one of those partitions, next execution from the next partition, and so on. This is likely why you are not seeing all the kafka messages consumed when you schedule the processor to execute only once every 4 hours. Even if you were to set concurrent tasks to 10 on the ConsumeKafkaRecord processor, the scheduler is only going to allow one execution every 4 hours. So in this case you would be best suited to set 10 concurrent tasks and adjust your Quartz Cron scheduler so it schedules every second for 10 seconds every 4 hours. Also keep in mind the "Max Poll Records" setting as in controls max records(messages) to add to single record FlowFile created during each execution. If you have a lot of records you may consider increasing how many times it get scheduled every 4 hours to maybe 30 seconds to make sure you get all messages form every partition. Now assuming you have a multi-node NiFi cluster with 5 nodes for example, your consumeKafkaRecord processor is configured with a group.id, and 10 partitions. You would set concurrent tasks to 2 (2 consumers X 5 nodes = 10 consumers in the consumer group). Kafka will assign one partition to each of these 10 consumers in the consumer group. Hope this helps you configure your ConsumeKafkaRecord processor so you can be successful with your requirement. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
07-01-2025
05:50 AM
@Bhar Can you share more detail? Without it, I would only be making random guesses. What version of Apache NiFi are using? Is this a single instance of NiFi or a NiFi multi-node cluster? How is your MergeContent processor configured? Thank you, Matt
... View more
07-01-2025
05:44 AM
@HoangNguyen Welcome to the community. It would be very difficult to provide any suggestions with the limited information you have shared. Please share more detail about your use case and what you are trying to accomplish. The JoinEnrichment processor is used in conjunction with the ForkEnrichment processor. For a JoinEnrichment processor to join two NiFi FlowFiles, those two FlowFiles must both have a matching group id set in an "enrichment.group.id" attribute on each FlowFile and must also have an Attribute" enrichment.role" set appropriately on each FlowFile (ORIGINAL set on FlowFile to be enriched and ENRICHMENT set on the FlowFile containing the enrichment data). Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
06-23-2025
09:04 AM
@melek6199 What you have is an authorization issue. When you access you multi-node NiFi cluster, you are authorized only into the node in which you authenticated. When you make a request like List Queue or Empty Queue, you are making a request from one node to all the other nodes to list or empty the connection queue. This means that the nodes themselves need to be authorized to request other nodes to share back their queue list or empty their target node queues. All 4 of your NiFi nodes should already have been authorized for "proxy user requests", but in order to list or empty queues, your node will need these additional authorizations: "view the data" - authorizes a node(s) to list the data from other nodes (user must also be authorized) "modify the data" - authorizes a node(s) to empty a connection queue on other nodes. You can see from the nfi-user.log output you shared the identity and policy missing to perform this action on the specific connection UUID: Node x.x.x.x:8443 is unable to fulfill this request due to: Unable to modify the data for Processor with ID d3a802c6-0196-1000-ffff-ffff90fdc7b8 You would have seen this same exception for all but one node when you made the request to empty the queue. Authorizations are inherited form parent Process groups unless explicitly set on the individual component directly. So you don't need to authorize your nodes for "view the data" and "modify the data" on the connection "d3a802c6-0196-1000-ffff-ffff90fdc7b8" directly, but rather set these authorization instead on the parent process group. Keep in mind that child process groups also inherit from parent process groups unless policy is explicitly set on that child process group.. Typically you would set these authorization policies on the root process group (top level). You'll also notice when you are viewing policies on a component it will tell you if it is inheriting policies and if you choose to set explicit policies on that component it asks you if you want to copy the inherited policy before modifying. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
06-12-2025
09:19 AM
Hello @Bdeyyam Cloudera Manager cumulative Hotfix release information can be found in the Cloudera documentation: Cumulative hotfixes From the rpm version shared above I can see those are from Cloudera Manager 7.11.3 Cumulative hotfix 4 Hope this help you, Matt
... View more
06-10-2025
05:35 AM
@agriff I did not know that you were using the Apache NiFi 2.x release. The component list I provided is from the Apache NiFi 1.x release. NiFi 2.x switched from having numerous client version Kafka based processors to single Kafka based processors that now use a KafkaConnectionService controller service component to define the kafka client version. In Apache NiFi the only connection service included is for theKafka 3 Client. The Kafka client 3 I understand to be backwards compatible to Kafka 2.6, but sounds like you are having success with using it for Kafka 2.5. Glad to hear you were able to resolve yoru underlying schema issue. Setting Bulletins level on a processor has absolutely nothing to do with log levels written to the nifi-app.log. It only controls what level bulletins are created within the NiFi UI. To change logging within the NiFi logs, you will need to modify the logback.xml configuration file found in the NiFi conf directory. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
06-09-2025
06:39 AM
@nifier I would not expect much difference between making the stop request via the NiFi UI or via a rest-api call. Even when you make a request to stop components via the NiFi UI, the UI may quickly show the "stopped" icon on the component, but any active threads are not killed in that process. In fact the processor is considered "stopping" until all its active threads complete however long that takes. While still in the state of stopping, you can not modify those components. A component is considered stopping if its "activeThreadCount" is not 0. when you are executing your rest-api script without the delay, what exception are you encountering? This one? unable to fulfill this request due to: Cannot start component with <component id> because it is currently stopping Above means you have active threads. Perhaps you can build a wait loop around above response until the active threads complete. Or you can capture that component id and execute a terminate threads command on it. ../nifi-api/processors/<component id>/threads -X DELETE Terminating threads will not cause data loss. NiFi is not killing any threads in this process, only way to kill threads is via a NiFi restart. Terminating threads on component just shifts the thread to dev null and unhooks it from the FlowFile(s) it is associated with in the inbound connection. When the processor is restarted, the FlowFile(s) will be reprocessed by the component. Should the "terminated" thread complete execution its logging and output just goes to dev null and results are not written back to a FlowFile, but depending on processor it could end up in duplicate data on a destination system if the tread is sending data out of NiFi since NiFi will reprocess the FlowFile originally associated with that terminated thread next time processor is started. The other option is to get the status of components for the process group you stopped and parse the json for any "activeThreadCount" were count is not 0 and wait 1 sec and make request again and then repeat this loop until all are 0 before making your next rest-api call. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
06-06-2025
11:45 AM
@shiva239 1. If you are building your own custom components for NiFi, I suppose you can have them do whatever you want. But considering your use case, you would be better off building a custom processor rather then a custom controller service. For example, building a custom version of the PutDatabaseRecord processor that instead of using a connection pool controller service, makes a a direct connection for each record. 2. I have nothing setup to test those settings, but based on setting there is still opportunity for connection reuse with multiple NiFi FlowFiles. There is the 1 sec between when one processing ends and the next may start that may grab the connection that is idling for 1 sec. Keep in mind that there is nothing in the DBCPConnectionPool code that would prevent the sever from killing closing connections at end of transaction. That is the whole purpose of the "ValidationQuery" existence. It is not common that server side closes connection. So when the DBCPConnectionPool tries to give an connection from the pool to a requesting processor, it runs the validation query to make sure the connection is still active. If validation query fails, that one is dropped from pool a new connection is made. I don't think "Max Idle Connections" is going to do anything since you set "Min idle connections" to zero which means "zero to allow no idle connections". - Can you clarify what does -1 indicate? Does it mean no limit on the lifetime of a connection? <-- yes The setting you have sound solid, but I would still set a validation query to ensure avoiding any chance of a race condition scenario where a 1 sec idle connection ends up getting reused that may already be a closed connection. The processor would just sit there assuming the connection was good waiting for a return. But with min idle connections set to 0, this may not be an issue. I have not tested with this specific setup ever. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more