We have a use case that we need a solution for urgently. We have just recently upgraded to HDF Version 184.108.40.206 (Powered by Apache NiFi - Version 220.127.116.11.0.1.1-5)
We have a streaming flow where we consume events from Kafka topic and store the raw data in Hadoop. So our flow starts with ConsumeKafka. We have a need to encrypt part of the data before we store it. Our encryption key is stored in a separate Cassandra database. There are some internal restrictions due to which we cannot store the key in a property file. We need a mechanism to retrieve the encryption key from our Cassandra store periodically and "cache" it or make it available in our realtime flow to use for encrypting data. We do not want to keep querying cassandra for every event. We are fine with making the call for the key once for every node if required.
We had looked at distributed cache in HDF Version 18.104.22.168 (Powered by Apache NiFi - Version 22.214.171.124.1.0.1-1) back but we realized it is not distributed and actually tied to a node. Also the client has to specify only one node to retreive from. So if that node goes down that cache cannot be accessed from any node and we have to manually switch the node. We have not checked the behavior in HDF 3.0.1 but if this has changed and this cache is usable please let us know. Basically we need a way to get the key and load it in memory one time and as part of the flow retrieve it from memory and use it for encrypting data on every event.
What you are looking for is variable registry which is not yet implemented : https://cwiki.apache.org/confluence/display/NIFI/Variable+Registry
One work around to get this done is to to use SimpleKeyValueLookupService. With this service you can add a key-value pair to the service which represents your encryption key. Then, you can enrich your flow files with LookupAttribute and add the encryption key as an attribute for later encryption. An example on to use it is available here.
The issue here is that you need to manually update the key value in NiFi UI. Maybe you can have another NiFi flow that query Cassandra periodically and update the KeyValue lookup service by NiFi API but I am not sure if this is possible.
Another thought I have but I didn't test is using a ScriptedLookupService with a script that query Cassandra and get the required data.
There's a MongoDB lookup service available in NiFi 1.4 and one for HBase coming. You should maybe create a Jira for a Cassandra Lookup service.
Thanks for the response. Here are the challenges/questions we have on this solution. LookupAttribute processor does not support with ScriptedLookupService. So we have to use LookupRecord processor to do the lookup. What this means is I take my event content, parse it as a record, do lookup for the encryption key and add the returned key to the record/content since lookuprecord only allows to enhance the record and not set attribute, store the key as attribute, remove the key from the content and then encrypt. It sounds like a hack to add the key to the content. Is there any way we can add the result of the lookup as an attribute rather than messing up the content?
Also the lookup is not working for me. It matches but I get null. The difference is I am trying to lookup using a string value. I even tried adding the property key with the quotes but it did not work.
Indeed, as I said it's a workaround to get this done. What about LookupAttribute and SimpleKeyValueLookupService? this was my initial thought but I don't know if this answer your need. You can get the key as attribute but you need to manually update the encryption Key in NiFi UI.
I found another way to implement this using UpdateAttribute processor in NiFi 1.3. UpdateAttribute has the feature of state in this version. I didn't test in NiFi 1.2 so I can't tell if this will work in previous version.
Here's the global solution:
In the upper stream you have data coming from Cassandra with the new value. I don't have this so I simulated data coming in JSON format from GenerateFlow. This processor trigger each one hour to generate new key. For you it will be get data from Cassandra.
What you will be doing for data coming from this stream is basically adding two attributes : type = 'update_key' and key = 'the key that you get from Cassandra'. In my case I did it with two processors UpdateAttribute and EvaluateJSONPath configured as follow:
On the bottom stream you get you data to encrypt. So all what you need to do is add an attribute type = 'data' (this is optional). I do it with an UpdateAttribute.
ow both these streams will go to an UpdateAttribute processor that will add an attribute encryptionkey and store it in State. We will initialize it with an empty value. As you can see, I'll add the value in the state as this attribute. See the configuration below:
Now what I want to do is update this key in state only when I have a flow file from the upper stream (AKA : type = update_key). To do this, click on the Advanced setting in the left-bottom of the update attribute add the following configurations:
with this condition we will be updating the encription key only once per hour when new data comes from Cassandra. After the updateattribute, you can route based on the type to drop message coming from Cassandra (update_key) and encrypt the others.
Can you try this and let me know if it works?