- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Nifi Will the ExecuteGroovyScript process the queue sequentially, or can it handle multiple queues at the same time?
- Labels:
-
Apache NiFi
Created 03-18-2025 09:07 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hello everyone,
I am working with Apache NiFi and I encountered the following issue:
The file I need to get is CaryIL-ASTUDENTDI-2025-03-18-083011.csv. The number of files in the list will change daily, and my goal is to get the file with the latest timestamp in the format yyyy-MM-dd-HHmmss.
Specifically, I have 5 queues containing FlowFiles, and I want to merge all the FlowFiles into a single queue. After collecting the FlowFiles from these queues, I want to select the file with the most recent date based on the file name (which follows the yyyy-MM-dd-HHmmss.csv format), and move the FlowFile with the latest date into one queue for further processing. Please refer to the attached file.
I also tried passing them through an ExecuteGroovyScript, but it seems that it processes each queue sequentially, not merging the 5 queues into one and then finding the Latest Date time. Below is the ExecuteGroovyScript code snippet.
def flowFiles = session.get()
if (flowFiles != null && flowFiles.iterator().hasNext()) {
def maxLastModified = null
def maxFile = null
def maxFileDate = null
def fileList = new StringBuilder()
flowFiles.each { flowFile ->
def filename = flowFile.getAttribute('filename')
def lastModified = flowFile.getAttribute('s3.lastModified')
log.info("Xử lý tệp: " + filename + ", s3.lastModified: " + lastModified)
if (lastModified != null) {
fileList.append("Filename: ${filename}, s3.lastModified: ${lastModified}\n")
if (maxLastModified == null || Long.parseLong(lastModified) > Long.parseLong(maxLastModified)) {
maxLastModified = lastModified
maxFile = flowFile
maxFileDate = new Date(Long.parseLong(lastModified)).format("yyyy-MM-dd-HHmmss")
}
}
}
if (maxFile != null && maxFileDate != null) {
def newFlowFile = session.create()
newFlowFile = session.putAttribute(newFlowFile, "filename", maxFile.getAttribute('filename'))
newFlowFile = session.putAttribute(newFlowFile, "s3.bucket", maxFile.getAttribute('s3.bucket'))
newFlowFile = session.putAttribute(newFlowFile, "s3.region", maxFile.getAttribute('s3.region'))
newFlowFile = session.putAttribute(newFlowFile, "file_date", maxFileDate)
newFlowFile = session.putAttribute(newFlowFile, "max_file_date", maxFileDate)
newFlowFile = session.putAttribute(newFlowFile, "latest_date", maxFileDate)
newFlowFile = session.putAttribute(newFlowFile, "s3.lastModified", maxLastModified)
newFlowFile = session.write(newFlowFile, { outputStream ->
outputStream.write(fileList.toString().getBytes())
} as OutputStreamCallback)
session.transfer(newFlowFile, REL_SUCCESS)
log.info("Tạo FlowFile mới cho tệp mới nhất: " + maxFile.getAttribute('filename') + ", max_file_date: " + maxFileDate)
} else {
log.warn("Không tìm thấy s3.lastModified hợp lệ. Không có tệp nào được xử lý.")
flowFiles.each { flowFile ->
session.transfer(flowFile, REL_FAILURE)
}
}
flowFiles.each { flowFile ->
session.remove(flowFile)
}
log.info("Danh sách tất cả tệp đã xử lý: " + fileList.toString())
} else {
log.warn("Không có FlowFiles nào trong hàng đợi.")
}
