Member since
01-14-2022
14
Posts
6
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
5130 | 02-14-2022 03:40 AM | |
2457 | 01-16-2022 09:48 PM |
02-14-2022
10:08 AM
I used GetHDFSFileInfo to get the numbe of incoming files with hdfs.count.files attribute Then at the end of the dataflow I move the processed files into a separate folder so only files to merge stay in the root folder. Thanks to @OliverGong for the hint 🙂
... View more
01-19-2022
07:51 AM
For others that stumble across this.. I ended up delving into scripting processors and implemented a script that does the batching: var IOUtils = Java.type("org.apache.commons.io.IOUtils")
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")
// Grab 50 flow files from the input queue (or whatever's available)
flowFileList = session.get(50)
if(!flowFileList.isEmpty()) {
var ids = [];
for each (var flowFile in flowFileList) {
var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback")
// Create a new InputStreamCallback, passing in a function to define the interface method
session.read(flowFile, new InputStreamCallback(function(inputStream) {
// Get the JSON out of the flowfile and conver to a JS object
var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
var obj = JSON.parse(text);
if (obj.hasOwnProperty('SourceDataElementValue')) {
ids.push(obj.SourceDataElementValue);
}
// Do something with text here
}));
// Eat the flowfile after the TML ID is extracted.
session.remove(flowFile);
}
if (ids.length > 0) {
attributeValue = ids.join();
outputFlowFile = session.create();
outputFlowFile = session.putAttribute(outputFlowFile, 'tml_list', attributeValue);
session.transfer(outputFlowFile, REL_SUCCESS)
}
}
... View more
01-19-2022
06:01 AM
@OliverGong I would avoid dataflow design when possible where you are extracting the entire contents of a FlowFile to FlowFile attribute(s). While FlowFile content only exists on disk (unless read in to memory by a processor during processing), FlowFile attributes are held in NiFi's JVM heap memory all the time (There is per connection swapping that happens when a specific connection reaches the swap threshold set in the nifi.properties file). FlowFiles with lots of attributes and/or large attribute values will consume considerable amounts of JVM heap which can lead to JVM Out Of Memory (OOM) exceptions, long stop-the-world JVM Garbage Collection (GC) events, etc... When options exist that avoid adding large attributes, those should be utilized. Thanks, Matt
... View more
01-17-2022
06:46 AM
1 Kudo
Glad to hear that works for you! Though we can split the raw csv or json content into a smaller size just in case of OOM issue when doing the shifting stuff on JoltTransformJSON, it may cause other issue like: - result from JoltTransformJSON is not complete. That means, some part of the same user data ( with different bill_date) may be wrapped in the other flow files. In such case, we will need to merge them back as a whole. - It would be better if we can keep raw csv data in some DB table. Then drag out the data rows with a specified limit--> we can use "split pages" way to query on such table, - We can then easily fetch a logical completed data to do the rest shift things. With such completed result, we don't need to worry about the data is not an info-incomplete one.
... View more
01-17-2022
01:55 AM
Thank you for your question. As you mentioned, the output flow file you want get should be with two attributes. Here you can try following: 1. Use EvaluateJsonPath processor, add two properties and configure it using JSONPath like below: Details on JSONPath, you may refer to https://github.com/json-path/JsonPath 2. Or you may try to define some customized script to get the expected output dynamically. e.g Using ExecuteScript with the corresponding module lib settings to indicate path of referenced jars. - Script Engine: Groovy - Module Directory: System Path which jars reside (or without specifying this path, you can also configure the JVM level to load it, either you set the NiFi bootstrap or place the jars under the bootstrap extended lib directory.) import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import org.apache.nifi.logging.ComponentLog
import groovy.json.JsonSlurper
import com.fasterxml.jackson.databind.ObjectMapper
//Specify the module directory or configure the jvm classpath for current script to load and use such jars(jackson-annotations-2.9.0.jar,jackson-core-2.9.5.jar,jackson-databind-2.9.5.jar)
flowFile = session.get()
if(!flowFile) return
def inputStr=''
def jsonSlurper = new JsonSlurper()
try {
// Cast a closure with an inputStream parameter to InputStreamCallback
session.read(flowFile, {inputStream ->
inputStr=IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback)
def map = jsonSlurper.parseText(inputStr)
ObjectMapper mapper = new ObjectMapper()
for (entry in map) {
log.info( "JSONKey: ${entry.key} = JSONValue: ${entry.value}")
flowFile = session.putAttribute(flowFile, "${entry.key}", "${mapper.writeValueAsString(entry.value)}")
}
def outputStr=""
flowFile = session.write(flowFile, {outputStream ->
outputStream.write(outputStr.toString().getBytes('utf-8'))
} as OutputStreamCallback)
session.transfer(flowFile, REL_SUCCESS)
session.commit()
} catch(e) {
flowFile = session.putAttribute(flowFile, 'errorMsg', "${e.toString()}")
flowFile = session.write(flowFile, {outputStream ->
outputStream.write(e.toString().getBytes('utf-8'))
} as OutputStreamCallback)
session.transfer(flowFile, REL_FAILURE)
session.commit()
} ============ Additional Infos ============ You may go and visit below links: 1. For JSONPath -> https://github.com/json-path/JsonPath 2. For Apache NiFi scripts -> https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-1/ta-p/248922 3. For jackson-databind -> https://github.com/FasterXML/jackson-databind Hope this helps. Just feel free to contact me if you have any questions. Thanks, Oliver Gong
... View more
01-14-2022
02:40 PM
1 Kudo
Not sure if this issue persist. Attached is one way, using only one-phase shift operation: 1. date and env are flow file attributes which should be predefined when debugging under the JOLTTransformJSON Advanced Mode. 2. When dealing with JOLT Spec, we can predefine some constant which starts with "#" for the unexist value scope "#<constant>", in that way we can then handle the rest match-and-shift work . [ {
"operation" : "shift",
"spec" : {
"logTypeCode" : {
"FIN" : {
"#END_OF_TESTING" : {
"$":"body.&3",
"#END_OF_TESTING":"customValue.logTypeCode"
}
}
},
"*" : "body.&",
"#${date}" : "header.date",
"#${env}" : {
"$" : "customValue.ENV"
}
}
} ]
==============
OR
==============
[ {
"operation" : "shift",
"spec" : {
"logTypeCode" : {
"FIN" : {
"#END_OF_TESTING" : {
"$":"body.logTypeCode",
"#END_OF_TESTING":"customValue.logTypeCode"
}
}
},
"*" : "body.&",
"#${date}" : "header.date",
"#${env}" : {
"$" : "customValue.ENV"
}
}
} ] Hope this helps. Thanks
... View more
01-14-2022
02:46 AM
When using ExecuteSQL processor to do CRUD actions on database tables, we should prepare the SQL statement by ourselves. There could be plenties of ways to do such thing. One way as you mentioned is to use the raw json data to insert/update a DB table, you might try to use PutDatabaseRecord processor, in such way, the json's structure must be flat as the avro-schema you fetched from the corresponding DB table.
... View more
01-14-2022
02:31 AM
As per my experience, the issue you faced could be the content-type(multipart/form-data;boundary...) is not officially supported by content-viewer page's engine. It does support Content-Type (such as application/json, application/xml, text/plain etc.) in original/formatted mode. Other types can be viewed via hex mode. Directly supported Type under original mode: 1. application/json 2. application/xml 3. text/plain Note: you may also append the charset encoding to the specified content-type. e.g. application/json;charset=utf-8 WORKAROUND 1. You can try to put an UpdateAttribute Processor (Configure it with an attribute "mime.type" along with the supported Content-Type listed above, specifying this attribute would not change the actual content, it just tells the content-viewer to depend on such way to parse and display the flowfile's content. ) 2. View the flowfile's content via Hex mode. Hope this helps, and feel free to contact me if you have any questions. Thanks
... View more