Member since
11-03-2024
5
Posts
3
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
435 | 12-04-2024 08:19 PM |
03-18-2025
09:07 PM
Hello everyone, I am working with Apache NiFi and I encountered the following issue: The file I need to get is CaryIL-ASTUDENTDI-2025-03-18-083011.csv. The number of files in the list will change daily, and my goal is to get the file with the latest timestamp in the format yyyy-MM-dd-HHmmss. Specifically, I have 5 queues containing FlowFiles, and I want to merge all the FlowFiles into a single queue. After collecting the FlowFiles from these queues, I want to select the file with the most recent date based on the file name (which follows the yyyy-MM-dd-HHmmss.csv format), and move the FlowFile with the latest date into one queue for further processing. Please refer to the attached file. I also tried passing them through an ExecuteGroovyScript, but it seems that it processes each queue sequentially, not merging the 5 queues into one and then finding the Latest Date time. Below is the ExecuteGroovyScript code snippet. def flowFiles = session.get()
if (flowFiles != null && flowFiles.iterator().hasNext()) {
def maxLastModified = null
def maxFile = null
def maxFileDate = null
def fileList = new StringBuilder()
flowFiles.each { flowFile ->
def filename = flowFile.getAttribute('filename')
def lastModified = flowFile.getAttribute('s3.lastModified')
log.info("Xử lý tệp: " + filename + ", s3.lastModified: " + lastModified)
if (lastModified != null) {
fileList.append("Filename: ${filename}, s3.lastModified: ${lastModified}\n")
if (maxLastModified == null || Long.parseLong(lastModified) > Long.parseLong(maxLastModified)) {
maxLastModified = lastModified
maxFile = flowFile
maxFileDate = new Date(Long.parseLong(lastModified)).format("yyyy-MM-dd-HHmmss")
}
}
}
if (maxFile != null && maxFileDate != null) {
def newFlowFile = session.create()
newFlowFile = session.putAttribute(newFlowFile, "filename", maxFile.getAttribute('filename'))
newFlowFile = session.putAttribute(newFlowFile, "s3.bucket", maxFile.getAttribute('s3.bucket'))
newFlowFile = session.putAttribute(newFlowFile, "s3.region", maxFile.getAttribute('s3.region'))
newFlowFile = session.putAttribute(newFlowFile, "file_date", maxFileDate)
newFlowFile = session.putAttribute(newFlowFile, "max_file_date", maxFileDate)
newFlowFile = session.putAttribute(newFlowFile, "latest_date", maxFileDate)
newFlowFile = session.putAttribute(newFlowFile, "s3.lastModified", maxLastModified)
newFlowFile = session.write(newFlowFile, { outputStream ->
outputStream.write(fileList.toString().getBytes())
} as OutputStreamCallback)
session.transfer(newFlowFile, REL_SUCCESS)
log.info("Tạo FlowFile mới cho tệp mới nhất: " + maxFile.getAttribute('filename') + ", max_file_date: " + maxFileDate)
} else {
log.warn("Không tìm thấy s3.lastModified hợp lệ. Không có tệp nào được xử lý.")
flowFiles.each { flowFile ->
session.transfer(flowFile, REL_FAILURE)
}
}
flowFiles.each { flowFile ->
session.remove(flowFile)
}
log.info("Danh sách tất cả tệp đã xử lý: " + fileList.toString())
} else {
log.warn("Không có FlowFiles nào trong hàng đợi.")
}
... View more
Labels:
- Labels:
-
Apache NiFi
12-04-2024
08:19 PM
1 Kudo
@SAMSAL MapCacheClientService provides the ability to communicate with a MapCacheServer. I can find MapCacheServer in Add Controller Service It seems like I have found a solution to my issue. Thank you
... View more
12-04-2024
01:28 AM
1 Kudo
I am facing an issue with Notify and Wait starting from version 2.0.0. In order for them to listen to each other in version 1.28, I need to configure them with the MapCacheClientService 2.0.0 and MapCacheServer 2.0.0 services. However, in version 2.0.0, although I can still find both services in the documentation, in practice, the configuration only includes the MapCacheClientService 2.0.0 service, and I can no longer find the MapCacheServer 2.0.0 service. Using Wait and Notify in version 2.0.0 seems much more difficult compared to previous versions. I spent two days trying to figure it out, but I haven't made any progress. If possible, could you please guide me on the correct configuration so that Wait and Notify can work properly, like they did in the older versions?
... View more
Labels:
- Labels:
-
Apache NiFi
11-28-2024
08:27 PM
1 Kudo
I tried to delete the data you mentioned, but I don't know how to edit the topic. Thank you very much for your support.
... View more
11-28-2024
01:58 AM
My NiFi flow fails when encountering a CSV with a column containing a double quote within a string, such as: "Protection from Abuse Order on file against dad Raul Martinez Lopez." NO CONTACT WITH DAD. 12/16/2014 kb. The error is occurring at the Record Reader stage. Has anyone else successfully handled CSV data with embedded double quotes? From csv: My record reader config
... View more
Labels:
- Labels:
-
Apache NiFi