Support Questions

Find answers, ask questions, and share your expertise

how to generate attributes using executescript in apache nifi?

avatar
Contributor

Hi in Apache Nifi using the processor' executescript' I want to open the content of the flowfile, and according to if it contains any specific word.

Generate an attribute, which I can use with another processor.

Example of flowfile content:

ford, mustang, 67,350, texas, blue, good, etc.

Objective:

If the line contains the term ford.

I want to generate an attribute with the name kafka_topic with the value' Ford'.

If the line contains the term audi.

I want to generate an attribute with the name kafka_topic with the value' Audi'.

That way in the next processor: I will be able to use the value of ${kafka_topic} in the next processor and receive the correct value as the condition is met.

I know how to read the content of the flowfile, but I've been blocked in the part where I have to generate the attribute. someone please can help me.

I actually have this, for what it's worth:

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
	def __init__(self):
		pass
	def process(self, inputStream, outputStream):
		Log = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
		Log2 = str(Log).split(',')
		Brand = Log2[0]
		Color = Log2[5]
                Model = Log2[1]
				
		if Brand == 'ford' and Color == 'gray':
			NewLog = str(Log2) 
			outputStream.write(bytearray((NewLog).encode('utf-8')))
		if Brand == 'audi' and Color == 'black':
			NewLog = str(Log2) 
			outputStream.write(bytearray((NewLog).encode('utf-8')))
                if Brand == 'bmw' and Color == 'white':
			NewLog = str(Log2) 
			outputStream.write(bytearray((NewLog).encode('utf-8')))
3 REPLIES 3

avatar
@xav webmaster

Have you looked at using the ScanContent processor for this use case?

avatar
Master Guru

Hi @xav webmaster,
Use scancontent processor

with DictionaryFile needs to be in all nifi nodes, in my case my file is in /tmp

cat dict.txt
ford

So this processor looks into content and if it matched with the dictionary file then adds matching.term attribute to the flowfile.

If your content of the flowfile is multiple lines then it will looks for all the lines if ford word is there then it adds matching.term attribute to the flowfile.

41430-match.png

41429-scancontent.png

Once you got matching.term attribute then use update attribute processor to change the name of the attribute (or) you can use same attribute.

add new property

kafka_topic

${matching.term}

Configs:-

41431-update-attr.png

after this processor matching.term attribute renamed to kafka_topic

Another way to achieve this case is

1.ExtractText to get contents to attributes

2.UpdateAtrribute and add advance properties to check if the attribute extracted before having the required content by using NiFi expression Language

3. you will get same result as you got using scan content processor.

avatar
Super Collaborator

@xav webmaster

Straight answer:

flowFile = session.get() 
if (flowFile != None):
    flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
# implicit return at the end

More info on executeScript processor:

https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html

In your particular case, in callback function where you read from input stream, you can scan

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
 
class PyStreamCallback(StreamCallback):
	def __init__(self):
		self.topic_name=''
		pass


	def get_topic_name(self):
		return self.topic_name


	def process(self, inputStream, outputStream):
		Log = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
		Log2 = str(Log).split(',')
		Brand = Log2[0]
		Color = Log2[5]
                Model = Log2[1]
				
		if Brand == 'ford' and Color == 'gray':
			NewLog = str(Log2)
			self.topic_name = 'ford'
			outputStream.write(bytearray((NewLog).encode('utf-8')))
		if Brand == 'audi' and Color == 'black':
			NewLog = str(Log2) 
			self.topic_name = 'audi'
			outputStream.write(bytearray((NewLog).encode('utf-8')))
        	if Brand == 'bmw' and Color == 'white':
			NewLog = str(Log2) 
			self.topic_name = 'bmw'
			outputStream.write(bytearray((NewLog).encode('utf-8')))


# add exception handling if needed for empty flowfile content, etc
if(flowFile != None):
    caller = PyStreamCallback()
    flowFile = session.write(flowFile, caller)
    topic_name = caller.get_topic_name()
    flowFile = session.putAttribute(flowFile, 'kafka_topic', topic_name)



Hope that will help.