Member since
01-14-2022
14
Posts
6
Kudos Received
2
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 6878 | 02-14-2022 03:40 AM | |
| 4090 | 01-16-2022 09:48 PM |
02-14-2022
10:08 AM
I used GetHDFSFileInfo to get the numbe of incoming files with hdfs.count.files attribute Then at the end of the dataflow I move the processed files into a separate folder so only files to merge stay in the root folder. Thanks to @OliverGong for the hint 🙂
... View more
01-19-2022
07:51 AM
For others that stumble across this.. I ended up delving into scripting processors and implemented a script that does the batching: var IOUtils = Java.type("org.apache.commons.io.IOUtils")
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")
// Grab 50 flow files from the input queue (or whatever's available)
flowFileList = session.get(50)
if(!flowFileList.isEmpty()) {
var ids = [];
for each (var flowFile in flowFileList) {
var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback")
// Create a new InputStreamCallback, passing in a function to define the interface method
session.read(flowFile, new InputStreamCallback(function(inputStream) {
// Get the JSON out of the flowfile and conver to a JS object
var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
var obj = JSON.parse(text);
if (obj.hasOwnProperty('SourceDataElementValue')) {
ids.push(obj.SourceDataElementValue);
}
// Do something with text here
}));
// Eat the flowfile after the TML ID is extracted.
session.remove(flowFile);
}
if (ids.length > 0) {
attributeValue = ids.join();
outputFlowFile = session.create();
outputFlowFile = session.putAttribute(outputFlowFile, 'tml_list', attributeValue);
session.transfer(outputFlowFile, REL_SUCCESS)
}
}
... View more
01-17-2022
06:46 AM
1 Kudo
Glad to hear that works for you! Though we can split the raw csv or json content into a smaller size just in case of OOM issue when doing the shifting stuff on JoltTransformJSON, it may cause other issue like: - result from JoltTransformJSON is not complete. That means, some part of the same user data ( with different bill_date) may be wrapped in the other flow files. In such case, we will need to merge them back as a whole. - It would be better if we can keep raw csv data in some DB table. Then drag out the data rows with a specified limit--> we can use "split pages" way to query on such table, - We can then easily fetch a logical completed data to do the rest shift things. With such completed result, we don't need to worry about the data is not an info-incomplete one.
... View more