Member since
07-29-2020
574
Posts
323
Kudos Received
176
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2871 | 12-20-2024 05:49 AM | |
| 3221 | 12-19-2024 08:33 PM | |
| 2917 | 12-19-2024 06:48 AM | |
| 1960 | 12-17-2024 12:56 PM | |
| 2660 | 12-16-2024 04:38 AM |
09-10-2023
11:45 PM
@SAMSAL I experimented same template with nifi 1.10.0, and found that FetchXMLFile has no issues with execution node as PRIMARY. It seems this new requirement mentioned by you was introduced only after 1.10.0.
... View more
09-03-2023
10:42 PM
1 Kudo
Update : This is working as I hoped for With this configuration the FileProcessor group will take the next flowfile only after completely processing the flowfile that is inside the group. Thank you @SAMSAL, @pvillard
... View more
08-31-2023
09:12 AM
If you are getting multiple records in one Json array , then you probably need to use SplitJosn Processor to get each record individually , then extract the values you need using EvaluateJsonPath from each record then do the PutSQL.
... View more
08-30-2023
01:54 PM
Each zip contains a json file along with a bunch of various file types. After unzipping, I do some processing on the other files and then I need to match up json with each of the other files contained in from the original zip file to create an elasticsearch document. I tried using a Content Merge based on the fragment attribute, but that is not working 100%. I am out of ideas on how to get this to work.
... View more
08-28-2023
08:05 AM
@JohnnyRocks, as @steven-matison said, you should avoid linking so many ReplaceText. I am not quite sure I understood your flow exactly, but something tells me that before reaching ReplaceText, something is not properly configured in your NiFi Flow. First of all, when using the classic Java Data Format, MM will always transpose in a two digit month, meaning that month from 1 to 9 will be automatically appended with a leading zero. "dd" will do the same trick but for days. As I see in your post, you said that your CSV reader is configured to read the data as MM/dd/yy, which should be fine, but somehow something is missing here ---> How do you reach the format of dd/MM/yyyy? What I would personally try to do is to convert all those date values in the same format. So instead of all those ReplaceText, I would try to insert an UpdateRecord Processor, where I would define my RecordReader and my RecordWritter with the desired schemas (make sure that your column is type int with logicaly type date). Next, in that processor, I would change the Replacement Value Strategy into "Record Path Value" and I would press on + and add a new property. I would call it "/Launch_Date" (pay attention to the leading slash) and I would assign it the value " format( /Launch_Date, "dd/MM/yyyy", "Europe/Bucharest") " (or any other timezone you require -- if you require your data in UTC, just remove the coma and the timezone).
... View more
08-27-2023
03:19 PM
How often are you looking to run the GenerateFetchTable ? If its going to be a batch process then you can set up the a schedule on the top processor using processor config -> Scheduling tab and setting the Run Schedule value. By default this value is set to 0 secs which means its continuously running.
... View more
08-22-2023
08:51 AM
1 Kudo
Hi @Anderosn , If I understood you correctly then you are trying to duplicate the flowfile so that it can be sent to different processors, is that right? if that is the case then you can easily drag the same relationship multiple times from a given processor, so lets assume in the upstream processor where you are getting the result flowfile is sending this flowfile to the success relationship, then you can drag two success relationship to different downstream processors and process the same content differently in parallel. If that helps please accept solution. Thanks
... View more
08-18-2023
02:36 AM
1 Kudo
Hi, If you're using Apache NiFi and the token you're trying to capture with the InvokeHTTP processor is too large to be stored as an attribute, you can follow the steps below to work around this limitation: Keep the token in the content of the FlowFile if it's returned by the InvokeHTTP processor. You can use processors like ReplaceText to wrap the token in the header format you need. For instance, if you need the header to be Authorization: Bearer {token}, then you can configure a ReplaceText processor to replace the content (i.e., the token) to match this format.
... View more
08-17-2023
06:29 AM
I think this is what I was trying to achieve, pause the execution for some time after processing 1000 flowfiles . Thank you
... View more
08-16-2023
10:46 PM
2 Kudos
Hi Everyone,
I ran into a situation where I have to the following XML Input:
<workspace id="1234">
<documents totalCount="2">
<document id="5555" title="document1" revision="1">
<file name="file1.pdf" type="pdf"/>
<attributes>
<attribute name="custAtt1" value="v1"/>
<attribute name="custAtt2" value="v2"/>
<attribute name="custAtt3" value=""/>
</attributes>
</document>
<document id="6666" title="document2" revision="0">
<file name="file2.xlsx" type="xlsx"/>
<attributes>
<attribute name="custAtt1" value="v1"/>
<attribute name="custAtt2" value="v2"/>
<attribute name="custAtt3" value="v3"/>
</attributes>
</document>
</documents>
</workspace>
Here, each /workspace/documents/document record needed to be split and transformed into JSON, where:
Each document object needs to have the workspace ID it belongs to.
Each document attribute value will be assigned a key in the format of document_[AttributeName].
Each document file attribute value will be assigned a key in the format of file_[AttributeName].
Each document custom attribute value will be assigned a key in the format custom_[AttributeName]. Empty custom attributes should be ignored.
For example, the first document record will look like the following in JSON:
{
"workspace_id": "1234",
"document_id": "5555",
"document_title": "document1",
"document_revision": "1",
"file_name": "file1.pdf",
"file_type": "pdf",
"custom_custAtt1": "v1",
"custom_custAtt2": "v2"
}
Traditional Approach:
The first approach that came to mind is to use the traditional processors of SplitXML, ConvertRecord and finally use some JoltTransformJson to flatten the JSON and re-map values to the proper keys. However, the jolt is not going to be that straightforward given the complexity of the XML which makes the result of the ConvertRecord processor hard to predict.
EvaluateXQuery to the Rescue:
After researching Xquery syntax, the EvaluateXQuery processor, and some testing, I found that it can be used as a once processor to split, convert, and transform the input XML to the required JSON format. XQuery in general can support multiple data types of output: XML, HTML, and Text. We can utilize the Text data type (Output:Method property in the EvaluateXQuery processor) to produce any format we like, which is JSON in our case. In Xquery syntax, you can use XPath to access elements\attributes values in an XML structure, and it has a lot of built-in functions like "string-join" and "concat" besides variable declaration, all of which can help in the required transformation. The beauty of EvaluateXQuery is that it will return new flowfile content (or attributes, depending on the Destination property setting) for each query result. For example, if we use a for loop to iterate through the different XML child elements and use a "return" statement in the iteration body, every return will result in a newly created flowfile (or nth attribute) in the Matched relationship. This serves as the needed split operation. Without further due, here is how the EvaluateXQuery is configured in my case:
The XQueryValue is a dynamic property which contains the following Xquery:
let $workspaceId := workspace/@id
for $doc in /workspace/documents/document
let $workspace_id := concat('"workspace_id":"',$workspaceId,'"')
let $doc_attr := string-join( for $x in $doc/@* where $x!=''
return
concat(',"document_',name($x),'":"',data($x),'"')
,'')
let $doc_file := string-join( for $x in $doc/file/@*
return
concat(',"file_',name($x),'":"',data($x),'"')
,'')
let $doc_custom:= string-join( for $x in $doc/attributes/attribute where $x/@value!=''
return
concat(',"custom_', $x/@name,'":"',$x/@value,'"')
,'')
let $doc_json:= string-join(
("{",$workspace_id,$doc_attr,$doc_file,$doc_custom,"}")
,'')
return $doc_json
Performance Analysis:
Despite the fact that the EvaluateXQuery in this case compensated for split, convertrecord, and Jolt, I had to make sure that it was still performant in comparison. To do that, I created the following scenarios to test against the same input (all executed on the primary node):
SplitXML -> ConvertRecord -> FlattenJson -> LogMessage
EvaluateXQuery -> LogMessage
The first scenario, of course, is much simplified just to see if EvaluateXQuery still performs better.
After doing stress testing (back pressure was applied), the average Lineage Duration at the end of each flow came out as follows:
First Scenario (split, convert, and transform):
Second Scenario (EvaluateXQuery):
We can see that Max/Mean Values for the EvaluateQuery are much better than traditional split, convert, and transform processors - 00:00:05.811 / 00:00:00.066 vs. 00:00:30.258 / 00:00:00.360
Conclusion:
Using the EvaluateXQuery simplified our data flow and gained us better performance when using the split, convertrecord, and transform processors. However, one needs to be careful when using such a processor, as it doesn’t guarantee valid JSON output, since the output method is set to text. Understanding the schema of your input XML and using XQuery functions like replace to accommodate for any invalid characters can help you avoid failures in the downstream processors.
Thanks for reading.
Please let me know if you have any comments or feedback.
Samer Alisaleh
... View more
Labels: