Member since
02-15-2021
18
Posts
5
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
10955 | 01-13-2022 06:50 AM | |
3187 | 02-17-2021 05:35 AM |
03-20-2024
03:10 AM
1 Kudo
Hi! I have created a DAG file to control few scripts and an Apache Ni-Fi data pipeline with Apache Airflow. The data pipeline starts with a GenerateTableFetch processor which is followed by a ExecuteSQL and few more processors which have the scope of performing certain data transformations. Then, I use a PutDatabaseRecord processor to load the data into a database. Below is the Ni-FI flow. Airflow tells Ni-FI to start the pipeline by setting the GenerateTableFetch processor into RUN_ONCE state by using a similar approach to the one nicely described in this post: https://towardsdatascience.com/interconnecting-airflow-with-a-nifi-etl-pipeline-8abea0667b8a The same article proposes a method to let Airflow know that the Ni-Fi pipeline is over. That method is based on a UpdateAttribute processor which sets the variable "last_tms" to the current time every time it runs. This processor gets input from the success relationship of the previous processor (PutDatabaseRecord). The problem with that approach is that my pipeline is meant to transfer a huge amount of data and uses multiple flows. So, waiting that the value of "last_tms" changes is not a solution since that will happen many times during the life of the pipeline. Waiting that this value remains the same for some time would require setting a pause timer and this would require knowledge of the maximum possible interval between the end of two flows in the pipeline. This last approach is not very practical since it has the risk of waiting for too long or of continuing with the DAG tasks before the pipeline is over. I wonder if it would be possible to automatize the method I have been using manually by looking Ni-Fi GUI: checking that there are no more active threads and no more queued data in the Ni-Fi pipeline. How I can retrieve this information from Ni-Fi in Airflow? Any other suggestion to solve the issue would be highly appreciated. I run Apache Airflow version 2.6.2 and Apache Ni-Fi 1.23.2 on Java 11.0.22. Both machines run Linux Ubuntu 22.04.3. Thanks, Bernardo
... View more
Labels:
- Labels:
-
Apache Airflow
-
Apache NiFi
09-25-2023
04:53 AM
1 Kudo
@cotopaul Thanks a lot for your answer! By running GenerateTableFetch in DEBUG mode I noticed that it was searching for entries whose id was bigger that the max. The processor has been used with the same source table in testing phase and this was the reason why it was not producing any output from that table. I have cleared the state of the processor and now it is working as expected. Have a nice day, Bernardo
... View more
09-25-2023
01:40 AM
HIi I have created an Apache Ni-Fi data pipeline that fetches data from a MySQL table and after some data transformation loads the data into a postgreSQL table. The first processor is a GenerateTableFetch which is followed by an ExecuteSQL processor. The pipeline works perfectly with source tables containing over 14 million entries (around 2700 MB). However, with tables containing over 420 million entries (around 69000 MB), the GenerateTableFetch processor does not provide any output. I have used a partition size of 100000 rows and I am using as Column for Value Partitioning the column id which is the primary key in the source table. The same column is also used as Maximimum-value Columns. The Max Wait Time is set to zero, allowing also long SQL queries. The attached figure shows the other properties of the processor. In the ExecuteSQL processor I have configured the Max Rows Per Flow File to the same value as the partitioning size: 100000. I am running Ni-Fi 1.18.0 (java 11.0.20.1) on Linux Ubuntu 22.04. Any hint how I would manage to load such a huge table? Thanks, Bernardo
... View more
Labels:
- Labels:
-
Apache NiFi
04-12-2022
06:47 AM
I have created an Apache Ni-Fi pipeline to transfer data from a MySQL database in an AWS RDS instance into a Timescale database (PostgreSQL) located in my local Linux Ubuntu instance running on VirtualBox on top of Windows 10. The target table has a composite primary key and has been created as follows: CREATE TABLE periodic_measurements (
time_stamp TIMESTAMP WITH TIME ZONE NOT NULL,
original_db SMALLINT NULL,
original_id BIGINT NULL,
apartment_id INTEGER NOT NULL,
room_id INTEGER NOT NULL,
sensor_id INTEGER NOT NULL,
metric SMALLINT NOT NULL,
mvalue NUMERIC NULL,
PRIMARY KEY (time_stamp, apartment_id, room_id, sensor_id, metric)); Here are some details about the ETL data pipeline: The MySQL source database has a timestamp column called period with datatype datetime, which therefore does not contain natively the time zone. However, the implementation uses Django framework which is configured to use time zone (USE_TZ=True). In the first processor of the Ni-Fi pipeline (QueryDatabaseTable) the column period is renamed time_stamp and formatted as follows: date_format(period, '%Y-%m-%d %H:%i:%s'). The output of the in Ni-Fi QueryDatabaseTable processor comes in AVRO format and the propriety Use Avro Logical Types is set to true. This means that the time_stamp column should be formatted as AVRO long / timestamp-millis. After a series of processors that use AVRO format (UpdateRecord, QueryRecord), the data goes to PutDatabaseRecord processor to be inserted into Timescale (PostgreSQL). In postgreSQL the time_stamp column has datatype timestamp with time zone, as seen above. When running the data pipeline, in the data provenance in Apache Ni-Fi I can see the following: Data in the output of QueryDatabaseTable (first processor) Obj avro.schemaæ {"type":"record","name":"periodic_measurements","namespace":"any.data","fields":[{"name":"original_id","type":["null","long"]},{"name":"time_stamp","type":["null","string"]},{"name":"apartment_id","type":["null","int"]},{"name":"room_id","type":["null","int"]},{"name":"sensor_id","type":["null","int"]},{"name":"metric","type":["null","int"]},{"name":"valueSum","type":["null","double"]},{"name":"valueDiv","type":["null","double"]}]} { "original_id" : 3341103630, "time_stamp" : "2019-12-08 03:20:00", "apartment_id" : 269, "room_id" : 1204, "sensor_id" : 2562, "metric" : 1, "valueSum" : 12267.1010819471, "valueDiv" : 600.0 }, Data in the input of PutDatabaseRecord (last processor) Obj avro.schemaΰ {"type":"record","name":"nifiRecord","namespace":"org.apache.nifi","fields":[{"name":"time_stamp","type":["null","string"]},{"name":"apartment_id","type":["null","int"]},{"name":"room_id","type":["null","int"]},{"name":"sensor_id","type":["null","int"]},{"name":"metric","type":["null","int"]},{"name":"original_db","type":["null","string"]},{"name":"original_id","type":["null","long"]},{"name":"mvalue","type":["null","double"]}]} { "time_stamp" : "2019-08-21 06:30:00", "apartment_id" : 269, "room_id" : 1196, "sensor_id" : 2554, "metric" : 1, "original_db" : "1", "original_id" : 2347470787, "mvalue" : 23.014902551831668 } Only part of the data is copied to destination since I get the following kind of errors in Ni-Fi: PutDatabaseRecord[id=017e1008-dfd4-1d38-4b7c-6efa32ed611a] Failed to put Records to database for StandardFlowFileRecord[uuid=5ec613aa-b836-437e-a855-31907519e95b,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1649245325204-268, container=default, section=268], offset=472323, length=49990],offset=0,name=6f15f3ef-79f4-43e0-83a9-ee0a2a65d106,size=49990]. Routing to failure.: Batch entry 341 INSERT INTO public.periodic_measurements (time_stamp, apartment_id, room_id, sensor_id, metric, original_db, original_id, mvalue) VALUES ('2019-03-31 04:00:00+03',269,0,4699,2,1,1226523146,94.19718804413162) was aborted: ERROR: duplicate key value violates unique constraint "1551_709_periodic_measurements_pkey" Detail: Key (time_stamp, apartment_id, room_id, sensor_id, metric)=(2019-03-31 04:00:00+03, 269, 0, 4699, 2) already exists. Call getNextException to see other errors in the batch Note that the duplicate key error occurs at the time of change into daylight saving time (the timezone is UTC+2 without daylight saving time). The most strange thing is that it happens when time is changed one hour forward (not backward). Also, in all the error cases, the composite key has always one field whose value is zero (it was NULL in the source database before the data transformation in the pipeline). When going to the target database, I get the following result: SELECT COUNT(*) FROM periodic_measurements WHERE time_stamp BETWEEN '2019-03-31 03:59:59+03' AND '2019-03-31 04:00:01+03';
count
-------
0
(1 row) I get the same error also when I use a normal postgreSQL database as destination (with no Timescale hypertable and compression). Note that by using another pipeline (Qlik Replicate), there is no duplicate key error, so the problem is somewhere in Ni-Fi. All this rises the following questions: Why in the data provenance the format of the field time_stamp is AVRO string rather than long / timestamp-millis as it should be based on the settings in the property Use Avro Logical Types set to true in QueryDatabaseTable processor? Why in the target database there is no entry with timestamp value 2019-03-31 04:00:00+03 if this value causes a duplicate key error? What is causing the error in Apache Ni-Fi? Any hints? Resetting the key manually as suggested at postgresql duplicate key violates unique constraint is not an option since the composite key is a natural key formed by timestamp and ids which have a meaning. Used SW versions: Linux Unbuntu: 18.04 Apache Ni-Fi: 1.15.2 PostgreSQL: 14.2 Timescale: 2.6.0 Thanks, Bernardo Di Chiara
... View more
Labels:
- Labels:
-
Apache NiFi
03-22-2022
01:36 AM
1 Kudo
Hi André! Thanks for your answer! That was the problem!
... View more
03-21-2022
09:58 AM
I have created a data pipeline with Apache Ni-Fi (version 1.15.2) to transfer data from a MySQL database in a AWS RDS instance into Timescale (postgreSQL). Ni-Fi runs in Linux Ubuntu 18.04 in Virtual Box on top of Windows 10. The pipeline has been tested successfully by using as a target the Timescale database located in my local Linux instance, without using SSL. Then I have modified the pipeline to use Timescale Cloud as a target (https://portal.timescale.cloud/login). The database name is: periodic-measurements In Timescale Cloud service page I can see these credentials: Host: <host-string> Port: 10250 User: tsdbadmin Password: <timescale-cloud-service-password> Service URI: postgres://tsdbadmin:<timescale-cloud-service-password>@<host-string>:10250/defaultdb?sslmode=require SSL mode: require Allowed IP addresses: 0.0.0.0/0 I use PutDatabaseRecord processor to write data in the postgreSQL database which uses the DBCPConnectionPool controller service. I have copied the CA certificate got from Timescale Cloud service page into the following directory in my Linux instance with the following file name: /etc/ca-certificates/ts-cloud-ca.pem These are the configuration parameters in the the DBCPConnectionPool controller service: • Database Connection URL: jdbc:postgres://<host-string>:10250/periodic-measurements • Database Driver Class Name: org.postgresql.Driver • Database Driver Location: /usr/share/java/postgresql/postgresql-42.3.1.jar • Database User: tsdbadmin • Password: <timescale-cloud-service-password> • sslmode: require • sslrootcert: /etc/ca-certificates/ts-cloud-ca.pem When enabling the controller service I get the following error: StandardControllerServiceNode[service=DBCPConnectionPool[id=017e1005-818e-1423-7951-125faf9dff4d], name=Connector to Timescale Cloud, active=true] Failed to invoke @OnEnabled method due to java.sql.SQLException: No suitable driver
causes: org.apache.nifi.processor.exception.ProcessException: No suitable driver for the given Database Connection URL: No suitable driver for the given Database Connection URL I get the same result if I change the DBCPConnectionPool controller service settings with sslmode: verify-ca Note that I have managed to connect successfully to the Timescale Cloud database with Grafana from my Linux instance by using the same credentials. Any hint what could be wrong? Thanks, Bernardo
... View more
Labels:
- Labels:
-
Apache NiFi
01-13-2022
06:50 AM
I managed to fix the issue. It was related to the version of the jdbc driver. I did the following: In Linux browser go to https://dev.mysql.com/downloads/connector/j/ Select • Ubuntu Linux • 18.04 Download In a CLI terminal go to the Downloads directory $ cd /home/bernardo/Downloads Unpack the deb file $ sudo dpkg -i mysql-connector-java_8.0.26-1ubuntu18.04_all.deb Check where is the connector location in the file system $ dpkg -L mysql-connector-java | fgrep jar
/usr/share/java/mysql-connector-java-8.0.26.jar Clean /usr/share/java from the previous jdbc driver $ sudo rm mysql-connector-java-5.1.45.jar
$ sudo rm -r com
$ sudo rm -r META-INF/
Install the connector: unpack the .jar file $ sudo jar xvf /usr/share/java/mysql-connector-java-8.0.26.jar To find the driver class name, open /usr/share/java/META-INF/services/java.sql.Driver $ cat /usr/share/java/META-INF/services/java.sql.Driver
com.mysql.cj.jdbc.Driver The driver class name is: com.mysql.jdbc.Driver Restart Apache Ni-Fi Modify the configuration of the controller service with: • Database Driver Class Name: com.mysql.cj.jdbc.Driver • Database Driver Location: /usr/share/java/mysql-connector-java-8.0.26.jar
... View more
01-12-2022
01:23 AM
Hi have corrected a typo in Ni-Fi controller service configuration: • Database Connection URL: jdbc:mysql://127.0.0.1:33061/<local_db_name> This way I solved the controller service problem and I have been able to enable it. However, when I run the processor QueryDatabaseTable, I get the following error: Cannot create PoolableConnectionFactory - Communications link failure QueryDatabaseTable[id=017e1003-c2d8-14cf-4e34-feee76411595] Unable to execute SQL select query SELECT * FROM periodic_measurements_test due to java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure The last packet successfully received from the server was 1 milliseconds ago. The last packet sent successfully to the server was 1 milliseconds ago.): javax.net.ssl.SSLHandshakeException: No appropriate protocol (protocol is disabled or cipher suites are inappropriate) ↳ causes: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure The last packet successfully received from the server was 1 milliseconds ago. The last packet sent successfully to the server was 1 milliseconds ago. ↳ causes: java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure The last packet successfully received from the server was 1 milliseconds ago. The last packet sent successfully to the server was 1 milliseconds ago.) ↳ causes: org.apache.nifi.processor.exception.ProcessException: java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure The last packet successfully received from the server was 1 milliseconds ago. The last packet sent successfully to the server was 1 milliseconds ago.) The processor configuration is default, except for: • Database Connection Pooling Service: < DBCPConnectionPool controller service name > • Database Type: MySQL • Table Name: periodic_measurements_test The problem might be due to the SSH tunnel but I cannot understand where the problem is. netstat -nplt
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 127.0.0.1:33061 0.0.0.0:* LISTEN 29483/ssh
tcp 0 0 127.0.0.1:3306 0.0.0.0:* LISTEN -
tcp 0 0 127.0.0.1:8443 0.0.0.0:* LISTEN - And in Linux terminal I can still access the MySQL database thorough CLI: $ mysql -u <db_user> -p -h 127.0.0.1 -P 33061
The controller service configuration is still: • Database Connection URL: jdbc:mysql://127.0.0.1:33061/<db_name> • Database Driver Class Name: com.mysql.jdbc.Driver • Database Driver Location: /usr/share/java/mysql-connector-java-5.1.45.jar • Database User: <db_user> • Password: <db_password>
... View more
01-12-2022
01:20 AM
I thought it could be an issue related to the driver version so I re-installed the jdbc driver. I have cleaned /home/bernardo/Download and /usr/share/java from the old sql related files. Then I have done the installation again. Install the jdbc connector $ sudo apt-get install libmysql-java The .jar file is in /usr/share/java/mysql-connector-java-5.1.45.jar Unpack the .jar file $ sudo jar xvf /usr/share/java/mysql-connector-java-5.1.45.jar To find the driver class name, open /usr/share/java/META-INF/services/java.sql.Driver $ cat /usr/share/java/META-INF/services/java.sql.Driver
com.mysql.jdbc.Driver
com.mysql.fabric.jdbc.FabricMySQLDriver The driver class name is: com.mysql.jdbc.Driver This is the updated configuration in the DBCPConnectionPool1.15.2 controller service in Apache Ni-Fi: • Database Driver Class Name: com.mysql.jdbc.Driver • Database Driver Location: /usr/share/java/mysql-connector-java-5.1.45.jar • Database Connection URL: jbdc:mysql://127.0.0.1:33061/<db_name> • Database User: <db_user> • Password: <db_password> I have obtained the same result: causes: org.apache.nifi.processor.exception.ProcessException: No suitable driver for the given Database Connection URL Then I checked if the problem was in the SSH tunnel. $ netstat -nplt
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 127.0.0.1:33061 0.0.0.0:* LISTEN 4497/ssh
tcp 0 0 127.0.0.1:3306 0.0.0.0:* LISTEN -
tcp6 0 0 ::1:33061 :::* LISTEN 4497/ssh
$ cat /proc/4497/status
Name: ssh
State: S (sleeping) The SSH process is in sleeping state. In a separate terminal I accessed the remote MySQL database: $ mysql -u <db_user> -p -h 127.0.0.1 -P 33061
Finally, I tried to connect to my local mysql database in my local Linux instance by using the following Ni-Fi settings: • Database Driver Class Name: com.mysql.jdbc.Driver • Database Driver Location: /usr/share/java/mysql-connector-java-5.1.45.jar • Database Connection URL: jbdc:mysql://127.0.0.1:3306/<local_db_name> • Database User: <local_db_user> • Password: <local_db_password> I have obtained the same result: causes: org.apache.nifi.processor.exception.ProcessException: No suitable driver for the given Database Connection URL Still, in a separate Linux terminal I am able to access the local mysql database : $ mysql -u <local_db_user> -p -h 127.0.0.1 -P 3306
... View more
01-05-2022
10:00 AM
The file mysql-connector-java-8.0.26.jar is in /usr/share/java Also: $ cat /usr/share/java/META-INF/services/java.sql.Driver com.mysql.cj.jdbc.Driver SO, I have made the following changes to the DBCPConnectionPool1.15.2 controller service configuration: • Database Driver Class Name: com.mysql.cj.jdbc.Driver • Database Driver Location: /usr/share/java/mysql-connector-java-8.0.26.jar The other parameters are not changed: • Database Connection URL: jbdc:mysql://127.0.0.1:33061/<db_name> • Database User: <user> • Password: <password> $ netstat -nplt Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp 0 0 127.0.0.1:33061 0.0.0.0:* LISTEN 4873/ssh tcp6 0 0 ::1:33061 :::* LISTEN 4873/ssh Still I get an error: causes: org.apache.nifi.processor.exception.ProcessException: No suitable driver for the given Database Connection URL
... View more