Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Rising Star

A quick glance at NiFi’s 252+ processors shows that it can solve a wide array of use cases out of the box. What is not immediately obvious is the flexibility that its attributes and expression language can provide. This allows it to quickly, easily, and efficiently solve complex use cases that would require significant customization to solve in other solutions.

For example, sending all of the incoming data to both Kafka and HDFS while sending 10% to a dev environment and a portion to a partner system based on the content of the data (e.g. CustomerName=ABC). These more complex routing scenarios are easily accommodated using UpdateAttribute, RouteOnAttribute, and RouteOnContent.

Another example of NiFi’s flexibility is the ability to multiplex data flows. In traditional ETL systems, the schema is tightly coupled to the data as it moves between systems, because transformations occur in transit. In more modern ELT scenarios, the data is often loaded into the destination with minimal transformations before the complex transformation step is kicked off. This has many advantages and allows NiFi to focus on the EL portion of the flow.

When focused on EL, there is far less of a need for the movement engine to be schema aware since it is general focused on simple routing, filtering, format translation, and concatenation. One common scenario is when loading data from many Kafka topics into their respective HDFS directories and/or Hive tables with only simple transformations. In traditional systems, this would require one flow per topic, but by parameterizing flows, one flow can be used for all topics.

In the image below you can see the configurations and attributes that make this possible. The ConsumeKafka processor can use a list of topics or a regular expression to consume from many topics at once. Each FlowFile (e.g. batch of Kafka messages) has an attribute added called "kafka.topic" to identify its source topic.

Next, in order to load streaming data into HDFS or Hive, it is recommended to use MergeContent to combine records into large files (e.g. every 1GB or every 15 minutes). In MergeContent, setting the “correlation attribute” configuration to “kafka.topic” ensures that only records from the same kafka topic are combined (similar to a group-by clause). After the files are merged, the “directory” configuration in HDFS can be parameterized (e.g. /myDir/${kafka.topic}) in order to load the data into the correct directory based on the kafka topic name.

72819-zlovu.png

Note that this diagram includes a retry and notify on failure process group. This type of solution is highly recommended for production flows. More information can be found here.

This example could easily be extended to include file format translation (e.g. ConverAvroToORC), filtering (e.g. RouteOnContent), kafka-topic to HDFS-directory mapping (e.g. UpdateAttribute). It can even trigger downstream processing (e.g. ExecuteSparkInteractive, PutHiveQL, ExecuteStreamCommand, etc.) or periodically update metrics and logging solutions such as Graphite, Druid, or Solr. Of course, this solution also applies to many more data stores than just Kafka and HDFS.

Overall, parameterizing flows in NiFi for multiplexing can reduce complexity for EL use cases and simplify administration. This design is straightforward to implement and uses core NiFi features. It is also easily extended to a variety of use cases.

2,401 Views