Created on 08-25-2017 07:55 PM - edited 08-17-2019 11:26 AM
Why Hortonworks Schema Registry?
Both Apache NiFi and Hortonworks Streaming Analytics Manager (SAM) are extremely powerful tools in the data-in-motion space. To make them truly enterprise ready, Hortonworks DataFlow (HDF), which is a cutting edge data-in-motion platform, offers a set of enterprise foundational services. Apache Ambari is one example, which provides cluster management and provisioning capabilities; Apache Ranger is another example, which provides a centralized place for users to manage policies/permissions across HDF and HDP components. In HDF 3.0, Hortonworks Schema Registry (SR) became a GA offering as part of the foundational services.
SR offers a centralized place to store and manage reusable schemas. This becomes extremely handy when you have data sharing the same schema either generated by different producers, or being delivered to different consumers. Think of data coming from different ATM machines, different trucks, different producers, or being delivered to different consumers. Think of data coming from different ATM machines, different trucks, different IOT devices, instead of having to re-recreate schemas and attach schemas to your payload, you can simply attach a schema fingerprint, and retrieve the actual schema from your SR when necessary. From an operational efficiency perspective, your schema could be even heavier than your actual payload sometimes when dealing with smaller messages. This capability allows your organization to significantly reduce operational overhead.schemas and attach schemas to your payload, you can simply attach a schema fingerprint, and retrieve the actual schema from your SR when necessary. From an operational efficiency perspective, your schema could be even heavier than your actual payload sometimes when dealing with smaller messages. This capability allows your organization to significantly reduce operational overhead.
Hortonworks SR also supports having different schema versions and furthermore, allowing users to define compatibility between schema versions. This would effectively allow your producers and consumers to evolve at a different rate. Producers can therefore be focusing on producing data, and consumers can focus on consuming data, without having to worry about schema compatibility issue at the producer/consumer level.
Use Apache NiFi in conjunction with Hortonworks SR
There have already been a couple of great blog posts introducing this topic, such as Apache NiFi - Records and Schema Registries, Record-Oriented Data with NiFi and Real-Time SQL on Event Streams. In this article, user experience and use cases would be our primary focal points.
To enable real-time interaction with Hortonworks SR, and in fact not limited to only Hortonworks SR but any schema registries, a net new capability has been introduced in Apache NIFI 1.2/HDF 3.0: record based processing mechanism. More details can be found here: record based processors in Apache NiFi 1.2.
As a NiFi developer, I can easily design a processor leveraging the record based processing mechanism by using the Record Reader Interface and Record Writer Interface, shown in Fig.1.
Fig.1: record reader and writer interface
The reader/writer interfaces would allow my processor to reference a SR for retrieving schemas (see Fig.2) when: (a). I need to deserialize bytes into in-memory record objects using a reader (read and convert bytes with a specific format into schema-less records, and store those records in-memory), and (b). I need to serialize the in-moery record objects into output bytes using a writer (convert records into a user specified output format and write it back to disk)
Fig.2: referencing a schema from the Hortonworks SR
There have been tremendous interests and efforts in the Apache NiFi community to develop record based processors, primarily because by leveraging this new processing mechanism, you can avoid splitting a FlowFile content (e.g. a CSV file with 1,000 rows) into many FlowFiles (split the CSV to 1,000 individual FlowFiles) just for the sake of simple event processing, or making a routing decision. In our example, you can avoid creating 1,000 updates in the flow file repository, as well as in the provenance repository. This will bring your enterprise level dataflow performance to the next level.
Some example record based processors that are available today in Apache NiFi 1.2/HDF 3.0 (as shown in Fig.3):
Fig.3: record based processors available in Apache NiFi 1.2/HDF 3.0.
Use Case
Let’s walk thru an example use case to understand and visualize why NiFi and HWX SR are better to be used together. In this use case, assume an upstream application has been producing real-time events, specifically speed events and geo-location events emitted from a truck. A MiNiFi agent has been deployed on the truck, to collect the raw events, covert them to Avro formats, attach schema fingerprint to the payload (first 14 bytes of the payload) and send them to two kafka topics respectively.
We are running the following dataflow in NiFi (Fig.4) to
Fig.4: NiFi demo dataflow
Drill in to the 'Acquire Events' Process Group, you’ll see the following processors to consume Kafka events (Fig.5), both are record based processors.
Fig.5: two consume Kafka processors
Open config panel of the ’Truck Geo Kafka Stream’ processor (Fig.6), we are using an Avro record reader here (to consume Avro events from Kafka), and a CSV record writer on the other side.
Fig.6: ConsumeKafkaRecord, ’Truck Geo kafka Stream’ processor configuration
Open the Avro record reader config (Fig.7), the ‘HWX Content-Encoded Schema Reference’ schema access strategy indicates that this record reader will be looking for a schema fingerprint in the first 14 bytes of the payload, assuming that the upstream MiNiMi agent has added that info to the Avro payload before ingesting to the kafka topic. With the fingerprint, this record reader will be able to interact with an Hortonworks SR to retrieve a proper schema object, and deserialize the Avro bytes into in-memory record objects.
Fig.7: ‘Avro truck events’ record reader
Next, let’s open the CSV record writer config panel (Fig.8). The ‘HWX schema reference attributes’ schema write strategy tells me that, in addition to convert the in-memory records into CSV formats, this CSV record writer will store the schema fingerprint info in attributes, and attach those attributes to the FlowFiles produced by this processor.
Fig.8: CSV record writer configuration
Now let’s take a look at an example FlowFile produced by this ’Truck Geo kafka Stream’ processor. Content (in CSV format) is shown in Fig.9 while the attributes are shown in Fig.10. ’schema.identifier’, ’schema.protocol.version’ and ’schema.version’ are the schema fingerprint, and created by the CSV record writer.
Fig.9: FlowFile content
Fig.10: FlowFile attributes
Going back to the upper level flow, the ‘GeoCode Enrichment’ processor executes a script to map geo-locations to nearby addresses, and thus the FlowFile content becomes (Fig.11)
Fig.11: FlowFile content after the enrichment processor
The next processor in the flow pipeline, ‘UpdateAttribute’, will be attaching a kafka topic name (truck_events_avro) to each FlowFile, as well as specifying a schema name (truck_events_avro). Both are stored in attributes. See the processor configuration in Fig.12.
Fig.12: UpdateAttribute processor configuration
Now, for each FlowFile coming out of this UpdateAttribute processor, and ready to be sent to the downstream ‘PublishKafkaRecord’ processor, they all have ‘kafka.topic’ and ’schema.name’ attributes (Fig.13).
Fig.13: FlowFile attributes, after the UpdateAttribute processor
On the other side of the house, the truck speed events have gone thru a similar flow, and eventually reach to the ‘PublishKafkaRecord' processor at the same time. (processor config shown in Fig.14). The ‘PublishKafkaRecord’ processor will be able to ingest truck speed events and geo events to separate Kafka topics by dynamically evaluating the topic name using Expression Language (see the ’Topic Name’ configuration field)
Fig.14: PublishKafkaRecord processor configuration
Open the CSV record reader config (Fig.15), different than the previous record reader in the ‘ConsumeKafkaRecord’ processor, this reader uses ’Schema Name property’ as the schema access strategy, indicating that this reader will lookup a matching schema object in the Hortonworks SR, based on the schema name retrieved from incoming FlowFiles.
Fig.15: PublishKafkaRecord processor configuration
Notice that when schema name is used as the schema access strategy, even if there are multiple versions under the same schema name, only the latest schema version will be used, as explained in this HCC article. If you prefer to manually specify a schema version, you can chose to use ‘HWX schema reference attributes’ schema access strategy, and add an UpdateAttribute processor in your flow to update/modify the version number. Again, all the different options are explained in details in this HCC article.
Last but not least, let’s take a look at the Hortonworks SR UI (Fig.16). From an operational point of view, you can easily visualize all the available schema names, different versions under each schema name, and compatibility between schema versions, etc.
Fig.16: Hortonworks schema registry UI