Created 10-04-2016 02:53 PM
I have created 4 node Nifi cluster and now trying to test performance of following data flow -
Read data from Kafka topic and write on HDFS.
1. I have two broker on my hadoop cluster.
2. I have created a kafka topic with 3 partition and 1 replication factor.
3. I have written a Kafka processor which post 100k message of 3-4 byte in a single run.
4. I'm using 4 consumeKafka processors to read data from this topic with same group Id and later passing this data to 4 putHdfs processors which write data to 4 different location in HDFS.kafka-to-hdfs-flow.png
Idea behind this flow is to achieve parallel processing. but when i run this flow, it takes very long time to ingest data in HDFS.
Questions -
1. Is this a correct usecase?
2. What else i could try to achieve maximum performance? i've already gone through this post - https://community.hortonworks.com/articles/16120/how-do-i-distribute-data-across-a-nifi-cluster.html
3. My usecase is similar to Pulling data usecase described in above post. But as there is no NCM in Nifi 1.0. will it work the same way as described?
4. How can i verify if this flow has ingested all the messages successfully and how long it took to ingest those messages? is there any direct way to verify on Nifi UI?
Created 10-04-2016 04:43 PM
This blog has an explanation of how to scale ConsumeKafka across a cluster:
http://bryanbende.com/development/2016/09/15/apache-nifi-and-apache-kafka
If you have a 4 node NiFi cluster with 4 ConsumeKafka processors for the same topic and group, then you have 16 consumers for a single topic with 3 partitions, so only 3 out of 16 are actually doing anything.
If you are going to stick with 3 partitions, then you should have 1 ConumeKafka processor which means 4 in total. If you want to write to 4 different HDFS directories you can connect the success relationship of the 1 ConsumeKafka to 4 different PutHDFS processors.
if you wanted more parallelism you would have to increase the number of partitions.
Created 10-04-2016 04:43 PM
This blog has an explanation of how to scale ConsumeKafka across a cluster:
http://bryanbende.com/development/2016/09/15/apache-nifi-and-apache-kafka
If you have a 4 node NiFi cluster with 4 ConsumeKafka processors for the same topic and group, then you have 16 consumers for a single topic with 3 partitions, so only 3 out of 16 are actually doing anything.
If you are going to stick with 3 partitions, then you should have 1 ConumeKafka processor which means 4 in total. If you want to write to 4 different HDFS directories you can connect the success relationship of the 1 ConsumeKafka to 4 different PutHDFS processors.
if you wanted more parallelism you would have to increase the number of partitions.
Created 10-05-2016 11:31 AM
Thanks Bryan. This was really helpful. I have few questions -
> how can I measure performance of any flow.
I'm posting one million messages on Kafka topic(partition=8) later my nifi data flow(4 node cluster as described above) picks all the messages and store in Hdfs. I can verify the count as currently I'm sending one message/flowfile so there would be one million files in Hdfs.
> How/Where can I check the ingestion speed ? If there is no direct way then will it be a good idea to write a custom processor to capture start and end time of process?
Created 10-05-2016 02:26 PM
I think there are two different things, one is understanding the performance, and another is validating the data/correctness.
For performance, you should be able to get a good idea of the performance by looking at the various statistics in NiFI, there are stats on each processor, process groups, and from the global menu Summary page, they all show things like FlowFiles in/out and bytes in/out.
For validating the data, I don't know there is a great way to do this other than checking what ends up in HDFS. Most of the time a dataflow is a continuous stream of data and not really a discrete set where you know how many records.
One last thing to mention, do you really want to write 1 million small files to HDFS? You would probably get a lot better performance by using the batching capability on ConsumeKafka to write a couple of hundred, or even thousand, messages to a single flow file.
Created 10-05-2016 02:45 PM
That's true, writing 1M small messages on Hdfs doesn't make any sense. i was performing it just to cover a test case.
My second use case is to write data in one single flow file. I can use "Message Demarcator" property of consumeKafka for that. just wanted to understand what exactly would happen if i set it to 10000 or any number. does it write these many messages to a single flow file?
Created 10-05-2016 03:19 PM
Yup, if you specify the Message Demarcator it will write all of the messages received in a single poll from Kafka to a single FlowFile, so it would be a maximum of "Max Poll Records" per flow file, but could be less if it received less from Kafka.
Created 10-04-2016 04:53 PM
Let me make sure I understand your flow completely.
- You have 4 consumeKafka processors all reading from the same topic?
If this is your intent, You should have a single consume Kafka processor with the success relationship drawn off of it 4 times (one to each unique putHDFS processor). This cuts down on disk I/O since the consumed data is only written to the NiFi content repository once.
- Then you are trying to write that same data to 4 different HDFS endpoints?
With only 3 partitions on your Kafka, you can only have three consumers at a time. With 4 nodes in your cluster, one of the nodes at any given time will not be consuming any data. Optimally the number of partitions would be equal to or multiples of the number of nodes in your NiFi cluster. (For example with 4 partitions, you would have 4 nodes with the consumeKafka processor running with 1 concurrent task. With 8 partitions, you would have 4 nodes with the consumeKafka processor 2 concurrent tasks.)
Would be interesting to know more about your custom Kafka processor and how it differs from the "Max Poll Records" property in the existing consumeKafka processor.
Redistributing data across your cluster is only necessary when dealing with ingest type processors that are not cluster friendly such as getSFTP, listSFTP, GetFTP, etc.... With ConsumeKafka, the most optimized approach is as I described above.
Your question about how do I know if all files were consumed form topic... A Kafka topic is typically a living thing with more and more files written and removed from it. Not sure How NiFi would know when all files are consumed. NiFi will just continue to poll the topic for new files. If there is nothing new, NiFi gets nothing. It is the Kafka server that keeps track of what files were served up to a consumer, NiFi does not keep a listing itself.
Data is not passed to the success relationship until it is consumed completely successfully.
NiFi provenance could be used to track particular files or list all FlowFiles created by a consumeKafka processor, but you would need to know how many files were on the topic, NiFi will not know that.
Matt
Created 10-05-2016 11:46 AM
@mclark Thanks for your response. I'm posting 1 million messages of 4-5 bytes each on kafka topic in single run. my intend is to know the time taken to ingest these 1M messages in Hdfs.
1. One way i can think of is to do everything manually. Note the start time of process and check again when all messages are ingested in Hdfs .But this will not work if I set "Message Demarcator" property as there would be less number of files on Hdfs and i'll never know when nifi has stopped/completed writing.
2. Another way I can think of achieving this by writing a custom processor which will capture start and end time of process.
3. Is this correct approach to achieve this?
4. Is there any Bench-marking post/document available which can give some figures of NIFI ingestion speed.