Member since
01-11-2016
355
Posts
230
Kudos Received
74
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
8334 | 06-19-2018 08:52 AM | |
3242 | 06-13-2018 07:54 AM | |
3707 | 06-02-2018 06:27 PM | |
4014 | 05-01-2018 12:28 PM | |
5578 | 04-24-2018 11:38 AM |
10-07-2017
02:20 PM
1 Kudo
@Vinod Chhabria I found another way to implement this using UpdateAttribute processor in NiFi 1.3. UpdateAttribute has the feature of state in this version. I didn't test in NiFi 1.2 so I can't tell if this will work in previous version. Here's the global solution: In the upper stream you have data coming from Cassandra with the new value. I don't have this so I simulated data coming in JSON format from GenerateFlow. This processor trigger each one hour to generate new key. For you it will be get data from Cassandra. What you will be doing for data coming from this stream is basically adding two attributes : type = 'update_key' and key = 'the key that you get from Cassandra'. In my case I did it with two processors UpdateAttribute and EvaluateJSONPath configured as follow: On the bottom stream you get you data to encrypt. So all what you need to do is add an attribute type = 'data' (this is optional). I do it with an UpdateAttribute. ow both these streams will go to an UpdateAttribute processor that will add an attribute encryptionkey and store it in State. We will initialize it with an empty value. As you can see, I'll add the value in the state as this attribute. See the configuration below: Now what I want to do is update this key in state only when I have a flow file from the upper stream (AKA : type = update_key). To do this, click on the Advanced setting in the left-bottom of the update attribute add the following configurations: with this condition we will be updating the encription key only once per hour when new data comes from Cassandra. After the updateattribute, you can route based on the type to drop message coming from Cassandra (update_key) and encrypt the others. Can you try this and let me know if it works?
... View more
10-07-2017
08:02 AM
7 Kudos
Introduction This is part 3 of a series of articles on Data Enrichment with NiFi:
Part 1: Data flow enrichment with LookupRecord and SimpleKV Lookup Service is available here Part 2: Data flow enrichment with LookupAttribute and SimpleKV Lookup Service is available here Part 3: Data flow enrichment with LookupRecord and MongoDB Lookup Service is available here Enrichment is a common use case when working on data ingestion or flow management. Enrichment is getting data from external source (database, file, API, etc) to add more details, context or information to data being ingested. In Part 1 I showed how to use LookupRecord to enrich the content of a flow file. This is a powerful feature of NiFi based on the record based paradigm. For some scenarios we want to enrich the flow file by adding the result of the lookup as an attribute and not to the content of the flow file. For this, we can use LookupAttribute with a LookupService. Scenario We will be using the same retail scenario of the previous article. However, we will be adding the city of the store as an attribute to each flow file. This information will be used inside NiFi for data routing for instance. Let's see how we can use LookupAttribute to do it. Implementation We will be using the same GenerateFlowFile processor to generate data as well as the same SimpleKeyValueLookupService. In order to add the city of a store as an attribute, we will use a LookupAttribute with the follwing configuration: The LookupAttribute processor will use the value of the attribute id_store as a key and query the lookup service. The returned value will be added as the 'city' attribute. To make this work, the flow files should have an attribute 'id_store' before entering the lookup processor. Currently, this information is only in the content. We can use an EvaluateJsonPath to get this information from the content to attribute. The final flow looks as the following: Results To verify that our enrichment is working, let's see the attribute of after the EvaluateJsonPath and then the LookupAttribute:
... View more
Labels:
10-07-2017
07:36 AM
1 Kudo
@Hans FeldmannThe best thing to explain this is to take an example. Let's say that you want to add an attribute city with a SimpleKVLookupService. And you will be doing the lookup by an attribute id_store (look at this article to understand the logic behind this use case).
You need to configure LookupAttribute as follows: This means that the processor will use the value of the attribute id_store as a key and query the lookupservice. the returned value will be added as an attribute city. Here's the result: Of course, the flow file should have the attribute id_store before going in LookupAttribute. And here's the content of the SimpleKVLookupService I hope this helps
... View more
10-06-2017
05:43 AM
Indeed, as I said it's a workaround to get this done. What about LookupAttribute and SimpleKeyValueLookupService? this was my initial thought but I don't know if this answer your need. You can get the key as attribute but you need to manually update the encryption Key in NiFi UI.
... View more
10-05-2017
06:03 AM
@Mike Bailey Have you tried ListHDFS/FetchHDFS to implement your logic: EvaluateXPath parses the XML file and get you the name of the directory. You use this to list all files in that directory and then fetchHDFS to get the data. The state you were refering to is to avoir get the same file several time and get only new file from a directory. I expect this to be the desired behavior.
... View more
10-05-2017
05:46 AM
Hi @Vinod Chhabria What you are looking for is variable registry which is not yet implemented : https://cwiki.apache.org/confluence/display/NIFI/Variable+Registry One work around to get this done is to to use SimpleKeyValueLookupService. With this service you can add a key-value pair to the service which represents your encryption key. Then, you can enrich your flow files with LookupAttribute and add the encryption key as an attribute for later encryption. An example on to use it is available here. The issue here is that you need to manually update the key value in NiFi UI. Maybe you can have another NiFi flow that query Cassandra periodically and update the KeyValue lookup service by NiFi API but I am not sure if this is possible. Another thought I have but I didn't test is using a ScriptedLookupService with a script that query Cassandra and get the required data. There's a MongoDB lookup service available in NiFi 1.4 and one for HBase coming. You should maybe create a Jira for a Cassandra Lookup service.
... View more
10-03-2017
01:55 PM
2 Kudos
Hi @Aviram Voda Spark 2.2 is not yet supported with HDP 2.6.x that's why it's not available through Ambari (Install/upgrade). It has to go through our QE to be added in further release.
... View more
09-28-2017
04:11 PM
15 Kudos
Introduction This is part 1 of a series of articles on Data Enrichment with NiFi:
Part 1: Data flow enrichment with LookupRecord and SimpleKV Lookup Service is available here Part 2: Data flow enrichment with LookupAttribute and SimpleKV Lookup Service is available here Part 3: Data flow enrichment with LookupRecord and MongoDB Lookup Service is available here Enrichment is a
common use case when working on data ingestion or flow management. Enrichment
is getting data from external source (database, file, API, etc) to add more
details, context or information to data being ingested. It's common that data
contains references (ex. IDs) rather than the actual information. These references can be used
to query another data source (known as reference table in the relational world) to get
other attributes of an entity (location, name, etc). Often, the enrichment
is done in batch using the join operation. However, doing the enrichment on data streams in realtime is more interesting. Motivation
Enrichment in
previous versions of NiFi was not natively supported. There were few workarounds to do
it but they are not performant neither integrated. This is because NiFi is a data flow tool. In Data Flow logic, each flow file is an independent
item that can be processed independently. Data enrichment involves correlating
and joining two data sources at least which is not the sweet-spot of NiFi.
Starting from NiFi
1.3, it's possible to do data enrichment with new processors (LookupAttribute and
LookupRecord) and new lookup services. This article explains how these new features can be used. Scenario
Let's take an
example of real-time retail data ingestion coming from stores in different
cities. Data is coming in JSON form with the following schema:
{
"created_at" : "Thu Sep 28 08:08:09 CEST 2017",
"id_store" : 4,
"event_type" : "store capacity",
"id_transaction" : "1009331737896598289",
"id_product" : 889,
"value_product" : 45
} This JSON tells us that we have 45 units of product 889 in store 4. Details of store 4 such as
city, adresse, capacity, etc are available on another data source. Let's say
that we want to do some geographical analysis and show in a realtime dashboard all stores that have products which will be out of stock soon. To do so, we need
information on the store locations. This can be achieved through data enrichment. Implementation Data
generation Let's use a GenerateFlowFile with the below configuration to simulate data
coming from 5 different store (1 to 5). Data
enrichment LookupRecord
processor uses ServiceLookup services for data enrichment. You can see the
lookup service as a Key-Value service that LookupRecord queries to get the
value associated with a key. Currently, there are 6 available ServiceLookup: PropertiesFileLookupService,
SimpleCsvFileLookupService and IPLookupService are file-based lookup services.
Your reference data should be sitting in a file (CSV,XML, etc) that NiFi will
use to match a value to a key. ScriptedLookupService uses a script (Python,
Ruby, Groovy, etc) to generate a value corresponding to a key. The
SimpleKeyValueLookupService stores the key-value pairs in NiFi directly. It is
very convenient to use if your reference data is not too big and don't evolve too much. This is the case in our scenario. Indeed, we don't add a new store each day. Other interesting lookup
services are coming with the new versions of NiFi. These include MongoDB
(NIFI-4345) and HBase (NIFI-4346). To start the enrichment, add a LookupRecord processor to the flow and configure the
following properties: Record Reader:
Create a new JSONTreeReader and configure it. Use schema text property as a "Schema Access Strategy" and use the following Avro Schema
{
"namespace": "nifi",
"name": "store_event",
"type": "record",
"fields": [
{ "name": "created_at", "type":"string" },
{ "name": "id_store", "type":"int" },
{ "name": "event_type", "type":"string" },
{ "name": "id_transaction", "type":"string" },
{ "name": "id_product", "type":"int" },
{ "name": "value_product", "type":"int" }
]
} This tells the
LookupRecord processor to serialize received JSON data with the provided
schema. We don't use any schema registry here. I won't go into details of record oriented processors or Schema registries. If you are not familiar
with these concepts, start by reading this article here Record Writer:
create a new JsonRecordSetWriter and configure it. Set the different attributes
as follow and use this schema for the Schema Text property:
{
"namespace": "nifi",
"name": "store_event",
"type": "record",
"fields": [
{
"name": "created_at", "type":"string" },
{ "name": "id_store", "type":"int" },
{ "name": "event_type", "type":"string" },
{ "name": "id_transaction", "type":"string" },
{ "name": "id_product", "type":"int" },
{ "name": "value_product", "type":"int" },
{ "name": "city", "type":"string" }
]} Note that the writer
schema is slightly different from the reader schema. Indeed, I added a field
called 'city' that the processor will populate. Lookup Service:
create a new SimpleKeyValueLookupService and populate it with your reference
data. Here, I added the city of each one of my stores. Store 1 is Paris, store 2
is in Lyon, and so on. Finalize the
configuration of the lookup processor. You need to add a
custom property "key" and set it to the JSON path of the field
that will be used for the lookup. Here, it's the Store ID so Key = /id_store.
Result RecordPath tells the processor where to store the retrieved value. Finally, route to 'matched' or 'unmatched' strategy tells the processor what to do after
the lookup. Connect the
LookupRecord to the next processor and start the flow. For demonstration, I'll
be merging the encriched JSON event and pushing them to Solr to build my
dashboard. Results To verify that our
enrichment is working, let's see the content of flow files using the data
provenance feature. First of all, you
can notice that LookupRecord is adding an attribute called avro.schema. This is due to the write strategy that we are using. It's not useful here but just
wanted to highlight this. By using a Schema Registry, we can add the name
of the schema only. Let's see the content of a flow file now. As you can see, a new
field "city" is added to my JSON. Here the city is Toulouse since my
Store ID is 4. It's worth noting that it's possible to write the file in other
format (Avro for instance) to have enrichment and conversion with one step. Conclusion Data enrichment is a
common use case for ingestion and flow processing. With Lookup processors and
services, we can now easily enrich data in NiFi. The existing Lookup
services are convenient if reference data doesn't change often. Indeed, reference data is manually added or use through a file. In future NiFi releases, new databases lookup services
will be available (ex. MongoDB and Hbase).
... View more
Labels:
08-07-2017
03:08 PM
Hi @Mustafa Kemal MAYUK There's a MySQL CDC processor in NiFi (CaptureChangeMySQL) but no native CDC solution for PostgreSQL currently. You can capture updates by using and updating a timestamp field together with QueryDatabaseTable. Delete are harder to get, you can use logical delete with delayed hard delete to let NiFi get them before. Another option is to use external CDC solution like Attunity. I hope this helps
... View more
04-30-2017
05:15 PM
1 Kudo
Hi @Tech Gig Let's say you have a cluster of 3 brokers. In case 1 (--broker-list hadoop1.test.com:6667) a client that connects for the first time to the cluster try to reach the broker you specified. If this broker is not reachable (host is down, network issue, etc) the connection fails. The connection fails even if there are other brokers that can do the job and provide the required information. When you provide a list of brokers, the client uses each broker sequentially until first successful response. You have a better HA in this case.
... View more