Member since
07-29-2020
574
Posts
323
Kudos Received
176
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 3585 | 12-20-2024 05:49 AM | |
| 3831 | 12-19-2024 08:33 PM | |
| 3631 | 12-19-2024 06:48 AM | |
| 2367 | 12-17-2024 12:56 PM | |
| 3115 | 12-16-2024 04:38 AM |
08-27-2023
03:19 PM
How often are you looking to run the GenerateFetchTable ? If its going to be a batch process then you can set up the a schedule on the top processor using processor config -> Scheduling tab and setting the Run Schedule value. By default this value is set to 0 secs which means its continuously running.
... View more
08-27-2023
03:14 PM
Hi @JohnnyRocks , Its hard for me to suggest a solution without seeing how the input looks like, however based on the schema you provided and assuming that you are dealing with dates only values, I tested the following config in the ReplaceText Processor and it appears to work on all cases: Search Value: (\b\d/) Replacement Value: 0$1 If that doesnt help can you provide sample input and the different cases for search and replace values in the three replaceText processors. If that helps please accept solution. Thanks
... View more
08-26-2023
06:54 AM
Hi @TonyPath , I noticed that the added dynamic property initial.maxvalue.<max_column_name> accept expression language (EL) so since the GenerateTableFetch can take incoming relationship maybe you can use ExecuteSQLRecord to fetch the initial max value from sql into json format then store it in a flow file attribute using EvaluateJsonPath processor which you can reference to set the initial.maxvalue.<max_column_name> property value as follows: ExecuteSQLRecord Config: EvaluateJsonPath Config: GenerateTableFetch Config: If that helps please accept solution. Thanks
... View more
08-22-2023
08:51 AM
1 Kudo
Hi @Anderosn , If I understood you correctly then you are trying to duplicate the flowfile so that it can be sent to different processors, is that right? if that is the case then you can easily drag the same relationship multiple times from a given processor, so lets assume in the upstream processor where you are getting the result flowfile is sending this flowfile to the success relationship, then you can drag two success relationship to different downstream processors and process the same content differently in parallel. If that helps please accept solution. Thanks
... View more
08-17-2023
11:22 AM
1 Kudo
How are you authenticating? Are you using OAuth2 token for example? If so see if you can take advantage of the "OAuth2 Access Token provider" in the InvokeHttp processors where you set and configure your token provider as a service: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-oauth2-provider-nar/1.22.0/org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider/index.html If that helps please accept solution. Thanks
... View more
08-17-2023
05:37 AM
2 Kudos
Hi @scoutjohn , Have you looked into ControlRate processor? https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.12.1/org.apache.nifi.processors.standard.ControlRate/ In your case the processor configuration would be something like this: If that helps please accept solution. Thanks
... View more
08-16-2023
10:46 PM
2 Kudos
Hi Everyone,
I ran into a situation where I have to the following XML Input:
<workspace id="1234">
<documents totalCount="2">
<document id="5555" title="document1" revision="1">
<file name="file1.pdf" type="pdf"/>
<attributes>
<attribute name="custAtt1" value="v1"/>
<attribute name="custAtt2" value="v2"/>
<attribute name="custAtt3" value=""/>
</attributes>
</document>
<document id="6666" title="document2" revision="0">
<file name="file2.xlsx" type="xlsx"/>
<attributes>
<attribute name="custAtt1" value="v1"/>
<attribute name="custAtt2" value="v2"/>
<attribute name="custAtt3" value="v3"/>
</attributes>
</document>
</documents>
</workspace>
Here, each /workspace/documents/document record needed to be split and transformed into JSON, where:
Each document object needs to have the workspace ID it belongs to.
Each document attribute value will be assigned a key in the format of document_[AttributeName].
Each document file attribute value will be assigned a key in the format of file_[AttributeName].
Each document custom attribute value will be assigned a key in the format custom_[AttributeName]. Empty custom attributes should be ignored.
For example, the first document record will look like the following in JSON:
{
"workspace_id": "1234",
"document_id": "5555",
"document_title": "document1",
"document_revision": "1",
"file_name": "file1.pdf",
"file_type": "pdf",
"custom_custAtt1": "v1",
"custom_custAtt2": "v2"
}
Traditional Approach:
The first approach that came to mind is to use the traditional processors of SplitXML, ConvertRecord and finally use some JoltTransformJson to flatten the JSON and re-map values to the proper keys. However, the jolt is not going to be that straightforward given the complexity of the XML which makes the result of the ConvertRecord processor hard to predict.
EvaluateXQuery to the Rescue:
After researching Xquery syntax, the EvaluateXQuery processor, and some testing, I found that it can be used as a once processor to split, convert, and transform the input XML to the required JSON format. XQuery in general can support multiple data types of output: XML, HTML, and Text. We can utilize the Text data type (Output:Method property in the EvaluateXQuery processor) to produce any format we like, which is JSON in our case. In Xquery syntax, you can use XPath to access elements\attributes values in an XML structure, and it has a lot of built-in functions like "string-join" and "concat" besides variable declaration, all of which can help in the required transformation. The beauty of EvaluateXQuery is that it will return new flowfile content (or attributes, depending on the Destination property setting) for each query result. For example, if we use a for loop to iterate through the different XML child elements and use a "return" statement in the iteration body, every return will result in a newly created flowfile (or nth attribute) in the Matched relationship. This serves as the needed split operation. Without further due, here is how the EvaluateXQuery is configured in my case:
The XQueryValue is a dynamic property which contains the following Xquery:
let $workspaceId := workspace/@id
for $doc in /workspace/documents/document
let $workspace_id := concat('"workspace_id":"',$workspaceId,'"')
let $doc_attr := string-join( for $x in $doc/@* where $x!=''
return
concat(',"document_',name($x),'":"',data($x),'"')
,'')
let $doc_file := string-join( for $x in $doc/file/@*
return
concat(',"file_',name($x),'":"',data($x),'"')
,'')
let $doc_custom:= string-join( for $x in $doc/attributes/attribute where $x/@value!=''
return
concat(',"custom_', $x/@name,'":"',$x/@value,'"')
,'')
let $doc_json:= string-join(
("{",$workspace_id,$doc_attr,$doc_file,$doc_custom,"}")
,'')
return $doc_json
Performance Analysis:
Despite the fact that the EvaluateXQuery in this case compensated for split, convertrecord, and Jolt, I had to make sure that it was still performant in comparison. To do that, I created the following scenarios to test against the same input (all executed on the primary node):
SplitXML -> ConvertRecord -> FlattenJson -> LogMessage
EvaluateXQuery -> LogMessage
The first scenario, of course, is much simplified just to see if EvaluateXQuery still performs better.
After doing stress testing (back pressure was applied), the average Lineage Duration at the end of each flow came out as follows:
First Scenario (split, convert, and transform):
Second Scenario (EvaluateXQuery):
We can see that Max/Mean Values for the EvaluateQuery are much better than traditional split, convert, and transform processors - 00:00:05.811 / 00:00:00.066 vs. 00:00:30.258 / 00:00:00.360
Conclusion:
Using the EvaluateXQuery simplified our data flow and gained us better performance when using the split, convertrecord, and transform processors. However, one needs to be careful when using such a processor, as it doesn’t guarantee valid JSON output, since the output method is set to text. Understanding the schema of your input XML and using XQuery functions like replace to accommodate for any invalid characters can help you avoid failures in the downstream processors.
Thanks for reading.
Please let me know if you have any comments or feedback.
Samer Alisaleh
... View more
Labels:
08-02-2023
09:35 AM
1 Kudo
Hi @Anderosn , The ConvertJsonToSQL according to the documentation will map fields only with simple type: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.6.0/org.apache.nifi.processors.standard.ConvertJSONToSQL/index.html "...The incoming FlowFile is expected to be "flat" JSON message, meaning that it consists of a single JSON element and each field maps to a simple type..." If you dont care about storing array fields as is, then you can use JoltTransformJson to convert array fields to concatenated string. The jolt spec to do this can be as the following: [
{
"operation": "modify-overwrite-beta",
"spec": {
"CARDNUMBER": "=join(',',@(1,CARDNUMBER))",
"EXPIRYDATE": "=join(',',@(1,EXPIRYDATE))",
"EMAIL": "=join(',',@(1,EMAIL))"
}
}
] This will produce the following output: {
"REQUESTID" : "379",
"REQUESTTYPE" : "V",
"REQUESTCONTEXT" : "B2B",
"CARDNUMBER" : "5537290000000511,5537290000000522",
"EXPIRYDATE" : "08/20,09/21",
"EMAIL" : "John.Jones123@abcmail.com,Jason.456@gmail.com",
"PHONENUMBER" : "11234565555"
} Which can be converted to SQL using the ConvertJsonToSQL and all fields will be populated. Hope that works. If that helps please accept solution. Thanks
... View more
08-01-2023
06:54 AM
1 Kudo
Hi @MWM , What you are describing is a classical data enrichment pattern that can be achieved using ForkEnrichment & JoinEnrichment processors. For more information on this please refer to : https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.20.0/org.apache.nifi.processors.standard.JoinEnrichment/additionalDetails.html Based on your scenario the "SQL" strategy of the JoinEnrichment will work the best for you since you can select the main fields (system, name, surname, phone, mail) from the original flowfile data and select proffesion and department from enrichment result: SELECT o.system, o.name, e.surname,o.phone, o.mail, e.profession, e.department FROM original o LEFT OUTER JOIN enrichment e ON o.name= e.name Since you are splitting the CSV and enrich per record then you can just join by name. If you have an API where you can get a collection of user information then you dont have to split and you can do the enrichment on multiple records from the CSV vs. returned records from the API json output , however be aware that if you have large data set this strategy "... can lead to heap exhaustion and cause stability problems or OutOfMemoryErrors to occur". Please review the link above to see how this can be mitigated. If you find this is helpful please accept solution. Thanks
... View more
07-31-2023
01:42 PM
Hi @nict , Not sure how you want to add your headings (firstDetails, secondDetails...etc.) specially secondDetails is not found anywhere in your jolt spec. If you are just looking to associate different records (test1, test2, test3...etc.) to different header based on the value of "Name" where you have to list each Name value explicitly, then something like this will work: [
{
"operation": "shift",
"spec": {
"dataPackage": {
"datatest": {
"field": {
"*": {
"Name": {
"test1": {
"@(2,content)": "firstDetails.test1"
},
"test2": {
"@(2,content)": "firstDetails.test2"
},
"test3": {
"@(2,content)": "secondDetails.test3"
}
}
}
}
}
}
}
}
] You can add more values\header as needed. However, if you are looking to create header dynamically based on the Name values then you have to provide more info on how you would associate records with header. If that helps please accept solution. Thanks
... View more