Community Articles
Find and share helpful community-sourced technical articles
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.
Labels (2)
New Contributor

Why Hortonworks Schema Registry?

Both Apache NiFi and Hortonworks Streaming Analytics Manager (SAM) are extremely powerful tools in the data-in-motion space. To make them truly enterprise ready, Hortonworks DataFlow (HDF), which is a cutting edge data-in-motion platform, offers a set of enterprise foundational services. Apache Ambari is one example, which provides cluster management and provisioning capabilities; Apache Ranger is another example, which provides a centralized place for users to manage policies/permissions across HDF and HDP components. In HDF 3.0, Hortonworks Schema Registry (SR) became a GA offering as part of the foundational services.


SR offers a centralized place to store and manage reusable schemas. This becomes extremely handy when you have data sharing the same schema either generated by different producers, or being delivered to different consumers. Think of data coming from different ATM machines, different trucks, different producers, or being delivered to different consumers. Think of data coming from different ATM machines, different trucks, different IOT devices, instead of having to re-recreate schemas and attach schemas to your payload, you can simply attach a schema fingerprint, and retrieve the actual schema from your SR when necessary. From an operational efficiency perspective, your schema could be even heavier than your actual payload sometimes when dealing with smaller messages. This capability allows your organization to significantly reduce operational overhead.schemas and attach schemas to your payload, you can simply attach a schema fingerprint, and retrieve the actual schema from your SR when necessary. From an operational efficiency perspective, your schema could be even heavier than your actual payload sometimes when dealing with smaller messages. This capability allows your organization to significantly reduce operational overhead.


Hortonworks SR also supports having different schema versions and furthermore, allowing users to define compatibility between schema versions. This would effectively allow your producers and consumers to evolve at a different rate. Producers can therefore be focusing on producing data, and consumers can focus on consuming data, without having to worry about schema compatibility issue at the producer/consumer level.


Use Apache NiFi in conjunction with Hortonworks SR

There have already been a couple of great blog posts introducing this topic, such as Apache NiFi - Records and Schema Registries, Record-Oriented Data with NiFi and Real-Time SQL on Event Streams. In this article, user experience and use cases would be our primary focal points.


To enable real-time interaction with Hortonworks SR, and in fact not limited to only Hortonworks SR but any schema registries, a net new capability has been introduced in Apache NIFI 1.2/HDF 3.0: record based processing mechanism. More details can be found here: record based processors in Apache NiFi 1.2.

As a NiFi developer, I can easily design a processor leveraging the record based processing mechanism by using the Record Reader Interface and Record Writer Interface, shown in Fig.1.

38410-fig1.png

Fig.1: record reader and writer interface

The reader/writer interfaces would allow my processor to reference a SR for retrieving schemas (see Fig.2) when: (a). I need to deserialize bytes into in-memory record objects using a reader (read and convert bytes with a specific format into schema-less records, and store those records in-memory), and (b). I need to serialize the in-moery record objects into output bytes using a writer (convert records into a user specified output format and write it back to disk)

38411-fig2.png

Fig.2: referencing a schema from the Hortonworks SR

There have been tremendous interests and efforts in the Apache NiFi community to develop record based processors, primarily because by leveraging this new processing mechanism, you can avoid splitting a FlowFile content (e.g. a CSV file with 1,000 rows) into many FlowFiles (split the CSV to 1,000 individual FlowFiles) just for the sake of simple event processing, or making a routing decision. In our example, you can avoid creating 1,000 updates in the flow file repository, as well as in the provenance repository. This will bring your enterprise level dataflow performance to the next level.

Some example record based processors that are available today in Apache NiFi 1.2/HDF 3.0 (as shown in Fig.3):

  • ConvertRecord: enable generic format conversion. Convert CSV/Avro/JSON/etc. input formats to any output formats as long as there is a record writer available.
  • LookupRecord: extract fields from the in-memory records, then lookup values via a LookUp service. results can be used either for routing purposes, or replacing the original values
  • QueryRecord: define one or more queries to be executed against the in-memory records, leveraging Calcite libraries behind the scene. The query results can be mapped to different output relationships
  • PublishKafkaRecord/ConsumeKafkaRecord: refactored kafka processors using the reader/writer interface

38413-fig3.png

Fig.3: record based processors available in Apache NiFi 1.2/HDF 3.0.

Use Case

Let’s walk thru an example use case to understand and visualize why NiFi and HWX SR are better to be used together. In this use case, assume an upstream application has been producing real-time events, specifically speed events and geo-location events emitted from a truck. A MiNiFi agent has been deployed on the truck, to collect the raw events, covert them to Avro formats, attach schema fingerprint to the payload (first 14 bytes of the payload) and send them to two kafka topics respectively.

