Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

how to read cached values from fetchdistributedmapcache using python?

Highlighted

how to read cached values from fetchdistributedmapcache using python?

New Contributor

Hi Folks,

I am a new bee to Apache NiFi. I have couple of questions w.r.t. below usecase

Usecase:

Read the csv rows as flowfiles. Parse the col-1 and col-2 as flowfile attributes. Compare the flowfile attribute with other flowfile attributes (previous flowfile occurrence). If the attribute value of the current flowfile is matches with previous flowfile then add an attribute named "occurrence" in the current flowfile and update the value as "yes" or else "no"

Approach:

- get the csv file
- split the rows to flowfiles
- update col-1 and col-2 values as flowfile attributes
- put the new attributes (col-1 and col-2 values) in mapcache (PutDistributedMapCache)
- read the cached attributes using FetchDistributedMapCache
- loop through the mapcache and compare with col-1 and col-2 attribute of the flow file. if matched add new attribute to the flow file as "occurrence" as "yes" else "no"

Question-1: Referring attached "2018-01-28 21_25_40-Clipboard.png" image, am i using right processors/approach?

Question-2: How to read cached values from FetchDistributedMapCache using python (jython)?

CSV data will look like as below

 id,value
 t1,12.5
 t2,32.5
 t1,12.5
 t2,32.5
 t1,13
 t2,32.5
 t1,13
 t2,35.25

59381-2018-01-28-21-25-40-clipboard.png

59382-2018-01-28-21-27-30-nifi.png

59383-2018-01-28-21-28-28-nifi.png

Jython script used in execute script processor

from org.python.core.util import StringUtil
from org.apache.nifi.distributed.cache.client import DistributedMapCacheClient, Serializer, Deserializer
# Define a subclass of Serializer for use in the client's get() method
class StringSerializer(Serializer):
    def __init__(self):
        pass
    def serialize(self, value, out):
        out.write(value)
# Define a subclass of Deserializer for use in the client's get() method
class StringDeserializer(Deserializer):
    def __init__(self):
        pass
    def deserialize(self,  bytes):
        return StringUtil.fromBytes(bytes)
myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient)
print("printing cache_prop...")
result = myDistClient.get('cache_prop', StringSerializer(), StringDeserializer())
print('Result = ' + str(result))

I appreciate your valuable suggestions and help on this?

Thank you.


az8rk.png