Member since
01-27-2023
229
Posts
73
Kudos Received
45
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1280 | 02-23-2024 01:14 AM | |
1589 | 01-26-2024 01:31 AM | |
1093 | 11-22-2023 12:28 AM | |
2670 | 11-22-2023 12:10 AM | |
2700 | 11-06-2023 12:44 AM |
04-10-2023
11:38 AM
Well to be honest it is pretty hard to debug something you have no clue about 😄 From your command I see that you are using the value for "referer" as a variable taken out of an attribute. I assume that the value is correct. Now, based on my experience with ExecuteStreamCommand, I would give each parameter after a semi colon.... something like: ( it is worth a shot, otherwise you will need to test each parameter until you find the faulty one) -X;POST;-H;referer:${Referer};-H;'Content-Type: application/json';-d;'{\"newTopics\": [{\"name\":\"testing123\",\"numPartitions\":3,\"replicationFactor\":3}], \"allTopicNames\":[\"testing123\"]}';--negotiate;-u;:;-b;/tmp/cookiejar.txt;-c;/tmp/cookiejar.txt
... View more
04-10-2023
10:08 AM
1 Kudo
Well I am not quite certain what SMM is and what sort of API Calls it accepts, but as far as it goes to the received answer, it could have an endless list of root causes. Error 415 - Unsupported Media type indicates that your server refused the accept that request, due to the payload format. This problem might be due to the content type or the content encoding, as well as the data directly. You could first of all try from a linux machine and see if that curl command truly works. Another option would be to use Postman to test the API command and see the results. Last but not least, try maybe using InvokeHTTP instead of ExecuteStreamCommand, as InvokeHTTP is more tailored for such actions. InvokeHTTP: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.20.0/org.apache.nifi.processors.standard.InvokeHTTP/index.html
... View more
04-10-2023
08:15 AM
1 Kudo
@udayabaski, as @steven-matison mentioned, the best solution would be to use the Distributed Map Cache. In order to implement it, you can follow these steps to initialize your Server: https://stackoverflow.com/questions/44590296/how-does-one-setup-a-distributed-map-cache-for-nifi/44591909#44591909 Now, regarding the way you want to incorporate it in your flow, I would suggest the following: - right after UpdateAttribute, you activate the PutDistributedMapCache. Within the processor, you will set the desired attribute at the property Cache Entry Identified. - Before InvokeHTTP, you add a FetchDistributedMapCache with which you extract the value for your key:value pair. All you have to do next is to extract your attribute to further use in your invokeHTTP. It is as simple as that and you do not need any fancy configurations 🙂
... View more
04-05-2023
12:15 AM
1 Kudo
Hi @saquibsk, Ok, undestood. What I can tell you is that what you are trying to achieve is not impossible but is not easy either. I believe in the power of a community but in the same time I believe that the scope of the community is to help you with solutions and advice to your problems and not to do the work for you 🙂 I assume that you already started a flow so lets start from there, what you developed, why it is not good and what you are missing from it. From my point of view, there are two options here: 1) You modify all of your Processors to write the Bulletin Level at INFO (or Debug) and afterwards, using an InvokeHTTP, you can access your Bulletin Board with the REST API and extract your information. This it not highly recommended as you will generate very large logs files. Besides that, your certificates must be generated accordingly, otherwise you will get some errors. 2) At each step in your flow, you write a message to LogMessage, which will save your data into nifi-app.log. Here you can define in LogMessage exactly what you want to write. Afterwards, you can create a separate flow, using a TailFile Processor and extract everything you want from your nifi-app.log File. Here you will have to extract only the information you require 🙂 Once you have extracted your data, either from your Bulletin Board or from your LogFile, you can build the SQL Statement for inserting the data into the DB.
... View more
04-04-2023
06:43 AM
1 Kudo
hi @noncitizen, I have tested the following configuration on my local machine and it seems to be working fine. You can give it a try and let me know if it works: Using GenerateFlowFile, I created a FlowFile having the attribute "absolute.path" defined as "/home/test/testFolder/". Now, you already have a flow which generates you all these information so you can skip this step. In ExecuteStreamCommand, I have defined the following: Command Path = bash Command Arguments = -c;mv ${absolute.path:append('*.log')} ${absolute.path:append('logfiles')} The result is that all the files with the extension .log from within my folder /home/test/testFolder/ have been copied into /home/test/testFolder/logfiles. I need to mention though the following: - the absolute path should end with "/", otherwise, using your expression you will have something like /path/input*.log.
... View more
04-04-2023
06:17 AM
hi @saquibsk, What are you trying to achieve with this post, to be more precisely? You want to learn how to generate something in NiFi or you want somebody to help you write an SQL Query in your Data Warehouse? Please be so kind and provide some more details with regards to your problem.
... View more
03-31-2023
02:27 AM
Hi , Thank you for your assistance with this matter. The answers to your questions is as follows: Is that complete stack trace from the nifi-app.log? No, the complete stack trace is the following one: 2023-03-29 10:02:21,002 ERROR [Timer-Driven Process Thread-24] o.a.n.p.standard.PartitionRecord PartitionRecord[id=3be1c42e-5fa9-3144-3365-f568bb616028] Processing halted: yielding [1 sec]
java.lang.IllegalArgumentException: newLimit > capacity: (92 > 83)
at java.base/java.nio.Buffer.createLimitException(Buffer.java:372)
at java.base/java.nio.Buffer.limit(Buffer.java:346)
at java.base/java.nio.ByteBuffer.limit(ByteBuffer.java:1107)
at java.base/java.nio.MappedByteBuffer.limit(MappedByteBuffer.java:235)
at java.base/java.nio.MappedByteBuffer.limit(MappedByteBuffer.java:67)
at org.xerial.snappy.Snappy.compress(Snappy.java:156)
at org.apache.parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:78)
at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81)
at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)
at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:167)
at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:168)
at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:59)
at org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:387)
at org.apache.parquet.column.impl.ColumnWriteStoreBase.flush(ColumnWriteStoreBase.java:186)
at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:29)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:185)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:124)
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319)
at org.apache.nifi.parquet.record.WriteParquetResult.close(WriteParquetResult.java:69)
at java.base/jdk.internal.reflect.GeneratedMethodAccessor983.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:254)
at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.access$100(StandardControllerServiceInvocationHandler.java:38)
at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:240)
at com.sun.proxy.$Proxy316.close(Unknown Source)
at org.apache.nifi.processors.standard.PartitionRecord.onTrigger(PartitionRecord.java:274)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1356)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246)
at org.apache.nifi.controller.scheduling.AbstractTimeBasedSchedulingAgent.lambda$doScheduleOnce$0(AbstractTimeBasedSchedulingAgent.java:59)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829) What version of Apache NiFi? Currently running on Apache NiFi open source 1.19.1 What version of Java? Currently running on openjdk version "11.0.17" 2022-10-18 LTS Have you tried using ConsumeKafkaRecord processor instead of ConsumeKafka --> MergeContent? No I did not, but for a good reason. The files coming out of Kafka require some "data manipulation" before using PartitionRecord, where I have defined the CSVReader and the ParquetRecordSetWriter. If I were to use ConsumeKafkaRecord, I would have to define a CSV Reader and the Parquet(or CSV)RecordSetWriter and the result will be very bad, as the data is not formatted as per the required schema. I will give it a try with ConsumeKafkaRecord using CSVReader and CSVRecordSetWriter, to see if I still encounter the same issue. Do you have issue only when using the ParquetRecordSetWriter? Unfortunately I can only test with parquet as this file format is somehow mandatory for the current project. I will try to reproduce the flow with an AVRO format, to see if I can reproduce the error or not. How large are the FlowFiles coming out of the MergeContent processor? So directly out of Kafka, 1 FlowFile has around 600-700 rows, as text/plain and the size is 300-600KB. Using MergeContent, I combine a total of 100-150 files, resulting in a total of 50MB. Have you tried reducing the size of the Content being output from MergeContent processor? Yes, I have played with several combinations of sizes and most of them either resulted in the same error or in an "to many open files" error.
... View more
03-30-2023
08:23 AM
hi @Sivagopal, As far as I know, what you are asking is not possible directly from within NiFi. To remove the node from the cluster you have two options: - the manual remove, done from the Menu - Cluster - Nodes --> disconnect Node. ( see https://docs.cloudera.com/HDPDocuments/HDF3/HDF-3.5.1/nifi-configuration-best-practices/content/disconnect-nodes.html) - the NiFi's REST API. For that you will need to use the Controller commands, select the DELETE option and perform the call --> /controller/cluster/nodes/{id}. (see https://nifi.apache.org/docs/nifi-docs/rest-api/index.html --> Controller Section / DELETE) As for the update in authorizers.xml, you will need to write a script which will perform this action, while NiFi is down. As far as I know, these XML Files get initialized when NiFi starts so even though you perform some actions while NiFi is up and running, these changes will not get taken into considerations. Take note that if you are using the embedded zookeeper, you will have to modify the zookeeper.properties file as well. In addition to these files, you have to take a look to state-management.xml as well.
... View more
03-30-2023
06:04 AM
If I may intervene here for a second, I would say that I totally agree with what @MattWho said but in the same time, I think that what you, @wffger2, are trying to achieve might be partially possible.You would still need everything what Matt said, but if you just want to double check and confirm your IDs within your definition flow with the ones from your canvas, you might achieve this goal. So i tested with two versions right now: 1.15.3 and 1.19.1. Now, as you can see in the first screenshot, when I downloaded the flow definition for my process group, I have at line 14 the tag "identifier", which has a different ID as the one from the NiFi Canvas. This was done on NiFi 1.15.3 If I do the same exact thing on NiFi 1.19.1, in the exported definition flow, at line 16 I have a new tag, named instanceIdentifier, which contains the exact ID of the processor, from your NiFi Canvas. In this case, if you just want to have your ID compared to the one from your NiFi Canvas, an upgrade to a newer version might be your solution. But, pointing this again, if you want the connection between the environment, you require everything what Matt just told you.
... View more
03-30-2023
05:00 AM
So the question with you entering the correct password is still valid, even though it is a dumb question. 🙂 Nevertheless, would it be possible for you to add the nifi.properties config file here? You can replace the password properties with something else.
... View more