Support Questions

Find answers, ask questions, and share your expertise

Nifi Will the ExecuteGroovyScript process the queue sequentially, or can it handle multiple queues at the same time?

avatar
Explorer

Hello everyone,
I am working with Apache NiFi and I encountered the following issue:
The file I need to get is CaryIL-ASTUDENTDI-2025-03-18-083011.csv. The number of files in the list will change daily, and my goal is to get the file with the latest timestamp in the format yyyy-MM-dd-HHmmss.

Specifically, I have 5 queues containing FlowFiles, and I want to merge all the FlowFiles into a single queue. After collecting the FlowFiles from these queues, I want to select the file with the most recent date based on the file name (which follows the yyyy-MM-dd-HHmmss.csv format), and move the FlowFile with the latest date into one queue for further processing. Please refer to the attached file.

I also tried passing them through an ExecuteGroovyScript, but it seems that it processes each queue sequentially, not merging the 5 queues into one and then finding the Latest Date time. Below is the ExecuteGroovyScript code snippet.

 

 

def flowFiles = session.get()
if (flowFiles != null && flowFiles.iterator().hasNext()) {
    def maxLastModified = null
    def maxFile = null
    def maxFileDate = null
    def fileList = new StringBuilder()

    flowFiles.each { flowFile ->
        def filename = flowFile.getAttribute('filename')
        def lastModified = flowFile.getAttribute('s3.lastModified')
        log.info("Xử lý tệp: " + filename + ", s3.lastModified: " + lastModified)

        if (lastModified != null) {
            fileList.append("Filename: ${filename}, s3.lastModified: ${lastModified}\n")
            if (maxLastModified == null || Long.parseLong(lastModified) > Long.parseLong(maxLastModified)) {
                maxLastModified = lastModified
                maxFile = flowFile
                maxFileDate = new Date(Long.parseLong(lastModified)).format("yyyy-MM-dd-HHmmss")
            }
        }
    }

    if (maxFile != null && maxFileDate != null) {
        def newFlowFile = session.create()
        newFlowFile = session.putAttribute(newFlowFile, "filename", maxFile.getAttribute('filename'))
        newFlowFile = session.putAttribute(newFlowFile, "s3.bucket", maxFile.getAttribute('s3.bucket'))
        newFlowFile = session.putAttribute(newFlowFile, "s3.region", maxFile.getAttribute('s3.region'))
        newFlowFile = session.putAttribute(newFlowFile, "file_date", maxFileDate)
        newFlowFile = session.putAttribute(newFlowFile, "max_file_date", maxFileDate)
        newFlowFile = session.putAttribute(newFlowFile, "latest_date", maxFileDate)
        newFlowFile = session.putAttribute(newFlowFile, "s3.lastModified", maxLastModified)
        newFlowFile = session.write(newFlowFile, { outputStream ->
            outputStream.write(fileList.toString().getBytes())
        } as OutputStreamCallback)

        session.transfer(newFlowFile, REL_SUCCESS)
        log.info("Tạo FlowFile mới cho tệp mới nhất: " + maxFile.getAttribute('filename') + ", max_file_date: " + maxFileDate)
    } else {
        log.warn("Không tìm thấy s3.lastModified hợp lệ. Không có tệp nào được xử lý.")
        flowFiles.each { flowFile ->
            session.transfer(flowFile, REL_FAILURE)
        }
    }

    flowFiles.each { flowFile ->
        session.remove(flowFile)
    }

    log.info("Danh sách tất cả tệp đã xử lý: " + fileList.toString())
} else {
    log.warn("Không có FlowFiles nào trong hàng đợi.")
}

 

 

 

0 REPLIES 0