Member since
07-29-2020
574
Posts
323
Kudos Received
176
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2004 | 12-20-2024 05:49 AM | |
2279 | 12-19-2024 08:33 PM | |
2050 | 12-19-2024 06:48 AM | |
1355 | 12-17-2024 12:56 PM | |
1945 | 12-16-2024 04:38 AM |
12-29-2023
08:06 AM
1 Kudo
Hi @MWM , The following worked for me: The GenerateFlwoFile has json content and an attribute flowfile_id with value 123 In the ReplaceText I replace everything with empty string: In the UpdateAttribute Im adding new Attribute new_attr with value of 555 The MergeConent is configured as follows: I'm using flowfile_id as Correlation Attribute Name. Also notice how I set the "Minimum Number of Entries" to 2 so that original flowfile will wait until the second ready. The result is the original flowfile content with the new added attribute. Another option to MergeRecord is you can use PutDistributedMapCache and FetchDistributedMapCache to store the original content into Cache, then do whatever is needed to get the new attributes and finally fetch the original content again , this will give you the original flowfile including the new attributes. The only caveat with this approach that you have to create two controller services: DistributedMapCacheClientService & DistributedMapCacheServer. Another issue with this DistributedMapCacheClientService is that you have to provide a server hostname which could be the same as your nifi node, however this produces a single point of failure specially when you have cluster. For more info: https://stackoverflow.com/questions/44590296/how-does-one-setup-a-distributed-map-cache-for-nifi If that helps please accept solution. Thanks
... View more
12-29-2023
05:25 AM
Hi @SAMSAL your solution is workink, thank you so much!
... View more
12-28-2023
01:17 PM
Agree with @SAMSAL's approach and if you can provide a parameter or something in the header or request so your API returns a JSON response each time it'll make things a lot easier for you to parse and build the request for the next step in your flow.
... View more
12-22-2023
09:52 AM
@Rohit1997jio, I was going to answer this similar to @MattWho who beat me to it but I will post my answer anyway in case it can help. Basically @MattWho is correct , you can use the ExecuteScript processor to somehow simulate the retry flowfile processor and its better than RouteOnAttribute Option because when you use penalize the processor is setting idle during this time unlike the RouteOnAttribute where its always looping to unmached relationship for the period of wait time. Anyway here is my solution and in my case you dont need RouteOnAttribute but you have to add more code. I'm using groovy code for my script. The process relies on two attributes: 1- totalRetry: which incremental value to track the retry threshold every time the file is sent to retry. first time it will be set to 0. 2- isPenalized: is used to track if the file should be penalized before each retry (isPenalized == null) or its already penalized which means its ready for the next retry. The groovy script: flowFile = session.get()
if(!flowFile) return
// get totalPenalized and isPenalized attributes
totalRetry = flowFile.getAttribute('totalRetry')
isPenalized = flowFile.getAttribute('isPenalized')
// if its the first time set the value to 0 (no rety yet. first penalize)
totalRetry = !totalRetry ? 0 : totalRetry.toInteger()
// if the total retry has passed the threshold ( 3 in this case) then send to failure rel (expired).
// Total wait time (penalize time) 3*10 sec = 30 secs
if(totalRetry.toInteger()>3)
{
session.transfer(flowFile, REL_FAILURE)
return
}
// if totalRetry has not passed the threshold and the file is not
// penalized (isPenalized == null) then penalize and send back to upstream queue
if(!isPenalized)
{
flowFile = session.putAttribute(flowFile, 'isPenalized', '1')
flowFile = session.penalize(flowFile)
session.transfer(flowFile)
return
}
// Otherwise file has been already penalized then send to retry and increment totalRetry
flowFile = session.putAttribute(flowFile, 'totalRetry', (totalRetry+1).toString())
flowFile = session.removeAttribute(flowFile, 'isPenalized')
session.transfer(flowFile, REL_SUCCESS)
return You can set the Penalize period under processor SETTINGS tab Hope this helps Thanks
... View more
12-18-2023
06:06 AM
Is the problem that the other system doesnt take negative values?
... View more
12-18-2023
05:01 AM
@SAMSAL Thank you for help, everything works unfortunately except the date, but as you said, it is more complicated
... View more
12-14-2023
07:03 AM
It seems that you need to process the 500 records once. In this case you dont need to start the ExecuteSQL processor nor worry about scheduling it unless you want to run it again in the future. Keep the processor stopped then right click on it and select Run Once. This should load the total number of records one time.
... View more
12-14-2023
05:54 AM
Hi @Eva0 , You were close . I think the first astrik "*" was not needed because the input json is not contained in an array brackets. Then to include the email in each data element it has to be added at the date & type level but to reference the value you need to go to 2 levels up using the @ character as in @(2,mail) where the email is located in relative to date & type. [
{
"operation": "shift",
"spec": {
//level 2 where email originally is
"dates": {
// level 1
"*": {
// start count here at level 0
"@(2,email)": "[&1].email",
"date": "[&1].date",
"type": "[&1].type"
}
}
}
}
] If you find this helpful please accept solution. Thanks
... View more
12-12-2023
06:57 AM
Can you post screenshot of the UpdateRecord processor configuration? Also you have to be careful with the provided input because there is an extra comma after last Garry value which makes the json invalid.
... View more
12-11-2023
07:52 AM
@SAMSAL Thank you. For future reference. If you ever need to figure out what the request is, simply use NIFI UI to do the action you want while monitoring your Developer Tools. This will expose the full request/response on any given UI action. I have filed a JIRA for this. https://issues.apache.org/jira/browse/NIFI-12503
... View more