Created 10-02-2017 04:19 PM
Hi,
I have a scenario and I would like to know your suggestions on how I can achieve it in Nifi.
Step1: I got to query a hive table and get the list of values from a particular column
Step2: I have a spark job that should be executed in a way that it takes these column values as one of the parameters to spark submit job. And, these spark jobs have to be executed in parallel.
So, today if the query result gives me two values for the column that I queried, flow should trigger two spark submit jobs that run in parallel. And tomorrow if the result gives me 10 values for the queried column, 10 jobs should start in parallel. Ofcourse, I understand that when the resources are not available, it cannot start all the jobs.
Please advise.
On a different note, I would like to know how does a processor typically deal with incoming flow files. Does it process one flow file after the other or does it take a set of flow files and execute all of them in parallel?
Thanks,
John
Created 10-02-2017 07:37 PM
Hi John,
One idea for acheiving your scenario would be the following flow:
SelectHiveQL -> ConvertAvrotoJSON -> SplitText -> EvaluateJSONPath -> ExecuteStreamCommand
or
SelectHiveQL -> SplitAvro -> ConvertAvrotoJSON -> EvaluateJSONPath -> ExecuteStreamCommand
To increase the number of times the ExecuteStreamCommand processor can be executed concurrently, you can adjust the "Concurrent Tasks" setting on the processor's scheduling tab. However, this is not a dynamic property - i.e - if today, "Concurrent Tasks" is set to 10 and tomorrow 11 rows are returned from the hive query only 10 of the 11 flowfiles will be executed concurrently.
Created 10-02-2017 07:37 PM
Hi John,
One idea for acheiving your scenario would be the following flow:
SelectHiveQL -> ConvertAvrotoJSON -> SplitText -> EvaluateJSONPath -> ExecuteStreamCommand
or
SelectHiveQL -> SplitAvro -> ConvertAvrotoJSON -> EvaluateJSONPath -> ExecuteStreamCommand
To increase the number of times the ExecuteStreamCommand processor can be executed concurrently, you can adjust the "Concurrent Tasks" setting on the processor's scheduling tab. However, this is not a dynamic property - i.e - if today, "Concurrent Tasks" is set to 10 and tomorrow 11 rows are returned from the hive query only 10 of the 11 flowfiles will be executed concurrently.
Created 10-02-2017 09:49 PM
Thanks for your reply Paras. Currently I designed the flow as SelectHiveQL(reading as csv instead of default avro)->SplitText(By Line) ->ExtractText (Here assigning content of the split files to an attribute). This is good so far. Every value of my query result is associated with a flow file attribute. And, I hope this is what you were also mentioning; but in a different way.
Now the question is about the ExecuteStreamProcessor where I pass the flow file attribute to command arguments. So, here could you please clarify if one task handles one spark-submit command with attribute from one flowfile taken at a time?
Is my understanding correct?
I remember reading somewhere that one task in Nifi can process multiple flow files at a time. So, wanted to understand how the flowfiles are handled by Nifi processor tasks.
Regards,
John
Created 10-02-2017 11:42 PM
For the ExecuteStreamCommand, 1 task would execute your shell script with the attribute value from 1 flowfile. If the "Concurrent Tasks" setting was bumped up to 10 then 10 tasks would be executed concurrently, i.e - your shell script would be executed 10 times in parallel with each execution being executed with the attribute from 1 of the 10 flowfiles.
Certain processors will process multiple flowfiles at the same time without having to increase the value for "Concurrent Tasks". One example is the PutSQL processor which can batch multiple Insert/Update statements (flowfiles) in 1 database transaction.
However, for the ExecuteStreamCommand processor, 1 flowfile = 1 task and flowfiles will be processed sequentially. If a flowfile is still being processed by the processor and "Concurrent Tasks" = 1, the other 9 flowfiles will wait in the queue.
Let me know if that clears things up!
Created 10-03-2017 05:09 PM
Yes Paras. It is clear now. Thanks.
However, any inputs on the following is highly appreciated.
Currently, I have Nifi running on an edge node that has 4 cores. Say I have 20 incoming flow files and I give concurrent tasks as 10 for ExecuteStreamCommand processor, does it mean I get only concurrent execution or both concurrent and parallel execution?
Created 10-03-2017 11:17 PM
Setting concurrent tasks to 10 will give the processor the ability to request up to 10 threads from the NiFi controller, so that 10 flowfiles can be processed concurrently.
However, since you have NiFi running on a single node, an individual flowfile will not be processed in parallel - i.e. - subsets of the flowfile's data are not processed independently in parallel.
If you have NiFi running in a cluster with more than 1 node, the data can be divided among the nodes and processed in parallel.