Member since
12-21-2015
57
Posts
7
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3960 | 08-25-2016 09:31 AM |
03-08-2020
09:45 AM
Hi @Kart,
As this is a thread which was marked 'Solved' over three years ago, you would have a better chance of receiving a resolution by posting a new question. This will also present you with the opportunity to include details specific to your environment that could aid other members in providing a more relevant answer to your question.
... View more
08-04-2017
03:17 PM
1 Kudo
@J. D. Bacolod Those processors were added for specific uses cases such as yours. You can accomplish the same thing almost using the putDistributedMapCache and FetchDistributeMapCache processors along with an UpdateAttribute processor. I used the UpdateAttribute processor to set a unique value in a new attribute named "release-value". In my case the value is assigned it was: The FetchDistributedMapCache processor then acts as the wait processor did looping FlowFile in the "not-found" relationship until the corresponding value is found in the cache. The "release-value" is written to the cache using the PutDistributedMapCache processor down the other path after the InvokeHTTP processor. It will receive the "Response" relationship. Keep in mind, the FetchDistributedMapCache processor does not have an "expire" relationship. If a response if never received for some FlowFile or the cache expired/evicted the needed value, those FlowFiles will loop forever. You can solve this two ways: 1. Set File Expiration on the connection containing the"not-found" relationship that will purge files that have not found a matching key value in the cache by the time the FlowFile's age has reached x value. With this option aged data is just lost. 2. Build a FlowFile expire loop which kicks these looping not-found FlowFiles out of loop after x amount of time so they can be handled by other processors. This can be done using the "Advanced" UI of an UpdateAttribute processor and a RouteOnAttribute processor: The UpdateAttribute sets a new attribute I called "initial-date" if and only if it has not already been set on the FlowFile. This can be done as follows using the "Advanced" UI of the UpdateAttribute processor : The RouteOnAttribute Processor then compares the current date plus x milliseconds to that attribute's value to see if file has been looping for more the x amount of time. (Using 6 minutes (360000 ms) as an example, my RouteOnAttribute would have a property/routing rule like this: FlowFiles that have been looping for 360000 milliseconds or more will then get routed to "expired" relationship where you can choose what you want to do with them. As you can see the processors wrap the above flow up in only two processors versus 5 processors you would need in older versions to get same functionality. Thanks, Matt
... View more
08-11-2017
07:31 AM
2 Kudos
@Matt Clarke I have created a Jira ticket for this enhancement: https://issues.apache.org/jira/browse/NIFI-4284
... View more
06-21-2017
06:23 PM
The documentation says "The Expression Language allows single quotes and double quotes to be used interchangeably". Try double-quotes in your EL expression.
... View more
06-19-2017
01:58 PM
1 Kudo
This type of information is typically stored in provenance data... You can use the SiteToSiteProvenanceReportingTask to get access to provenance events in JSON format and then filter the events to find the ones you are interested in. Each provenance event should have an event time which is the time the event was reports, as well as the lineage start time which is the time of the first event in the given lineage. So event time - lineage start time would be the time it took to get to current event.
... View more
06-07-2017
02:29 PM
1 Kudo
@J. D. Bacolod Have you considered using the PutDistributedMapCache and GetDistributedMapCache processors? Have two separate dataflows. One runs on a cron and is responsible for obtaining the token and write that token to the distirbutedMapCache using the putDistirbutedMapCache processor. The Second flow is for doing all your other operations using that token. Just before the invokeHTTP processor add a GetDistibutedMapCache processor that reads the token from the distributed map cache in to a FlowFile attribute. You then use that attribute to pass the token in your connections. One thing to keep in mind is that it is possible that a new token may be retrieved after a FlowFile had already retrieved the old token from the distirbutedMapCache. This would result in auth failure. So you will want your flow to loop back to GetDistributedMapChace processor to get latest key on auth failure on your invokeHTTP processor. This flow does not keep track in any way when a token expires, but if you know how long a token is good for you can set your cron accordingly. Thanks, Matt
... View more
05-26-2017
12:15 PM
1 Kudo
@J. D. Bacolod You can point NiFi directly at the Java 8 java command. Modify the following line in NiFi's bootstrap.conf file: # Java command to use when running NiFi
java=java change: java=java to: java=<path to JDK 8>/bin/java Thanks, Matt If you find this answer addressed your question, please mark answer as accepted.
... View more
05-25-2017
12:05 PM
2 Kudos
@J. D. Bacolod Anything you can do within the NiFi UI, you can also do via NiFi Rest-api calls. So you could issue a rest-api call to stop specific processors before the batch job is started and then issue another rest-api call to start the processor again after the batch job completes. https://nifi.apache.org/docs/nifi-docs/rest-api/index.html Thanks, Matt
... View more
06-21-2017
04:35 PM
Also, as of NiFi 1.3.0 / HDF 3.0.0, GenerateTableFetch accepts incoming connections/flow files, so you can use ListDatabaseTables -> GenerateTableFetch -> RPG -> Input Port -> ExecuteSQL to fully distribute the fetching of batches of rows across your NiFi cluster. The RPG -> Input Port part is optional and only used on a cluster if you want to fetch rows in parallel.
... View more
03-15-2017
01:11 PM
1 Kudo
In addition to QueryDatabaseTable, you may be interested in the GenerateTableFetch processor. It is similar to QueryDatabaseTable except that it does not execute SQL queries, it generates them and sends out flow files with SQL queries. This allows you to distribute to the fetching in parallel over a NiFi cluster. In an upcoming release, GenerateTableFetch will accept incoming flow files, so you could enhance the workflow with the ListDatabaseTables processor, sending those tables to GenerateTableFetch, thus parallelizing the fetching of multiple pages of multiple tables.
... View more