Support Questions
Find answers, ask questions, and share your expertise

Using NiFi to populate data warehouse

Using NiFi to populate data warehouse

New Contributor

Summary

We're trying to used NiFi to copy data from one Postgres db to another and seeing inconsistent results. We have a series of process groups (each composed of ExecuteSQL and PutDatabaseRecord) connected with MergeContent to defer processors until prerequisite tasks have completed. Sometimes tables get populated with empty records, sometimes it works just fine.

Why would it work fine sometimes but not others? The source data has not changed.

Read on for more details...

We are using NiFi to copy data from our app's database to data warehouse database. Both databases are Postgres. A series of tasks needs to happen in sequence to maintain referential integrity:

  1. Truncate tables in data warehouse
  2. Populate tables clients, users, and roles
  3. Populate relational tables clients_users_map (this table contains user_id and client_id pointers to users and clients tables respectively) and users_roles_map (contains user_id and role_id)

I have created process groups for each task item composed of an ExecuteSQL (use avro logical types = true) processor linked to a PutDatabaseRecord processor.

The process groups are connected using MergeContent processors in order to force a subsequent task to wait until prerequisite tasks have completed (setting MergeContent's min/max entries = num of prerequisite tasks). I attempted to use Wait/Notify but that caused subsequent tasks to be invoked multiple times (once for each prerequisite task).

I'm seeing a number of issues when I click start for the entire flow:

Data is only partially populated, sometimes

Occasionally NiFi does not populate all columns in a table with data. For example, clients table gets populated with only an id but rest of columns are empty. Why would this happen sometimes but other times populate all columns as expected?

More complex flows lead to failures

Things seem to work for a simple flow but suddenly start breaking when I add more processors. Simple flow (see attached image) seems to work fine on its own but when expanded with a few additional processors (see 2nd attached image) the populate dim_clients_users_map process group fails with an error like this:

Batch entry 0 INSERT INTO app.dim_clients_users_map (id, client_id, user_id, created_at) VALUES ('1','3','20','2016-05-12 18:04:08.000000 +00:00:00') was aborted. Call getNextException to see the cause.

If I copy and paste that statement into my SQL console I see this error:

[23503] ERROR: insert or update on table "dim_clients_users_map" violates foreign key constraint "dim_clients_users_map_user_id_fkey" Detail: Key (user_id)=(20) is not present in table "dim_users".

Inconsistent results

In this particular instance, when I examined the dim_users table to see if a key for user_id = 20 existed, I found the table was partially populated with only 27 rows containing only id and created_at values. The other columns were empty.

I stopped the flow and ran it again (without changing anything) everything worked fine -- all tasks completed successfully.

But running it a third time, the dim_roles table was empty and corresponding task reported this error:

Record does not have a value for the Required column 'name'

I don't understand why it would work sometimes but not others.

Should I be using MergeContent?

Am I using MergeContent correctly to defer tasks until prerequisites have completed?

Thanks in advance for any help.

87581-process-group.png

87580-expanded-flow.png


simple-flow.png