Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Need help with Database migration with NiFi 1.9.2

Need help with Database migration with NiFi 1.9.2

I'm trying to migrate data from an Oracle Database to a PostGres Database.  I'm using an example I found online, but it's very vague on how the Processors need to be set up.  The red text under the graphic is the instructions on the diagram.

Migration process group.PNG

Get Table List

  • ListDatabaseTables: There may be a need to migrate all tables or some of the tables only. So, the initial task is to list the tables to migrate. Use ListDatabaseTables processor to fetch the list of tables to be migrated. Edit the properties of the processor to connect to source database and filter out the list of tables. Properties: Catalog, Schema Pattern, Table Name Pattern and Table Types to filter out the list of tables. The processor outputs a single flowfile for each table in the list, and to each flowfile, adds the following properties: db.table.name, db.table.catalog, db.table.schema, db.table.name. This processor adds the details of source database and table associated with the flowfile.
  • UpdateAttribute: The ListDatabaseTables processor adds the table properties for the source database system. To add the destination database properties, use UpdateAttrbiute processor. Add following properties: ‘destination_table_catalog’ and ‘destination_table_schema’. Table name will be same as in the source database (db.table.name).

 Create Table

  • ExecuteSQL: After each flowfile has the information of the source database, the destination database, and the table to migrate, the next step is to create the table in the destination database. Use the ExecuteSQL processor to fetch the table schema of the table from the source database. The SQL statement to fetch the table schema may vary as per the type of the source database. The ExecuteSQL outputs the Avro flowfile which contains table schema.
  • ConvertAvroToJson: The Avro format flowfile is converted to Json format using the ConvertAvroToJson processor.
  • ExecuteScript: Use ExecuteScript processor to generate Data Definition Language (DDL) statement to create table for the destination database. Through the ExecuteScript processor, scripting language such as python, groovy, ruby etc. can be used to read the Json format schema of the table and create the DDL statement.
  • PutSQL: To execute the DDL statement to create table in the destination database, use PutSQL processor.

Extract Data

  • ExecuteSQL: Use ExecuteSQL processor to fetch table data from the source database. Add the SQL select statement ‘select * from ${db.table.catalog}.${db.table.schema}.${db.table.name};’.  This processor outputs the Avro format flowfile with all the data of the source table. Alternatively, GenerateTableFetch and QueryDatabase processors can be used to extract data, which will be discussed in the next blog post.

Ingest Data

  • PutDatabaseRecord: The PutDatabaseRecord processor reads the Avro data, creates a batch upload and uploads the data to the table created in the destination table. Data ingestion can also be performed using ConvertAvroToJson and PutSQL processor. However, PutDatabaseRecord is better and faster way to upload data. PutSQL processor can possibly be used if the extracted data requires some transformations.

First of all, both of my DB Connections are working, I've verified them.  I have my ListDatabaseTables set up to get 4 tables, and I can view the table names in the view state of the processor.  The UpdateAttribute processor has had 2 attributes added per the instructions.  I don't really know what else to do with it.  4 flow files are created though and flow through successfully.  The next processor, ExecuteSQL, uses the following SQL Select Query:  SELECT DBMS_METADATA.GETDDL('TABLE',u.table_name) FROM USER_ALL_TABLES u WHERE u.nested='NO' AND (u.iot_type is null or u.iot_type='IOT')

I don't see where I'm getting any output, and when I run the SQL in SQL Developer, I get nothing.  Again though, 4 flow files successfully pass through.  The ConvertAvroToJSON processor, I just used the defaults.  Again, 4 flow files successfully pass through.  I don't know what I'm doing, I'm very new to this.  I'm looking at the data provenance and see things happening, but I really don't know what is happening and I can't find out how to view the contents of the flow files to see what is being passed through.  It has the appearance of working, but how do I prove it, and what am I doing wrong?  I haven't gotten past the ConvertAvroToJSON processor portion of the example because I've stopped there.  However, the ExecuteScript processor has an error in it because I have no script, again, still learning!

3 REPLIES 3
Highlighted

Was told you were the one to ask.

Bill,

I received some sort of graphic on my NiFi question in the Cloudera Community board that said you were the person to go to.  https://community.cloudera.com/t5/Support-Questions/Need-help-with-Database-migration-with-NiFi-1-9-....  Would you mind taking a look and letting me know what I'm missing?

 

Thanks,

 

Doug

Highlighted

Re: Was told you were the one to ask.

Community Manager

Assuming you're referring to the icon to the right of the text last edited on ‎12-05-2019 10:35 PM by, all it it telling you is that I (a moderator) edited your post. In this case, it was to add the label at the top so that our internal search index creator knows that your question is related to NiFi.

 

 

Bill Brooks, Community Manager
Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Highlighted

Re: Need help with Database migration with NiFi 1.9.2

Master Guru

@wann_douglas 

 

You can download and view the data from any connection to inspect your data as it passed through your dataflow.  You can also do the same by looking at each reported provenance event; however, access to view or download the content of a FlowFile  via a Provenance event is only possible if that cntent still exists in the NiFi content_repository.

Here is my suggestion:

1. Stop all the processors in your dataflow
2. Start only the first processor and you will see data queue on the connection leading to the next processor.
3. Right click on the connection with the queued data and select "List Queue" form the context menu that is displayed.
4. You can click on the "view details' icon to far left side of any listed FlowFile from the table displayed
5. From the "FlowFile" UI that is displayed you can select the "Download" or "View" buttons to get access to the content as it exists at this point in your dataflow.

When you are done examining the content, repeat steps 3-5 after starting the next processor in your dataflow.  This allows you to see how your content is changing s it progresses through your dataflow  one processor at a time.

Hope this helps you,

Matt

Don't have an account?
Coming from Hortonworks? Activate your account here