Created 10-16-2017 01:04 PM
Hi in Apache Nifi using the processor' executescript' I want to open the content of the flowfile, and according to if it contains any specific word.
Generate an attribute, which I can use with another processor.
Example of flowfile content:
ford, mustang, 67,350, texas, blue, good, etc.
Objective:
If the line contains the term ford.
I want to generate an attribute with the name kafka_topic with the value' Ford'.
If the line contains the term audi.
I want to generate an attribute with the name kafka_topic with the value' Audi'.
That way in the next processor: I will be able to use the value of ${kafka_topic} in the next processor and receive the correct value as the condition is met.
I know how to read the content of the flowfile, but I've been blocked in the part where I have to generate the attribute. someone please can help me.
I actually have this, for what it's worth:
from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import StreamCallback class PyStreamCallback(StreamCallback): def __init__(self): pass def process(self, inputStream, outputStream): Log = IOUtils.toString(inputStream, StandardCharsets.UTF_8) Log2 = str(Log).split(',') Brand = Log2[0] Color = Log2[5] Model = Log2[1] if Brand == 'ford' and Color == 'gray': NewLog = str(Log2) outputStream.write(bytearray((NewLog).encode('utf-8'))) if Brand == 'audi' and Color == 'black': NewLog = str(Log2) outputStream.write(bytearray((NewLog).encode('utf-8'))) if Brand == 'bmw' and Color == 'white': NewLog = str(Log2) outputStream.write(bytearray((NewLog).encode('utf-8')))
Created 10-19-2017 02:52 PM
Have you looked at using the ScanContent processor for this use case?
Created on 10-19-2017 04:11 PM - edited 08-17-2019 07:10 PM
Hi @xav webmaster,
Use scancontent processor
with DictionaryFile needs to be in all nifi nodes, in my case my file is in /tmp
cat dict.txt ford
So this processor looks into content and if it matched with the dictionary file then adds matching.term attribute to the flowfile.
If your content of the flowfile is multiple lines then it will looks for all the lines if ford word is there then it adds matching.term attribute to the flowfile.
Once you got matching.term attribute then use update attribute processor to change the name of the attribute (or) you can use same attribute.
add new property
kafka_topic
${matching.term}
Configs:-
after this processor matching.term attribute renamed to kafka_topic
Another way to achieve this case is
1.ExtractText to get contents to attributes
2.UpdateAtrribute and add advance properties to check if the attribute extracted before having the required content by using NiFi expression Language
3. you will get same result as you got using scan content processor.
Created 10-24-2017 05:41 PM
Straight answer:
flowFile = session.get() if (flowFile != None): flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue') # implicit return at the end
More info on executeScript processor:
https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html
In your particular case, in callback function where you read from input stream, you can scan
from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import StreamCallback class PyStreamCallback(StreamCallback): def __init__(self): self.topic_name='' pass def get_topic_name(self): return self.topic_name def process(self, inputStream, outputStream): Log = IOUtils.toString(inputStream, StandardCharsets.UTF_8) Log2 = str(Log).split(',') Brand = Log2[0] Color = Log2[5] Model = Log2[1] if Brand == 'ford' and Color == 'gray': NewLog = str(Log2) self.topic_name = 'ford' outputStream.write(bytearray((NewLog).encode('utf-8'))) if Brand == 'audi' and Color == 'black': NewLog = str(Log2) self.topic_name = 'audi' outputStream.write(bytearray((NewLog).encode('utf-8'))) if Brand == 'bmw' and Color == 'white': NewLog = str(Log2) self.topic_name = 'bmw' outputStream.write(bytearray((NewLog).encode('utf-8'))) # add exception handling if needed for empty flowfile content, etc if(flowFile != None): caller = PyStreamCallback() flowFile = session.write(flowFile, caller) topic_name = caller.get_topic_name() flowFile = session.putAttribute(flowFile, 'kafka_topic', topic_name)
Hope that will help.