Member since
01-27-2023
229
Posts
74
Kudos Received
45
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1760 | 02-23-2024 01:14 AM | |
| 2296 | 01-26-2024 01:31 AM | |
| 1436 | 11-22-2023 12:28 AM | |
| 3588 | 11-22-2023 12:10 AM | |
| 3671 | 11-06-2023 12:44 AM |
04-20-2023
12:02 AM
hi @Ray82, Assuming that you have a way to identify where you need to add your ";", you can easily use NiFi's Expression Language to add it. More details are available here: https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html. At first sight, if you identify a pattern, you can use any function with replace in order to replace your white space with semicolon. Next, to add the semicolon between your characters, you can use an regex, based on your pattern. In terms of processors, you have ReplaceText, UpdateAttribute, UpdateRecord, etc .... so you have plenty to choose from 🙂
... View more
04-19-2023
06:58 AM
hi @VLban, How do your files look like before reaching MergeRecord and how do they look after they went through MergeRecord? Besides that, what settings did you use in your MergeRecord? For your two requirements, everything depends on how you configure MergeRecord. To generate parquet files, you set the Parquet Writer in your Record Writer property. For Large Files, you must define the Minimum Bin size, the Minimum Number of Records and optional Max Bin Age. What it would also help is the Correlation Attribute Name.
... View more
04-18-2023
02:41 AM
hi @ushasri, Can you provide some more details about your flow? Without knowing what you are doing in your flow, I can only tell you that you can use the Expression Language from NiFi and extract the current time send it into your stream. The current time can be called as an example like: ${now():toNumber():format('yyyy-MM-dd')} Next, you can use an UpdateRecord Processor, add the attribute to your newly defined column and send it to further processing. More about NiFi's Expression Language: https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html
... View more
04-18-2023
02:32 AM
1 Kudo
hi @databoi, It would help if you could also provide your NiFi Version, as each version has it s own perks and twitches. What you have experienced so far can have plenty of root causes and it is not quite easy to debug 😞 I assume that this happens only a single node, all the time, right? Something similar happened to me as well and it was not easy to fix ... or at least it was not for me. My problem was mostly related to how I configured the NiFi Cluster. I have been told that there are some best practices when it comes to configuring NiFi, especially on a bare metal machine: the repositories (content, flowfile and provenence) should be stored on separate drives, with a high I/O as these repositories are mostly used by NiFi to persist data. (nifi.properties) assign no more than 40% of your node's RAM memory to your heap configs. (bootstrap.conf) make sure that your open files and max user processes are set to a higher value than normal. setting the correct number of threads. (2 - 4 times the number of cores of your server) There were three problems on my side and the solution was as follows: - I moved the repositories on a different drive (a SSD) with a high I/O, so it could read and write the content faster. - I increased the open files and the max user processes to 50000 and 10000. And I will increase it again in a couple of days. - And my third problem was related to the disk hardware, as it was dying, it started to malfunction causing this stop-the-world delays. I replaced it and everything went back to normal. You should also pay attention to the JVM memory of that particular node. In addition, you could activate the debug mode and even generate some dumps to further analyze (./nifi.sh dump > <name of your dump file>). Another point you could check are the processes on your affected node. Maybe something is causing NiFi to become an zombie process (or your have some zombie processes) which are affecting your overall performance. I do hope that something from this message might lead you to your root cause. In any case, I strongly recommend you to take into consideration other opinions as well, from other community members, with far more experience than myself.
... View more
04-18-2023
01:31 AM
hi @MKothari, Are you certain that your messages get transferred successfully? Based on your screenshot, I can see that no FlowFile got IN in the past 5 mins, meaning that your flow is not running, even though everything is green and you have no errors in your GUI. I assume that the flowfile format matches whatever you have configured in your kafka. Have a look within PublishKafka and see if you do not have a strange scheduling set, causing you to see the number increasing instead of decreasing. Another thing you might try is stop both EvaluateJsonPath and PublishKafka and right click on your matched queue and open List Queue and see if there are any flowfile there and if so, on which node. Next, you can go on that node and further check the log file for possible error messages.
... View more
04-18-2023
12:52 AM
hi @nisha2112, Are you certain that your schema is correct? I do not have to much experience with the ConfluentSchemaRegistry, but I think that you might have altered your schema, either when inserting it into the registry or when exporting it out of the registry. What I recommend you to do is: - retrieve the schema (AS-IS) and check it to see if it is correct or not. If not, you know what to do. If it is correct, proceed to next point. - Within your ConvertRecord, modify both your Reader and your Writer to use Schema Text Property, where you manually define your schema. This will tell you one of the following two things: 1) Either your data coming into ConvertRecord is not in a correct format. --> ConvertRecord will fail. 2) You schema gets extracted fault from your ConfluentSchemaRegistry. --> The Flow will work and you will have no error. Once you did the test, you will know where the error is located and you can try debugging it further. For example, you can try and extract your schema from ConfluentSchemaRegistry and see if it gets extracted accordingly. Or if your data is incorrect, you can check if something changed in your source and you modify that data or your schema. There are plenty of possibilities and you have to start from somewhere 🙂
... View more
04-13-2023
09:01 AM
Hi @kspyropoulos, I would honestly start by asking what NiFi are you currently running (open source or cloudera)? In addition what is the version of your NiFi? I know that the questions might sound silly, but each version has (or does not have) some features. Next, I would ask you if the table you are inserting to has the same table structure (same column type) and if you are using the correct avro schema? I reproduced your flow as follows: CREATE TABLE test_voice (
column1 INT,
column2 varchar(255),
column3 bytea
); In NiFi (1.19.1) I set an ExecuteSQL processor, configured as follows: a simple DBCP Connection Pool pointing to my PostgreSQL Database, the simple select * from my_table as SQL select Query and !!! Use Avro Logical Types = true !!! After executing the flow, in the success queue, I get a single flowfile with a single row (because I inserted only 1 row for my test) insert into test_voice(column1,column2,column3) values(1,'hello','\x4920616d206261636b20616761696e2e2e2e') In terms of schema, we have: So far so good. Now, we will update the current row in my PostgreSQL database, column1=1 to column1=2, so that we could check if the insert took place. update test_voice set column1='2' where column1='1'; Next, using PutDatabaseRecord, we will insert the row in our database. For PutDatabaseRecord, I have configured the following: RecordReader = Avro Reader with Inherit Record Schema, Database Type = PostgreSQL, Statement Type = INSERT, Database Connection Pooling Service = The one used in ExecuteSQL, Catalog Name, Schema Name and Table Name taken out of PostgreSQL. Everything else left as default. Once I executed the flow, the row was inserted within the DB. So I can tell you that PutDatabaseRecord works just fine. Unfortunately for you, it seems that you problem is located somewhere else .... my points are on either the AVRO schema or the table you are trying to insert into 🙂
... View more
04-12-2023
07:58 AM
Thank you @MattWho, it worked like a charm. You are a life saver 🙂 I did not even consider the nanoseconds and I did not really knew about EL functions for the Java DateTimeFormatter. Neverthless, if somebody else encounters a similar issue, here is the link to the documentation --> here. One more question though, if possible. When saving the data into the postgresql database, using PutDatabaseRecord (JSON as Reader) , the value "2023-04-10 07:43:15.794" gets immediately truncated to "2023-04-10 07:43:15" --> basically it removed everything after the point. In postgresql, the column is defined as "timestamp without time zone" with an precision of 6.
... View more
04-12-2023
06:51 AM
1 Kudo
hi @moahmedhassaan, Regarding a CDC in NiFi for an Oracle instance, I highly recommend the following article as it guides you step by step in order to perform a partial CDC, only for UPDATE and INSERT: https://murtazak.medium.com/mimic-an-upsert-in-oracle-using-nifi-bb112dc1d6ab This solution will not work for DELETE, but based on that example you can create your own. This will however eat lots of resources if not configured properly.
... View more
04-12-2023
02:55 AM
@MattWho, @steven-matison: would really appreciate your input as I am struggling with this and I do not know how to solve it, or what to further check and try 😞 I tried to replace ConvertRecord with an UpdateRecord where I have tried updating my column /TimeStamp using the EL ${field.value:toDate("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"):format('yyyy-MM-dd HH:mm:ss.SSS')}. Unfortunately, the same result, a new datetime is generated 😞
... View more