Member since
07-29-2020
574
Posts
320
Kudos Received
175
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
384 | 12-20-2024 05:49 AM | |
447 | 12-19-2024 08:33 PM | |
430 | 12-19-2024 06:48 AM | |
340 | 12-17-2024 12:56 PM | |
342 | 12-16-2024 04:38 AM |
08-30-2023
01:14 PM
Hi @code_mnkey , Its not recommended to store possibly large values into attributes since attributes have size limit and are stored in JVM Memory (heap) which might result in out of memory exception in case you are dealing with large amount of zip files that might have large amount of file contents. May I ask why are you trying to store the file list in an attribute?
... View more
08-29-2023
07:20 PM
If you are able to store the display name and the mail values into flowfile attributes, then the easiest way is to use the PutSQL processor to create your insert statement of those values and other values you want to ingest into the target table. In the PutSQL you can specify the insert statement in the SQL Statement property as follows: insert into myTable(displayNameCol, mailCol, otherCol...) values
('${displayNameAttribute}','${mailAttribute}', 'otherValue',...) The AttributeToJson should work as well if you specify which attributes you want to insert into the DB to be converted as Json flowfile and then use the PutDatabaseRecord with JsonTreeReader where you specify that the Statement Type property as INSERT. The only caveat here is in the PutDatabaseRecord the attribute name should match the table column names. If that helps please accept solution. Thanks
... View more
08-29-2023
09:19 AM
Hi @Kiranq , If you are getting the output form the ExecuteStreamCommand as Json I dont understand why are you using the ConvertAvroJson Processor. In the PutDatabaseRecrod you can specify the Record Reader property as JsonTreeReader and consume it as such. I guess if you can shed more light on how the executestream output looks like maybe we can help you better. If that helps please accept solution. Thanks
... View more
08-27-2023
03:19 PM
How often are you looking to run the GenerateFetchTable ? If its going to be a batch process then you can set up the a schedule on the top processor using processor config -> Scheduling tab and setting the Run Schedule value. By default this value is set to 0 secs which means its continuously running.
... View more
08-27-2023
03:14 PM
Hi @JohnnyRocks , Its hard for me to suggest a solution without seeing how the input looks like, however based on the schema you provided and assuming that you are dealing with dates only values, I tested the following config in the ReplaceText Processor and it appears to work on all cases: Search Value: (\b\d/) Replacement Value: 0$1 If that doesnt help can you provide sample input and the different cases for search and replace values in the three replaceText processors. If that helps please accept solution. Thanks
... View more
08-26-2023
06:54 AM
Hi @TonyPath , I noticed that the added dynamic property initial.maxvalue.<max_column_name> accept expression language (EL) so since the GenerateTableFetch can take incoming relationship maybe you can use ExecuteSQLRecord to fetch the initial max value from sql into json format then store it in a flow file attribute using EvaluateJsonPath processor which you can reference to set the initial.maxvalue.<max_column_name> property value as follows: ExecuteSQLRecord Config: EvaluateJsonPath Config: GenerateTableFetch Config: If that helps please accept solution. Thanks
... View more
08-22-2023
08:51 AM
1 Kudo
Hi @Anderosn , If I understood you correctly then you are trying to duplicate the flowfile so that it can be sent to different processors, is that right? if that is the case then you can easily drag the same relationship multiple times from a given processor, so lets assume in the upstream processor where you are getting the result flowfile is sending this flowfile to the success relationship, then you can drag two success relationship to different downstream processors and process the same content differently in parallel. If that helps please accept solution. Thanks
... View more
08-17-2023
11:22 AM
1 Kudo
How are you authenticating? Are you using OAuth2 token for example? If so see if you can take advantage of the "OAuth2 Access Token provider" in the InvokeHttp processors where you set and configure your token provider as a service: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-oauth2-provider-nar/1.22.0/org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider/index.html If that helps please accept solution. Thanks
... View more
08-17-2023
05:37 AM
2 Kudos
Hi @scoutjohn , Have you looked into ControlRate processor? https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.12.1/org.apache.nifi.processors.standard.ControlRate/ In your case the processor configuration would be something like this: If that helps please accept solution. Thanks
... View more
08-16-2023
10:46 PM
2 Kudos
Hi Everyone,
I ran into a situation where I have to the following XML Input:
<workspace id="1234">
<documents totalCount="2">
<document id="5555" title="document1" revision="1">
<file name="file1.pdf" type="pdf"/>
<attributes>
<attribute name="custAtt1" value="v1"/>
<attribute name="custAtt2" value="v2"/>
<attribute name="custAtt3" value=""/>
</attributes>
</document>
<document id="6666" title="document2" revision="0">
<file name="file2.xlsx" type="xlsx"/>
<attributes>
<attribute name="custAtt1" value="v1"/>
<attribute name="custAtt2" value="v2"/>
<attribute name="custAtt3" value="v3"/>
</attributes>
</document>
</documents>
</workspace>
Here, each /workspace/documents/document record needed to be split and transformed into JSON, where:
Each document object needs to have the workspace ID it belongs to.
Each document attribute value will be assigned a key in the format of document_[AttributeName].
Each document file attribute value will be assigned a key in the format of file_[AttributeName].
Each document custom attribute value will be assigned a key in the format custom_[AttributeName]. Empty custom attributes should be ignored.
For example, the first document record will look like the following in JSON:
{
"workspace_id": "1234",
"document_id": "5555",
"document_title": "document1",
"document_revision": "1",
"file_name": "file1.pdf",
"file_type": "pdf",
"custom_custAtt1": "v1",
"custom_custAtt2": "v2"
}
Traditional Approach:
The first approach that came to mind is to use the traditional processors of SplitXML, ConvertRecord and finally use some JoltTransformJson to flatten the JSON and re-map values to the proper keys. However, the jolt is not going to be that straightforward given the complexity of the XML which makes the result of the ConvertRecord processor hard to predict.
EvaluateXQuery to the Rescue:
After researching Xquery syntax, the EvaluateXQuery processor, and some testing, I found that it can be used as a once processor to split, convert, and transform the input XML to the required JSON format. XQuery in general can support multiple data types of output: XML, HTML, and Text. We can utilize the Text data type (Output:Method property in the EvaluateXQuery processor) to produce any format we like, which is JSON in our case. In Xquery syntax, you can use XPath to access elements\attributes values in an XML structure, and it has a lot of built-in functions like "string-join" and "concat" besides variable declaration, all of which can help in the required transformation. The beauty of EvaluateXQuery is that it will return new flowfile content (or attributes, depending on the Destination property setting) for each query result. For example, if we use a for loop to iterate through the different XML child elements and use a "return" statement in the iteration body, every return will result in a newly created flowfile (or nth attribute) in the Matched relationship. This serves as the needed split operation. Without further due, here is how the EvaluateXQuery is configured in my case:
The XQueryValue is a dynamic property which contains the following Xquery:
let $workspaceId := workspace/@id
for $doc in /workspace/documents/document
let $workspace_id := concat('"workspace_id":"',$workspaceId,'"')
let $doc_attr := string-join( for $x in $doc/@* where $x!=''
return
concat(',"document_',name($x),'":"',data($x),'"')
,'')
let $doc_file := string-join( for $x in $doc/file/@*
return
concat(',"file_',name($x),'":"',data($x),'"')
,'')
let $doc_custom:= string-join( for $x in $doc/attributes/attribute where $x/@value!=''
return
concat(',"custom_', $x/@name,'":"',$x/@value,'"')
,'')
let $doc_json:= string-join(
("{",$workspace_id,$doc_attr,$doc_file,$doc_custom,"}")
,'')
return $doc_json
Performance Analysis:
Despite the fact that the EvaluateXQuery in this case compensated for split, convertrecord, and Jolt, I had to make sure that it was still performant in comparison. To do that, I created the following scenarios to test against the same input (all executed on the primary node):
SplitXML -> ConvertRecord -> FlattenJson -> LogMessage
EvaluateXQuery -> LogMessage
The first scenario, of course, is much simplified just to see if EvaluateXQuery still performs better.
After doing stress testing (back pressure was applied), the average Lineage Duration at the end of each flow came out as follows:
First Scenario (split, convert, and transform):
Second Scenario (EvaluateXQuery):
We can see that Max/Mean Values for the EvaluateQuery are much better than traditional split, convert, and transform processors - 00:00:05.811 / 00:00:00.066 vs. 00:00:30.258 / 00:00:00.360
Conclusion:
Using the EvaluateXQuery simplified our data flow and gained us better performance when using the split, convertrecord, and transform processors. However, one needs to be careful when using such a processor, as it doesn’t guarantee valid JSON output, since the output method is set to text. Understanding the schema of your input XML and using XQuery functions like replace to accommodate for any invalid characters can help you avoid failures in the downstream processors.
Thanks for reading.
Please let me know if you have any comments or feedback.
Samer Alisaleh
... View more
Labels: