Created 04-13-2023 05:54 AM
I am using ExecuteSQL Nifi processor to read from a Postgresql table defined as per below:
CREATE TABLE otherdatatype(
column1 int,
column2 varchar(255),
column3 bytea
);
I get the values of that table successfully, as per below:
column1 : 1
column2 : "hello world"
column3 :
\x4920616d206261636b20616761696e2e2e2e (converted to text "I am back again...")
and the AVRO schema automatically generated by ExecuteSQL Nifi processor is:
{
"type": "record",
"name": "otherdatatype",
"namespace": "any.data",
"fields": [{
"name": "column1",
"type": ["null", "int"]
}, {
"name": "column2",
"type": ["null", "string"]
}, {
"name": "column3",
"type": ["null", "bytes"]
}]
}
Then I try to use PutDatabaseRecord Nifi processor to insert the values already read into another table, as per below:
The AvroReaderInferSchema record reader property is set to an AvroReader controller that is using the embedded Avro Schema:
Created on 04-13-2023 09:01 AM - edited 04-13-2023 09:03 AM
Hi @kspyropoulos,
I would honestly start by asking what NiFi are you currently running (open source or cloudera)? In addition what is the version of your NiFi? I know that the questions might sound silly, but each version has (or does not have) some features.
Next, I would ask you if the table you are inserting to has the same table structure (same column type) and if you are using the correct avro schema? I reproduced your flow as follows:
CREATE TABLE test_voice (
column1 INT,
column2 varchar(255),
column3 bytea
);
In NiFi (1.19.1) I set an ExecuteSQL processor, configured as follows: a simple DBCP Connection Pool pointing to my PostgreSQL Database, the simple select * from my_table as SQL select Query and !!! Use Avro Logical Types = true !!!
After executing the flow, in the success queue, I get a single flowfile with a single row (because I inserted only 1 row for my test)
insert into test_voice(column1,column2,column3) values(1,'hello','\x4920616d206261636b20616761696e2e2e2e')
In terms of schema, we have:
So far so good. Now, we will update the current row in my PostgreSQL database, column1=1 to column1=2, so that we could check if the insert took place.
update test_voice set column1='2' where column1='1';
Next, using PutDatabaseRecord, we will insert the row in our database. For PutDatabaseRecord, I have configured the following: RecordReader = Avro Reader with Inherit Record Schema, Database Type = PostgreSQL, Statement Type = INSERT, Database Connection Pooling Service = The one used in ExecuteSQL, Catalog Name, Schema Name and Table Name taken out of PostgreSQL. Everything else left as default.
Once I executed the flow, the row was inserted within the DB.
So I can tell you that PutDatabaseRecord works just fine. Unfortunately for you, it seems that you problem is located somewhere else .... my points are on either the AVRO schema or the table you are trying to insert into 🙂
Created 04-13-2023 07:55 AM
@kspyropoulos Welcome to the Cloudera Community!
To help you get the best possible solution, I have tagged our NiFi experts @SAMSAL @cotopaul @MattWho who may be able to assist you further.
Please keep us updated on your post, and we hope you find a satisfactory solution to your query.
Regards,
Diana Torres,Created on 04-13-2023 09:01 AM - edited 04-13-2023 09:03 AM
Hi @kspyropoulos,
I would honestly start by asking what NiFi are you currently running (open source or cloudera)? In addition what is the version of your NiFi? I know that the questions might sound silly, but each version has (or does not have) some features.
Next, I would ask you if the table you are inserting to has the same table structure (same column type) and if you are using the correct avro schema? I reproduced your flow as follows:
CREATE TABLE test_voice (
column1 INT,
column2 varchar(255),
column3 bytea
);
In NiFi (1.19.1) I set an ExecuteSQL processor, configured as follows: a simple DBCP Connection Pool pointing to my PostgreSQL Database, the simple select * from my_table as SQL select Query and !!! Use Avro Logical Types = true !!!
After executing the flow, in the success queue, I get a single flowfile with a single row (because I inserted only 1 row for my test)
insert into test_voice(column1,column2,column3) values(1,'hello','\x4920616d206261636b20616761696e2e2e2e')
In terms of schema, we have:
So far so good. Now, we will update the current row in my PostgreSQL database, column1=1 to column1=2, so that we could check if the insert took place.
update test_voice set column1='2' where column1='1';
Next, using PutDatabaseRecord, we will insert the row in our database. For PutDatabaseRecord, I have configured the following: RecordReader = Avro Reader with Inherit Record Schema, Database Type = PostgreSQL, Statement Type = INSERT, Database Connection Pooling Service = The one used in ExecuteSQL, Catalog Name, Schema Name and Table Name taken out of PostgreSQL. Everything else left as default.
Once I executed the flow, the row was inserted within the DB.
So I can tell you that PutDatabaseRecord works just fine. Unfortunately for you, it seems that you problem is located somewhere else .... my points are on either the AVRO schema or the table you are trying to insert into 🙂
Created 04-14-2023 04:51 AM
Hi @cotopaul,
Thanks for your prompt reply, it was clear and to the point. I have followed the same steps and it works for Apache Nifi 1.18.0 (open-source/community).
The version I have observed the issue on is Apache Nifi 1.11.4 (open-source/community).
Created 04-14-2023 07:03 AM
I have also tested the same with Apache Nifi 1.15.3 (open-source/community) and it works. It did not work though when testing with Apache Nifi 1.13.2 (open-source/community) it still fails with the same error message.