Member since
07-29-2020
574
Posts
323
Kudos Received
176
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1999 | 12-20-2024 05:49 AM | |
2268 | 12-19-2024 08:33 PM | |
2045 | 12-19-2024 06:48 AM | |
1350 | 12-17-2024 12:56 PM | |
1937 | 12-16-2024 04:38 AM |
08-16-2023
10:46 PM
2 Kudos
Hi Everyone,
I ran into a situation where I have to the following XML Input:
<workspace id="1234">
<documents totalCount="2">
<document id="5555" title="document1" revision="1">
<file name="file1.pdf" type="pdf"/>
<attributes>
<attribute name="custAtt1" value="v1"/>
<attribute name="custAtt2" value="v2"/>
<attribute name="custAtt3" value=""/>
</attributes>
</document>
<document id="6666" title="document2" revision="0">
<file name="file2.xlsx" type="xlsx"/>
<attributes>
<attribute name="custAtt1" value="v1"/>
<attribute name="custAtt2" value="v2"/>
<attribute name="custAtt3" value="v3"/>
</attributes>
</document>
</documents>
</workspace>
Here, each /workspace/documents/document record needed to be split and transformed into JSON, where:
Each document object needs to have the workspace ID it belongs to.
Each document attribute value will be assigned a key in the format of document_[AttributeName].
Each document file attribute value will be assigned a key in the format of file_[AttributeName].
Each document custom attribute value will be assigned a key in the format custom_[AttributeName]. Empty custom attributes should be ignored.
For example, the first document record will look like the following in JSON:
{
"workspace_id": "1234",
"document_id": "5555",
"document_title": "document1",
"document_revision": "1",
"file_name": "file1.pdf",
"file_type": "pdf",
"custom_custAtt1": "v1",
"custom_custAtt2": "v2"
}
Traditional Approach:
The first approach that came to mind is to use the traditional processors of SplitXML, ConvertRecord and finally use some JoltTransformJson to flatten the JSON and re-map values to the proper keys. However, the jolt is not going to be that straightforward given the complexity of the XML which makes the result of the ConvertRecord processor hard to predict.
EvaluateXQuery to the Rescue:
After researching Xquery syntax, the EvaluateXQuery processor, and some testing, I found that it can be used as a once processor to split, convert, and transform the input XML to the required JSON format. XQuery in general can support multiple data types of output: XML, HTML, and Text. We can utilize the Text data type (Output:Method property in the EvaluateXQuery processor) to produce any format we like, which is JSON in our case. In Xquery syntax, you can use XPath to access elements\attributes values in an XML structure, and it has a lot of built-in functions like "string-join" and "concat" besides variable declaration, all of which can help in the required transformation. The beauty of EvaluateXQuery is that it will return new flowfile content (or attributes, depending on the Destination property setting) for each query result. For example, if we use a for loop to iterate through the different XML child elements and use a "return" statement in the iteration body, every return will result in a newly created flowfile (or nth attribute) in the Matched relationship. This serves as the needed split operation. Without further due, here is how the EvaluateXQuery is configured in my case:
The XQueryValue is a dynamic property which contains the following Xquery:
let $workspaceId := workspace/@id
for $doc in /workspace/documents/document
let $workspace_id := concat('"workspace_id":"',$workspaceId,'"')
let $doc_attr := string-join( for $x in $doc/@* where $x!=''
return
concat(',"document_',name($x),'":"',data($x),'"')
,'')
let $doc_file := string-join( for $x in $doc/file/@*
return
concat(',"file_',name($x),'":"',data($x),'"')
,'')
let $doc_custom:= string-join( for $x in $doc/attributes/attribute where $x/@value!=''
return
concat(',"custom_', $x/@name,'":"',$x/@value,'"')
,'')
let $doc_json:= string-join(
("{",$workspace_id,$doc_attr,$doc_file,$doc_custom,"}")
,'')
return $doc_json
Performance Analysis:
Despite the fact that the EvaluateXQuery in this case compensated for split, convertrecord, and Jolt, I had to make sure that it was still performant in comparison. To do that, I created the following scenarios to test against the same input (all executed on the primary node):
SplitXML -> ConvertRecord -> FlattenJson -> LogMessage
EvaluateXQuery -> LogMessage
The first scenario, of course, is much simplified just to see if EvaluateXQuery still performs better.
After doing stress testing (back pressure was applied), the average Lineage Duration at the end of each flow came out as follows:
First Scenario (split, convert, and transform):
Second Scenario (EvaluateXQuery):
We can see that Max/Mean Values for the EvaluateQuery are much better than traditional split, convert, and transform processors - 00:00:05.811 / 00:00:00.066 vs. 00:00:30.258 / 00:00:00.360
Conclusion:
Using the EvaluateXQuery simplified our data flow and gained us better performance when using the split, convertrecord, and transform processors. However, one needs to be careful when using such a processor, as it doesn’t guarantee valid JSON output, since the output method is set to text. Understanding the schema of your input XML and using XQuery functions like replace to accommodate for any invalid characters can help you avoid failures in the downstream processors.
Thanks for reading.
Please let me know if you have any comments or feedback.
Samer Alisaleh
... View more
Labels:
08-16-2023
08:59 AM
Can you elaborate more on what you are trying to split? The transactions in the first and second split you specified seem to be identical.
... View more
08-16-2023
06:30 AM
Hi, You can try the following spec: [
{
"operation": "shift",
"spec": {
"*": "&",
"responseData": {
"responseList": {
"*": {
"individualInfo": {
"#${UUID()}": "responseData.responseList.[&2].individualInfo.activityUID",
"firstName": "responseData.responseList.[&2].individualInfo.&",
"middleName": "responseData.responseList.[&2].individualInfo.&",
"lastName": "responseData.responseList.[&2].individualInfo.&",
"dateOfBirth": "responseData.responseList.[&2].individualInfo.&"
}
}
}
}
}
}
] If that helps please accept solution. Thanks
... View more
08-07-2023
08:30 AM
Are you referring to validation failures? If this is what you are referring to then the answer is, you can't. If you get failures then you'll need to queue the failures up and validate them manually to see why they failed. I use xmllint. For example: xmllint --noout --schema my_data_schema.xsd my_data.xml
... View more
08-02-2023
09:35 AM
1 Kudo
Hi @Anderosn , The ConvertJsonToSQL according to the documentation will map fields only with simple type: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.6.0/org.apache.nifi.processors.standard.ConvertJSONToSQL/index.html "...The incoming FlowFile is expected to be "flat" JSON message, meaning that it consists of a single JSON element and each field maps to a simple type..." If you dont care about storing array fields as is, then you can use JoltTransformJson to convert array fields to concatenated string. The jolt spec to do this can be as the following: [
{
"operation": "modify-overwrite-beta",
"spec": {
"CARDNUMBER": "=join(',',@(1,CARDNUMBER))",
"EXPIRYDATE": "=join(',',@(1,EXPIRYDATE))",
"EMAIL": "=join(',',@(1,EMAIL))"
}
}
] This will produce the following output: {
"REQUESTID" : "379",
"REQUESTTYPE" : "V",
"REQUESTCONTEXT" : "B2B",
"CARDNUMBER" : "5537290000000511,5537290000000522",
"EXPIRYDATE" : "08/20,09/21",
"EMAIL" : "John.Jones123@abcmail.com,Jason.456@gmail.com",
"PHONENUMBER" : "11234565555"
} Which can be converted to SQL using the ConvertJsonToSQL and all fields will be populated. Hope that works. If that helps please accept solution. Thanks
... View more
08-01-2023
11:35 PM
Good morning @cotopaul @SAMSAL First of all thank you very much for your time Ok Ok so I see that I missunderstood how QueryDatabaseTable is "triggered". @SAMSAL I knew about those configuration but as I said I thought this processor only do the query when a new record is inserted in the database taking reference the maximum column value. Makes sense now the postgresql log...the querys executed every second since my RunSchedule is on default (0). What I use minifi for is to load sales from a supermarket then send it via API (and others transformations), so I need this to be in real time as soon as the client paid. I guess I have to define Run Schedule every few seconds. @cotopaul I will take a look at my DBCPConnectionPool, this info it´s helpfull Thank you all.
... View more
08-01-2023
06:54 AM
1 Kudo
Hi @MWM , What you are describing is a classical data enrichment pattern that can be achieved using ForkEnrichment & JoinEnrichment processors. For more information on this please refer to : https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.20.0/org.apache.nifi.processors.standard.JoinEnrichment/additionalDetails.html Based on your scenario the "SQL" strategy of the JoinEnrichment will work the best for you since you can select the main fields (system, name, surname, phone, mail) from the original flowfile data and select proffesion and department from enrichment result: SELECT o.system, o.name, e.surname,o.phone, o.mail, e.profession, e.department FROM original o LEFT OUTER JOIN enrichment e ON o.name= e.name Since you are splitting the CSV and enrich per record then you can just join by name. If you have an API where you can get a collection of user information then you dont have to split and you can do the enrichment on multiple records from the CSV vs. returned records from the API json output , however be aware that if you have large data set this strategy "... can lead to heap exhaustion and cause stability problems or OutOfMemoryErrors to occur". Please review the link above to see how this can be mitigated. If you find this is helpful please accept solution. Thanks
... View more
07-31-2023
01:42 PM
Hi @nict , Not sure how you want to add your headings (firstDetails, secondDetails...etc.) specially secondDetails is not found anywhere in your jolt spec. If you are just looking to associate different records (test1, test2, test3...etc.) to different header based on the value of "Name" where you have to list each Name value explicitly, then something like this will work: [
{
"operation": "shift",
"spec": {
"dataPackage": {
"datatest": {
"field": {
"*": {
"Name": {
"test1": {
"@(2,content)": "firstDetails.test1"
},
"test2": {
"@(2,content)": "firstDetails.test2"
},
"test3": {
"@(2,content)": "secondDetails.test3"
}
}
}
}
}
}
}
}
] You can add more values\header as needed. However, if you are looking to create header dynamically based on the Name values then you have to provide more info on how you would associate records with header. If that helps please accept solution. Thanks
... View more
07-30-2023
04:45 PM
I see, it work! Thanks for your information
... View more
07-28-2023
05:28 AM
1 Kudo
Hi @Anderosn, Similar question was asked before, see if the following posts can help: https://community.cloudera.com/t5/Support-Questions/How-to-load-json-record-to-postgres-as-json-datatype/m-p/345779#M234644 https://community.cloudera.com/t5/Support-Questions/Apache-Nifi-Insert-json-data-into-table-as-single-column/m-p/363974 https://community.cloudera.com/t5/Support-Questions/nifi-Writing-Inserting-entire-flowfile-content/td-p/205367 If Any of the links work for your case please accept solution. Thanks
... View more