Support Questions

Find answers, ask questions, and share your expertise
Announcements
We’ve updated our product names and community labels - click here for full details

Reading file from folder and creating new flowfile giving error

avatar
Explorer

 

Hi All,

 

Getting error in executing below script thru Execute Script processor.

Requirement : upper flow will copy files to some folder.. trying to read files from folder and post them to http processor. For each file i am trying to create a new flowfile and move the same to HTTP processor. But below code failing. Can you please suggest what i am missing.

--------------------------------
import java.nio.charset.StandardCharsets
def flowFile = session.get()
if(!flowFile) return
def flowFiles = [] as List<FlowFile>
def inputStream = session.read(flowFile)
def list = []
def dir = new File("/root/file_data/poc/output/")
dir.eachFileRecurse (FileType.FILES) { file ->
       list << file
}
list.each{ i ->
        def newFlowFile = session.create(flowFile)
        newFlowFile = session.write(newFlowFile, { outputStream ->
             outputStream.write( i.getBytes(StandardCharsets.UTF_8))
           } as OutputStreamCallback)
        flowFiles << newFlowFile
}

session. Transfer(flowFiles, REL_SUCCESS)
session.remove(flowFile)

 

Thanks in advance!

5 REPLIES 5

avatar

@rangareddyy   I recently did something similar, and each time, in the code, that i wanted to send flowfile i used:

 

session.transfer(flowFile2, REL_SUCCESS)
session.commit()

 It is my understanding in a single flowfile example, the commit() is inferred upon script completion.

avatar
Explorer

Thanks @steven-matison its is working now..

import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
import java.nio.charset.*
import groovy.io.FileType
import java.io.*

def flowFile = session.get()
if(!flowFile) return
def flowFiles = [] as List<FlowFile>
def list = []
def dir = new File("/zyme_shared/nas_data/poc/zgw/output1/")
dir.eachFileRecurse (FileType.FILES) { file ->
list << file
}
list.each{ i ->
def flowFile2 = session.create(flowFile)
flowFile2 = session.putAttribute(flowFile2, "filename" , i.name)
flowFile2 = session.write(flowFile2, { outputStream ->
outputStream.write(i.getBytes())
} as OutputStreamCallback)
flowFiles << flowFile2
}

session.transfer(flowFiles, REL_SUCCESS)
session.remove(flowFile)
session.commit()

avatar

This is a groovy error.  I would suggest having a working groovy script executeable outside of nifi before trying to modify and execute that code in the context of ExecuteScript.

avatar

@rangareddyy You will need to add attributes to the flowfile:

flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')

Reference:  

https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-1/ta-p/248922

avatar

I think you will also need to do the transfer/commit in the each list logic