Member since
07-30-2019
3400
Posts
1621
Kudos Received
1002
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 193 | 12-03-2025 10:21 AM | |
| 510 | 11-05-2025 11:01 AM | |
| 384 | 11-05-2025 08:01 AM | |
| 666 | 11-04-2025 10:16 AM | |
| 780 | 10-20-2025 06:29 AM |
08-16-2017
01:46 PM
@Wesley Bohannon Is this a NiFi standalone or a NiFi cluster? If cluster, are the FlowFiles being produced by each of your SelectHiveQL processors being produced on the same node? The MergeContent processor will not merge FlowFiles from different cluster nodes. Assuming that all FlowFiles are on same NiFi instance, the only way I could reproduce your scenario was:
Each FlowFile had a different value assigned to the "table_name" FlowFile Attribute and Merge Strategy was set to "Bin-Packing Algorithm". This caused each FlowFile to be placed in its own bin. At the end of 5 minutes max bin age, each bin of 1 was merged. If the intent is always to merge one FlowFile from each incoming connection, what is the purpose of setting a "Correlation Attribute Name" Setting Maximum number of bins to 1 and having 4 source FlowFiles become queued at different times. The "Defragment" Merge Strategy will bin FlowFiles based on FlowFiles with matching values in the "fragment.identifier" FlowFile Attribute. It will then merge the flowFiles using the "fragment.index" and "fragment.count" attributes. Since you have also specified a correlation attribute, the MergeContent processor will instead use the value associated to that attribute instead of "fragment.identifier" to bin your files. If I have unique values on each FlowFile for "table_name", then each FlowFile ends up in a different bin and are routed to failure right away (if bins set to 1) or after 5 minutes max bin age since not all fragments where present. The other possibility is that "fragment.count" and "fragment.index" is set to 1 on every FlowFile. I would stop your MergeContent processor and allow 1 FlowFile to queue in each connection feeding it. Then use the "list queue" capability to inspect the attributes on each queued FlowFile. What values are associated to each FlowFile for the following attributes: fragment.identifier fragment.count fragment.index table_name Thank you, Matt
... View more
08-16-2017
01:08 PM
2 Kudos
@Pierre Leroy Splitting such a large file may result in Out Of Memory (OOM) errors in NiFi. NiFi must create every split FlowFile before committing those splits to the "splits" relationship. During that process NiFi holds the FlowFile attributes (metadata) on all those FlowFile being produced in heap memory space. What you image above shows is that you issue a stop on the processor. What this indicates is that you have stopped the processor scheduler form triggering again. The processor will still allow any existing running threads to complete. The small number "2" in the upper right corner indicates the number of threads still active on this processor. If you have run out of memory for example, this process will probably never complete. A restart of NiFi will kill off these threads When splitting very large files, it is common practice to use multiple splitText processors in series with one another. The first SplitText is configured to split the incoming files in to large chucks (say every 10,000 to 20,000 lines). The second SplitText processor then splits those chunks in to the final desired size. This greatly reduces the heap memory footprint here. Thanks, Matt
... View more
08-15-2017
06:14 PM
1 Kudo
@Hadoop User It is unlikely you will see the same performance out of Hadoop between reads and writes. The Hadoop Architecture is designed in such a way to favor multiple many readers and few data writers. Increasing the number of concurrent tasks may help but performance since you will then have multiple files being written concurrently. 1 - 2 KB files are very small and do not make optimal use of your Hadoop architecture. Commonly, NiFi is used to merge bundles of files together to a more optimal size for storage in Hadoop. I believe 64 KB is the default optimal size. You can remove some of the overhead of each connection by mergeing files together in to larger files using the MergeContent processor before writing to Hadoop. Thanks, Matt
... View more
08-15-2017
06:01 PM
@Wesley Bohannon The issue you are most likely running in to is caused by only having 1 bin. https://issues.apache.org/jira/browse/NIFI-4299 Change number of bins to at least 2 and see if the resolves your issue. Thanks, Matt
... View more
08-15-2017
02:39 PM
@Hadoop User I am unfortunately not a Hive or Hadoop Guru. Both errors above are being thrown by the Hive and Hadoop client libraries that these processors use and not by NiFi itself. Hopefully the above log lines are followed by full stack traces in the nifi-app.log. If not, try enabling DEBUG logging to see fi you can get a stack trace output. That stack trace may provide the necessary details to help diagnose what is causing the issue. Hopefully a Hive or Hadoop Guru will then be able to provide some assistance here. I also suggest providing the details for the HiveConnectionPool controller service you setup that is being used by this PutHiveQl processor. Thanks, Matt
... View more
08-15-2017
02:01 PM
1 Kudo
@Timothy Spann I am not sure what you mean by "Not updating". Also what Provenance implementation are you using (Persistent, WriteAhead, or Volatile)?
... View more
08-15-2017
01:57 PM
@Hadoop User Please share your PutHDFS processor configuration with us. How large are the individual files that are being written to HDFS? Thanks, Matt
... View more
08-04-2017
03:17 PM
1 Kudo
@J. D. Bacolod Those processors were added for specific uses cases such as yours. You can accomplish the same thing almost using the putDistributedMapCache and FetchDistributeMapCache processors along with an UpdateAttribute processor. I used the UpdateAttribute processor to set a unique value in a new attribute named "release-value". In my case the value is assigned it was: The FetchDistributedMapCache processor then acts as the wait processor did looping FlowFile in the "not-found" relationship until the corresponding value is found in the cache. The "release-value" is written to the cache using the PutDistributedMapCache processor down the other path after the InvokeHTTP processor. It will receive the "Response" relationship. Keep in mind, the FetchDistributedMapCache processor does not have an "expire" relationship. If a response if never received for some FlowFile or the cache expired/evicted the needed value, those FlowFiles will loop forever. You can solve this two ways: 1. Set File Expiration on the connection containing the"not-found" relationship that will purge files that have not found a matching key value in the cache by the time the FlowFile's age has reached x value. With this option aged data is just lost. 2. Build a FlowFile expire loop which kicks these looping not-found FlowFiles out of loop after x amount of time so they can be handled by other processors. This can be done using the "Advanced" UI of an UpdateAttribute processor and a RouteOnAttribute processor: The UpdateAttribute sets a new attribute I called "initial-date" if and only if it has not already been set on the FlowFile. This can be done as follows using the "Advanced" UI of the UpdateAttribute processor : The RouteOnAttribute Processor then compares the current date plus x milliseconds to that attribute's value to see if file has been looping for more the x amount of time. (Using 6 minutes (360000 ms) as an example, my RouteOnAttribute would have a property/routing rule like this: FlowFiles that have been looping for 360000 milliseconds or more will then get routed to "expired" relationship where you can choose what you want to do with them. As you can see the processors wrap the above flow up in only two processors versus 5 processors you would need in older versions to get same functionality. Thanks, Matt
... View more
08-03-2017
01:21 PM
2 Kudos
@J. D. Bacolod The use case you describe is an exact fit for the "Wait" and "Notify" processors introduced in HDF 3.0/Apache NiFi 1.2.0. Using these processor would work as follows: The input (original FlowFile) is routed to both a Wait processor and your exiting flow. The "Response" relationship from your InvokeHTTP processor would route to the corresponding Notify processor. The Copy of the FlowFile that was routed to the Wait processor will continuously loop in the "wait" relationship until a release signal identifier for the FlowFile is written to a DistirbutedMapCache service by the Notify processor. Thanks, Matt
... View more
08-03-2017
12:55 PM
@Narasimma varman In order to access a secured NiFi's UI, successful user authentication and authorization must occur. In HDF a NiFI CA is installed that takes care of building valid keystores and truststores for your NiFi nodes, but it does not create user certificates for you. Typically the above error indicates NiFi did not trust the client certificate it was passed or a client certificate was not passed at all. I would suggest staring by getting verbose outputs of your NiFi keystore.jks, truststore.jks, and users keystore.p12. The verbose output for each of these can be obtained using keytool. ./keytool -v --list -keystore <jks or p12 keystore file> In your Keystore.jks used by the NiFi server, you will see a single entry with two certificates included in it: Specifically you are looking for the "PrivateKeyEntry". This PrivateKeyEntry will show a user DN (It will be in the form of CN=<server FQDN>, OU=NIFI). You will then see an issuer line which will also have a DN for the NiFi CA. This PrivateKeyEntry should have an extended key usage that allows the key to be used for both client auth and server auth. Something else (not related to your issue) i noticed was your browser URL is "localhost". The NIFi CA will generate a server certificate based off the hostname of the server and not localhost. This will require you to add an exception in your browser at some point. (This is because the cert passed to your browser from your NiFi server will say it belongs to server XYZ, but your browsers knows it was trying to connect to localhost. So it appears as a man in the middle type attack (some en-point using another end-points cert). In your truststore.jks used on your NiFi servers, you will see a single certificate. It will be a "TrustedCertEntry" for the NiFi CA. The truststore.jks file can contain 1 to many trusted cert entries. Each trusted cert entry is derived from the public key of a CA or self-signed cert. When a client (user or another server) negotiates a connection with the server a TLS handshake occurs. As part of this negotiation, the server expects to receive a client certificate which it can trust. If a trusted client cert is not received, the connection is typically closed by the server. Your client keystore.p12 file will also need to contain a PrivateKeyEntry. In TLS negotiation that occurs with the server, the DN associated to that PrivateKeyEntry is passed to the server. If that certificate was self-signed, the truststore would need to contain the public key for that certificate as a TrustedCertEntry before that certificate will be accepted for authentication. Beyond authentication is authorization, but it does not appear you are getting that far yet. Thanks, Matt
... View more