Member since
11-22-2017
10
Posts
0
Kudos Received
0
Solutions
03-19-2018
02:32 AM
I just re-run above case again. This time, I found that for those duplicated topics., exception is found in Kafka logs: ERROR [Replica Manager on Broker 0]: Error processing append operation on partition MY_TOPIC-0 (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is no longer valid. There is probably another producer with a newer epoch. 0 (request epoch), 1 (server epoch)
Do I configure anything wrong in my Kafka server?
... View more
03-16-2018
01:19 PM
The above case is run on a standalone nifi. I can confirm that it is not twice the amount, as I set failure queue to be 'auto terminated'
... View more
03-16-2018
03:09 AM
@Bryan Bende My flow is simple: ListFile > FetchFile > PublishKafka_0_11 (with transaction=true, message demarcator=\n) To test whether it is duplicate, I : 1. Check offset of the topic bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:49092 --topic MY_TOPIC --time -1 2. Run kafka-console-consumer.sh with param "--from-beginning" and compare the result with the source file. My source file contains 9.26 million rows. Using method 1, I got offset=18460643, far more than row count. Using method 2, I got ~18million messages consumed. p.s. The topic is newly created when running the flow. There's no error occurred on the flow that I found duplicate. But in general, I occasionally encounter ProducerFencedException from kafka when using the PublishKafka producer. FYI, from consumer side, if I use consumeKafka processor with 'Honor Transaction=true', I cannot get any flowfile. Only if 'Honor Transaction=false', I can get that 18 million duplicated messages.
... View more
03-15-2018
08:12 AM
I would like to achieve 'exactly-once' semantics i.e. No duplicate of messages when writing records into Kafka cluster. As far as I know, Kafka uses an 'idempotent' concept to achieve 'exactly-once' since version 0.11. I have tried to enable 'Transaction' option in PublishKafka processor, however message duplication still happens. So, I would like to know 1. Does PublishKafka_0_11 / PublishKafka_1_0 support exactly-once semantics? 2. Besides of de-duplication on consumer side, are there any way to remove duplicates msgs?
... View more
Labels:
- Labels:
-
Apache Kafka
-
Apache NiFi
03-12-2018
07:03 AM
Thanks all. One more question: How does PutParquet Processor connect to HDFS? By WebHDFS , HttpFS or Native ?
... View more
03-09-2018
06:49 AM
I have a special case which is required to write out Parquet file into local directory. I tried not to enter 'Hadoop Configuration Resources' but it will instantly throw exceptions. If I entered "Hadoop Configuration Resources", then the output path is actually points to path inside the HDFS cluster. So, I would like to know, Does PutParquet processor support writing to Local Folder?
... View more
Labels:
- Labels:
-
Apache Hadoop
-
Apache NiFi
02-28-2018
07:20 AM
I have a flow which include ListFile > FetchFile > (other processors) , which works fine in non-cluster mode. Now I want to switch to cluster mode; I know that I can set ListFile to be run on primary node , then use Remote Process Group to FetchFile and do other processing. But my files only exists on node A, and node B have no access to that folder. Also, zookeeper select primary node automatically, it is probably to throw exception when Primary node is node B. So, are there any way to somehow 'force' ListFile processor always list file on node A?
... View more
Labels:
- Labels:
-
Apache NiFi
11-22-2017
08:41 AM
@Shu Thanks for the reply, however using UpdateAttribute processor will have multiple parquet files output. What I want to achieve is: N files under directory > 1 .parquet file
... View more
11-22-2017
07:07 AM
I have a process group as follow: ListFile > FetchFile > mergeContent >convertCSVtoAvro > PutParquet On 1st execution, everything works fine, which all 15 files in the directory are written into parquet. After that, if new file is added in the directory, it will be ingested, but the original parquet file was overwritten. What I want is `append` contents of the new file into the parquet file, but not 'overwrite' it. I would like to know are there any approach/ processor to resolve this issue?
... View more
Labels:
- Labels:
-
Apache NiFi