Member since
07-29-2020
574
Posts
320
Kudos Received
175
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
248 | 12-20-2024 05:49 AM | |
282 | 12-19-2024 08:33 PM | |
294 | 12-19-2024 06:48 AM | |
244 | 12-17-2024 12:56 PM | |
238 | 12-16-2024 04:38 AM |
07-11-2024
03:44 AM
1 Kudo
Not sure why do you need to use the replace text processor in this case. I provided that in my sample above as an example anda a way to simulate getting new data by replacing the original content with something else. You can think of replaceText as if Im doing InvokeHttp and getting different flowfile content in the response releationship. If you got the data from module C just link directly to the join enrichment. As long as you have the correct writer\reader configured for each fork then you should be good.
... View more
07-11-2024
03:31 AM
2 Kudos
Hi, Not over complicating your scenario and assuming that you get a .trg file every once in a while where you dont have to worry about clashes or concurrency issue, I would solve this as follows 1- Use ListFile processor and points it to target directory. This processor needs to run on a schedule where its not continuously reading the same file while you are processing them. You have to figure out how much time between the different listing is enough to process the files in case of trg arrival. Also make sure to set the Record Writer property set so that you get an array of all the files in one flow file. there wont be tracking in this case (set listing strategy No Tracking) since we will be continuously reading the same files again and again in case no trg file has made it yet. The output of this processor is going to be an array of all the files found where each file object has the following properties (assuming we have json writer): {
"filename": "...",
"path": "....",
"directory": false,
"size": 256496,
"lastModified": 1707490322483,
"permissions": null,
"owner": null,
"group": null
} 2- Use QueryRecord by adding dynamic property with the following query: select * from flowfile where exists (
select 1 from flowfile where filename like '%.trg'
) This will produce the array list from above only if .trg file is found amongs them, otherwise nothing will happen and we will wait for the next listing from above. 3- If the condition above is met and trg file has made it , then use SplitRecord (or SplitJson in case you are using json writer ) to split each file object. 4- Use EvaluateJsonPath to get the filename and path for each file object. 5- Use FetchFile provided the attributes above to get the file and then do whatever needed. Make sure to set the completion strategy to move or delete the file so that you dont re process again. This is a very simplistic solution that might work in case like I said you get .trg file every once in a while where there is enough time to process each trg files batch. Also if you are not dealing with large number of files. If any of those conditions are not met , you definitly have to re consider. Another option that would work better , is to have two flows where one is continuously picking up whatever files come and place it in staging area and log it in the DB, so that when trg file arrives you invoke the other flow using nifi api to read and process whatever got logged in the DB. The DB table will have the staging area path for each logged file so you pass that to the FetchFile processor. This way you can manage clashes and concurrency issues better as well as you dont have to continuously keep listing all the files like above and query the dataset to look for trg files. The files already has been moved to the staing area and whenever trg arrives the list is read once and the files are processed. If find this helpful please accept the solution. Thanks
... View more
07-11-2024
02:57 AM
2 Kudos
It all depends on the complexity of the data you are working with. If you are taking about data transformation (converting to timestamp, replacing quote, ...etc.) then maybe groovy is the way to go. JSLT has some function that can you help you accomplish this as well like string replace and parse-time functons but Im not sure that is everything. Im not sure where did you get the impression that the nifi community doesnt recommend using groovey and if you find an article about that please share. I think its more of an issue with your Boss not wanting you to do any scripting to avoid not having this supported by others than you. The processor is there for you to use. Actually there is a dedicated processor for groovey called ExecuteGroovyScript. I think the ExecuteScript processor might get deprecated since its redundant. The only issue that I can find that warns about this processor is the fact the script is getting compiled for every flowfile and that might get expensive and impact the performance if you have a big script and working with large data volume. To avoid running into those scenarios, Nifi provides other alternative like InvokeScriptedProcessor (using groovey as well) or develop your custom processor in java (.nar) where the code is compiled once and done. The jslt processor also re compiles the script but it uses caching to avoid having to do that every time. In terms which performs better: groovey or jslt? Im not sure and I have never tested but you can do some stress testing and let us know :).
... View more
07-11-2024
02:32 AM
2 Kudos
Hi, I think you are confusing the grouping function represented by # with the reference function using &. what you need to do is actually reference the ARRAY_ONE index to group the fields properly. [
{
"operation": "shift",
"spec": {
"Data": {
"ARRAY_ONE": {
"*": {
//&1 refernce the index above so you will have two elements
"@(2,ID)": "[&1].ID",
"@(2,DATE)": "[&1].DATE",
"NAME": "[&1].NAME",
"priceRuleAttributes": {
"*": {
"id": {
"PR_BRANCH_NAME": {
//&5 refernce ARRAY_ONE index at level 5 starting from this level (0)
"@(2,values)": "[&5].branchName"
}
}
}
}
}
}
}
}
}
] If you found this helpful please accept solution. Thanks
... View more
07-10-2024
12:25 PM
Its going to be hard to pinpoint what is going on without seeing configuration or providing clear instruction to replicate. When you say "query is not getting executed" , do you mean its stays in the queue or you get an error and the original flowfile gets penalized back ? One reason that its not getting executed is because you are not getting any data from the C processor\group therefore the fork original never gets merged back with anything. Make sure C is providing data to merge with in the final join enrichment
... View more
07-10-2024
07:58 AM
Hi, Sorry to hear that its not working properly yet. I understand that its project specific but can you replicate the issue on some dummy data and post the configuration. This would help us troubleshoot better and find exactly what is going on.
... View more
07-10-2024
06:52 AM
Hi @zain99 , Thanks for your information. it definitely shed more light on why 3.12 is not working and how it need to be resolved. This however appears to have been addressed in 2.0.0M4 release. I have not tested it against 3.12 but I can see in the code its no longer using the find_module and using the find_spec instead. Thanks
... View more
07-10-2024
06:39 AM
Hi @ageMLex , Which version of Nifi are you using. If you are using Nifi version 1.19 or higher then you can take advantage of the jslt transformation using the JSLTTransformJson processor. Its another json transformation language that is very powerful and work just like xquery in xml. Unlike jolt the result can be none json like stirng, integer , boolean...etc. Assuming you have the following json: [
{
"before": {
"old_key1": "old_value1",
"old_key2": "old_value2",
"old_key3": "old_value3",
"old_key4": "old_value4"
},
"after": {
"new_key1": "new_value1",
"new_key2": "new_value2",
"new_key3": "new_value3",
"new_key4": "new_value4"
},
"table": "table_name",
"op": "u",
"tm": 1720539441000000000
}
] The Jslt script would look like this: This will produce a sting like this when you can later use in ExecuteSQL,PUTSQL ..etc: let table = .[0].table
let oldValues = [for (.[0].before) .key+"="+.value ]
let newValues = [for (.[0].after) .key+"="+.value ]
let sqlUpdate = "update "+$table +" set "+ join($newValues,",") + " where " + join($oldValues," and ")
$sqlUpdate Note: when using the above scipt in the jslt processor try to have it in one line. For some reason the processor will give syntax error if the script is broken into different lines. I think this is a bug in the processor not accounting correctly for newline. This will produce the following output: "update table_name set new_key1=new_value1,new_key2=new_value2,new_key3=new_value3,new_key4=new_value4 where old_key1=old_value1 and old_key2=old_value2 and old_key3=old_value3 and old_key4=old_value4" You can use ExtractText with the following pattern to get rid of double quotation in the beginning and the end using regex: ^"(.*)"$ For more info about jslt please check this site.
... View more
07-09-2024
03:33 PM
1 Kudo
Hi @Kondaji , I could be wrong but I dont think you can utilize two enrichment from the same fork and expect to merge back to original. The join enrichment will take the first enrichment and merge it back to the original to do the enrichment based on the selected strategy. What you can do is utilize different fork-join for each enrichment: The first will be to enrich A with B and the second is to enrich the join of A&B with C. The below flow simulates this . I used GenerateFlowFile with single CSV record that has just the id for Group A , then I used two replacetext processors to simulate enrichment info with the same id from A. Here are the processes configurations: A (GenerateFlowFile): B(ReplaceText): JoinEnrichment - A&B: C(ReplaceText): JoinEnrichment- AB&C: Final Result: Hope that helps. If it does, please accept solution. Thanks
... View more
06-21-2024
10:57 AM
Hi, I know this a long shot but I'm going to ask anyway and I'm hoping someone can help because I have been struggling with this for days. I'm trying to create custom reporting task which I managed to do after a spending lots time trying to figure out the correct template, dependencies and conversation since not much available out there for such customization. I managed to deploy it and use and it's working as expected except I would like to run it on primary node . I know by convention the reporting task should not be dependent on a node but im just carious if there is a way in the code to make it work as such. @bbende , @MattWho , @stevenmatison
... View more
Labels:
- Labels:
-
Apache NiFi