Member since
11-03-2024
5
Posts
3
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1231 | 12-04-2024 08:19 PM |
03-18-2025
09:07 PM
Hello everyone, I am working with Apache NiFi and I encountered the following issue: The file I need to get is CaryIL-ASTUDENTDI-2025-03-18-083011.csv. The number of files in the list will change daily, and my goal is to get the file with the latest timestamp in the format yyyy-MM-dd-HHmmss. Specifically, I have 5 queues containing FlowFiles, and I want to merge all the FlowFiles into a single queue. After collecting the FlowFiles from these queues, I want to select the file with the most recent date based on the file name (which follows the yyyy-MM-dd-HHmmss.csv format), and move the FlowFile with the latest date into one queue for further processing. Please refer to the attached file. I also tried passing them through an ExecuteGroovyScript, but it seems that it processes each queue sequentially, not merging the 5 queues into one and then finding the Latest Date time. Below is the ExecuteGroovyScript code snippet. def flowFiles = session.get()
if (flowFiles != null && flowFiles.iterator().hasNext()) {
def maxLastModified = null
def maxFile = null
def maxFileDate = null
def fileList = new StringBuilder()
flowFiles.each { flowFile ->
def filename = flowFile.getAttribute('filename')
def lastModified = flowFile.getAttribute('s3.lastModified')
log.info("Xử lý tệp: " + filename + ", s3.lastModified: " + lastModified)
if (lastModified != null) {
fileList.append("Filename: ${filename}, s3.lastModified: ${lastModified}\n")
if (maxLastModified == null || Long.parseLong(lastModified) > Long.parseLong(maxLastModified)) {
maxLastModified = lastModified
maxFile = flowFile
maxFileDate = new Date(Long.parseLong(lastModified)).format("yyyy-MM-dd-HHmmss")
}
}
}
if (maxFile != null && maxFileDate != null) {
def newFlowFile = session.create()
newFlowFile = session.putAttribute(newFlowFile, "filename", maxFile.getAttribute('filename'))
newFlowFile = session.putAttribute(newFlowFile, "s3.bucket", maxFile.getAttribute('s3.bucket'))
newFlowFile = session.putAttribute(newFlowFile, "s3.region", maxFile.getAttribute('s3.region'))
newFlowFile = session.putAttribute(newFlowFile, "file_date", maxFileDate)
newFlowFile = session.putAttribute(newFlowFile, "max_file_date", maxFileDate)
newFlowFile = session.putAttribute(newFlowFile, "latest_date", maxFileDate)
newFlowFile = session.putAttribute(newFlowFile, "s3.lastModified", maxLastModified)
newFlowFile = session.write(newFlowFile, { outputStream ->
outputStream.write(fileList.toString().getBytes())
} as OutputStreamCallback)
session.transfer(newFlowFile, REL_SUCCESS)
log.info("Tạo FlowFile mới cho tệp mới nhất: " + maxFile.getAttribute('filename') + ", max_file_date: " + maxFileDate)
} else {
log.warn("Không tìm thấy s3.lastModified hợp lệ. Không có tệp nào được xử lý.")
flowFiles.each { flowFile ->
session.transfer(flowFile, REL_FAILURE)
}
}
flowFiles.each { flowFile ->
session.remove(flowFile)
}
log.info("Danh sách tất cả tệp đã xử lý: " + fileList.toString())
} else {
log.warn("Không có FlowFiles nào trong hàng đợi.")
}
... View more
Labels:
- Labels:
-
Apache NiFi
12-04-2024
08:19 PM
1 Kudo
@SAMSAL MapCacheClientService provides the ability to communicate with a MapCacheServer. I can find MapCacheServer in Add Controller Service It seems like I have found a solution to my issue. Thank you
... View more
11-28-2024
08:27 PM
1 Kudo
I tried to delete the data you mentioned, but I don't know how to edit the topic. Thank you very much for your support.
... View more