Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Reading file from folder and creating new flowfile giving error

Explorer

 

Hi All,

 

Getting error in executing below script thru Execute Script processor.

Requirement : upper flow will copy files to some folder.. trying to read files from folder and post them to http processor. For each file i am trying to create a new flowfile and move the same to HTTP processor. But below code failing. Can you please suggest what i am missing.

--------------------------------
import java.nio.charset.StandardCharsets
def flowFile = session.get()
if(!flowFile) return
def flowFiles = [] as List<FlowFile>
def inputStream = session.read(flowFile)
def list = []
def dir = new File("/root/file_data/poc/output/")
dir.eachFileRecurse (FileType.FILES) { file ->
       list << file
}
list.each{ i ->
        def newFlowFile = session.create(flowFile)
        newFlowFile = session.write(newFlowFile, { outputStream ->
             outputStream.write( i.getBytes(StandardCharsets.UTF_8))
           } as OutputStreamCallback)
        flowFiles << newFlowFile
}

session. Transfer(flowFiles, REL_SUCCESS)
session.remove(flowFile)

 

Thanks in advance!

5 REPLIES 5

Expert Contributor

@rangareddyy   I recently did something similar, and each time, in the code, that i wanted to send flowfile i used:

 

session.transfer(flowFile2, REL_SUCCESS)
session.commit()

 It is my understanding in a single flowfile example, the commit() is inferred upon script completion.

Explorer

Thanks @steven-matison its is working now..

import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
import java.nio.charset.*
import groovy.io.FileType
import java.io.*

def flowFile = session.get()
if(!flowFile) return
def flowFiles = [] as List<FlowFile>
def list = []
def dir = new File("/zyme_shared/nas_data/poc/zgw/output1/")
dir.eachFileRecurse (FileType.FILES) { file ->
list << file
}
list.each{ i ->
def flowFile2 = session.create(flowFile)
flowFile2 = session.putAttribute(flowFile2, "filename" , i.name)
flowFile2 = session.write(flowFile2, { outputStream ->
outputStream.write(i.getBytes())
} as OutputStreamCallback)
flowFiles << flowFile2
}

session.transfer(flowFiles, REL_SUCCESS)
session.remove(flowFile)
session.commit()

Expert Contributor

This is a groovy error.  I would suggest having a working groovy script executeable outside of nifi before trying to modify and execute that code in the context of ExecuteScript.

Expert Contributor

@rangareddyy You will need to add attributes to the flowfile:

flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')

Reference:  

https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-1/ta-p/248922

Expert Contributor

I think you will also need to do the transfer/commit in the each list logic

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.