Created on 10-04-2018 04:46 PM - edited 08-17-2019 06:16 AM
Objective
The objective of this article is to present a workflow to capture and republish data in Kafka topics. Note that NiFi is NOT a Kafka Replication/Backup tool. This is just a workflow based approach to capture kafka data and publish it back to a fresh topic if necessary.
Although storage of Kafka data is not mandatory is several cases, it may be essential in cases such as
Start by creating two processor groups – One for the backup workflow, second for the restore workflow
Backup is a long running workflow which archives data to HDFS. A file in HDFS may contain a batch of events or a single event, based on how frequently messages are getting published to the source kafka topic. Hence, user does not have to stop/start this backup workflow frequently.
Restore a manually triggered job/workflow in Nifi and it has to be modeled based on the topic being restored.
Some considerations:
Restore data to a temporary topic. A restore to original topic may duplicate the data in the original topic.
If you want to restore data to original topic, do that only after cleaning the original topic (re-create the original topic)
Stop backup workflow which re-publishing to original topic: To avoid duplication of backed up data in HDFS by Nifi. An alternate approach would be to move or rename the HDFS backup directory for the topic.
Steps:-
Some Technical Consideration for Backup Workflow:-
1.Backup location is HDFS
2.Kafka Key is to be saved along with the Kafka Payload
3.Message Metadata like, partition info, message offset should be stored along with the message
4.Messages format is either text or json
The following variables have been used the following processors to avoid hardcoding values.
kafka.brokers
kafka.kerberos-service-name
kerberos.principal
kerberos.keytab
hadoop.configuration.resources
Destination Directory: /edl_data/backup/kafka/${kafka.topic}
Each file thus created may have a batch of Kafka messages. Each message would have the following format. Note that in case there is a missing key or payload, that part would just be left empty.
[topicname-partition-offset]<@>[message key] <@>[message payload]
Example:-
topic1-0-231<@> {"ID":"535"}<@> {"vwcustomer":{"id":"13631918","name":"Mikel","age":51,"date":"2018-10-04T15:16:06Z"}}
This processor has a list of topic(s), whose messages should be consumed and eventually backed up in HDFS.
Topic Names: Can have a list of topics that would be consumed by this processor.
GroupID: the nifi-kafka-consumer group id. if you want to consume all the messages, please reset this to a new consumer group for a new backup
Max Poll Records: Why is this set to 1? To get the key for each Kafka message. If we poll a batch of kafka messages, the message key is lost and not stored as an attribute for the flow file. In order to save the kafka key, max poll records should be set to 1. This way each message is sent to a flow file having a kafka.key attribute.
This processor captures the kafka-metadata that would be used later to store this information along with the message key and value.
This processor is used to append the kafka.topic.metadata and the kafka.key to the beginning of the message.
Note that the delimiter used is separate metadata, key and value is "<@>". If this delimiter is updated, make sure to update the same in the ExtractText processor of the Restore Workflow.
If we do NOT use this processor, then each kafka message (metadata<@>key<@>value)- would be written to a separate HDFS file. Instead, we want to merge multiple kafka messages to a given file in HDFS, if possible. We use MergeContent processor to this. Note that we are merging content based on the attribute kafka.topic; hence messages with same topic should end up together.
For information on how batching would occur and other properties, Check: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.6.0/org.apache...
Eventually, we backup the files to HDFS. Note that if the kafka.topic destination directory does not exist, PutHDFS will create it.
Some Technical Consideration for Restore Workflow:-
1.Restore Messages from HDFS backup location to Kafka Topic
2.Restore Key and payload
3.Order of restored messages: An attempt has been made to preserve order while restoring kafka messages. Kafka maintains order only on a partition. Hence, this Restore workflow is for topics with 1 partition.
4.If order of restored messages is not important, than this workflow can be made significantly simpler.
5.As per the backup workflow, this is for messages in text or json format.
ENSURE THAT BACKUP IS STOPPED BEFORE RESTORE IS STARTED. This prevents a backup/restore loop thus duplicating the messages.
These processors fetch the hdfs files from the configured directory location in ListHDFS. The files will not be deleted in the source directory.
These processors are used to extract the Kafka offset from the metadata section stored in the backed up hdfs files.
The offset attribute extracted is used to sequence the messages using “priority” attribute and “PriorityAttributePrioritizer”
Update Attribute creates a new 'priority' attribute in every flowfile (kafka message), and assigns the offset value to it.
This 'priority' attribute is used in the following relationships to prioritize processing of the messages.
Enforce Order is used to order flow files based on the attribute "kafka.offset".
The wait timeout is set to 10 seconds. This means that if there is a missing offset (order attribute) in the sequence, the processor will wait for a 10 seconds window in which time, if it gets the event, it continues to process and take the message to success relationship. If event does not arrive within the window, the message will be taken to overtook relationship.
Refer:-
https://issues.apache.org/jira/browse/NIFI-3414
https://gist.github.com/ijokarumawak/88fc30a2300845b3c27a79113fc72d41
MergeContent batches the messages back into 1 flow file.
Finally, we extract the metadata, key and value. Publish kafka key and value using PublishKafka processor.
Note, that kafka topic names should be updated in PublishKafka processor.
We use UpdateAttribute processor yet again to order messages before publishing them.