Support Questions

Find answers, ask questions, and share your expertise

Nifi Will the ExecuteGroovyScript process the queue sequentially, or can it handle multiple queues at the same time?


Hello everyone,
I am working with Apache NiFi and I encountered the following issue:
The file I need to get is CaryIL-ASTUDENTDI-2025-03-18-083011.csv. The number of files in the list will change daily, and my goal is to get the file with the latest timestamp in the format yyyy-MM-dd-HHmmss.

Specifically, I have 5 queues containing FlowFiles, and I want to merge all the FlowFiles into a single queue. After collecting the FlowFiles from these queues, I want to select the file with the most recent date based on the file name (which follows the yyyy-MM-dd-HHmmss.csv format), and move the FlowFile with the latest date into one queue for further processing. Please refer to the attached file.

I also tried passing them through an ExecuteGroovyScript, but it seems that it processes each queue sequentially, not merging the 5 queues into one and then finding the Latest Date time. Below is the ExecuteGroovyScript code snippet.



def flowFiles = session.get()
if (flowFiles != null && flowFiles.iterator().hasNext()) {
    def maxLastModified = null
    def maxFile = null
    def maxFileDate = null
    def fileList = new StringBuilder()

    flowFiles.each { flowFile ->
        def filename = flowFile.getAttribute('filename')
        def lastModified = flowFile.getAttribute('s3.lastModified')"Xử lý tệp: " + filename + ", s3.lastModified: " + lastModified)

        if (lastModified != null) {
            fileList.append("Filename: ${filename}, s3.lastModified: ${lastModified}\n")
            if (maxLastModified == null || Long.parseLong(lastModified) > Long.parseLong(maxLastModified)) {
                maxLastModified = lastModified
                maxFile = flowFile
                maxFileDate = new Date(Long.parseLong(lastModified)).format("yyyy-MM-dd-HHmmss")

    if (maxFile != null && maxFileDate != null) {
        def newFlowFile = session.create()
        newFlowFile = session.putAttribute(newFlowFile, "filename", maxFile.getAttribute('filename'))
        newFlowFile = session.putAttribute(newFlowFile, "s3.bucket", maxFile.getAttribute('s3.bucket'))
        newFlowFile = session.putAttribute(newFlowFile, "s3.region", maxFile.getAttribute('s3.region'))
        newFlowFile = session.putAttribute(newFlowFile, "file_date", maxFileDate)
        newFlowFile = session.putAttribute(newFlowFile, "max_file_date", maxFileDate)
        newFlowFile = session.putAttribute(newFlowFile, "latest_date", maxFileDate)
        newFlowFile = session.putAttribute(newFlowFile, "s3.lastModified", maxLastModified)
        newFlowFile = session.write(newFlowFile, { outputStream ->
        } as OutputStreamCallback)

        session.transfer(newFlowFile, REL_SUCCESS)"Tạo FlowFile mới cho tệp mới nhất: " + maxFile.getAttribute('filename') + ", max_file_date: " + maxFileDate)
    } else {
        log.warn("Không tìm thấy s3.lastModified hợp lệ. Không có tệp nào được xử lý.")
        flowFiles.each { flowFile ->
            session.transfer(flowFile, REL_FAILURE)

    flowFiles.each { flowFile ->
    }"Danh sách tất cả tệp đã xử lý: " + fileList.toString())
} else {
    log.warn("Không có FlowFiles nào trong hàng đợi.")