Member since
07-29-2020
574
Posts
323
Kudos Received
176
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2143 | 12-20-2024 05:49 AM | |
| 2438 | 12-19-2024 08:33 PM | |
| 2189 | 12-19-2024 06:48 AM | |
| 1456 | 12-17-2024 12:56 PM | |
| 2082 | 12-16-2024 04:38 AM |
12-28-2023
12:53 PM
@apache_nifi, Thanks for providing more details. Have you tried what I proposed in my first response? I still think that using ReplaceText would help you create the needed request for each consecutive api call. You can use something like EvaluageJsonPath or ExtractText processors to extract any parameters from the latest response and store as flowfile attributes, which you can use when constructing the next request . For example I can see that step 2 relies on the output of step 1, so after you get the idp token you can use the ExtractText processor to save the token into attribute (by adding dynamic property) as follows: This will give you the same flowfile (idp token) but it will have new attribute idp_token: Then you can use ReplaceText to construct the next request body as follows: The Replacement Value will have the x-www-form-urlencoded parameters: client_id=CLIENT_ID&grant_type=urn:GRANT_TYPE&company_id=COMPANT_ID&new_token=true&assertion=${idp_token} Notice how Im using the expression language ${idp_token} to set the assertion parameter. This will generate new flowfile for step 2 invokehttp which will serve as the request body. Keep in mind any request body has to be passed as flowfile to the InvokeHttp processor. For more information on how to do x-www-form-urlencoded request using invokehttp: https://community.cloudera.com/t5/Support-Questions/NIFI-Is-it-possible-to-make-a-x-www-form-urlencoded-POST/m-p/339398 Hopefully you have enough to get going. Let me know if you have any particular questions. If that helps please accept solution. Thanks
... View more
12-28-2023
12:16 PM
Hi @MWM , replacing empty string with null is kind of tricky thing with jolt. Unfortunately there is no direct way that I know of but hopefully my comments will make it easy for you to understand. Based on your spec I assume you have the following input: {
"idTransakcji": "123",
"date": "",
"name": "sam"
} The jolt spec would be like this ( comments start with //): [
{
"operation": "shift",
"spec": {
"idTransakcji": "id",
"date": {
// Parse date value
// if its empty string ""
"": {
//remove the date from the output
"": "user.date"
},
//For any values other than ""
"*": {
//assign the actual value ($) to user.date
"$": "user.date"
}
},
// Assign other fields like name under user
"*": "user.&"
}
},
{
"operation": "modify-overwrite-beta",
"spec": {
"user": {
// if the date is found it means that we
// should have a value, therefore convert to long
"date": "=toLong(@(1,date))"
}
}
},
{
"operation": "default",
"spec": {
"user": {
// if the date is not found then set to null
// default transformation only works when
// the key doesnt exist.
"date": null
}
}
}
] You can try the spec here : https://jolt-demo.appspot.com/#inception If that helps please accept solution. Thanks
... View more
12-28-2023
11:40 AM
2 Kudos
Hi @Sadhana21 , You can do this in two ways that I can think of : 1- This might take few processors but basically after you get the output: [{
"order_id": "10000",
"order_item": "[{\"product_Id\": \"10007\", \"order_Item_Seq_Id\": \"00101\"}]"
}] 1.1 First EvaluateJsonPath to get the order_id and set the destination to attribute: 1.2 2ed EvaluateJsonPath to get the order_item and set the destination to flowfile and the Return Type as Json This will produce the following output: [{"product_Id": "10007", "order_Item_Seq_Id": "00101"}] 1.3 Finally you JoltTransformJson with the following spec: [
{
"operation": "shift",
"spec": {
"*": {
"#${order_id}": "[&1].order_id",
"@": "[&1].order_item[]"
}
}
}
] This should give you the needed result 2- Using just UpdateRecord Processor: I like this option because that is the only processor you need to use. The only caveat is that you need to define JsonRecordReader & JsonRecordSetWriter where in the later you have to define Avro schema for the expected output. In the UpdateRecord you can use the built-in function called "unescapeJson" to read json string and return it as json: https://nifi.apache.org/docs/nifi-docs/html/record-path-guide.html#unescapejson UpdateRecord: JsonRecordSetWriter: Avro Scheam in the Schema Text: {
"name": "nifi",
"type": "record",
"fields": [
{
"name": "order_id",
"type": "string"
},
{
"name": "order_item",
"type": {
"name": "order_item_array",
"type": "array",
"items": {
"name": "order_item",
"type": "record",
"fields": [
{
"name": "product_Id",
"type": "string"
},
{
"name": "order_Item_Seq_Id",
"type": "string"
}
]
}
}
}
]
} If that helps please accept solution Thanks
... View more
12-27-2023
07:36 AM
Hi @apache_nifi , Its difficult to give you an exact answer without seeing and understanding the whole scenario. For example how those 9 InvokeHttp processors are related and what kind of request body you need to construct for each POST. If you cant list all the API's and the request body each needs and how are they dependent, maybe you can create an example based on hypothetical situation. However, to give you some direction that might help in your case, there are a lot of processors that you can use in between to help construct the next request body, for example lets say that the first invokeHttp returns a json response and you need some parameters from this response to pass to the next InvokeHttp, then you can use processor like EvaluateJsonPath to get the parameters into flowfile attributes then use ReplaceText processor to replace the json response flowfile content with the new request body and utilize the flowfile attributes you extracted from the json response to populate the needed parameters in the replace value. To demonstrate: Lets assume I want to use Nifi Api to start a processor and run it once , the first invokehttp will be to get the processor information using the processor Id. The response of this invokation will be something like this: {
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"id": "value",
"uri": "value",
"position": {…},
"permissions": {…},
"bulletins": [{…}],
"disconnectedNodeAcknowledged": true,
"component": {…},
"inputRequirement": "value",
"status": {…},
"operatePermissions": {…}
} To change the run state of processor I need to pass the version number alongside the processor id in the request body. To get the version number (0) from the response above and store it as an attribute I use EvaluateJsonPath like this: Then I use ReplaceText to replace the response content with the run state api request body : In the Search Value I'm using the regex "(?s)(^.*$)" that captures everything and the replacement value is the request body for the next invokehttp: {
"revision": {
"clientId": "${pipeline.start.processor.id}",
"version": ${processorVersion}
},
"state": "RUN_ONCE",
"disconnectedNodeAcknowledged": true
} Notice how I'm utilizing the flowfile attributes using expression language to populate the required parameters like processor id and processor version which I extracted using the EvaluateJsonPath above. Next will will my InvokeHttp passing the new request body content. Hope that helps put you on the right direction. If this helps please accept solution. Thanks
... View more
12-22-2023
09:52 AM
@Rohit1997jio, I was going to answer this similar to @MattWho who beat me to it but I will post my answer anyway in case it can help. Basically @MattWho is correct , you can use the ExecuteScript processor to somehow simulate the retry flowfile processor and its better than RouteOnAttribute Option because when you use penalize the processor is setting idle during this time unlike the RouteOnAttribute where its always looping to unmached relationship for the period of wait time. Anyway here is my solution and in my case you dont need RouteOnAttribute but you have to add more code. I'm using groovy code for my script. The process relies on two attributes: 1- totalRetry: which incremental value to track the retry threshold every time the file is sent to retry. first time it will be set to 0. 2- isPenalized: is used to track if the file should be penalized before each retry (isPenalized == null) or its already penalized which means its ready for the next retry. The groovy script: flowFile = session.get()
if(!flowFile) return
// get totalPenalized and isPenalized attributes
totalRetry = flowFile.getAttribute('totalRetry')
isPenalized = flowFile.getAttribute('isPenalized')
// if its the first time set the value to 0 (no rety yet. first penalize)
totalRetry = !totalRetry ? 0 : totalRetry.toInteger()
// if the total retry has passed the threshold ( 3 in this case) then send to failure rel (expired).
// Total wait time (penalize time) 3*10 sec = 30 secs
if(totalRetry.toInteger()>3)
{
session.transfer(flowFile, REL_FAILURE)
return
}
// if totalRetry has not passed the threshold and the file is not
// penalized (isPenalized == null) then penalize and send back to upstream queue
if(!isPenalized)
{
flowFile = session.putAttribute(flowFile, 'isPenalized', '1')
flowFile = session.penalize(flowFile)
session.transfer(flowFile)
return
}
// Otherwise file has been already penalized then send to retry and increment totalRetry
flowFile = session.putAttribute(flowFile, 'totalRetry', (totalRetry+1).toString())
flowFile = session.removeAttribute(flowFile, 'isPenalized')
session.transfer(flowFile, REL_SUCCESS)
return You can set the Penalize period under processor SETTINGS tab Hope this helps Thanks
... View more
12-21-2023
01:28 PM
1 Kudo
I did a test on a multipart file upload api and it worked for with the following properties: No need to add dynamic properties for Content-Disposition & Content-Type. There is a property already for the Content-Type called "Request Content-Type".
... View more
12-21-2023
10:26 AM
1 Kudo
My understanding is the file "file.xls" will be uploaded and the file name is stored under filename attribute (make sure to set the file name and extension correctly in the UpdateAttribute as file.xls) , Also make sure the following properties are set as follows: Request Body Enabled : true Request Multipart Form-Data Filename Enabled : true The later will set the Content-Disposition accordingly based on the filename attribute so you dont have to add as dynamic property. Hope that helps.
... View more
12-21-2023
08:57 AM
1 Kudo
Hi @enam , Not sure where you set the acno parameter , I cant see it in the UpdateAttribute. Make sure the value is available before the invokeHttp as flowfile attribute. Once you have that and you want to send it as multipart form request you need to add dynamic property with the following format per the documentation (https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.17.0/org.apache.nifi.processors.standard.InvokeHTTP/index.html So in your case the dynamic property will look like the following: In this case you probably dont need the AttributeToJson processor. If that helps please accept solution. Thanks
... View more
12-20-2023
06:40 AM
I noticed you are using the "data rate" for the Rate Control Criteria. I dont think this is the correct criteria , instead it should be by "flowfile count". Also you mentioned that you wanted to wait 10 seconds so the Time Duration property should be set as such. Keep in mind setting the ControlRate this way means , that the processor will allow 1 flowfile to proceed to the success relationship every 10 secs, so if you have 3 files added to the queue around the same time , the first one will proceed after 10 seconds , 2ed will proceed after 20 secs and the 3rd in 30 secs. If you are looking for only 10 secs on each file , you can use RouteOnAttribute as mentioned here: https://stackoverflow.com/questions/61875809/introduce-time-delay-before-moving-flow-files-to-next-processor-in-nifi I noticed also from your processors that you are using version 1.18 which should have retryflowfile . If that helps please accept solution. Thanks
... View more
12-19-2023
01:15 PM
Hi @Rohit1997jio , What version are you using ? Have you looked into ControlRate processor ( https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.23.2/org.apache.nifi.processors.standard.ControlRate/index.html )? The wait processor is not supposed to be used by itself , it should be accompanied by the Notify processor and its used for certain scenarios where for example if you want to split data into multiple flowfiles and you want to wait for all of them to be processed before performing certain step and so on. If that helps please accept solution. Thanks
... View more