We are running the following dataflow in NiFi (Fig.4) to

  • consume the serialized Avro events from Kafka
  • Extract schema fingerprint from the Avro payload
  • Retrieve matching schema objects from the Hortonworks SR to fully parse the Avro payload
  • Convert Avro payload to CSV format for easy interpretation/enrichment
  • Enrich the CSV events
  • Convert the geo-events and speed events into Avro format, and ingest to separate Kafka topics for downstream processing

38414-fig4.png

Fig.4: NiFi demo dataflow

Drill in to the 'Acquire Events' Process Group, you’ll see the following processors to consume Kafka events (Fig.5), both are record based processors.

38415-fig5.png

Fig.5: two consume Kafka processors

Open config panel of the ’Truck Geo Kafka Stream’ processor (Fig.6), we are using an Avro record reader here (to consume Avro events from Kafka), and a CSV record writer on the other side.

38417-fig6.png

Fig.6: ConsumeKafkaRecord, ’Truck Geo kafka Stream’ processor configuration

Open the Avro record reader config (Fig.7), the ‘HWX Content-Encoded Schema Reference’ schema access strategy indicates that this record reader will be looking for a schema fingerprint in the first 14 bytes of the payload, assuming that the upstream MiNiMi agent has added that info to the Avro payload before ingesting to the kafka topic. With the fingerprint, this record reader will be able to interact with an Hortonworks SR to retrieve a proper schema object, and deserialize the Avro bytes into in-memory record objects.

38418-fig7.png

Fig.7: ‘Avro truck events’ record reader

Next, let’s open the CSV record writer config panel (Fig.8). The ‘HWX schema reference attributes’ schema write strategy tells me that, in addition to convert the in-memory records into CSV formats, this CSV record writer will store the schema fingerprint info in attributes, and attach those attributes to the FlowFiles produced by this processor.

38420-fig8.png

Fig.8: CSV record writer configuration

Now let’s take a look at an example FlowFile produced by this ’Truck Geo kafka Stream’ processor. Content (in CSV format) is shown in Fig.9 while the attributes are shown in Fig.10. ’schema.identifier’, ’schema.protocol.version’ and ’schema.version’ are the schema fingerprint, and created by the CSV record writer.

38421-fig9.png

Fig.9: FlowFile content

38423-fig10.png

Fig.10: FlowFile attributes

Going back to the upper level flow, the ‘GeoCode Enrichment’ processor executes a script to map geo-locations to nearby addresses, and thus the FlowFile content becomes (Fig.11)

38424-fig11.png

Fig.11: FlowFile content after the enrichment processor

The next processor in the flow pipeline, ‘UpdateAttribute’, will be attaching a kafka topic name (truck_events_avro) to each FlowFile, as well as specifying a schema name (truck_events_avro). Both are stored in attributes. See the processor configuration in Fig.12.

38425-fig12.png

Fig.12: UpdateAttribute processor configuration

Now, for each FlowFile coming out of this UpdateAttribute processor, and ready to be sent to the downstream ‘PublishKafkaRecord’ processor, they all have ‘kafka.topic’ and ’schema.name’ attributes (Fig.13).

38426-fig13.png

Fig.13: FlowFile attributes, after the UpdateAttribute processor

On the other side of the house, the truck speed events have gone thru a similar flow, and eventually reach to the ‘PublishKafkaRecord' processor at the same time. (processor config shown in Fig.14). The ‘PublishKafkaRecord’ processor will be able to ingest truck speed events and geo events to separate Kafka topics by dynamically evaluating the topic name using Expression Language (see the ’Topic Name’ configuration field)

38427-fig14.png

Fig.14: PublishKafkaRecord processor configuration

Open the CSV record reader config (Fig.15), different than the previous record reader in the ‘ConsumeKafkaRecord’ processor, this reader uses ’Schema Name property’ as the schema access strategy, indicating that this reader will lookup a matching schema object in the Hortonworks SR, based on the schema name retrieved from incoming FlowFiles.

38428-fig15.png

Fig.15: PublishKafkaRecord processor configuration

Notice that when schema name is used as the schema access strategy, even if there are multiple versions under the same schema name, only the latest schema version will be used, as explained in this HCC article. If you prefer to manually specify a schema version, you can chose to use ‘HWX schema reference attributes’ schema access strategy, and add an UpdateAttribute processor in your flow to update/modify the version number. Again, all the different options are explained in details in this HCC article.

Last but not least, let’s take a look at the Hortonworks SR UI (Fig.16). From an operational point of view, you can easily visualize all the available schema names, different versions under each schema name, and compatibility between schema versions, etc.

38430-fig16.png

Fig.16: Hortonworks schema registry UI

5,474 Views
Don't have an account?
Coming from Hortonworks? Activate your account here
Version history
Revision #:
2 of 2
Last update:
‎08-17-2019 11:26 AM
Updated by:
 
Contributors
Top Kudoed Authors