Created on 09-28-2017 04:11 PM - edited 08-17-2019 10:59 AM
This is part 1 of a series of articles on Data Enrichment with NiFi:
Enrichment is a common use case when working on data ingestion or flow management. Enrichment is getting data from external source (database, file, API, etc) to add more details, context or information to data being ingested. It's common that data contains references (ex. IDs) rather than the actual information. These references can be used to query another data source (known as reference table in the relational world) to get other attributes of an entity (location, name, etc). Often, the enrichment is done in batch using the join operation. However, doing the enrichment on data streams in realtime is more interesting.
Enrichment in previous versions of NiFi was not natively supported. There were few workarounds to do it but they are not performant neither integrated. This is because NiFi is a data flow tool. In Data Flow logic, each flow file is an independent item that can be processed independently. Data enrichment involves correlating and joining two data sources at least which is not the sweet-spot of NiFi.
Starting from NiFi 1.3, it's possible to do data enrichment with new processors (LookupAttribute and LookupRecord) and new lookup services. This article explains how these new features can be used.
Let's take an example of real-time retail data ingestion coming from stores in different cities. Data is coming in JSON form with the following schema:
{ "created_at" : "Thu Sep 28 08:08:09 CEST 2017", "id_store" : 4, "event_type" : "store capacity", "id_transaction" : "1009331737896598289", "id_product" : 889, "value_product" : 45 }
This JSON tells us that we have 45 units of product 889 in store 4. Details of store 4 such as city, adresse, capacity, etc are available on another data source. Let's say that we want to do some geographical analysis and show in a realtime dashboard all stores that have products which will be out of stock soon. To do so, we need information on the store locations. This can be achieved through data enrichment.
Let's use a GenerateFlowFile with the below configuration to simulate data coming from 5 different store (1 to 5).
LookupRecord processor uses ServiceLookup services for data enrichment. You can see the lookup service as a Key-Value service that LookupRecord queries to get the value associated with a key. Currently, there are 6 available ServiceLookup:
PropertiesFileLookupService, SimpleCsvFileLookupService and IPLookupService are file-based lookup services. Your reference data should be sitting in a file (CSV,XML, etc) that NiFi will use to match a value to a key. ScriptedLookupService uses a script (Python, Ruby, Groovy, etc) to generate a value corresponding to a key. The SimpleKeyValueLookupService stores the key-value pairs in NiFi directly. It is very convenient to use if your reference data is not too big and don't evolve too much. This is the case in our scenario. Indeed, we don't add a new store each day. Other interesting lookup services are coming with the new versions of NiFi. These include MongoDB (NIFI-4345) and HBase (NIFI-4346).
To start the enrichment, add a LookupRecord processor to the flow and configure the following properties:
{ "namespace": "nifi", "name": "store_event", "type": "record", "fields": [ { "name": "created_at", "type":"string" }, { "name": "id_store", "type":"int" }, { "name": "event_type", "type":"string" }, { "name": "id_transaction", "type":"string" }, { "name": "id_product", "type":"int" }, { "name": "value_product", "type":"int" } ] }
This tells the LookupRecord processor to serialize received JSON data with the provided schema. We don't use any schema registry here. I won't go into details of record oriented processors or Schema registries. If you are not familiar with these concepts, start by reading this article here
{ "namespace": "nifi", "name": "store_event", "type": "record", "fields": [ { "name": "created_at", "type":"string" }, { "name": "id_store", "type":"int" }, { "name": "event_type", "type":"string" }, { "name": "id_transaction", "type":"string" }, { "name": "id_product", "type":"int" }, { "name": "value_product", "type":"int" }, { "name": "city", "type":"string" } ]}
Note that the writer schema is slightly different from the reader schema. Indeed, I added a field called 'city' that the processor will populate.
Finalize the configuration of the lookup processor. You need to add a custom property "key" and set it to the JSON path of the field that will be used for the lookup. Here, it's the Store ID so Key = /id_store. Result RecordPath tells the processor where to store the retrieved value. Finally, route to 'matched' or 'unmatched' strategy tells the processor what to do after the lookup.
Connect the LookupRecord to the next processor and start the flow. For demonstration, I'll be merging the encriched JSON event and pushing them to Solr to build my dashboard.
To verify that our enrichment is working, let's see the content of flow files using the data provenance feature.
First of all, you can notice that LookupRecord is adding an attribute called avro.schema. This is due to the write strategy that we are using. It's not useful here but just wanted to highlight this. By using a Schema Registry, we can add the name of the schema only.
Let's see the content of a flow file now. As you can see, a new field "city" is added to my JSON. Here the city is Toulouse since my Store ID is 4. It's worth noting that it's possible to write the file in other format (Avro for instance) to have enrichment and conversion with one step.
Data enrichment is a common use case for ingestion and flow processing. With Lookup processors and services, we can now easily enrich data in NiFi. The existing Lookup services are convenient if reference data doesn't change often. Indeed, reference data is manually added or use through a file. In future NiFi releases, new databases lookup services will be available (ex. MongoDB and Hbase).
Created on 11-02-2017 02:35 PM
Is there a template to review to walk thru this example?
Created on 11-02-2017 02:40 PM
Created on 11-02-2017 02:45 PM
Thanks Abdel!