Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Reading file from folder and creating new flowfile giving error

avatar
Explorer

 

Hi All,

 

Getting error in executing below script thru Execute Script processor.

Requirement : upper flow will copy files to some folder.. trying to read files from folder and post them to http processor. For each file i am trying to create a new flowfile and move the same to HTTP processor. But below code failing. Can you please suggest what i am missing.

--------------------------------
import java.nio.charset.StandardCharsets
def flowFile = session.get()
if(!flowFile) return
def flowFiles = [] as List<FlowFile>
def inputStream = session.read(flowFile)
def list = []
def dir = new File("/root/file_data/poc/output/")
dir.eachFileRecurse (FileType.FILES) { file ->
       list << file
}
list.each{ i ->
        def newFlowFile = session.create(flowFile)
        newFlowFile = session.write(newFlowFile, { outputStream ->
             outputStream.write( i.getBytes(StandardCharsets.UTF_8))
           } as OutputStreamCallback)
        flowFiles << newFlowFile
}

session. Transfer(flowFiles, REL_SUCCESS)
session.remove(flowFile)

 

Thanks in advance!

5 REPLIES 5

avatar

@rangareddyy   I recently did something similar, and each time, in the code, that i wanted to send flowfile i used:

 

session.transfer(flowFile2, REL_SUCCESS)
session.commit()

 It is my understanding in a single flowfile example, the commit() is inferred upon script completion.

avatar
Explorer

Thanks @steven-matison its is working now..

import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
import java.nio.charset.*
import groovy.io.FileType
import java.io.*

def flowFile = session.get()
if(!flowFile) return
def flowFiles = [] as List<FlowFile>
def list = []
def dir = new File("/zyme_shared/nas_data/poc/zgw/output1/")
dir.eachFileRecurse (FileType.FILES) { file ->
list << file
}
list.each{ i ->
def flowFile2 = session.create(flowFile)
flowFile2 = session.putAttribute(flowFile2, "filename" , i.name)
flowFile2 = session.write(flowFile2, { outputStream ->
outputStream.write(i.getBytes())
} as OutputStreamCallback)
flowFiles << flowFile2
}

session.transfer(flowFiles, REL_SUCCESS)
session.remove(flowFile)
session.commit()

avatar

This is a groovy error.  I would suggest having a working groovy script executeable outside of nifi before trying to modify and execute that code in the context of ExecuteScript.

avatar

@rangareddyy You will need to add attributes to the flowfile:

flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')

Reference:  

https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-1/ta-p/248922

avatar

I think you will also need to do the transfer/commit in the each list logic