Member since
01-11-2016
355
Posts
232
Kudos Received
74
Solutions
05-01-2018
08:35 PM
4 Kudos
DataWorks Summit (DWS) is the industry’s Premier Big Data Community Event in Europe and the US. The last DWS was in Berlin, Germany, on April 18th and 19th. This was the 6th year occurence in Europe and this year there was over 1200 attendees from 51 different countries, 77 breakouts in 8 tracks, 8 Birds-of-a-Feather sessions and 7 Meetups. I had the opportunity to attend as a speaker this year, where I gave a talk on “Best practices and lessons learnt from Running Apache NiFi”. It was a joint talk with the Big Data squad team from Renault, a French car manufacturer. The presentation recording will be available on the DWS website. In the meantime, I’ll share with you the 3 key takeaways from our talk. NiFi is an accelerator for your Big Data projects If you worked on any data project, you already know how hard it is to get data into your platform to start “the real work”. This is particularly important in Big Data projects where companies aim to ingest a variety of data sources ranging from Databases, to files, to IoT data. Having NiFi as a single ingestion platform that gives you out-of-the-box tools to ingest several data sources in a secure and governed manner is a real differentiator. NiFi accelerates data availability in the data lake, and hence accelerates your Big Data projects and business value extraction. The following numbers from Renault projects are worth a thousands words. NiFi enables new use cases NiFi is not only an ingestion tool. It’s a data logistics platform. This means that NiFi enables easy collection, curation, analysis and action on any data anywhere (edge, cloud, data center) with built-in end-to-end security and provenance. This unique set of features makes NiFi the best choice for implementing new data centric use cases that require geographically distributed architectures and high levels of SLA (availability, security and performance). In our talk, two exciting use cases were shared: connected plants and packaging traceability. NiFi flow design is like software development When I pitch NiFi to my customers I can see them get excited quickly. They start brainstorming instantly and ask if NiFi can do this or that. In this situation, I usually fire a NiFi instance on my MAC and start dragging and dropping a few processors in NiFi to simulate their use case. This is a powerful feature that fosters interactions between team members in the room and gets us to very interesting business and technical discussions. When people see the power of NiFi and all what we can easily achieve in short a timeframe, a new set of questions arise (especially from the very few skeptics in the room :)). Can I automate this task? Can I monitor my data flows? Can I integrate NiFi flow design with my development process? Can I “industrialize” my use case?. All these questions are legitimate when we see how powerful and easy to use NiFi is. The good news is that “Yes” is the answer to all previous questions. However, it’s important to put in place the right process to avoid having a POC that becomes a production (who has never lived this situation?)
The way I like to answer these questions is to show how much NiFi flow design is like software development. When a developer wants to tackle a problem, he starts designing a solution by asking : ‘what’s the best way to implement this?’. The word best here integrates aspects like complexity, scalability, maintainability, etc. The same logic applies to NiFi flow design. You have several ways to implement your use case and they are not equivalent. Once a solution is found, you will use NiFi UI as your IDE to implement the solution. Your flow is a set of processors just like your code or your algorithm is a set of instructions. You have “if then else” statements with routing processor, you have “for” or “while” loops with update attributes and self-relations, you have mathematical and logical operators with processors and Expression Langage, etc. When you build your flow you divide it into process groups similar to functions you use when you organize your code. This makes your applications easier to understand, to maintain, and to debug. You use templates for repetitive things like you build and use libraries across your projects. From this main consideration, you can derive several best practices. Some of them are generic software development practices, and some of them are specific to NiFi as “a programming language”. I share some good principals to use in this following slide: Final thoughts NiFi is a powerful tool that gives you business and technical agility. To master its power, it is important to define and to enforce best practices. Lots of these best practices can be borrowed directly from software engineering. Others are specific to NiFi. We have shared some of these ideas in deck available on the DWS webpage. Some of the ideas explained in the presentation have been discussed by other NiFi enthusiasts such as the excellent “Monitoring NiFi Series” by Pierre[1]. Various Flow Development Lifecycle (FDLC) [2] topics have been also covered by folks like Dan and Tim for NiPyAPI[3][4], Bryan for flow registry [5] and Pierre for NiFi CLI [6]. Other topics like NiFi design patterns requires a dedicated post that I’ll address in the future. Article initially shared on https://medium.com/@abdelkrim.hadjidj/best-practices-for-using-apache-nifi-in-real-world-projects-3-takeaways-1fe6912101db
... View more
Labels:
12-09-2017
05:25 PM
Hi @mayki wogno I didn't test it but you should be able to do it. At least RecordReader support it : https://community.hortonworks.com/questions/113959/use-nifi-to-change-the-format-of-numeric-date-and.html
... View more
11-23-2017
10:55 AM
Hi @Andrew Chisholm Thank you for your feedback. I confirm that and I run into the same problem. I forgot to mention it when writing this blog. I created a Jira to track it : https://issues.apache.org/jira/browse/NIFI-4634
... View more
11-06-2017
02:02 PM
7 Kudos
Introduction
This is part 3 of a series of articles on Data Enrichment with NiFi:
Part 1: Data flow enrichment with LookupRecord and SimpleKV Lookup Service is available here
Part 2: Data flow enrichment with LookupAttribute and SimpleKV Lookup Service is available here
Part 3: Data flow enrichment with LookupRecord and MongoDB Lookup Service is available here
Enrichment is a common use case when working on data ingestion or flow management. Enrichment is getting data from external source (database, file, API, etc) to add more details, context or information to data being ingested. In Part 1 and 2 of this series, I showed how to use LookupRecord and LookupAttribute to enrich the content/metadata of a flow file with a Simple Key Value Lookup Service. Using this lookup service helped us implement an enrichment scenario without deploying any external system. This is perfect for scenarios where reference data is not too big and don't evolve too much. However, managing entries in the SimpleKV Service can become cumbersome if our reference data is dynamic or large.
Fortunately, NiFi 1.4 introduced a new interesting Lookup Service with NIFI-4345 : MongoDBLookupService. This lookup service can be used in NiFi to enrich data by querying a MongoDB store in realtime. With this service, your reference data can live in a MongoDB and can be updated by external applications. In this article, I describe how we can use this new service to implement the use case described in part 1. Scenario
We will be using the same retail scenario described in Part 1 of this series. However, our stores reference data will be hosted in a MongoDB rather than in the SimpleKV Lookup service of NiFi.
For this example, I'll be using a hosted MongoDB (BDaaS) on MLab. I created a database "bigdata" and added a collection "stores" in which I inserted 5 documents.
Each Mongo document contains information on a store as described below: {
"id_store" : 1,
"address_city" : "Paris",
"address" : "177 Boulevard Haussmann, 75008 Paris",
"manager" : "Jean Ricca",
"capacity" : 464600
} The complete database looks like this: Implementation We will be using the exact same flow and processors used in part 1. The only difference is using a MongoDBLookupService instead of SimpleKVLookupService with Lookup record. The configuration of the LookupRecord processor looks like this: Now let's see how to configure this service to query my MongoDB and get the city of each store. As you can see, I'll query MongoDB by the id_store that I read from each flow file. Data enrichment If not already done, add a MongoDBLookupService and configure it as follows:
Mongo URI: the URI used to access your MongoDB database in the format mongodb://user:password@hostname:port Mongo Database Name : the name of your database. It's bigdata in my case Mongo Collection Name : the name of the collection to query for enrichment. It's stores in my case SSL Context Service and Client Auth : use your preferred security options Lookup Value Field : the name of the field you want the lookup service to return. For me, it's address_city since I am looking to enrich my events with the city of each store. If you don't specify which field you want, the whole Mongo document is returned. This is useful if you want to enrich your flow with several attributes. Results To verify that our enrichment is working, let's see the content of flow files using the data provenance feature in our global flow. As you can see, the attribute city has been added to the content of my flow file. The city Paris has been added to Store 1 which correspond to my data in MongoDB. What happened here is that the lookup up service extracted the id_store which is 1 from my flow file, generated a query to mongo to get the address_city field of the store having id_store 1, and added the result into the field city in my new generated flow files. Note that if the query has returned several results from Mongo, only the first document is used. By setting an empty Lookup Value Field, I can retrieve the complete document corresponding to the query { "id_store" : "1" } Conclusion Lookup services in NiFi is a powerful feature for data enrichment in realtime. Using Simple Key/Value lookup service is straightforward for non-dynamic scenarios. In addition, it doesn't require external data source. For more complex scenarios, NiFi started supporting lookup from external data source such as MongoDB (available in NiFi 1.4) and HBase (NIFI-4346 available in NiFi 1.5).
... View more
Labels:
11-02-2017
02:40 PM
@Wesley Bohannon PFA the template enrichlookuprecord.xml
... View more
10-10-2017
08:53 PM
5 Kudos
Introduction
Parquet is a famous file format used with several tools such as Spark. NiFi can be used to easily convert data from different formats such as Avro, CSV or JSON to Parquet. This article explains how to convert data from JSON to Parquet using the PutParquet processor.
Implementation
Define a schema for the source data
In this article I'll be using JSON data source with the following structure:
{
"created_at" : "Tue Oct 10 21:47:12 CEST 2017",
"id_store" : 4,
"event_type" : "store capacity",
"id_transaction" : "6594277248900858122",
"id_product" : 319,
"value_product" : 1507664832649
}
Since we will be using a record based processor, we need to define a schema for our data. This will be an Avro schema but it can be used with other types as well. It's only "a common langage" that helps us describe a schema. The Avro Schema for my data is the following:
{
"namespace": "nifi",
"name": "store_event",
"type": "record",
"fields": [
{ "name": "created_at", "type": "string" },
{ "name": "id_store", "type": "int" },
{ "name": "event_type", "type": "string" },
{ "name": "id_transaction", "type": "string" },
{ "name": "id_product", "type": "int" },
{ "name": "value_product", "type": "int" }
]
}
Generate data for testing
For testing, I'll generate random dummy data using a GenerateFlowFile processor with the following configuration
Convert JSON to Parquet
Now let's use a PutParquet processor to convert our data. PutParquet is a special record based processor because of the specifities of Parquet format. Since Parquet's API is based on the Hadoop Path object, and not InputStreams/OutputStreams, NiFi doesn't generate a Parquet flow file directly. Instead, NiFi takes data in record format (in memory) and write it in Parquet on an HDFS cluster. For this reason, we need to configure PutParquet with a Hadoop cluster like we usually do for a PutHDFS.
Hadoop Configuration Resources: a local path for core-site.xml and hdfs-site.xml files from our Hadoop cluster. You can use Ambari to easily download these files from your HDP cluster.
RecordReader: a JSONTreeReader that will be used to read our source data and convert it to record format in memory. This record reader should be configured with the same schema and schema access strategy as PutParquet.
Directory: an HDFS directory where Parquet files will be written
Schema Access Strategy: where to get the schema that will be used for written data. For the sake of simplicity, I'll use the schema text property to define the schema. You can use a schema registry for more governed solution.
Schema text: the Avro Schema that we defined in previous section
Other parameters: this processor has several parameters to help tune the Parquet conversion. I'll let the the default values since details of Parquet format are out of the scope of this article.
Complete flow
Let's connect the different processors and start data generation/conversion.
Results
As discussed before, PutParquet writes parquet data directly into HDFS. Let's check in /tmp/nifi to see the generated data. Note that data coming out from this processor will be the original JSON data. If the result Parquet files are required for the remaining of the flow, NiFi should pull them from HDFS using List/FetchHDFS.
Now let's try and read the data in HDFS to check if we have all the information and the right format. There are several ways to do it. What I like to do is to start a Spark shell and try to read the content of my file. Spark has a very good built-in support for Parquet.
Start a Spark-Shell session and run the following code
val myParquet = sqlContext.read.parquet("/tmp/nifi/748458744258259")
myParquet.show()
As you can see in the screenshot below, we got the same schema and data from our initial JSON data.
If you want to convert other data than JSON, you can use the same process with other RecordReader such as Avro or CSV record reader.
... View more
Labels:
10-07-2017
08:02 AM
7 Kudos
Introduction This is part 3 of a series of articles on Data Enrichment with NiFi:
Part 1: Data flow enrichment with LookupRecord and SimpleKV Lookup Service is available here Part 2: Data flow enrichment with LookupAttribute and SimpleKV Lookup Service is available here Part 3: Data flow enrichment with LookupRecord and MongoDB Lookup Service is available here Enrichment is a common use case when working on data ingestion or flow management. Enrichment is getting data from external source (database, file, API, etc) to add more details, context or information to data being ingested. In Part 1 I showed how to use LookupRecord to enrich the content of a flow file. This is a powerful feature of NiFi based on the record based paradigm. For some scenarios we want to enrich the flow file by adding the result of the lookup as an attribute and not to the content of the flow file. For this, we can use LookupAttribute with a LookupService. Scenario We will be using the same retail scenario of the previous article. However, we will be adding the city of the store as an attribute to each flow file. This information will be used inside NiFi for data routing for instance. Let's see how we can use LookupAttribute to do it. Implementation We will be using the same GenerateFlowFile processor to generate data as well as the same SimpleKeyValueLookupService. In order to add the city of a store as an attribute, we will use a LookupAttribute with the follwing configuration: The LookupAttribute processor will use the value of the attribute id_store as a key and query the lookup service. The returned value will be added as the 'city' attribute. To make this work, the flow files should have an attribute 'id_store' before entering the lookup processor. Currently, this information is only in the content. We can use an EvaluateJsonPath to get this information from the content to attribute. The final flow looks as the following: Results To verify that our enrichment is working, let's see the attribute of after the EvaluateJsonPath and then the LookupAttribute:
... View more
Labels:
09-28-2017
04:11 PM
16 Kudos
Introduction This is part 1 of a series of articles on Data Enrichment with NiFi:
Part 1: Data flow enrichment with LookupRecord and SimpleKV Lookup Service is available here Part 2: Data flow enrichment with LookupAttribute and SimpleKV Lookup Service is available here Part 3: Data flow enrichment with LookupRecord and MongoDB Lookup Service is available here Enrichment is a
common use case when working on data ingestion or flow management. Enrichment
is getting data from external source (database, file, API, etc) to add more
details, context or information to data being ingested. It's common that data
contains references (ex. IDs) rather than the actual information. These references can be used
to query another data source (known as reference table in the relational world) to get
other attributes of an entity (location, name, etc). Often, the enrichment
is done in batch using the join operation. However, doing the enrichment on data streams in realtime is more interesting. Motivation
Enrichment in
previous versions of NiFi was not natively supported. There were few workarounds to do
it but they are not performant neither integrated. This is because NiFi is a data flow tool. In Data Flow logic, each flow file is an independent
item that can be processed independently. Data enrichment involves correlating
and joining two data sources at least which is not the sweet-spot of NiFi.
Starting from NiFi
1.3, it's possible to do data enrichment with new processors (LookupAttribute and
LookupRecord) and new lookup services. This article explains how these new features can be used. Scenario
Let's take an
example of real-time retail data ingestion coming from stores in different
cities. Data is coming in JSON form with the following schema:
{
"created_at" : "Thu Sep 28 08:08:09 CEST 2017",
"id_store" : 4,
"event_type" : "store capacity",
"id_transaction" : "1009331737896598289",
"id_product" : 889,
"value_product" : 45
} This JSON tells us that we have 45 units of product 889 in store 4. Details of store 4 such as
city, adresse, capacity, etc are available on another data source. Let's say
that we want to do some geographical analysis and show in a realtime dashboard all stores that have products which will be out of stock soon. To do so, we need
information on the store locations. This can be achieved through data enrichment. Implementation Data
generation Let's use a GenerateFlowFile with the below configuration to simulate data
coming from 5 different store (1 to 5). Data
enrichment LookupRecord
processor uses ServiceLookup services for data enrichment. You can see the
lookup service as a Key-Value service that LookupRecord queries to get the
value associated with a key. Currently, there are 6 available ServiceLookup: PropertiesFileLookupService,
SimpleCsvFileLookupService and IPLookupService are file-based lookup services.
Your reference data should be sitting in a file (CSV,XML, etc) that NiFi will
use to match a value to a key. ScriptedLookupService uses a script (Python,
Ruby, Groovy, etc) to generate a value corresponding to a key. The
SimpleKeyValueLookupService stores the key-value pairs in NiFi directly. It is
very convenient to use if your reference data is not too big and don't evolve too much. This is the case in our scenario. Indeed, we don't add a new store each day. Other interesting lookup
services are coming with the new versions of NiFi. These include MongoDB
(NIFI-4345) and HBase (NIFI-4346). To start the enrichment, add a LookupRecord processor to the flow and configure the
following properties: Record Reader:
Create a new JSONTreeReader and configure it. Use schema text property as a "Schema Access Strategy" and use the following Avro Schema
{
"namespace": "nifi",
"name": "store_event",
"type": "record",
"fields": [
{ "name": "created_at", "type":"string" },
{ "name": "id_store", "type":"int" },
{ "name": "event_type", "type":"string" },
{ "name": "id_transaction", "type":"string" },
{ "name": "id_product", "type":"int" },
{ "name": "value_product", "type":"int" }
]
} This tells the
LookupRecord processor to serialize received JSON data with the provided
schema. We don't use any schema registry here. I won't go into details of record oriented processors or Schema registries. If you are not familiar
with these concepts, start by reading this article here Record Writer:
create a new JsonRecordSetWriter and configure it. Set the different attributes
as follow and use this schema for the Schema Text property:
{
"namespace": "nifi",
"name": "store_event",
"type": "record",
"fields": [
{
"name": "created_at", "type":"string" },
{ "name": "id_store", "type":"int" },
{ "name": "event_type", "type":"string" },
{ "name": "id_transaction", "type":"string" },
{ "name": "id_product", "type":"int" },
{ "name": "value_product", "type":"int" },
{ "name": "city", "type":"string" }
]} Note that the writer
schema is slightly different from the reader schema. Indeed, I added a field
called 'city' that the processor will populate. Lookup Service:
create a new SimpleKeyValueLookupService and populate it with your reference
data. Here, I added the city of each one of my stores. Store 1 is Paris, store 2
is in Lyon, and so on. Finalize the
configuration of the lookup processor. You need to add a
custom property "key" and set it to the JSON path of the field
that will be used for the lookup. Here, it's the Store ID so Key = /id_store.
Result RecordPath tells the processor where to store the retrieved value. Finally, route to 'matched' or 'unmatched' strategy tells the processor what to do after
the lookup. Connect the
LookupRecord to the next processor and start the flow. For demonstration, I'll
be merging the encriched JSON event and pushing them to Solr to build my
dashboard. Results To verify that our
enrichment is working, let's see the content of flow files using the data
provenance feature. First of all, you
can notice that LookupRecord is adding an attribute called avro.schema. This is due to the write strategy that we are using. It's not useful here but just
wanted to highlight this. By using a Schema Registry, we can add the name
of the schema only. Let's see the content of a flow file now. As you can see, a new
field "city" is added to my JSON. Here the city is Toulouse since my
Store ID is 4. It's worth noting that it's possible to write the file in other
format (Avro for instance) to have enrichment and conversion with one step. Conclusion Data enrichment is a
common use case for ingestion and flow processing. With Lookup processors and
services, we can now easily enrich data in NiFi. The existing Lookup
services are convenient if reference data doesn't change often. Indeed, reference data is manually added or use through a file. In future NiFi releases, new databases lookup services
will be available (ex. MongoDB and Hbase).
... View more
Labels:
03-13-2017
11:26 AM
3 Kudos
Introduction NiFi Site to Site (S2S) is a communication protocol used to exchange data between NiFi instances or clusters. This protocol is useful for use case where we have geographically distributed clusters that need to communicate. Examples include: IoT: collect data from edge node (MiNiFi) and send them to NiFi for aggregation/storage/analysis Connected cars : collect data locally by city or country with a local HDF cluster, and send it back to a global HDF cluster in core Data Center Replication : synchronization between two HDP clusters (on prem/cloud or Principal/DR) S2S provides several benefits such as scalability, security, load balancing and high availability. More information can be found here Context NiFi can be secured by enabling SSL and requiring users/nodes to authenticate with certificates. However, in some scenarios, customers have secured and unsecured NiFi clusters that should communicate. The objective of this tutorial is to show two approaches to achieve this. Discussions on having secure and unsecured NiFi cluster in the same application are outside the topic of this tutorial. Prerequisites Let's assume that we have already installed an unsecure HDF cluster (Cluster2) that needs to send data to a secure cluster (Cluster1). Cluster1 is a 3 node NiFi cluster with SSL : hdfcluster0, hdfcluster1 and hdfcluster2. We can see the HTTPS in the URLs as well as the connected user 'ahadjidj'. Cluster2 is also a 3 nodes NiFi cluster but without SSL enabled : hdfcluster20, hdfcluster21 and hdfcluster22 Option 1: the lazy option The easiest way to get data from cluster 2 to cluster 1 is to use a Pull method. In this approach, cluster 1 will use a Remote Process Group to pull data from cluster 2. We will configure the RPG to use HTTP and no special configurations are required. However, data will go unencrypted over the network. Let's see how to implement this. Step 1: configure Cluster2 to generate data The easiest way to generate data in cluster 2 is to use a GenerateFlowFile processor. Set the File Size to something different from 0 and Run Schedule to 60 sec Add an ouput port to the canvas and call it 'fromCluster2' Connect and start the two processors At this level, we can see data being generated and queued before the output port Step 2: configure Cluster1 to pull data Add a RPG and configure it with HTTP addresses of the three Cluster2' nodes. Use HTTP as Transport Protocol and enable the transmission. Add a PutFile processor to grab the data. Connect the RPG to the PutFile and chose the 'fromCluster2' output when you are asked for. Right click on the RPG and activate the toggle next 'fromCluster2' We should see flow files coming from the RPG and buffering before the PutFile processor. Option 2: the secure option The first approach was easy to configure but data was sent unencrypted over the wire. If we want to leverage SSL and send data encrypted even between the two clusters, we need to generate and use certificates for each node in the Cluster2. The only difference here is that we don't activate SSL. Step 1: generate and add Cluster2 certs I suppose that you already know how to generate certificates for CA/nodes and add them to Truststore/KeyStore. Otherwise, there are several HCC articles that explain how to do it. We need to configure Cluster2 with its certificats Upload nodes' certificate to each node and add it to the KeyStore (eg. keystore.pfx). Set also the KeyStore type and password. Upload the CA (Certificate Authority) certificate to each node and add it to the TrustStore (eg. truststore.jks). Set also the TrustStore type and password. Step 2: configure Cluster2 to push data to Cluster1 In Cluster1, add an input port (toCluster1) and connect it to a PutFile processor. Use a GenerateFlowFile to generate data in Cluster2 and a RPG to push data to Cluster1. Here we will use HTTPS addresses when configuring the RPG. Cluster2 should be able to send data to Cluster1 via the toCluster1 input port. However, the RPG shows a Forbidden error Step 3: add policies to authorize cluster2 to use the S2S protocol The previous error is triggered because nodes belonging to Cluster2 are not authorized to access to Cluster1 resources. To solve the problem, let's do the following configurations: 1) Go the users menu in Cluster1 and add a user for each node from Cluster2 2) Go to the policies menu in Cluster1, and add each node from Cluster2 to the retrieve site-to-site details policy At this point, the RPG in Cluster2 is working however the input port is not visible yet 3) The last step is editing the input port policy in Cluster1 to authorize nodes from Cluster2 to send data through S2S. Select the toCluster1 Input port and click on the key to edit it's policies. Add cluster2 nodes to the list. 4) Now, go back to cluster2 and connect the GenerateFlowFile with the RPG. The input port should be visible and data start flowing "securely" 🙂
... View more
Labels:
12-27-2016
05:15 PM
Hi @Smr Srid This is already available in HDF 2.1. You can install it (doc) or upgrade your existing cluster (doc)
... View more