Member since
01-27-2023
229
Posts
74
Kudos Received
45
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1770 | 02-23-2024 01:14 AM | |
| 2300 | 01-26-2024 01:31 AM | |
| 1441 | 11-22-2023 12:28 AM | |
| 3596 | 11-22-2023 12:10 AM | |
| 3680 | 11-06-2023 12:44 AM |
04-18-2023
02:41 AM
hi @ushasri, Can you provide some more details about your flow? Without knowing what you are doing in your flow, I can only tell you that you can use the Expression Language from NiFi and extract the current time send it into your stream. The current time can be called as an example like: ${now():toNumber():format('yyyy-MM-dd')} Next, you can use an UpdateRecord Processor, add the attribute to your newly defined column and send it to further processing. More about NiFi's Expression Language: https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html
... View more
04-18-2023
12:52 AM
hi @nisha2112, Are you certain that your schema is correct? I do not have to much experience with the ConfluentSchemaRegistry, but I think that you might have altered your schema, either when inserting it into the registry or when exporting it out of the registry. What I recommend you to do is: - retrieve the schema (AS-IS) and check it to see if it is correct or not. If not, you know what to do. If it is correct, proceed to next point. - Within your ConvertRecord, modify both your Reader and your Writer to use Schema Text Property, where you manually define your schema. This will tell you one of the following two things: 1) Either your data coming into ConvertRecord is not in a correct format. --> ConvertRecord will fail. 2) You schema gets extracted fault from your ConfluentSchemaRegistry. --> The Flow will work and you will have no error. Once you did the test, you will know where the error is located and you can try debugging it further. For example, you can try and extract your schema from ConfluentSchemaRegistry and see if it gets extracted accordingly. Or if your data is incorrect, you can check if something changed in your source and you modify that data or your schema. There are plenty of possibilities and you have to start from somewhere 🙂
... View more
04-14-2023
07:03 AM
I have also tested the same with Apache Nifi 1.15.3 (open-source/community) and it works. It did not work though when testing with Apache Nifi 1.13.2 (open-source/community) it still fails with the same error message.
... View more
04-12-2023
07:58 AM
Thank you @MattWho, it worked like a charm. You are a life saver 🙂 I did not even consider the nanoseconds and I did not really knew about EL functions for the Java DateTimeFormatter. Neverthless, if somebody else encounters a similar issue, here is the link to the documentation --> here. One more question though, if possible. When saving the data into the postgresql database, using PutDatabaseRecord (JSON as Reader) , the value "2023-04-10 07:43:15.794" gets immediately truncated to "2023-04-10 07:43:15" --> basically it removed everything after the point. In postgresql, the column is defined as "timestamp without time zone" with an precision of 6.
... View more
04-12-2023
07:52 AM
2 Kudos
@drewski7 The removal of quotes from the "command arguments" is expected behavior in the ExecuteStreamCommand processor. This processor was introduced to NiFi more than 10 years ago and was originally designed for a more minimal scope of work including the expectation that FlowFile content would be passed to the script/command being executed. As time passed on the use cases that were trying to be solutioned via the ExecuteStreamCommand expanded; however, handling those use case would potential break users already implemented and working dataflow. So rather then change that default behavior, a new property "Command Arguments Strategy" was added with the original "Command Arguments Property" as the default (legacy method) and a new "Dynamic property arguments" option. This change is part of this JIra and implemented as of Apache NiFi 1.10: https://issues.apache.org/jira/browse/NIFI-3221 In your use case, you'll want to switch to using the "Dynamic property arguments". This will then require you to click on the "+" to add a new dynamic property. The property names MUST use this format: command.argument.<num> So in your case you might try something like: command.argument.1 = -X POST -H referer:${Referer} -H 'Content-Type: application/json' -d '{"newTopics": [{"name":"testing123","numPartitions":3,"replicationFactor":3}], "allTopicNames":["testing123"]}' --negotiate -u : -b /tmp/cookiejar.txt -c /tmp/cookiejar.txt http://SMM-HOSTNAME:8585/api/v1/admin/topics If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
04-11-2023
08:20 AM
Hi @Jaimin7, I am not quite sure how your SMTP Server is configured, but everywhere I have implemented the PutEmail processors, I needed 5 mandatory properties: 1) SMTP Hostname 2) SMTP Port 3) SMTP Username (even though in NiFi it is not a mandatory field, it was a mandatory field for the STMP to allow the connection) 4) SMTP Password (even though in NiFi it is not a mandatory field, it was a mandatory field for the STMP to allow the connection) 5) From. Having all these fields configured, I was able to send email from NiFi without any restrictions. Of course, I made sure that the firewall connection between NiFi and the SMTP Server is allowing such connections 🙂
... View more
04-05-2023
12:15 AM
1 Kudo
Hi @saquibsk, Ok, undestood. What I can tell you is that what you are trying to achieve is not impossible but is not easy either. I believe in the power of a community but in the same time I believe that the scope of the community is to help you with solutions and advice to your problems and not to do the work for you 🙂 I assume that you already started a flow so lets start from there, what you developed, why it is not good and what you are missing from it. From my point of view, there are two options here: 1) You modify all of your Processors to write the Bulletin Level at INFO (or Debug) and afterwards, using an InvokeHTTP, you can access your Bulletin Board with the REST API and extract your information. This it not highly recommended as you will generate very large logs files. Besides that, your certificates must be generated accordingly, otherwise you will get some errors. 2) At each step in your flow, you write a message to LogMessage, which will save your data into nifi-app.log. Here you can define in LogMessage exactly what you want to write. Afterwards, you can create a separate flow, using a TailFile Processor and extract everything you want from your nifi-app.log File. Here you will have to extract only the information you require 🙂 Once you have extracted your data, either from your Bulletin Board or from your LogFile, you can build the SQL Statement for inserting the data into the DB.
... View more
04-04-2023
06:57 AM
Bingo! Thanks so much.
... View more
04-03-2023
12:57 AM
Good day, everyone. This problem has been resolved. I made a new subfolder called /opt/nifi_server/ and installed the NIFI in it. When I first began, it gave me the error "Unable to bind the IP with port 844." I terminated the PID and launched the Nifi.Everything is back to normal now.
... View more
03-31-2023
02:27 AM
Hi , Thank you for your assistance with this matter. The answers to your questions is as follows: Is that complete stack trace from the nifi-app.log? No, the complete stack trace is the following one: 2023-03-29 10:02:21,002 ERROR [Timer-Driven Process Thread-24] o.a.n.p.standard.PartitionRecord PartitionRecord[id=3be1c42e-5fa9-3144-3365-f568bb616028] Processing halted: yielding [1 sec]
java.lang.IllegalArgumentException: newLimit > capacity: (92 > 83)
at java.base/java.nio.Buffer.createLimitException(Buffer.java:372)
at java.base/java.nio.Buffer.limit(Buffer.java:346)
at java.base/java.nio.ByteBuffer.limit(ByteBuffer.java:1107)
at java.base/java.nio.MappedByteBuffer.limit(MappedByteBuffer.java:235)
at java.base/java.nio.MappedByteBuffer.limit(MappedByteBuffer.java:67)
at org.xerial.snappy.Snappy.compress(Snappy.java:156)
at org.apache.parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:78)
at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81)
at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)
at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:167)
at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:168)
at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:59)
at org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:387)
at org.apache.parquet.column.impl.ColumnWriteStoreBase.flush(ColumnWriteStoreBase.java:186)
at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:29)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:185)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:124)
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319)
at org.apache.nifi.parquet.record.WriteParquetResult.close(WriteParquetResult.java:69)
at java.base/jdk.internal.reflect.GeneratedMethodAccessor983.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:254)
at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.access$100(StandardControllerServiceInvocationHandler.java:38)
at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:240)
at com.sun.proxy.$Proxy316.close(Unknown Source)
at org.apache.nifi.processors.standard.PartitionRecord.onTrigger(PartitionRecord.java:274)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1356)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246)
at org.apache.nifi.controller.scheduling.AbstractTimeBasedSchedulingAgent.lambda$doScheduleOnce$0(AbstractTimeBasedSchedulingAgent.java:59)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829) What version of Apache NiFi? Currently running on Apache NiFi open source 1.19.1 What version of Java? Currently running on openjdk version "11.0.17" 2022-10-18 LTS Have you tried using ConsumeKafkaRecord processor instead of ConsumeKafka --> MergeContent? No I did not, but for a good reason. The files coming out of Kafka require some "data manipulation" before using PartitionRecord, where I have defined the CSVReader and the ParquetRecordSetWriter. If I were to use ConsumeKafkaRecord, I would have to define a CSV Reader and the Parquet(or CSV)RecordSetWriter and the result will be very bad, as the data is not formatted as per the required schema. I will give it a try with ConsumeKafkaRecord using CSVReader and CSVRecordSetWriter, to see if I still encounter the same issue. Do you have issue only when using the ParquetRecordSetWriter? Unfortunately I can only test with parquet as this file format is somehow mandatory for the current project. I will try to reproduce the flow with an AVRO format, to see if I can reproduce the error or not. How large are the FlowFiles coming out of the MergeContent processor? So directly out of Kafka, 1 FlowFile has around 600-700 rows, as text/plain and the size is 300-600KB. Using MergeContent, I combine a total of 100-150 files, resulting in a total of 50MB. Have you tried reducing the size of the Content being output from MergeContent processor? Yes, I have played with several combinations of sizes and most of them either resulted in the same error or in an "to many open files" error.
... View more