Created 05-15-2017 12:23 PM
Hi,
I am running a nififlow which ingest tabular separated text files into hdfs. I would lige to
add columns to each files with flowfile attributes like ${path}.${filename} and so on
Test every column for chars that i don't want to ingest
Not having experience with groovy i wondered if this could be achieved by writing a groovy script
I have been playing around without any good result until now, i could be nice with a small help to guide me in the right direction.
import java.nio.charset.StandardCharsets def flowFile = session.get() if(!flowFile) return flowFile = session.write(flowFile, {inputStream, outputStream -> inputStream.eachLine { line -> a = line.tokenize('\t') a = a + ' flowfile' *trying to add column } } as StreamCallback) session.transfer(flowFile, REL_SUCCESS) session.commit()
Created 05-15-2017 01:06 PM
Use this to pull the flow file attributes into groovy:
def path = flowFile.getAttribute('path') def filename = flowFile.getAttribute('filename')
After that it is pure groovy string manipulation to add to column, remove values, etc
Note when you tokenize you have an List where each field is indexed (e.g. a[0], a[1] etc. To add a field you would use a.add(path). After adding new fields or manipulating old fields you would have to reconstruct the string as tab-delim record.
You would then have to write to the OutputStream, catch errors, and set the session failure or success.
This is code is similar to what you would do. (This code emits each record as a flowfile; if you wanted to emit the full recordset you would concatenate each record into one string with a newline at the end of each record except the end.)
import org.apache.commons.io.IOUtils import java.nio.charset.* def flowFile = session.get() if(!flowFile) return def path = flowFile.getAttribute('path') def fail = false flowFile = session.write(flowFile, {inputStream, outputStream -> try { def recordIn = IOUtils.toString(inputStream, StandardCharsets.UTF_8) def cells = recordIn.split(',') def recordOut = cells[0]+','+ cells[1]+','+ //you could validate this or any field cells[2]+','+ path+','+ cells[3]+','+ cells[4]+','+ cells[5]+','+ cells[6]+','+ cells[7] outputStream.write(recordOut.getBytes(StandardCharsets.UTF_8)) recordOut = '' } catch(e) { log.error("Error during processing of validate.groovy", e) session.transfer(inputStream, REL_FAILURE) fail=true } } as StreamCallback) if(fail){ session.transfer(flowFile, REL_FAILURE) fail = false } else { session.transfer(flowFile, REL_SUCCESS) }
Created 05-15-2017 01:06 PM
Use this to pull the flow file attributes into groovy:
def path = flowFile.getAttribute('path') def filename = flowFile.getAttribute('filename')
After that it is pure groovy string manipulation to add to column, remove values, etc
Note when you tokenize you have an List where each field is indexed (e.g. a[0], a[1] etc. To add a field you would use a.add(path). After adding new fields or manipulating old fields you would have to reconstruct the string as tab-delim record.
You would then have to write to the OutputStream, catch errors, and set the session failure or success.
This is code is similar to what you would do. (This code emits each record as a flowfile; if you wanted to emit the full recordset you would concatenate each record into one string with a newline at the end of each record except the end.)
import org.apache.commons.io.IOUtils import java.nio.charset.* def flowFile = session.get() if(!flowFile) return def path = flowFile.getAttribute('path') def fail = false flowFile = session.write(flowFile, {inputStream, outputStream -> try { def recordIn = IOUtils.toString(inputStream, StandardCharsets.UTF_8) def cells = recordIn.split(',') def recordOut = cells[0]+','+ cells[1]+','+ //you could validate this or any field cells[2]+','+ path+','+ cells[3]+','+ cells[4]+','+ cells[5]+','+ cells[6]+','+ cells[7] outputStream.write(recordOut.getBytes(StandardCharsets.UTF_8)) recordOut = '' } catch(e) { log.error("Error during processing of validate.groovy", e) session.transfer(inputStream, REL_FAILURE) fail=true } } as StreamCallback) if(fail){ session.transfer(flowFile, REL_FAILURE) fail = false } else { session.transfer(flowFile, REL_SUCCESS) }
Created 05-16-2017 11:36 AM
Forgive my incompetence in groovy but the job cuts off a lot of data
i have 147 columns in the file so i want to iterate true the columns, and add columns ind the end of the line
def recordIn = IOUtils.toString(inputStream, StandardCharsets.UTF_8) def cells = recordIn.split(' ') for (column in cells) { recordOut += column + ' ' // do stuff on each column // need to detect linefeed to determine newline ? } // add new column recordOut += path outputStream.write(recordOut.getBytes(StandardCharsets.UTF_8)) recordOut = ''
I am not sure how i should understand data from InputStream , does it stream one char at the time or does it stream lines, normally i would iterate through lines, and iterate insite that loop through columns
2017-05-16 11:29:34,245 ERROR [Timer-Driven Process Thread-4] o.a.nifi.processors.script.ExecuteScript org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: groovy.lang.MissingMethodException: No signature of method: org.apache.nifi.controller.repository.StandardProcessSession.transfer() is applicable for argument types: (org.apache.nifi.controller.repository.io.FlowFileAccessInputStream, org.apache.nifi.processor.Relationship) values: [org.apache.nifi.controller.repository.io.FlowFileAccessInputStream@702d53aa, ...] Possible solutions: transfer(java.util.Collection, org.apache.nifi.processor.Relationship), transfer(org.apache.nifi.flowfile.FlowFile, org.apache.nifi.processor.Relationship), transfer(java.util.Collection), transfer(org.apache.nifi.flowfile.FlowFile) at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:214) ~[na:na] at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1099) ~[nifi-framework-core-1.1.0.2.1.1.0-2.jar:1.1.0.2.1.1.0-2] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.1.0.2.1.1.0-2.jar:1.1.0.2.1.1.0-2] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.1.0.2.1.1.0-2.jar:1.1.0.2.1.1.0-2] at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) [nifi-framework-core-1.1.0.2.1.1.0-2.jar:1.1.0.2.1.1.0-2] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_77] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_77] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_77] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_77] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_77] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_77] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77] Caused by: javax.script.ScriptException: javax.script.ScriptException: groovy.lang.MissingMethodException: No signature of method: org.apache.nifi.controller.repository.StandardProces
Created 05-16-2017 12:01 PM
To operate on one line at a time, before ExecuteScript processor use a SplitText processor (this will feed your script single lines) and after your ExecuteScript use a MergeText (to append emitted lines into one flow file). In ExecuteScript, the code should be something like:
def output = "" cells.each(){it -> output = output + it + "\t" // do something } output = output + path + "\n"
If you need to know which cell you are on, you can use a counter like def i = 0 and increment in the loop
Created 05-17-2017 07:26 AM
Hi , I am nearly there but i still miss something out to get it totally right
I have my flow built as
listHDFS->fetchHdfs->SplitText on lines->executescript on eatch lines-mergecontent->putfiles
When i run the flow without executescript everything looks fine, the file is similar after merge as i was before split. But when i run the files through my script data is somehow stripped of, and i cannot figure out why. It is standard files
UTF-8 Unicode text, with very long lines separated by tabulator.
The full script
import org.apache.commons.io.IOUtils import java.nio.charset.* def flowFile = session.get() if(!flowFile) return def path = flowFile.getAttribute('path') def fail = false flowFile = session.write(flowFile, {inputStream, outputStream -> try { def recordIn = IOUtils.toString(inputStream, StandardCharsets.UTF_8) def cells = recordIn.split(' ') ; def output = "" cells.each(){it -> output = output + it + "\t" // do something } output = output + path + "\n" outputStream.write(output.getBytes(StandardCharsets.UTF_8)) recordOut = '' } catch(e) { log.error("Error during processing of validate.groovy", e) session.transfer(inputStream, REL_FAILURE) fail=true } } as StreamCallback) if(fail){ session.transfer(flowFile, REL_FAILURE) fail = false } else { session.transfer(flowFile, REL_SUCCESS) }
It seems like something is cutoff in this line
outputStream.write(output.getBytes(StandardCharsets.UTF_8))
I Tried to change it to
outputStream.write(recordIn.getBytes(StandardCharsets.UTF_8))
with same result.
Created 05-19-2017 08:25 AM
@Greg Keys thank you for your reply, still facing odd behaviour loosing data ind the inputstream, think it is related to howe write outstream works. when i split and merge without execute script everything is ok, when putting my scriopt in between the output number of lines seems random sometimes less, sometime more than the original flowfile. Think it something related to memory in datastream. Anyway i go about using hive to this its easier