Member since
12-14-2015
13
Posts
45
Kudos Received
4
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1225 | 03-09-2017 03:07 PM | |
2548 | 08-10-2016 03:24 AM | |
4703 | 08-03-2016 04:29 PM | |
2342 | 01-22-2016 11:47 PM |
12-14-2017
05:51 PM
2 Kudos
In Apache NIFI, flow configuration allows users to reference environmental specific or system variables using Expression Language (EL). Prior to NIFI 1.4 release, a file-based variable registry was available. However, the management of those variables can be challenging due to the lack of a user interface. Additionally, those variables in the file based variable registry cannot be updated at NIFI runtime, hence breaking the continuous processing whenever NIFI needs to be restarted to update variables. In Apache NIFI 1.4, a UI driven variable registry was made available. This would help simplify the configuration management of flows in flow migration use cases across environments (works great in conjunction with the Flow Registry that is being discussed/worked on in the community). More importantly, similar to how controller services are handled by the NIFI framework, variables are now persisted in the flow definition file (flow.xml). There is a ‘variable' section nested under each process group section as the variables are now scoped at PG level. From an end user perspective, you can, therefore, update/change/delete variables without having to restart your NIFI instance. The changes/updates would be effective immediately once successfully saved to the flow.xml file. User-experience-wise, you can open the variable window at a PG level (either right click on the canvas, or right-click a select PG). All the PG level variables can be referenced by any processors in that PG, and inherited by lower level PGs. Furthermore, variables defined in a descendant group would override the value in a parent group. (e.g. 'user=hliu' is defined at root level. In a lower level PG, if the key ‘user’ is not redefined, value ‘hliu’ would be used when the key is being referenced. otherwise, the value defined in the closest PG would be used instead of the root level value) From an authorization perspective, NIFI admin user does NOT need to define separate policies for the variables. The variable policies are aligned with component policies by default. For those who do not have access to view a PG, they cannot view the variables (or they simply cannot open the variable window). For those who have PG level view permission but no write permission, they cannot modify the variables.
... View more
Labels:
08-25-2017
07:55 PM
8 Kudos
Why Hortonworks Schema Registry? Both Apache NiFi and Hortonworks Streaming Analytics Manager (SAM) are extremely powerful tools in the data-in-motion space. To make them truly enterprise ready, Hortonworks DataFlow (HDF), which is a cutting edge data-in-motion platform, offers a set of enterprise foundational services. Apache Ambari is one example, which provides cluster management and provisioning capabilities; Apache Ranger is another example, which provides a centralized place for users to manage policies/permissions across HDF and HDP components. In HDF 3.0, Hortonworks Schema Registry (SR) became a GA offering as part of the foundational services. SR offers a centralized place to store and manage reusable schemas. This becomes extremely handy when you have data sharing the same schema either generated by different producers, or being delivered to different consumers. Think of data coming from different ATM machines, different trucks, different producers, or being delivered to different consumers. Think of data coming from different ATM machines, different trucks, different IOT devices, instead of having to re-recreate schemas and attach schemas to your payload, you can simply attach a schema fingerprint, and retrieve the actual schema from your SR when necessary. From an operational efficiency perspective, your schema could be even heavier than your actual payload sometimes when dealing with smaller messages. This capability allows your organization to significantly reduce operational overhead.schemas and attach schemas to your payload, you can simply attach a schema fingerprint, and retrieve the actual schema from your SR when necessary. From an operational efficiency perspective, your schema could be even heavier than your actual payload sometimes when dealing with smaller messages. This capability allows your organization to significantly reduce operational overhead. Hortonworks SR also supports having different schema versions and furthermore, allowing users to define compatibility between schema versions. This would effectively allow your producers and consumers to evolve at a different rate. Producers can therefore be focusing on producing data, and consumers can focus on consuming data, without having to worry about schema compatibility issue at the producer/consumer level. Use Apache NiFi in conjunction with Hortonworks SR There have already been a couple of great blog posts introducing this topic, such as Apache NiFi - Records and Schema Registries, Record-Oriented Data with NiFi and Real-Time SQL on Event Streams. In this article, user experience and use cases would be our primary focal points. To enable real-time interaction with Hortonworks SR, and in fact not limited to only Hortonworks SR but any schema registries, a net new capability has been introduced in Apache NIFI 1.2/HDF 3.0: record based processing mechanism. More details can be found here: record based processors in Apache NiFi 1.2. As a NiFi developer, I can easily design a processor leveraging the record based processing mechanism by using the Record Reader Interface and Record Writer Interface, shown in Fig.1. Fig.1: record reader and writer interface The reader/writer interfaces would allow my processor to reference a SR for retrieving schemas (see Fig.2) when: (a). I need to deserialize bytes into in-memory record objects using a reader (read and convert bytes with a specific format into schema-less records, and store those records in-memory), and (b). I need to serialize the in-moery record objects into output bytes using a writer (convert records into a user specified output format and write it back to disk) Fig.2: referencing a schema from the Hortonworks SR There have been tremendous interests and efforts in the Apache NiFi community to develop record based processors, primarily because by leveraging this new processing mechanism, you can avoid splitting a FlowFile content (e.g. a CSV file with 1,000 rows) into many FlowFiles (split the CSV to 1,000 individual FlowFiles) just for the sake of simple event processing, or making a routing decision. In our example, you can avoid creating 1,000 updates in the flow file repository, as well as in the provenance repository. This will bring your enterprise level dataflow performance to the next level. Some example record based processors that are available today in Apache NiFi 1.2/HDF 3.0 (as shown in Fig.3): ConvertRecord: enable generic format conversion. Convert CSV/Avro/JSON/etc. input formats to any output formats as long as there is a record writer available. LookupRecord: extract fields from the in-memory records, then lookup values via a LookUp service. results can be used either for routing purposes, or replacing the original values QueryRecord: define one or more queries to be executed against the in-memory records, leveraging Calcite libraries behind the scene. The query results can be mapped to different output relationships PublishKafkaRecord/ConsumeKafkaRecord: refactored kafka processors using the reader/writer interface Fig.3: record based processors available in Apache NiFi 1.2/HDF 3.0. Use Case Let’s walk thru an example use case to understand and visualize why NiFi and HWX SR are better to be used together. In this use case, assume an upstream application has been producing real-time events, specifically speed events and geo-location events emitted from a truck. A MiNiFi agent has been deployed on the truck, to collect the raw events, covert them to Avro formats, attach schema fingerprint to the payload (first 14 bytes of the payload) and send them to two kafka topics respectively. We are running the following dataflow in NiFi (Fig.4) to consume the serialized Avro events from Kafka Extract schema fingerprint from the Avro payload Retrieve matching schema objects from the Hortonworks SR to fully parse the Avro payload Convert Avro payload to CSV format for easy interpretation/enrichment Enrich the CSV events Convert the geo-events and speed events into Avro format, and ingest to separate Kafka topics for downstream processing Fig.4: NiFi demo dataflow Drill in to the 'Acquire Events' Process Group, you’ll see the following processors to consume Kafka events (Fig.5), both are record based processors. Fig.5: two consume Kafka processors Open config panel of the ’Truck Geo Kafka Stream’ processor (Fig.6), we are using an Avro record reader here (to consume Avro events from Kafka), and a CSV record writer on the other side. Fig.6: ConsumeKafkaRecord, ’Truck Geo kafka Stream’ processor configuration Open the Avro record reader config (Fig.7), the ‘HWX Content-Encoded Schema Reference’ schema access strategy indicates that this record reader will be looking for a schema fingerprint in the first 14 bytes of the payload, assuming that the upstream MiNiMi agent has added that info to the Avro payload before ingesting to the kafka topic. With the fingerprint, this record reader will be able to interact with an Hortonworks SR to retrieve a proper schema object, and deserialize the Avro bytes into in-memory record objects. Fig.7: ‘Avro truck events’ record reader Next, let’s open the CSV record writer config panel (Fig.8). The ‘HWX schema reference attributes’ schema write strategy tells me that, in addition to convert the in-memory records into CSV formats, this CSV record writer will store the schema fingerprint info in attributes, and attach those attributes to the FlowFiles produced by this processor. Fig.8: CSV record writer configuration Now let’s take a look at an example FlowFile produced by this ’Truck Geo kafka Stream’ processor. Content (in CSV format) is shown in Fig.9 while the attributes are shown in Fig.10. ’schema.identifier’, ’schema.protocol.version’ and ’schema.version’ are the schema fingerprint, and created by the CSV record writer. Fig.9: FlowFile content Fig.10: FlowFile attributes Going back to the upper level flow, the ‘GeoCode Enrichment’ processor executes a script to map geo-locations to nearby addresses, and thus the FlowFile content becomes (Fig.11) Fig.11: FlowFile content after the enrichment processor The next processor in the flow pipeline, ‘UpdateAttribute’, will be attaching a kafka topic name (truck_events_avro) to each FlowFile, as well as specifying a schema name (truck_events_avro). Both are stored in attributes. See the processor configuration in Fig.12. Fig.12: UpdateAttribute processor configuration Now, for each FlowFile coming out of this UpdateAttribute processor, and ready to be sent to the downstream ‘PublishKafkaRecord’ processor, they all have ‘kafka.topic’ and ’schema.name’ attributes (Fig.13). Fig.13: FlowFile attributes, after the UpdateAttribute processor On the other side of the house, the truck speed events have gone thru a similar flow, and eventually reach to the ‘PublishKafkaRecord' processor at the same time. (processor config shown in Fig.14). The ‘PublishKafkaRecord’ processor will be able to ingest truck speed events and geo events to separate Kafka topics by dynamically evaluating the topic name using Expression Language (see the ’Topic Name’ configuration field) Fig.14: PublishKafkaRecord processor configuration Open the CSV record reader config (Fig.15), different than the previous record reader in the ‘ConsumeKafkaRecord’ processor, this reader uses ’Schema Name property’ as the schema access strategy, indicating that this reader will lookup a matching schema object in the Hortonworks SR, based on the schema name retrieved from incoming FlowFiles. Fig.15: PublishKafkaRecord processor configuration Notice that when schema name is used as the schema access strategy, even if there are multiple versions under the same schema name, only the latest schema version will be used, as explained in this HCC article. If you prefer to manually specify a schema version, you can chose to use ‘HWX schema reference attributes’ schema access strategy, and add an UpdateAttribute processor in your flow to update/modify the version number. Again, all the different options are explained in details in this HCC article. Last but not least, let’s take a look at the Hortonworks SR UI (Fig.16). From an operational point of view, you can easily visualize all the available schema names, different versions under each schema name, and compatibility between schema versions, etc. Fig.16: Hortonworks schema registry UI
... View more
Labels:
07-31-2017
07:08 PM
2 Kudos
as of today, NIFI docker container is not supported on Kubernetes by Hortonworks. But we love all the multi-tenancy resource manager & execution engines equally and will potentially support them in the future.
... View more
05-10-2017
03:04 PM
7 Kudos
Prior to release 1.2, Apache NiFi supports the notion of pluggable "Schema Registry”, and ConvertXToY processors including but not limited to handling format conversions between CSV, JSON and Avro. Starting from 1.2, NiFi supports a new concept - an abstraction of “record”. Any FlowFile can contain one or more record objects of a given format and schema. NiFi then offers a series of controller services which provide a ‘RecordReader' interface and will have immediate support for Avro, JSON Tree, JSON Path, Grok, and CSV as provided source formats. in the same 1.2 release, NiFi will support a series of controller services which provide a “RecordWriter" interface and will have immediate support for Avro, JSON, Text, CSV. Each of the Reader and Writer objects will support interaction with another Controller Service which will be referencing a schema registry for retrieving schemas when necessary. An Avro schema registry and an HWX schema registry will be immediately available in Apache NiFi 1.2. To use the Avro schema registry, a user needs to provide the actual schema when configuring the “AvroSchemaRegistry” controller service, and the schema information will be written to the flow.xml file. Another alternative is to use the HWX schema registry, which provides a centralized place to store reusable schemas, as well as storing the compatibility info between different schema versions. To further understand the record based operation mechanism, think of a CSV file with 100 rows and 5 fields, a record orientated processor will be able to read the CSV bytes row by row (leveraging a Record Reader CS), deserialize the CSV bytes into 100 record objects, and store those record objects in memory. and the same processor can potentially leverage a Record Writer CS to serialize the in-memory record objects into the specified output format. On top of all that, a “QueryRecord” processor allows users to define free-form SQL queries to be executed against the in-memory record objects, and be able to map SQL results to one or more output relationships. Now let’s take a closer look at a use case below, in which I am trying to configure a record based ConsumeKafka processor. Drill into the “Avro Truck Events” record reader, you can see the following schema access strategies out of the box:
use ‘schema name’ property User needs to point the CS to a legit schema registry, and provide a referenceable schema name in the ’schema name’ property. Notice that, with this option, schema version is not configurable, meaning that the record reader CS will always automatically grab the latest schema version from the schema registry.
use ‘schema text’ property
with this option selected, user needs to manually type in the schema in the ’Schema Text’ property. Notice that this is not used in conjunction with a schema registry. In a future NiFi release, with ‘dependent properties’ enabled (https://issues.apache.org/jira/browse/NIFI-1121), the ’Schema Registry’ property can be hidden on the UI when this option is selected.
HWX Schema Reference Attributes
with this option selected, we assume that the identifier and version of the schema will be stored as attributes on any incoming flow files. To be exact, we are talking about the following attributes: 1. schema version attribute, 2. schema ID attribute and 3. version protocol attribute. For a processor that doesn’t take any incoming flow files, such as the ConsumeKafkaRecord in our use case, such option would not be very useful.
HWX Content-Encoded Schema Reference with this option selected, the record reader CS will look for the identifier and version of the schema in the actual payload, assuming the first 14 bytes of the payload indicate this information.
Use Embedded Avro Schema with this option selected, the record reader CS will look for the actual schema (not the fingerprint) in the payload, and this option only works for Avro format. Last but not least, when the actual schema of your payload is not matching with the schema retrieved from the schema registry, the following logic applies: if your payload contains more fields than what you specified in the schema, you will end up getting ’NULL’ value in those fields after the payload is being parsed. If your payload contains less fields than what you specified in the schema, all fields will be successfully parsed. If the data type of a field is not matching with the data type specified in the schema, the record reader will try to transform the field according to the schema, and will throw out an error message if failed to do so.
... View more
Labels:
03-09-2017
03:44 PM
5 Kudos
Considering data is often times being "sent into NIFI" from other processes, and "sent out of NIFI" to other processes, data safety is a legitimate concern. When data is being exchanged between processes, we have the following possible delivery semantics: 1. at most once delivery 2. at least once delivery 3. exactly once delivery First of all, speaking of the communication between multiple NIFI/MINIFi instances via site-to-site protocol, because of the 2 phase commit provided at the framework level, at least once delivery is guaranteed. And not only that, the chance of "exactly once" delivery is maximized. "Exactly once delivery" isn't guaranteed only if your system breaks between the two commits (literally between those two lines of code), or if there was any error in NIFI when committing the state of things. As an example, after data is being delivered to the receiving side, but before your sending side commits, network connection is somehow lost. In that case, sending side will try to send the data again when network connectivity is regained. you end up with duplications on the receiving side. The way to handle that is to leverage DetectDuplicate processor, put it right after the S2S port. In a very similar manner, when NIFI S2S libraries are embedded into other systems like Storm or Spark, given the nature of S2S communication protocol, at least once delivery is guaranteed. Now, at NIFI processor level, it is really a case by case scenario, depending on how data transport is handled by that processor. For example, for external systems that support 2-phase commit like Kafka, similar to S2S, we can guarantee at least once delivery while maximizing the chance of exactly once delivery, if the processor is properly written. If the other system doesn't support 2-phase commit like Syslog protocol, there is nothing we can do at NIFI framework level given that the external system is out of our control. NIFI can only guarantee at most once delivery.
... View more
Labels:
03-09-2017
03:07 PM
1 Kudo
running NIFI in a clustered mode on windows is not supported by Hortonworks, but there has been some community effort around this topic, here is an example: https://github.com/kohsuke/winsw
... View more
08-16-2016
11:01 PM
very good information, thanks for sharing Matt. Can't wait to see your next one for GenerateTableFetch:)
... View more
08-10-2016
03:24 AM
10 Kudos
HDF, as a data-in-motion platform, contains NIFI as one component, for data ingestion & flow management, and Storm as another component for complex event processing. NIFI and Storm are complementary to each other. NIFI is not yet another processing framework, but to deliver data to those frameworks such as Storm. Both NIFI and Storm can handle processing jobs, but we
tend to leverage NiFi for Simple Event Processing, whereas Storm for Complex Event Processing. In terms of Simple Event
Processing, usually you
are able to operate on a single piece of data by itself (or joining with an
Enrichment Dataset). Complex Event Processing typically normally refers to processing data from
multiple streams, think of a JOIN operation, or rolling window aggregation type of analytics. You can embed NIFI-external-modules into Storm for integration purposes, in order to leverage both components to achieve your ultimate streaming analytics goals. As an example, you can either embed a "NiFiSpout" into Storm to pull data from NIFI, or "NiFiBolt" to send data back to NIFI
... View more
08-03-2016
08:46 PM
ConsumeKafka is one of those processors whose states are stored and managed externally, outside of NIFI. Offsets are therefore maintained by kafka, instead of zookeeper. Having said that, there is a plan to enhance state management for these processors. For more information, please refer to this JIRA ticket: https://issues.apache.org/jira/browse/NIFI-2078
... View more
08-03-2016
04:29 PM
5 Kudos
To add some color to Bryan's response: Kafka (Get/Put) can work with
Apache Kafka 0.8.x and 0.9.x broker in non-kerberized mode, without SSL HDP 2.3 and 2.4 Kafka broker in both non-kerberized and kerberized modes, without SSL Kafka (Consume/Publish) can work with Apache Kafka 0.9.x and 0.10.x broker in both non-kerberized and kerberized modes, with and without SSL HDP 2.4 Kafka 0.9.x broker and HDP 2.5 Kafka 0.10.x broker in both non-kerberized and kerberized modes, with and without SSL
... View more