Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Schedule a single nifi process group for multiple tables in a database

avatar
Rising Star

@Matt Clarke @Matt Burgess @Shu

I have created a process group with following requirements in Nifi :

Fetch data from hive table >> Encrypt content >> upload to azure blob storage .

Now I have 3000 tables for which the above flow needs to be scheduled . Is there any way to use only single flow for all the tables instead of creating 3000 flows for each table .

77381-capture1.png

Also I want to execute the azure storage for some of the tables not for all . Is there any way to give instruction in the flow based on any condition that Table 1 should go to only gcloud and not on Azure . Similarly I want Table 2 to go to both azure and gcloud.

Thanks In Advance

1 ACCEPTED SOLUTION

avatar
Master Guru
@aman mittal

Yes, it's possible.

Take a look into the below sample flow

77393-flow.png

Flow overview:

1.SelectHiveQL //to list tables from specific database in avro format

HiveQL Select Query

show tables from default //to list all tables from default database

2.ConvertAvroToJson //to convert the list of tables from avro format to json format

3.SplitJson //split each table into individual flowfiles

4.EvaluateJsonPath //extract tab_name value and keep as attribute to the flowfile

5.RemoteProcessorGroup //as you are going to do for 3k tables it's better to use RPG for distributing the work.

if you don't want to use RPG then skip both 5,6 processors feed success relationship from 4 to 7.

6.InputPort //get the RPG flowfiles

7.SelectHiveQL //to pull data from the hive tables

8.EncryptContent

9.RouteOnAttribute //as selecthiveql processor writes query.input.tables attribute, so based on this attribute and NiFi expression language add two properties in the processor.

Example:

azure

${query.input.tables:startsWith("a")} //only tablenames starts with a

gcloud

${query.input.tables:startsWith("e"):or(${query.input.tables:startsWith("a")})} //we are going to route table names starts with e(or)a to gcloud 

Feed the gcloud relationship to PutGCSobject processor and azure relationship to PutAzureBlobStorage processor.

Refer to this link for NiFi expression language and make your expression that can route only the required tables to azure,gcs.

In addition i have used only single database to list all the tables but if your 3k tables are coming from different databases then use GenerateFlowfile processor and add all the list of databases.Extract each database name as attribute --> feed the success relationship to SelectHiveQL processor.

Refer to this link dynamically pass database attribute to first select hiveql processor.

Reference flow.xml load-hivetables-to-azure-gcs195751.xml

-

If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.

View solution in original post

1 REPLY 1

avatar
Master Guru
@aman mittal

Yes, it's possible.

Take a look into the below sample flow

77393-flow.png

Flow overview:

1.SelectHiveQL //to list tables from specific database in avro format

HiveQL Select Query

show tables from default //to list all tables from default database

2.ConvertAvroToJson //to convert the list of tables from avro format to json format

3.SplitJson //split each table into individual flowfiles

4.EvaluateJsonPath //extract tab_name value and keep as attribute to the flowfile

5.RemoteProcessorGroup //as you are going to do for 3k tables it's better to use RPG for distributing the work.

if you don't want to use RPG then skip both 5,6 processors feed success relationship from 4 to 7.

6.InputPort //get the RPG flowfiles

7.SelectHiveQL //to pull data from the hive tables

8.EncryptContent

9.RouteOnAttribute //as selecthiveql processor writes query.input.tables attribute, so based on this attribute and NiFi expression language add two properties in the processor.

Example:

azure

${query.input.tables:startsWith("a")} //only tablenames starts with a

gcloud

${query.input.tables:startsWith("e"):or(${query.input.tables:startsWith("a")})} //we are going to route table names starts with e(or)a to gcloud 

Feed the gcloud relationship to PutGCSobject processor and azure relationship to PutAzureBlobStorage processor.

Refer to this link for NiFi expression language and make your expression that can route only the required tables to azure,gcs.

In addition i have used only single database to list all the tables but if your 3k tables are coming from different databases then use GenerateFlowfile processor and add all the list of databases.Extract each database name as attribute --> feed the success relationship to SelectHiveQL processor.

Refer to this link dynamically pass database attribute to first select hiveql processor.

Reference flow.xml load-hivetables-to-azure-gcs195751.xml

-

If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.