Support Questions
Find answers, ask questions, and share your expertise

Duplicate Key value violates unique constraint error in Apache Ni-Fi pipeline to Timescale(postgreSQL) database

Explorer
I have created an Apache Ni-Fi pipeline to transfer data from a MySQL database in an AWS RDS instance into a Timescale database (PostgreSQL) located in my local Linux Ubuntu instance running on VirtualBox on top of Windows 10.
 

The target table has a composite primary key and has been created as follows:

 

CREATE TABLE periodic_measurements (
time_stamp TIMESTAMP WITH TIME ZONE NOT NULL,
original_db SMALLINT NULL,
original_id BIGINT NULL,
apartment_id INTEGER NOT NULL,
room_id INTEGER NOT NULL,
sensor_id INTEGER NOT NULL,
metric SMALLINT NOT NULL,
mvalue NUMERIC NULL,
PRIMARY KEY (time_stamp, apartment_id, room_id, sensor_id, metric));

Here are some details about the ETL data pipeline:

 

  • The MySQL source database has a timestamp column called period with datatype datetime, which therefore does not contain natively the time zone. However, the implementation uses Django framework which is configured to use time zone (USE_TZ=True).
  • In the first processor of the Ni-Fi pipeline (QueryDatabaseTable) the column period is renamed time_stamp and formatted as follows: date_format(period, '%Y-%m-%d %H:%i:%s').
  • The output of the in Ni-Fi QueryDatabaseTable processor comes in AVRO format and the propriety Use Avro Logical Types is set to true. This means that the time_stamp column should be formatted as AVRO long / timestamp-millis.
  • After a series of processors that use AVRO format (UpdateRecord, QueryRecord), the data goes to PutDatabaseRecord processor to be inserted into Timescale (PostgreSQL).

In postgreSQL the time_stamp column has datatype timestamp with time zone, as seen above.

When running the data pipeline, in the data provenance in Apache Ni-Fi I can see the following:

 

  • Data in the output of QueryDatabaseTable (first processor) Obj avro.schemaæ {"type":"record","name":"periodic_measurements","namespace":"any.data","fields":[{"name":"original_id","type":["null","long"]},{"name":"time_stamp","type":["null","string"]},{"name":"apartment_id","type":["null","int"]},{"name":"room_id","type":["null","int"]},{"name":"sensor_id","type":["null","int"]},{"name":"metric","type":["null","int"]},{"name":"valueSum","type":["null","double"]},{"name":"valueDiv","type":["null","double"]}]}

  • { "original_id" : 3341103630, "time_stamp" : "2019-12-08 03:20:00", "apartment_id" : 269, "room_id" : 1204, "sensor_id" : 2562, "metric" : 1, "valueSum" : 12267.1010819471, "valueDiv" : 600.0 },

  • Data in the input of PutDatabaseRecord (last processor) Obj avro.schemaΰ {"type":"record","name":"nifiRecord","namespace":"org.apache.nifi","fields":[{"name":"time_stamp","type":["null","string"]},{"name":"apartment_id","type":["null","int"]},{"name":"room_id","type":["null","int"]},{"name":"sensor_id","type":["null","int"]},{"name":"metric","type":["null","int"]},{"name":"original_db","type":["null","string"]},{"name":"original_id","type":["null","long"]},{"name":"mvalue","type":["null","double"]}]}

  • { "time_stamp" : "2019-08-21 06:30:00", "apartment_id" : 269, "room_id" : 1196, "sensor_id" : 2554, "metric" : 1, "original_db" : "1", "original_id" : 2347470787, "mvalue" : 23.014902551831668 }

Only part of the data is copied to destination since I get the following kind of errors in Ni-Fi:

 

PutDatabaseRecord[id=017e1008-dfd4-1d38-4b7c-6efa32ed611a] Failed to put Records to database for StandardFlowFileRecord[uuid=5ec613aa-b836-437e-a855-31907519e95b,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1649245325204-268, container=default, section=268], offset=472323, length=49990],offset=0,name=6f15f3ef-79f4-43e0-83a9-ee0a2a65d106,size=49990]. Routing to failure.: Batch entry 341 INSERT INTO public.periodic_measurements (time_stamp, apartment_id, room_id, sensor_id, metric, original_db, original_id, mvalue) VALUES ('2019-03-31 04:00:00+03',269,0,4699,2,1,1226523146,94.19718804413162) was aborted: ERROR: duplicate key value violates unique constraint "1551_709_periodic_measurements_pkey" Detail: Key (time_stamp, apartment_id, room_id, sensor_id, metric)=(2019-03-31 04:00:00+03, 269, 0, 4699, 2) already exists. Call getNextException to see other errors in the batch

Note that the duplicate key error occurs at the time of change into daylight saving time (the timezone is UTC+2 without daylight saving time). The most strange thing is that it happens when time is changed one hour forward (not backward).

 

Also, in all the error cases, the composite key has always one field whose value is zero (it was NULL in the source database before the data transformation in the pipeline).

 

When going to the target database, I get the following result:

 

SELECT COUNT(*) FROM periodic_measurements WHERE time_stamp BETWEEN '2019-03-31 03:59:59+03' AND '2019-03-31 04:00:01+03';
 count 
-------
     0
(1 row)

I get the same error also when I use a normal postgreSQL database as destination (with no Timescale hypertable and compression).

 

Note that by using another pipeline (Qlik Replicate), there is no duplicate key error, so the problem is somewhere in Ni-Fi.

 

All this rises the following questions:

 

  • Why in the data provenance the format of the field time_stamp is AVRO string rather than long / timestamp-millis as it should be based on the settings in the property Use Avro Logical Types set to true in QueryDatabaseTable processor?
  • Why in the target database there is no entry with timestamp value 2019-03-31 04:00:00+03 if this value causes a duplicate key error?
  • What is causing the error in Apache Ni-Fi?

Any hints?

 

Resetting the key manually as suggested at postgresql duplicate key violates unique constraint is not an option since the composite key is a natural key formed by timestamp and ids which have a meaning.

 

Used SW versions:

  • Linux Unbuntu: 18.04
  • Apache Ni-Fi: 1.15.2
  • PostgreSQL: 14.2
  • Timescale: 2.6.0

Thanks,

 

Bernardo Di Chiara

0 REPLIES 0
; ;