Community Articles

Find and share helpful community-sourced technical articles.
Labels (2)
avatar
Contributor

Objective

The objective of this article is to present a workflow to capture and republish data in Kafka topics. Note that NiFi is NOT a Kafka Replication/Backup tool. This is just a workflow based approach to capture kafka data and publish it back to a fresh topic if necessary.

Although storage of Kafka data is not mandatory is several cases, it may be essential in cases such as

  • Archival – Dump Kafka Data in HDFS
  • Corrections/Inspections – In cases where an invalid event may disrupt a downstream system, to correct and restore the messages.

92671-1parentgroups.png

Start by creating two processor groups – One for the backup workflow, second for the restore workflow

Backup Workflow

Backup is a long running workflow which archives data to HDFS. A file in HDFS may contain a batch of events or a single event, based on how frequently messages are getting published to the source kafka topic. Hence, user does not have to stop/start this backup workflow frequently.

Restore Workflow

Restore a manually triggered job/workflow in Nifi and it has to be modeled based on the topic being restored.

Some considerations:

Restore data to a temporary topic. A restore to original topic may duplicate the data in the original topic.

If you want to restore data to original topic, do that only after cleaning the original topic (re-create the original topic)

Stop backup workflow which re-publishing to original topic: To avoid duplication of backed up data in HDFS by Nifi. An alternate approach would be to move or rename the HDFS backup directory for the topic.

Steps:-

  • Stop Backup KafkaConsumer processor.
  • Edit Restore Workflow - ListHDFS processor - to update the HDFS directory to the topic which needs to be restored.
  • Edit Restore Workflow - PublishKafka processor - to update the topic which needs to be restored.
  • Delete and Recreate the restore topic (if it already exists)
  • Start the Restore Workflow.
  • After the restore is complete and verified -
  • Stop the restore workflow
  • Move the backup directory (which has the original backup files)
  • Start the backup process (this should take all the data to a new directory to avoid duplication)

Backup Workflow

Some Technical Consideration for Backup Workflow:-

1.Backup location is HDFS

2.Kafka Key is to be saved along with the Kafka Payload

3.Message Metadata like, partition info, message offset should be stored along with the message

4.Messages format is either text or json

92672-2backup.png

Variables Used in the Backup Workflow

The following variables have been used the following processors to avoid hardcoding values.

kafka.brokers

kafka.kerberos-service-name

kerberos.principal

kerberos.keytab

hadoop.configuration.resources

Destination Directory: /edl_data/backup/kafka/${kafka.topic}


Data Format

Each file thus created may have a batch of Kafka messages. Each message would have the following format. Note that in case there is a missing key or payload, that part would just be left empty.

[topicname-partition-offset]<@>[message key] <@>[message payload]

Example:-

topic1-0-231<@> {"ID":"535"}<@> {"vwcustomer":{"id":"13631918","name":"Mikel","age":51,"date":"2018-10-04T15:16:06Z"}}

ConsumeKafka

92673-3consumekafkai.png

This processor has a list of topic(s), whose messages should be consumed and eventually backed up in HDFS.

Topic Names: Can have a list of topics that would be consumed by this processor.

GroupID: the nifi-kafka-consumer group id. if you want to consume all the messages, please reset this to a new consumer group for a new backup

Max Poll Records: Why is this set to 1? To get the key for each Kafka message. If we poll a batch of kafka messages, the message key is lost and not stored as an attribute for the flow file. In order to save the kafka key, max poll records should be set to 1. This way each message is sent to a flow file having a kafka.key attribute.

UpdateAttribute

92674-4updateattributei.png

This processor captures the kafka-metadata that would be used later to store this information along with the message key and value.

ReplaceText

92675-5replacetext.png

This processor is used to append the kafka.topic.metadata and the kafka.key to the beginning of the message.

Note that the delimiter used is separate metadata, key and value is "<@>". If this delimiter is updated, make sure to update the same in the ExtractText processor of the Restore Workflow.

MergeContent

92676-6mergecontent.png

If we do NOT use this processor, then each kafka message (metadata<@>key<@>value)- would be written to a separate HDFS file. Instead, we want to merge multiple kafka messages to a given file in HDFS, if possible. We use MergeContent processor to this. Note that we are merging content based on the attribute kafka.topic; hence messages with same topic should end up together.

For information on how batching would occur and other properties, Check: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.6.0/org.apache...

PutHDFS

92677-7puthdfs-nifi.png

Eventually, we backup the files to HDFS. Note that if the kafka.topic destination directory does not exist, PutHDFS will create it.

Restore Kafka Messages

Some Technical Consideration for Restore Workflow:-

1.Restore Messages from HDFS backup location to Kafka Topic

2.Restore Key and payload

3.Order of restored messages: An attempt has been made to preserve order while restoring kafka messages. Kafka maintains order only on a partition. Hence, this Restore workflow is for topics with 1 partition.

4.If order of restored messages is not important, than this workflow can be made significantly simpler.

5.As per the backup workflow, this is for messages in text or json format.

92678-8restore.png

ENSURE THAT BACKUP IS STOPPED BEFORE RESTORE IS STARTED. This prevents a backup/restore loop thus duplicating the messages.

ListHDFS and FetchHDFS

These processors fetch the hdfs files from the configured directory location in ListHDFS. The files will not be deleted in the source directory.

92679-9gethdfs.png

SplitText and ExtractText

These processors are used to extract the Kafka offset from the metadata section stored in the backed up hdfs files.

The offset attribute extracted is used to sequence the messages using “priority” attribute and “PriorityAttributePrioritizer”

92680-10extracttext-nifi.png

Update Attribute, EnforceOrder and MergeContent

Update Attribute creates a new 'priority' attribute in every flowfile (kafka message), and assigns the offset value to it.

92681-10-2extracttext-nifi.png

This 'priority' attribute is used in the following relationships to prioritize processing of the messages.

92682-10-3updateattribute.png

Enforce Order is used to order flow files based on the attribute "kafka.offset".

92683-11enforceorder.png

The wait timeout is set to 10 seconds. This means that if there is a missing offset (order attribute) in the sequence, the processor will wait for a 10 seconds window in which time, if it gets the event, it continues to process and take the message to success relationship. If event does not arrive within the window, the message will be taken to overtook relationship.

Refer:-

https://issues.apache.org/jira/browse/NIFI-3414

https://gist.github.com/ijokarumawak/88fc30a2300845b3c27a79113fc72d41

92684-12enforceorder.png

MergeContent batches the messages back into 1 flow file.

92685-13mergecontent.png

ExtractText, UpdateAttribute, PublishKafka

Finally, we extract the metadata, key and value. Publish kafka key and value using PublishKafka processor.

Note, that kafka topic names should be updated in PublishKafka processor.

92686-14extracttext.png

We use UpdateAttribute processor yet again to order messages before publishing them.

92688-15updateattributei.png

92687-15publish-kafka.png

7,915 Views