Support Questions

Find answers, ask questions, and share your expertise

change and add data to flowfiles in executescript using groovy

avatar
Expert Contributor

Hi,

I am running a nififlow which ingest tabular separated text files into hdfs. I would lige to

add columns to each files with flowfile attributes like ${path}.${filename} and so on

Test every column for chars that i don't want to ingest

Not having experience with groovy i wondered if this could be achieved by writing a groovy script

I have been playing around without any good result until now, i could be nice with a small help to guide me in the right direction.

import java.nio.charset.StandardCharsets
 
def flowFile = session.get()
if(!flowFile) return
 
flowFile = session.write(flowFile, {inputStream, outputStream ->
   inputStream.eachLine { line ->
   a = line.tokenize('\t')
   a = a + '	flowfile' *trying to add column 
   }
} as StreamCallback)
 
session.transfer(flowFile, REL_SUCCESS)
	session.commit()
	
	

1 ACCEPTED SOLUTION

avatar
Guru

Use this to pull the flow file attributes into groovy:

def path = flowFile.getAttribute('path')

def filename = flowFile.getAttribute('filename')

After that it is pure groovy string manipulation to add to column, remove values, etc

Note when you tokenize you have an List where each field is indexed (e.g. a[0], a[1] etc. To add a field you would use a.add(path). After adding new fields or manipulating old fields you would have to reconstruct the string as tab-delim record.

You would then have to write to the OutputStream, catch errors, and set the session failure or success.

This is code is similar to what you would do. (This code emits each record as a flowfile; if you wanted to emit the full recordset you would concatenate each record into one string with a newline at the end of each record except the end.)

import org.apache.commons.io.IOUtils
import java.nio.charset.*

def flowFile = session.get()
if(!flowFile) return

def path = flowFile.getAttribute('path')
def fail = false
flowFile = session.write(flowFile, {inputStream, outputStream ->
    try {
def recordIn = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
def cells = recordIn.split(',')

def recordOut = cells[0]+','+
cells[1]+','+    //you could validate this or any field
cells[2]+','+
path+','+
cells[3]+','+
cells[4]+','+
cells[5]+','+
cells[6]+','+
cells[7]
            outputStream.write(recordOut.getBytes(StandardCharsets.UTF_8))
            recordOut = ''

    }
    catch(e) {
    log.error("Error during processing of validate.groovy", e)
    session.transfer(inputStream, REL_FAILURE)
    fail=true
    }
} as StreamCallback)
if(fail){
session.transfer(flowFile, REL_FAILURE)
fail = false
} else {
session.transfer(flowFile, REL_SUCCESS)
}

View solution in original post

5 REPLIES 5

avatar
Guru

Use this to pull the flow file attributes into groovy:

def path = flowFile.getAttribute('path')

def filename = flowFile.getAttribute('filename')

After that it is pure groovy string manipulation to add to column, remove values, etc

Note when you tokenize you have an List where each field is indexed (e.g. a[0], a[1] etc. To add a field you would use a.add(path). After adding new fields or manipulating old fields you would have to reconstruct the string as tab-delim record.

You would then have to write to the OutputStream, catch errors, and set the session failure or success.

This is code is similar to what you would do. (This code emits each record as a flowfile; if you wanted to emit the full recordset you would concatenate each record into one string with a newline at the end of each record except the end.)

import org.apache.commons.io.IOUtils
import java.nio.charset.*

def flowFile = session.get()
if(!flowFile) return

def path = flowFile.getAttribute('path')
def fail = false
flowFile = session.write(flowFile, {inputStream, outputStream ->
    try {
def recordIn = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
def cells = recordIn.split(',')

def recordOut = cells[0]+','+
cells[1]+','+    //you could validate this or any field
cells[2]+','+
path+','+
cells[3]+','+
cells[4]+','+
cells[5]+','+
cells[6]+','+
cells[7]
            outputStream.write(recordOut.getBytes(StandardCharsets.UTF_8))
            recordOut = ''

    }
    catch(e) {
    log.error("Error during processing of validate.groovy", e)
    session.transfer(inputStream, REL_FAILURE)
    fail=true
    }
} as StreamCallback)
if(fail){
session.transfer(flowFile, REL_FAILURE)
fail = false
} else {
session.transfer(flowFile, REL_SUCCESS)
}

avatar
Expert Contributor

Forgive my incompetence in groovy but the job cuts off a lot of data

i have 147 columns in the file so i want to iterate true the columns, and add columns ind the end of the line

def recordIn = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
def cells = recordIn.split('	')

	for (column in cells) {
		recordOut += column + '	' // do stuff on each column
		// need to detect linefeed to determine newline ? 
		}
	// add new column 
		recordOut += path
		
            outputStream.write(recordOut.getBytes(StandardCharsets.UTF_8))
            recordOut = ''



I am not sure how i should understand data from InputStream , does it stream one char at the time or does it stream lines, normally i would iterate through lines, and iterate insite that loop through columns

2017-05-16 11:29:34,245 ERROR [Timer-Driven Process Thread-4] o.a.nifi.processors.script.ExecuteScript
org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: groovy.lang.MissingMethodException: No signature of method: org.apache.nifi.controller.repository.StandardProcessSession.transfer() is applicable for argument types: (org.apache.nifi.controller.repository.io.FlowFileAccessInputStream, org.apache.nifi.processor.Relationship) values: [org.apache.nifi.controller.repository.io.FlowFileAccessInputStream@702d53aa, ...]
Possible solutions: transfer(java.util.Collection, org.apache.nifi.processor.Relationship), transfer(org.apache.nifi.flowfile.FlowFile, org.apache.nifi.processor.Relationship), transfer(java.util.Collection), transfer(org.apache.nifi.flowfile.FlowFile)
        at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:214) ~[na:na]
        at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1099) ~[nifi-framework-core-1.1.0.2.1.1.0-2.jar:1.1.0.2.1.1.0-2]
        at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.1.0.2.1.1.0-2.jar:1.1.0.2.1.1.0-2]
        at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.1.0.2.1.1.0-2.jar:1.1.0.2.1.1.0-2]
        at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) [nifi-framework-core-1.1.0.2.1.1.0-2.jar:1.1.0.2.1.1.0-2]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_77]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_77]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_77]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_77]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_77]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_77]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]
