Member since
01-27-2023
229
Posts
73
Kudos Received
45
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1281 | 02-23-2024 01:14 AM | |
1589 | 01-26-2024 01:31 AM | |
1093 | 11-22-2023 12:28 AM | |
2670 | 11-22-2023 12:10 AM | |
2703 | 11-06-2023 12:44 AM |
04-18-2023
12:52 AM
hi @nisha2112, Are you certain that your schema is correct? I do not have to much experience with the ConfluentSchemaRegistry, but I think that you might have altered your schema, either when inserting it into the registry or when exporting it out of the registry. What I recommend you to do is: - retrieve the schema (AS-IS) and check it to see if it is correct or not. If not, you know what to do. If it is correct, proceed to next point. - Within your ConvertRecord, modify both your Reader and your Writer to use Schema Text Property, where you manually define your schema. This will tell you one of the following two things: 1) Either your data coming into ConvertRecord is not in a correct format. --> ConvertRecord will fail. 2) You schema gets extracted fault from your ConfluentSchemaRegistry. --> The Flow will work and you will have no error. Once you did the test, you will know where the error is located and you can try debugging it further. For example, you can try and extract your schema from ConfluentSchemaRegistry and see if it gets extracted accordingly. Or if your data is incorrect, you can check if something changed in your source and you modify that data or your schema. There are plenty of possibilities and you have to start from somewhere 🙂
... View more
04-13-2023
09:01 AM
Hi @kspyropoulos, I would honestly start by asking what NiFi are you currently running (open source or cloudera)? In addition what is the version of your NiFi? I know that the questions might sound silly, but each version has (or does not have) some features. Next, I would ask you if the table you are inserting to has the same table structure (same column type) and if you are using the correct avro schema? I reproduced your flow as follows: CREATE TABLE test_voice (
column1 INT,
column2 varchar(255),
column3 bytea
); In NiFi (1.19.1) I set an ExecuteSQL processor, configured as follows: a simple DBCP Connection Pool pointing to my PostgreSQL Database, the simple select * from my_table as SQL select Query and !!! Use Avro Logical Types = true !!! After executing the flow, in the success queue, I get a single flowfile with a single row (because I inserted only 1 row for my test) insert into test_voice(column1,column2,column3) values(1,'hello','\x4920616d206261636b20616761696e2e2e2e') In terms of schema, we have: So far so good. Now, we will update the current row in my PostgreSQL database, column1=1 to column1=2, so that we could check if the insert took place. update test_voice set column1='2' where column1='1'; Next, using PutDatabaseRecord, we will insert the row in our database. For PutDatabaseRecord, I have configured the following: RecordReader = Avro Reader with Inherit Record Schema, Database Type = PostgreSQL, Statement Type = INSERT, Database Connection Pooling Service = The one used in ExecuteSQL, Catalog Name, Schema Name and Table Name taken out of PostgreSQL. Everything else left as default. Once I executed the flow, the row was inserted within the DB. So I can tell you that PutDatabaseRecord works just fine. Unfortunately for you, it seems that you problem is located somewhere else .... my points are on either the AVRO schema or the table you are trying to insert into 🙂
... View more
04-12-2023
07:58 AM
Thank you @MattWho, it worked like a charm. You are a life saver 🙂 I did not even consider the nanoseconds and I did not really knew about EL functions for the Java DateTimeFormatter. Neverthless, if somebody else encounters a similar issue, here is the link to the documentation --> here. One more question though, if possible. When saving the data into the postgresql database, using PutDatabaseRecord (JSON as Reader) , the value "2023-04-10 07:43:15.794" gets immediately truncated to "2023-04-10 07:43:15" --> basically it removed everything after the point. In postgresql, the column is defined as "timestamp without time zone" with an precision of 6.
... View more
04-12-2023
06:51 AM
1 Kudo
hi @moahmedhassaan, Regarding a CDC in NiFi for an Oracle instance, I highly recommend the following article as it guides you step by step in order to perform a partial CDC, only for UPDATE and INSERT: https://murtazak.medium.com/mimic-an-upsert-in-oracle-using-nifi-bb112dc1d6ab This solution will not work for DELETE, but based on that example you can create your own. This will however eat lots of resources if not configured properly.
... View more
04-12-2023
02:55 AM
@MattWho, @steven-matison: would really appreciate your input as I am struggling with this and I do not know how to solve it, or what to further check and try 😞 I tried to replace ConvertRecord with an UpdateRecord where I have tried updating my column /TimeStamp using the EL ${field.value:toDate("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"):format('yyyy-MM-dd HH:mm:ss.SSS')}. Unfortunately, the same result, a new datetime is generated 😞
... View more
04-11-2023
08:55 AM
To be really honest, I do not know how you could not remove those double quotes. Maybe you can try escaping the quotes with a backslash and also wrap the entire command in single quotes. Something like: (I might have added a few extra single quotes by mistake but you get the point) '-X POST -H '\''Content-Type:application/json'\'' -d '\''{"name": "Apple AirPods", "data": {"color": "white", "generation": "3rd", "price": 135}}'\'' https://api.restful-api.dev/objects' In theory, the double quotes around the JSON string are escaped with a backslash and the entire command is wrapped in single quotes. The single quotes allow the command to be passed to the ExecuteStreamCommand processor as a single argument, which ensures that the quotes are preserved. PS: I did not test this and I am not 100% it will work, but at least it is worth a shot until somebody with more experience can guide your further.
... View more
04-11-2023
08:20 AM
Hi @Jaimin7, I am not quite sure how your SMTP Server is configured, but everywhere I have implemented the PutEmail processors, I needed 5 mandatory properties: 1) SMTP Hostname 2) SMTP Port 3) SMTP Username (even though in NiFi it is not a mandatory field, it was a mandatory field for the STMP to allow the connection) 4) SMTP Password (even though in NiFi it is not a mandatory field, it was a mandatory field for the STMP to allow the connection) 5) From. Having all these fields configured, I was able to send email from NiFi without any restrictions. Of course, I made sure that the firewall connection between NiFi and the SMTP Server is allowing such connections 🙂
... View more
04-11-2023
03:03 AM
Hi there, So, I need your experience with a flow I have been struggling with. My Flow is as follows: ConsumeKafka --> ConvertRecord --> ConvertJSONToSQL --> PutSQL. I am extracting some data out of some kafka brokers, which come in the following format: {"Column1": "1e39c17d25cb420e8e720aa4b1ae005e","TimeStamp": "2023-04-10T10:43:15.794241429+03:00","Column3": "some_string","Column4": false,"Column5": "some_string","Column6": "some_string","Column7": false,"Column8": "some_string","Column9": "","Column10": 0,"Column11": 0,"Column12": "","Column13": 0,"Column14": 137,"Column15": "","Column16": 0,"Column17": 0,"Column18": "","Column19": 138,"Column20": 1550,"Column21": "some_string","Column22": ""} I am using afterwards an ConverRecord, with an RecordReader = JsonTreeReader (with default configuration: Infer Schema and Root Node) and an RecordWriter = JsonRecordSetWriter (with default configuration: Inherit Record Schema and Pretty print json = false). The data comes out in JSON format, as expected, without any modifications. Next, I am going into a ConvertJSONToSQL. Here, I have defined a JDBC Connection Pool, set the Statement Type = INSERT and set the Table Name = my_table. All the other configurations remained the same. The data comes out as attributes and they look fine. sql.args.2.type --> 93 sql.args.2.value --> 2023-04-10T10:43:15.794241429+03:00 Now, due to the fact that I have to add the data in a PostgreSQL Database, I am using PutSQL to save the data. As the column I am inserting that value into is of format "timestamp without time zone", I have added one UpdateAttribute Processors to modify the date. The first UpdateAttribute is defined with a property named "sql.args.2.value" and has the following value "${sql.args.2.value:substringBefore('T')} ${sql.args.2.value:substringAfter('T'):substringBefore('+')}". The data which now comes out is: " 2023-04-10 10:43:15.794241429". When inserting the data into PostgreSQL, I have a different value as the one present into the attribute: "2023-04-19 15:20:36.429". To further debug the flow, I have added another UpdateAttribute and defined it with a property named "a3" and has the following value ${sql.args.2.value:toDate("yyyy-MM-dd HH:mm:ss.SSS"):toNumber():format("yyyy-MM-dd HH:mm:ss.SSS")}. I have added another property as well, named "a4" and has the following value "${sql.args.2.value:toDate("yyyy-MM-dd HH:mm:ss.SSS"):toNumber()}". Now, the results are pretty strange: a3 --> 2023-04-19 15:20:36.429 a4 --> 1681906836429. Basically the problems comes when translating that date into the datetime type. Does anybody know why it behaves like this? I am certain that this is not a bug, but something I am not using correctly 🙂 Thank you 🙂
... View more
Labels:
- Labels:
-
Apache NiFi
04-11-2023
01:17 AM
@Jame1979, It would really help if you could post a screenshot of each of the processors, as the problem might be related to some of your configurations. You might be doing something with UpdateAttribute which will affect the behavior of FetchS3. As I can see, you have a terminated thread and a running thread, meaning that something happened there. I had a similar issue and I solved it by restarting the NiFi Cluster. There was a problem in the back-end which translated into threads being generated but actually not performing any action. After the restart, everything went back to normal and I started collecting data using Fetch :). You could also set the FetchS3 to DEBUG to see if any logs are being generated, which might point you into the right direction.
... View more
04-10-2023
01:22 PM
Well, besides the fact that you are calling a different API endpoint as you previously did, yea, it could be that the payload is the one messing up your command. What I would try is to use an UpdateAttribute to generate the curl command as an atrribute and see how it gets constructed. I would then take the command and execute it on my server to see if it works — if it gets constructed correctly. Otherwise, you will see where the error is located and you can try and solve it.
... View more