Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

NIFI - how to insert distinct data from the flow and refer to that data ID in other places

avatar
Contributor

Hello, I'm trying to learn NIFI so this is all new to me, I used to work with Talend and I have hard time translating to NIFI. So the main idea: For example is I have two tables in Postgresql

 

Table CITY :

ID (auto generated), city_name

Table PERSON :

ID (auto generated), first_name, last_name, city_id

 

and I have a CSV file :

first_name, last_name, city_name

 

Can you please explain how I can insert in tow tables from one flowfile and refer in the table PERSON to the ID of the city not the name from the table CITY.

Thank you.

2 ACCEPTED SOLUTIONS

avatar
Super Guru

Hi,

Since you are looking for the DB auto generated ID for the City , your best option is to use the LookupRecord Processor , so the flow if your datapipeline will look something like this:

 

GetFile Processor (To the content of the CSV)

SplitRecord ( to split Records in the CSV)

[Insert the City Info to the DB First]

LookupRecord ( to get the ID of the City and enrich you CSV with ID. The Lookup Service should be DB Lookup Service that points to the City Table and Use City Name as Key)

[Finally Insert the Person Information with City ID added to the original csv record]

 

To Learn more about LookupRecord, you can watch this video:

https://www.youtube.com/watch?v=bSJ5reO8AA4

 

 

To learn more about the L

View solution in original post

avatar

@FediMannoubi Below is a basic approach to solve.  Assuming both postgres tables are populated with rows per your example, your nifi flow would need to get the CSV (various ways to do that), once the contents of the csv are in a flowfile (i use GenerateFlowFile processor), you can use a RecordReader based processor to read the csv.  This will allow you to write SQL against the flowfile with QueryRecord to get a single value.  For example: 

SELECT city_name FROM FLOWFILE

 Next, in your flow you will need to get the city_name value into an attribute, i use EvaluateJsonPath.  After that a ExecuteSQL processor and associated DBCP Connection pool to postgres.  Then in ExecuteSQL your query is 

SELECT city_id FROM CITY WHERE city_name=${city_name}

At the end of this flow you will have the city_name from csv, and city_id from postgres.  You can now combine or use the further downstream to suit your needs.   INSERT is done similarly, once you have the data in flowfiles, or attributes, using the same ExecuteSQL you write an insert instead.


My test flow looks like this, but forgive the end, as I did not actually have a postgres database setup.

 

Screen Shot 2022-05-24 at 8.53.49 AM.png

You can find this sample flow [here].

 

I hope this gets you pointed in the right direction for reading csv and querying data from database.

View solution in original post

2 REPLIES 2

avatar
Super Guru

Hi,

Since you are looking for the DB auto generated ID for the City , your best option is to use the LookupRecord Processor , so the flow if your datapipeline will look something like this:

 

GetFile Processor (To the content of the CSV)

SplitRecord ( to split Records in the CSV)

[Insert the City Info to the DB First]

LookupRecord ( to get the ID of the City and enrich you CSV with ID. The Lookup Service should be DB Lookup Service that points to the City Table and Use City Name as Key)

[Finally Insert the Person Information with City ID added to the original csv record]

 

To Learn more about LookupRecord, you can watch this video:

https://www.youtube.com/watch?v=bSJ5reO8AA4

 

 

To learn more about the L

avatar

@FediMannoubi Below is a basic approach to solve.  Assuming both postgres tables are populated with rows per your example, your nifi flow would need to get the CSV (various ways to do that), once the contents of the csv are in a flowfile (i use GenerateFlowFile processor), you can use a RecordReader based processor to read the csv.  This will allow you to write SQL against the flowfile with QueryRecord to get a single value.  For example: 

SELECT city_name FROM FLOWFILE

 Next, in your flow you will need to get the city_name value into an attribute, i use EvaluateJsonPath.  After that a ExecuteSQL processor and associated DBCP Connection pool to postgres.  Then in ExecuteSQL your query is 

SELECT city_id FROM CITY WHERE city_name=${city_name}

At the end of this flow you will have the city_name from csv, and city_id from postgres.  You can now combine or use the further downstream to suit your needs.   INSERT is done similarly, once you have the data in flowfiles, or attributes, using the same ExecuteSQL you write an insert instead.


My test flow looks like this, but forgive the end, as I did not actually have a postgres database setup.

 

Screen Shot 2022-05-24 at 8.53.49 AM.png

You can find this sample flow [here].

 

I hope this gets you pointed in the right direction for reading csv and querying data from database.