Created 03-18-2025 09:07 PM
Hello everyone,
I am working with Apache NiFi and I encountered the following issue:
The file I need to get is CaryIL-ASTUDENTDI-2025-03-18-083011.csv. The number of files in the list will change daily, and my goal is to get the file with the latest timestamp in the format yyyy-MM-dd-HHmmss.
Specifically, I have 5 queues containing FlowFiles, and I want to merge all the FlowFiles into a single queue. After collecting the FlowFiles from these queues, I want to select the file with the most recent date based on the file name (which follows the yyyy-MM-dd-HHmmss.csv format), and move the FlowFile with the latest date into one queue for further processing. Please refer to the attached file.
I also tried passing them through an ExecuteGroovyScript, but it seems that it processes each queue sequentially, not merging the 5 queues into one and then finding the Latest Date time. Below is the ExecuteGroovyScript code snippet.
def flowFiles = session.get()
if (flowFiles != null && flowFiles.iterator().hasNext()) {
    def maxLastModified = null
    def maxFile = null
    def maxFileDate = null
    def fileList = new StringBuilder()
    flowFiles.each { flowFile ->
        def filename = flowFile.getAttribute('filename')
        def lastModified = flowFile.getAttribute('s3.lastModified')
        log.info("Xử lý tệp: " + filename + ", s3.lastModified: " + lastModified)
        if (lastModified != null) {
            fileList.append("Filename: ${filename}, s3.lastModified: ${lastModified}\n")
            if (maxLastModified == null || Long.parseLong(lastModified) > Long.parseLong(maxLastModified)) {
                maxLastModified = lastModified
                maxFile = flowFile
                maxFileDate = new Date(Long.parseLong(lastModified)).format("yyyy-MM-dd-HHmmss")
            }
        }
    }
    if (maxFile != null && maxFileDate != null) {
        def newFlowFile = session.create()
        newFlowFile = session.putAttribute(newFlowFile, "filename", maxFile.getAttribute('filename'))
        newFlowFile = session.putAttribute(newFlowFile, "s3.bucket", maxFile.getAttribute('s3.bucket'))
        newFlowFile = session.putAttribute(newFlowFile, "s3.region", maxFile.getAttribute('s3.region'))
        newFlowFile = session.putAttribute(newFlowFile, "file_date", maxFileDate)
        newFlowFile = session.putAttribute(newFlowFile, "max_file_date", maxFileDate)
        newFlowFile = session.putAttribute(newFlowFile, "latest_date", maxFileDate)
        newFlowFile = session.putAttribute(newFlowFile, "s3.lastModified", maxLastModified)
        newFlowFile = session.write(newFlowFile, { outputStream ->
            outputStream.write(fileList.toString().getBytes())
        } as OutputStreamCallback)
        session.transfer(newFlowFile, REL_SUCCESS)
        log.info("Tạo FlowFile mới cho tệp mới nhất: " + maxFile.getAttribute('filename') + ", max_file_date: " + maxFileDate)
    } else {
        log.warn("Không tìm thấy s3.lastModified hợp lệ. Không có tệp nào được xử lý.")
        flowFiles.each { flowFile ->
            session.transfer(flowFile, REL_FAILURE)
        }
    }
    flowFiles.each { flowFile ->
        session.remove(flowFile)
    }
    log.info("Danh sách tất cả tệp đã xử lý: " + fileList.toString())
} else {
    log.warn("Không có FlowFiles nào trong hàng đợi.")
}
Created 10-25-2025 09:04 PM
Use the Funnel processor to merge the different queues into a single one and then you can specify more than one file for the get
def flowFiles = session.get(10)
 
					
				
				
			
		