Caused by: javax.script.ScriptException: javax.script.ScriptException: groovy.lang.MissingMethodException: No signature of method: org.apache.nifi.controller.repository.StandardProces

avatar
Guru

To operate on one line at a time, before ExecuteScript processor use a SplitText processor (this will feed your script single lines) and after your ExecuteScript use a MergeText (to append emitted lines into one flow file). In ExecuteScript, the code should be something like:

    def output = ""
     cells.each(){it ->
         output = output + it + "\t" // do something
    }
    output = output + path + "\n"

If you need to know which cell you are on, you can use a counter like def i = 0 and increment in the loop

avatar
Expert Contributor

Hi , I am nearly there but i still miss something out to get it totally right

I have my flow built as

listHDFS->fetchHdfs->SplitText on lines->executescript on eatch lines-mergecontent->putfiles

When i run the flow without executescript everything looks fine, the file is similar after merge as i was before split. But when i run the files through my script data is somehow stripped of, and i cannot figure out why. It is standard files

UTF-8 Unicode text, with very long lines separated by tabulator.

The full script

import org.apache.commons.io.IOUtils
import java.nio.charset.*
 
def flowFile = session.get()
if(!flowFile) return
 
def path = flowFile.getAttribute('path')
def fail = false
flowFile = session.write(flowFile, {inputStream, outputStream ->
    try {
def recordIn = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
def cells = recordIn.split('	') ;


	 def output = ""
     cells.each(){it ->
         output = output + it + "\t" // do something
    }
    output = output + path + "\n"






            outputStream.write(output.getBytes(StandardCharsets.UTF_8))
            recordOut = ''
 
    }
    catch(e) {
    log.error("Error during processing of validate.groovy", e)
    session.transfer(inputStream, REL_FAILURE)
    fail=true
    }
} as StreamCallback)
if(fail){
session.transfer(flowFile, REL_FAILURE)
fail = false
} else {
session.transfer(flowFile, REL_SUCCESS)
}

It seems like something is cutoff in this line

outputStream.write(output.getBytes(StandardCharsets.UTF_8))

I Tried to change it to

outputStream.write(recordIn.getBytes(StandardCharsets.UTF_8))

with same result.

avatar
Expert Contributor

@Greg Keys thank you for your reply, still facing odd behaviour loosing data ind the inputstream, think it is related to howe write outstream works. when i split and merge without execute script everything is ok, when putting my scriopt in between the output number of lines seems random sometimes less, sometime more than the original flowfile. Think it something related to memory in datastream. Anyway i go about using hive to this its easier