Support Questions

Find answers, ask questions, and share your expertise

How to Merge Multiple Flow Files - "ExecuteSQL"

avatar
Contributor

Good morning everybody,

 

I have the following scenario of returning an API through two streams =

Flow 1 (

{"cod": 1}

{"cod": 2}
{"cod": 3})

 

Flow 2 (
{"error": "error 1"}
{"error": "error 2"}
{"error": "error 3"})

 

Each error has its cod and I need to send it to my database in this way through a procedure through "ExecuteSQL" =

procedure (cod => '$ {cod}',
                  error => '$ {error}');

 

I am trying to do this through "MergeContent" but it is not working.

 

What Nifi is doing is as follows =

{"cod": 1} {"cod": 2} {"cod": 3} {"error": "error 1"} {"error": "error 2"} {"error": "error 3" }

 

Not dividing the shipments by code, let's say and passing to "ExecuteSQL" all at once, when Cod + Error would have to be sent one at a time.

 

Follow all my flow =

leandrolinof_0-1620413574081.png

 

This is my "MergeContent" = 

leandrolinof_1-1620413665869.png

 

This is my "ExecuteSQL" =

leandrolinof_2-1620413758453.png

 

Can someone help me?

5 REPLIES 5

avatar
Contributor

Good morning all,

 

Can someone help me?

avatar
Contributor

@MattWho  @pvillard @mattw  

 

Please can you help me?

avatar
Contributor

@mburgess can You help me?

avatar
Master Mentor

@leandrolinof 

The NiFi merge processor is working exactly as designed.  Binary concatenation simply appends the content from one FlowFile to the end of the previous FlowFIle's content.

You are trying to perform a special formatted merge.  There is no way to configure the mergeContent processor with custom merge logic.

I am not clear on the what the content looks like at the various stages of your dataflow.  You mention merging FlowFiles from two streams; however, the flow screenshot you shared as 3 flow streams leading to the MergeContent processor.

Are you saying the content of a FlowFile down one path is exactly this:

Flow 1 (
{"cod": 1}
{"cod": 2}
{"cod": 3})

and on the other path it is exactly this:

Flow 2 (
{"error": "error 1"}
{"error": "error 2"}
{"error": "error 3"})

or is it just the json objects without the "flow <num> ( )" wrapped around it?
I am guessing above is what is extracted from the invokeHTTP response content?

What does the FlowFile content look like on that third path before it reaches MergeContent?
I might be helpful with examples of content post InvokeHTTP on all three paths and then what the exact mergeContent based on that example you would want to see.

What logic can be get from the above two flows that tells us the first "cod" goes with the first "error"? 
So if we were to split each into three separate FlowFiles, would there be a way to determine what cod should go with what error?


The ExecuteSQL processor configuration you shared is using NiFi Expression Language (NEL).  NEL does not read from a FlowFiles content.  ${cod} and ${erros} will looks for attributes with these names created on FlowFile and replace that with the values assigned to those attributes.  If those FlowFile Attributes do not exist on the FlowFile, NEL will check the variable registry for those attribute strings.

Hope this helps give you some direction with your dataflow design journey here.
Matt


avatar
Contributor

@MattWho,

 

Good afternoon.

 

I changed my flow to make a counter by the processor "RouteOnAttribute 1.12.1" so every time I have a record to be sent to the API and have a return to store in Oracle, I do the LOOP through RouteOnAttribut. Thus sending one FlowFile at a time to the PROCEDURE call in Oracle by "ExecuteSQL 1.12.1"

 

But what is happening at the moment is that from the second time it goes through the LOOP, it seems to me that the Flow to get the attributes "CODIGO" and "JSON_ORIGINAL" is not passing the values ​​that I requested in the processor "EvaluateJsonPath 1.12.1" passing the parameters as null.

 

Do you have any idea what may be happening in this second flow passage within the LOOP?

 

leandrolinof_0-1621616568030.pngleandrolinof_1-1621616692553.png

 

This is the JSON ORIGINAL of the second stream that I must send to Oracle = 

leandrolinof_2-1621617160891.png

This is OK with the CODE attribute as you can check.

 

This is Split Configure Processor =

leandrolinof_3-1621617509591.png

 

 

This is EvaluateJsonPath 1.12.1 Configure Processor =

leandrolinof_4-1621617541297.png

 

This is MergeContent 1.12.1 Configure Processor =

leandrolinof_5-1621617612452.png

 

 

This is ExecuteSQL 1.12.1 Configure Processor =

leandrolinof_6-1621617679520.png

leandrolinof_7-1621617698264.png