Support Questions

Find answers, ask questions, and share your expertise

Get multiple spark jobs started in parallel using Nifi

avatar

Hi,

I have a scenario and I would like to know your suggestions on how I can achieve it in Nifi.

Step1: I got to query a hive table and get the list of values from a particular column

Step2: I have a spark job that should be executed in a way that it takes these column values as one of the parameters to spark submit job. And, these spark jobs have to be executed in parallel.

So, today if the query result gives me two values for the column that I queried, flow should trigger two spark submit jobs that run in parallel. And tomorrow if the result gives me 10 values for the queried column, 10 jobs should start in parallel. Ofcourse, I understand that when the resources are not available, it cannot start all the jobs.

Please advise.

On a different note, I would like to know how does a processor typically deal with incoming flow files. Does it process one flow file after the other or does it take a set of flow files and execute all of them in parallel?

Thanks,

John

1 ACCEPTED SOLUTION

avatar
Contributor

Hi John,

One idea for acheiving your scenario would be the following flow:

SelectHiveQL -> ConvertAvrotoJSON -> SplitText -> EvaluateJSONPath -> ExecuteStreamCommand

or

SelectHiveQL -> SplitAvro -> ConvertAvrotoJSON -> EvaluateJSONPath -> ExecuteStreamCommand

  1. Your hive query runs and returns the result set as a flowfile with avro format.
  2. Convert the flowfile from avro to json
  3. Split the JSON flowfile into multiple flowfiles, with 1 flowfile per row in result set. To do this in the SplitText processor, set the property "Line Split Count" to 1
  4. Use the EvaluateJSONPath processor to extract the value from the json object and write it to the flowfile's attribute.
  5. In the ExecuteStreamCommand processor, pass the values extracted from the EvaluateJSONPath processor to the property "Command Arguments" using NiFi's Expression Language - example: ${attribute_name}

To increase the number of times the ExecuteStreamCommand processor can be executed concurrently, you can adjust the "Concurrent Tasks" setting on the processor's scheduling tab. However, this is not a dynamic property - i.e - if today, "Concurrent Tasks" is set to 10 and tomorrow 11 rows are returned from the hive query only 10 of the 11 flowfiles will be executed concurrently.

View solution in original post

5 REPLIES 5

avatar
Contributor

Hi John,

One idea for acheiving your scenario would be the following flow:

SelectHiveQL -> ConvertAvrotoJSON -> SplitText -> EvaluateJSONPath -> ExecuteStreamCommand

or

SelectHiveQL -> SplitAvro -> ConvertAvrotoJSON -> EvaluateJSONPath -> ExecuteStreamCommand

  1. Your hive query runs and returns the result set as a flowfile with avro format.
  2. Convert the flowfile from avro to json
  3. Split the JSON flowfile into multiple flowfiles, with 1 flowfile per row in result set. To do this in the SplitText processor, set the property "Line Split Count" to 1
  4. Use the EvaluateJSONPath processor to extract the value from the json object and write it to the flowfile's attribute.
  5. In the ExecuteStreamCommand processor, pass the values extracted from the EvaluateJSONPath processor to the property "Command Arguments" using NiFi's Expression Language - example: ${attribute_name}

To increase the number of times the ExecuteStreamCommand processor can be executed concurrently, you can adjust the "Concurrent Tasks" setting on the processor's scheduling tab. However, this is not a dynamic property - i.e - if today, "Concurrent Tasks" is set to 10 and tomorrow 11 rows are returned from the hive query only 10 of the 11 flowfiles will be executed concurrently.

avatar

Thanks for your reply Paras. Currently I designed the flow as SelectHiveQL(reading as csv instead of default avro)->SplitText(By Line) ->ExtractText (Here assigning content of the split files to an attribute). This is good so far. Every value of my query result is associated with a flow file attribute. And, I hope this is what you were also mentioning; but in a different way.

Now the question is about the ExecuteStreamProcessor where I pass the flow file attribute to command arguments. So, here could you please clarify if one task handles one spark-submit command with attribute from one flowfile taken at a time?

Is my understanding correct?

I remember reading somewhere that one task in Nifi can process multiple flow files at a time. So, wanted to understand how the flowfiles are handled by Nifi processor tasks.

Regards,

John

avatar
Contributor

For the ExecuteStreamCommand, 1 task would execute your shell script with the attribute value from 1 flowfile. If the "Concurrent Tasks" setting was bumped up to 10 then 10 tasks would be executed concurrently, i.e - your shell script would be executed 10 times in parallel with each execution being executed with the attribute from 1 of the 10 flowfiles.

Certain processors will process multiple flowfiles at the same time without having to increase the value for "Concurrent Tasks". One example is the PutSQL processor which can batch multiple Insert/Update statements (flowfiles) in 1 database transaction.

However, for the ExecuteStreamCommand processor, 1 flowfile = 1 task and flowfiles will be processed sequentially. If a flowfile is still being processed by the processor and "Concurrent Tasks" = 1, the other 9 flowfiles will wait in the queue.

Let me know if that clears things up!

avatar

Yes Paras. It is clear now. Thanks.

However, any inputs on the following is highly appreciated.

Currently, I have Nifi running on an edge node that has 4 cores. Say I have 20 incoming flow files and I give concurrent tasks as 10 for ExecuteStreamCommand processor, does it mean I get only concurrent execution or both concurrent and parallel execution?

avatar
Contributor

Setting concurrent tasks to 10 will give the processor the ability to request up to 10 threads from the NiFi controller, so that 10 flowfiles can be processed concurrently.

However, since you have NiFi running on a single node, an individual flowfile will not be processed in parallel - i.e. - subsets of the flowfile's data are not processed independently in parallel.

If you have NiFi running in a cluster with more than 1 node, the data can be divided among the nodes and processed in parallel.