Member since
02-15-2021
18
Posts
5
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
10691 | 01-13-2022 06:50 AM | |
3100 | 02-17-2021 05:35 AM |
03-20-2024
03:10 AM
1 Kudo
Hi! I have created a DAG file to control few scripts and an Apache Ni-Fi data pipeline with Apache Airflow. The data pipeline starts with a GenerateTableFetch processor which is followed by a ExecuteSQL and few more processors which have the scope of performing certain data transformations. Then, I use a PutDatabaseRecord processor to load the data into a database. Below is the Ni-FI flow. Airflow tells Ni-FI to start the pipeline by setting the GenerateTableFetch processor into RUN_ONCE state by using a similar approach to the one nicely described in this post: https://towardsdatascience.com/interconnecting-airflow-with-a-nifi-etl-pipeline-8abea0667b8a The same article proposes a method to let Airflow know that the Ni-Fi pipeline is over. That method is based on a UpdateAttribute processor which sets the variable "last_tms" to the current time every time it runs. This processor gets input from the success relationship of the previous processor (PutDatabaseRecord). The problem with that approach is that my pipeline is meant to transfer a huge amount of data and uses multiple flows. So, waiting that the value of "last_tms" changes is not a solution since that will happen many times during the life of the pipeline. Waiting that this value remains the same for some time would require setting a pause timer and this would require knowledge of the maximum possible interval between the end of two flows in the pipeline. This last approach is not very practical since it has the risk of waiting for too long or of continuing with the DAG tasks before the pipeline is over. I wonder if it would be possible to automatize the method I have been using manually by looking Ni-Fi GUI: checking that there are no more active threads and no more queued data in the Ni-Fi pipeline. How I can retrieve this information from Ni-Fi in Airflow? Any other suggestion to solve the issue would be highly appreciated. I run Apache Airflow version 2.6.2 and Apache Ni-Fi 1.23.2 on Java 11.0.22. Both machines run Linux Ubuntu 22.04.3. Thanks, Bernardo
... View more
Labels:
- Labels:
-
Apache Airflow
-
Apache NiFi
09-25-2023
04:56 AM
1 Kudo
@BerniHacker, I did not even take into consideration the state 🙂 I figured you were trying to execute it for the first time and I assumed from the start that you had nothing saved in your state. Congrats on solving your issue.
... View more
04-12-2022
06:47 AM
I have created an Apache Ni-Fi pipeline to transfer data from a MySQL database in an AWS RDS instance into a Timescale database (PostgreSQL) located in my local Linux Ubuntu instance running on VirtualBox on top of Windows 10. The target table has a composite primary key and has been created as follows: CREATE TABLE periodic_measurements (
time_stamp TIMESTAMP WITH TIME ZONE NOT NULL,
original_db SMALLINT NULL,
original_id BIGINT NULL,
apartment_id INTEGER NOT NULL,
room_id INTEGER NOT NULL,
sensor_id INTEGER NOT NULL,
metric SMALLINT NOT NULL,
mvalue NUMERIC NULL,
PRIMARY KEY (time_stamp, apartment_id, room_id, sensor_id, metric)); Here are some details about the ETL data pipeline: The MySQL source database has a timestamp column called period with datatype datetime, which therefore does not contain natively the time zone. However, the implementation uses Django framework which is configured to use time zone (USE_TZ=True). In the first processor of the Ni-Fi pipeline (QueryDatabaseTable) the column period is renamed time_stamp and formatted as follows: date_format(period, '%Y-%m-%d %H:%i:%s'). The output of the in Ni-Fi QueryDatabaseTable processor comes in AVRO format and the propriety Use Avro Logical Types is set to true. This means that the time_stamp column should be formatted as AVRO long / timestamp-millis. After a series of processors that use AVRO format (UpdateRecord, QueryRecord), the data goes to PutDatabaseRecord processor to be inserted into Timescale (PostgreSQL). In postgreSQL the time_stamp column has datatype timestamp with time zone, as seen above. When running the data pipeline, in the data provenance in Apache Ni-Fi I can see the following: Data in the output of QueryDatabaseTable (first processor) Obj avro.schemaæ {"type":"record","name":"periodic_measurements","namespace":"any.data","fields":[{"name":"original_id","type":["null","long"]},{"name":"time_stamp","type":["null","string"]},{"name":"apartment_id","type":["null","int"]},{"name":"room_id","type":["null","int"]},{"name":"sensor_id","type":["null","int"]},{"name":"metric","type":["null","int"]},{"name":"valueSum","type":["null","double"]},{"name":"valueDiv","type":["null","double"]}]} { "original_id" : 3341103630, "time_stamp" : "2019-12-08 03:20:00", "apartment_id" : 269, "room_id" : 1204, "sensor_id" : 2562, "metric" : 1, "valueSum" : 12267.1010819471, "valueDiv" : 600.0 }, Data in the input of PutDatabaseRecord (last processor) Obj avro.schemaΰ {"type":"record","name":"nifiRecord","namespace":"org.apache.nifi","fields":[{"name":"time_stamp","type":["null","string"]},{"name":"apartment_id","type":["null","int"]},{"name":"room_id","type":["null","int"]},{"name":"sensor_id","type":["null","int"]},{"name":"metric","type":["null","int"]},{"name":"original_db","type":["null","string"]},{"name":"original_id","type":["null","long"]},{"name":"mvalue","type":["null","double"]}]} { "time_stamp" : "2019-08-21 06:30:00", "apartment_id" : 269, "room_id" : 1196, "sensor_id" : 2554, "metric" : 1, "original_db" : "1", "original_id" : 2347470787, "mvalue" : 23.014902551831668 } Only part of the data is copied to destination since I get the following kind of errors in Ni-Fi: PutDatabaseRecord[id=017e1008-dfd4-1d38-4b7c-6efa32ed611a] Failed to put Records to database for StandardFlowFileRecord[uuid=5ec613aa-b836-437e-a855-31907519e95b,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1649245325204-268, container=default, section=268], offset=472323, length=49990],offset=0,name=6f15f3ef-79f4-43e0-83a9-ee0a2a65d106,size=49990]. Routing to failure.: Batch entry 341 INSERT INTO public.periodic_measurements (time_stamp, apartment_id, room_id, sensor_id, metric, original_db, original_id, mvalue) VALUES ('2019-03-31 04:00:00+03',269,0,4699,2,1,1226523146,94.19718804413162) was aborted: ERROR: duplicate key value violates unique constraint "1551_709_periodic_measurements_pkey" Detail: Key (time_stamp, apartment_id, room_id, sensor_id, metric)=(2019-03-31 04:00:00+03, 269, 0, 4699, 2) already exists. Call getNextException to see other errors in the batch Note that the duplicate key error occurs at the time of change into daylight saving time (the timezone is UTC+2 without daylight saving time). The most strange thing is that it happens when time is changed one hour forward (not backward). Also, in all the error cases, the composite key has always one field whose value is zero (it was NULL in the source database before the data transformation in the pipeline). When going to the target database, I get the following result: SELECT COUNT(*) FROM periodic_measurements WHERE time_stamp BETWEEN '2019-03-31 03:59:59+03' AND '2019-03-31 04:00:01+03';
count
-------
0
(1 row) I get the same error also when I use a normal postgreSQL database as destination (with no Timescale hypertable and compression). Note that by using another pipeline (Qlik Replicate), there is no duplicate key error, so the problem is somewhere in Ni-Fi. All this rises the following questions: Why in the data provenance the format of the field time_stamp is AVRO string rather than long / timestamp-millis as it should be based on the settings in the property Use Avro Logical Types set to true in QueryDatabaseTable processor? Why in the target database there is no entry with timestamp value 2019-03-31 04:00:00+03 if this value causes a duplicate key error? What is causing the error in Apache Ni-Fi? Any hints? Resetting the key manually as suggested at postgresql duplicate key violates unique constraint is not an option since the composite key is a natural key formed by timestamp and ids which have a meaning. Used SW versions: Linux Unbuntu: 18.04 Apache Ni-Fi: 1.15.2 PostgreSQL: 14.2 Timescale: 2.6.0 Thanks, Bernardo Di Chiara
... View more
Labels:
- Labels:
-
Apache NiFi
03-22-2022
01:36 AM
1 Kudo
Hi André! Thanks for your answer! That was the problem!
... View more
01-13-2022
06:50 AM
I managed to fix the issue. It was related to the version of the jdbc driver. I did the following: In Linux browser go to https://dev.mysql.com/downloads/connector/j/ Select • Ubuntu Linux • 18.04 Download In a CLI terminal go to the Downloads directory $ cd /home/bernardo/Downloads Unpack the deb file $ sudo dpkg -i mysql-connector-java_8.0.26-1ubuntu18.04_all.deb Check where is the connector location in the file system $ dpkg -L mysql-connector-java | fgrep jar
/usr/share/java/mysql-connector-java-8.0.26.jar Clean /usr/share/java from the previous jdbc driver $ sudo rm mysql-connector-java-5.1.45.jar
$ sudo rm -r com
$ sudo rm -r META-INF/
Install the connector: unpack the .jar file $ sudo jar xvf /usr/share/java/mysql-connector-java-8.0.26.jar To find the driver class name, open /usr/share/java/META-INF/services/java.sql.Driver $ cat /usr/share/java/META-INF/services/java.sql.Driver
com.mysql.cj.jdbc.Driver The driver class name is: com.mysql.jdbc.Driver Restart Apache Ni-Fi Modify the configuration of the controller service with: • Database Driver Class Name: com.mysql.cj.jdbc.Driver • Database Driver Location: /usr/share/java/mysql-connector-java-8.0.26.jar
... View more
03-03-2021
10:36 PM
2 Kudos
I have managed to solve the issue by using the RouteText processor. I have added a propriety containing a regular expression that matches the rows I want to omit and I have configured the corresponding relationship to be terminated locally. Still, it would be nice to know how to implement the same functionality by using the ExtractText processor.
... View more
02-17-2021
05:35 AM
@justenji Thanks a lot! The suggestion you proposed works great! However, the source of the problem was not a lack of schema definition. In fact, I managed also to have a working flow with the option Schema Access Strategy = Use String Fields From Header in the CSVReader processor. The warning "None of the fields in the record map to the columns defined by the tablename table." is also obtained when the processor is not able to find the table and this can happen also when the table name is correctly configured in PutDatabaseRecord but there is some issue with user access rights (which ended up to be the actual cause of my error ...).
... View more