Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Contributor

Prior to release 1.2, Apache NiFi supports the notion of pluggable "Schema Registry”, and ConvertXToY processors including but not limited to handling format conversions between CSV, JSON and Avro. Starting from 1.2, NiFi supports a new concept - an abstraction of “record”. Any FlowFile can contain one or more record objects of a given format and schema. NiFi then offers a series of controller services which provide a ‘RecordReader' interface and will have immediate support for Avro, JSON Tree, JSON Path, Grok, and CSV as provided source formats. in the same 1.2 release, NiFi will support a series of controller services which provide a “RecordWriter" interface and will have immediate support for Avro, JSON, Text, CSV.

Each of the Reader and Writer objects will support interaction with another Controller Service which will be referencing a schema registry for retrieving schemas when necessary. An Avro schema registry and an HWX schema registry will be immediately available in Apache NiFi 1.2. To use the Avro schema registry, a user needs to provide the actual schema when configuring the “AvroSchemaRegistry” controller service, and the schema information will be written to the flow.xml file. Another alternative is to use the HWX schema registry, which provides a centralized place to store reusable schemas, as well as storing the compatibility info between different schema versions.

To further understand the record based operation mechanism, think of a CSV file with 100 rows and 5 fields, a record orientated processor will be able to read the CSV bytes row by row (leveraging a Record Reader CS), deserialize the CSV bytes into 100 record objects, and store those record objects in memory. and the same processor can potentially leverage a Record Writer CS to serialize the in-memory record objects into the specified output format. On top of all that, a “QueryRecord” processor allows users to define free-form SQL queries to be executed against the in-memory record objects, and be able to map SQL results to one or more output relationships.

Now let’s take a closer look at a use case below, in which I am trying to configure a record based ConsumeKafka processor.

15286-screen-shot-2017-05-09-at-100347-am.png

Drill into the “Avro Truck Events” record reader, you can see the following schema access strategies out of the box:

15287-screen-shot-2017-05-09-at-100439-am.png

use ‘schema name’ property

User needs to point the CS to a legit schema registry, and provide a referenceable schema name in the ’schema name’ property. Notice that, with this option, schema version is not configurable, meaning that the record reader CS will always automatically grab the latest schema version from the schema registry.
use ‘schema text’ property
with this option selected, user needs to manually type in the schema in the ’Schema Text’ property. Notice that this is not used in conjunction with a schema registry. In a future NiFi release, with ‘dependent properties’ enabled (https://issues.apache.org/jira/browse/NIFI-1121), the ’Schema Registry’ property can be hidden on the UI when this option is selected.
HWX Schema Reference Attributes
with this option selected, we assume that the identifier and version of the schema will be stored as attributes on any incoming flow files. To be exact, we are talking about the following attributes: 1. schema version attribute, 2. schema ID attribute and 3. version protocol attribute. For a processor that doesn’t take any incoming flow files, such as the ConsumeKafkaRecord in our use case, such option would not be very useful.
HWX Content-Encoded Schema Reference
with this option selected, the record reader CS will look for the identifier and version of the schema in the actual payload, assuming the first 14 bytes of the payload indicate this information.
Use Embedded Avro Schema with this option selected, the record reader CS will look for the actual schema (not the fingerprint) in the payload, and this option only works for Avro format. Last but not least, when the actual schema of your payload is not matching with the schema retrieved from the schema registry, the following logic applies:
  1. if your payload contains more fields than what you specified in the schema, you will end up getting ’NULL’ value in those fields after the payload is being parsed.
  2. If your payload contains less fields than what you specified in the schema, all fields will be successfully parsed.
  3. If the data type of a field is not matching with the data type specified in the schema, the record reader will try to transform the field according to the schema, and will throw out an error message if failed to do so.
13,340 Views