Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

NiFi loop through a SQL Table

avatar
New Contributor

Hello everyone,

I'm facing a problem in a project that I would like to do in NiFi and I would need some help to it. I have in a Database two tables, one with the name of a client and its ID, and the other with all the information about each client. What I want to do is to retrieve all the information related to each client and put it into a new table that contains all the data from the clients. The steps are the following:

1. Connect to the database and query the table to obtain the clients ID.

2. Take the first ID, query the other table to retrieve the data and put it into the new table.

3. Go through the following client ID until it finishes.

I have done previously some dataflows for copying one table and paste into another, but not to query a table, store its value and use in another query using a loop, so I'll be really grateful if someone could help me.

1 ACCEPTED SOLUTION

avatar
Master Guru

@Rubén Carmona

Run Join on Source side:-

You can use Execute Sql processor and run sql join on two tables on clientID, Select all the required columns that you need on the result set then store the results into HDFS and create hive table on top it.

(or)

Based on the clientid that we got from first table:-

1.Use QueryDatabaseTable/GenerateTableFetch+executeSql processors to pull first table data having clientID,clientName as we are going to get the output data in avro format.
2.Use ConvertAvroToJson processor //now we are going to have json array having all the records in it.
3.SplitJson processor //to split each record into individual flowfileEvaluateJson Path processor //extract the required fields from the json message and keep them as flowfile attributes.
4.ExecuteSql //use the extracted attributes to query the second table select * from tablename where clientid=${clientID}(assuming attributename is clientID in evaluatejsonpath processor)Now we have queried the second table based on clientid,now again convert the output from execute sql processor5.Use ConvertAvroToJson processor //now we are going to have json array having all the records in it.6.SplitJson processor //to split each record into individual flowfile.7.EvaluateJson Path processor //extract the required fields from the json message and keep them as flowfile attributes.8.AttributesTojson processor //to build new json message with all the required attributes.9.inferred avro schema processor //to build avro schema to the json messageConvert JSONtoAvro processor.10.MergeContent processor //use merge strategy as Avro,if you want to merge small flowfiles into one big flowfile11.ConvertAvroToORC processor //convert the merged avro content to orc format.12.PutHDFS processor //store the data into HDFS,create hive table on top of hdfs directory.

(or)

Run Join using Hive:-

1.ingest two tables into HDFS using NiFi, Then create hive tables on top them.
2.once ingestion is completed then Run join on hive tables using PutHiveQl processor and write results to new table(i.e insert into final table select * from table1 join table2 on table1.clientid=table2.clientid)

if we are querying based on the clientid that we got from the first table that process will include lot of processors in NiFi, it's easy to run join on both tables either on source side (or) hadoop side.

View solution in original post

2 REPLIES 2

avatar
Master Guru

@Rubén Carmona

Run Join on Source side:-

You can use Execute Sql processor and run sql join on two tables on clientID, Select all the required columns that you need on the result set then store the results into HDFS and create hive table on top it.

(or)

Based on the clientid that we got from first table:-

1.Use QueryDatabaseTable/GenerateTableFetch+executeSql processors to pull first table data having clientID,clientName as we are going to get the output data in avro format.
2.Use ConvertAvroToJson processor //now we are going to have json array having all the records in it.
3.SplitJson processor //to split each record into individual flowfileEvaluateJson Path processor //extract the required fields from the json message and keep them as flowfile attributes.
4.ExecuteSql //use the extracted attributes to query the second table select * from tablename where clientid=${clientID}(assuming attributename is clientID in evaluatejsonpath processor)Now we have queried the second table based on clientid,now again convert the output from execute sql processor5.Use ConvertAvroToJson processor //now we are going to have json array having all the records in it.6.SplitJson processor //to split each record into individual flowfile.7.EvaluateJson Path processor //extract the required fields from the json message and keep them as flowfile attributes.8.AttributesTojson processor //to build new json message with all the required attributes.9.inferred avro schema processor //to build avro schema to the json messageConvert JSONtoAvro processor.10.MergeContent processor //use merge strategy as Avro,if you want to merge small flowfiles into one big flowfile11.ConvertAvroToORC processor //convert the merged avro content to orc format.12.PutHDFS processor //store the data into HDFS,create hive table on top of hdfs directory.

(or)

Run Join using Hive:-

1.ingest two tables into HDFS using NiFi, Then create hive tables on top them.
2.once ingestion is completed then Run join on hive tables using PutHiveQl processor and write results to new table(i.e insert into final table select * from table1 join table2 on table1.clientid=table2.clientid)

if we are querying based on the clientid that we got from the first table that process will include lot of processors in NiFi, it's easy to run join on both tables either on source side (or) hadoop side.

avatar
New Contributor

@Shu thank you very much for your answer. It's true that the join option is a good solution but, in the case that you have the two tables in different databases (MySQL and Hive, for example) the join won't be possible.

However the solution 'Based on clientID that we got from first table' will surely work. I'm going to try and I'll post here the solution.

Thanks.