I am a new bee to Apache NiFi. I have couple of questions w.r.t. below usecase
Read the csv rows as flowfiles. Parse the col-1 and col-2 as flowfile attributes. Compare the flowfile attribute with other flowfile attributes (previous flowfile occurrence). If the attribute value of the current flowfile is matches with previous flowfile then add an attribute named "occurrence" in the current flowfile and update the value as "yes" or else "no"
- get the csv file
- split the rows to flowfiles
- update col-1 and col-2 values as flowfile attributes
- put the new attributes (col-1 and col-2 values) in mapcache (PutDistributedMapCache)
- read the cached attributes using FetchDistributedMapCache
- loop through the mapcache and compare with col-1 and col-2 attribute of the flow file. if matched add new attribute to the flow file as "occurrence" as "yes" else "no"
Question-1: Referring attached "2018-01-28 21_25_40-Clipboard.png" image, am i using right processors/approach?
Question-2: How to read cached values from FetchDistributedMapCache using python (jython)?
CSV data will look like as below
Jython script used in execute script processor
from org.python.core.util import StringUtil
from org.apache.nifi.distributed.cache.client import DistributedMapCacheClient, Serializer, Deserializer
# Define a subclass of Serializer for use in the client's get() method
def serialize(self, value, out):
# Define a subclass of Deserializer for use in the client's get() method
def deserialize(self, bytes):
myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient)
result = myDistClient.get('cache_prop', StringSerializer(), StringDeserializer())
print('Result = ' + str(result))
I appreciate your valuable suggestions and help on this?
... View more