Created on 03-05-2018 07:57 AM - edited 08-17-2019 05:10 PM
Hi Team,
I started exploring Nifi. I created my first workflow to read the data from MySQL database and put that data into S3 bucket. So far everything is going fine and I'm able to read the data and place in S3. I hard coded the properties(Table Name, Database and few other) . Now I want to make workflow to run for multiple tables, so I need to read the properties from some configuration and pass table one by one to the workflow. I'm using QueryDatabaseTable processor to achieve the requirement. Could someone help me out on reading the comma separated table names and pass this table names one by one to QueryDatabaseTable as an argument to pull the data.
Created 03-05-2018 02:42 PM
QueryDatabaseTable does not support incoming connections, so you wouldn't be able to support multiple tables. The "Table Name" property does support NiFi Expression Language, but that is so you can migrate flows from dev to test to production using different table names, each environment would have its own (static) variable set.
Instead, you can use GenerateTableFetch, it supports incoming connections and thus you can use flow file attributes in the expression(s) for Table Name, Columns to Return, Maximum-value Columns, etc. It works like QueryDatabaseTable, but instead of generating and executing the SQL, it only generates the SQL statements. This allows you to send the statements downstream to something like ExecuteSQL, possibly distributing the flow files among nodes in the cluster (using a Remote Process Group -> Input Port, if you have a cluster vs a standalone NiFi instance).
You can populate your incoming flow files from wherever you get your configuration (ListFile -> FetchFile if it is a file on disk, ListDatabaseTables if you want to get a list of tables from the database itself).
Created 03-05-2018 02:42 PM
QueryDatabaseTable does not support incoming connections, so you wouldn't be able to support multiple tables. The "Table Name" property does support NiFi Expression Language, but that is so you can migrate flows from dev to test to production using different table names, each environment would have its own (static) variable set.
Instead, you can use GenerateTableFetch, it supports incoming connections and thus you can use flow file attributes in the expression(s) for Table Name, Columns to Return, Maximum-value Columns, etc. It works like QueryDatabaseTable, but instead of generating and executing the SQL, it only generates the SQL statements. This allows you to send the statements downstream to something like ExecuteSQL, possibly distributing the flow files among nodes in the cluster (using a Remote Process Group -> Input Port, if you have a cluster vs a standalone NiFi instance).
You can populate your incoming flow files from wherever you get your configuration (ListFile -> FetchFile if it is a file on disk, ListDatabaseTables if you want to get a list of tables from the database itself).
Created on 03-06-2018 12:06 PM - edited 08-17-2019 05:10 PM
Thanks a lot for the suggestions. As per your suggestion, I made changes and the flow now looks like below.
Here, Input table has 2500+ records and I'm splitting the data into 400 records per file.
The GenerateFlow Table is generating 7 files and for all the files it's giving the same name(not sure how the filename is generating by default). As part of later stages(PutFile) I'm writing this data(files) into disk, It's giving error saying it can't write the same file name 7 times(only one file writing to disk and failing to write remaining 6 files). I want to give different file names for each file. Any suggestion please.
screen-shot-2018-03-06-at-52723-pm.png
Created 03-06-2018 06:14 PM
The default filename is the timestamp when they were created, so since they are created quickly I'm not totally surprised they have the same filename. However you can use the flow file's UUID as the filename, that is guaranteed to be unique. You can set the filename with an UpdateAttribute processor, add a user-defined property with key "filename" and value "${UUID()}".
Created 03-07-2018 06:26 AM
@Matt Burgess, Thanks a lot for the quick response. It worked.
Created 04-17-2020 04:37 AM
Hi,
Is there a way to dynamically specify primary key column name as maximum-value column value in Generate Table Fetch processor?
Created 04-17-2020 08:07 AM
You can use Expression Language in the Max-Value Columns property to set them per-flowfile, but there currently isn't any way to fetch the primary key column(s) from the database and use those as the max-value columns. You could do that in upstream processors though, then set an attribute to those columns and pass that into GenerateTableFetch.
Created 04-20-2020 01:44 AM
Hi @mburgess ,
Actually i'm implementing a solution where i'm fetching data from database dynamically from all tables. I used list database table to get table name and passed it to generate table fetch to fetch sql query. now i need to have column name from different table (having different primary key column names ) as maximum-value columns. I'm not sure how to write dynamic expression language for each columns per table in single processor.
Created 04-29-2020 10:59 PM
Hi Rishabh,
Were you able to solve your issue?
Created 04-29-2020 11:12 PM
I used python script in execute script to fetch column names for each flow file ( each table) and added new attribute with value as column name. Not best of the solution but that is best i tried.