Member since
07-29-2020
574
Posts
320
Kudos Received
175
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
245 | 12-20-2024 05:49 AM | |
281 | 12-19-2024 08:33 PM | |
291 | 12-19-2024 06:48 AM | |
242 | 12-17-2024 12:56 PM | |
233 | 12-16-2024 04:38 AM |
08-11-2024
02:57 AM
2 Kudos
Thank you @SAMSAL .. appreciated the way of explanation.. generally i'l prefer to add/update the max.initialvalue.column property of QDT to restart fetching missed data as a one time modification but I would love to see NIFI using fault tolerant mechanism in future so that the developers don't have to find the workaround or have to recover the lost data manually by modifying the QDT
... View more
08-09-2024
12:33 PM
I did as suggested by colleague @SAMSAL. I saved column by column, until I discovered that I had corrupted data. I had a data where the year was like "1111". As I had little experience, it was difficult to spot possible errors. Thank you everyone, especially my colleague @dianaDianaTorres
... View more
08-07-2024
04:08 AM
1 Kudo
Hi @Fredi , Its hard to say what is happening without looking at the data where optionalDict is not empty. You only provided data when its empty. Keep in mind that this is not true Python its actually flavor of it called Jython so its not apple to apple when comparing to python. If I can suggest two alternatives: 1 - Since Jython script is going to be deprecated starting from version 2.0 , then I would recommend using groovy instead . Actually parsing json in groovy is much simpler than Jython. Im not sure what version you are using but there is a dedicated processor for executing groovy script called ExecuteGroovyScript that is probably faster than running traditional ExecuteScritp which you can still use it. The script looks like this based on your input : import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import groovy.json.JsonSlurper
import groovy.json.JsonOutput
flowFile = session.get()
if(!flowFile) return
def text = ''
// Cast a closure with an inputStream parameter to InputStreamCallback
session.read(flowFile, {inputStream ->
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback)
def jsonSlurper = new JsonSlurper()
def jsonData = jsonSlurper.parseText(text)
if(jsonData.directories[0])
{
session.remove(flowFile)
jsonData.directories.each { d ->
newflowfile = session.create()
newflowfile = session.write(newflowfile, {inputStream, outputStream ->
outputStream.write(JsonOutput.toJson(d).getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
newflowfile = session.putAttribute(newflowfile, "setId", jsonData.setId.toString())
newflowfile = session.putAttribute(newflowfile, "setName", jsonData.setName)
newflowfile = session.putAttribute(newflowfile, "absolute.path", d.path)
if(jsonData.optionalDict)
{
newflowfile = session.putAttribute(newflowfile, "value1", jsonData.optionalDict.set_entity_relation.intValue.toString())
newflowfile = session.putAttribute(newflowfile, "value2", jsonData.optionalDict.set_entity_relation.stringValue)
}
session.transfer(newflowfile, REL_SUCCESS)
}
}
else session.transfer(flowfile, REL_FAILURE) I have tried the script for both scenarios and it worked as expected. 2- The other alternative is to use other processors (nifi way) to achieve what you want without executing script (not the nifi way) . The execute processor should be left as last option incase the out of the box processors dont suffice or you looking to improve performance in case the flow gets very complicated and inefficient. For this I would use the following processors: 1- JsonEvaluatePath to extract common attribues: setId, SetName, optionalDict.value 1 & 2..etc. 2-Do JsonSplit or QueryRecords on the directories object: this will produce different flowfiles and each flowfile will have the common attribute. 3- JsonEvaluatePath to extract each directory attributes even though its already part of the flowfile content. Hopefully that helps. If it does please accept the solution. Thanks
... View more
08-05-2024
10:53 AM
@SAMSAL , Thanks for the reply but our current nifi version is 1.23.2 , where it doesn't have that completion strategy. So thinking of implementing executeScript or executeStreamcommand in order to delete the files from SMB after fetchSMB. Please let me know if you have any suggestions for this script , nifi hosted on kube clusters and i've credentials for SMB as well.
... View more
08-01-2024
02:59 AM
1 Kudo
how to download this template? thanks.
... View more
08-01-2024
02:44 AM
1 Kudo
Thanks alot @MattWho for your expertise!
... View more
07-31-2024
07:41 AM
Thanks a lot Matt for the answer.
... View more
07-29-2024
07:01 PM
@cadrian90 Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. Thanks.
... View more
07-27-2024
05:32 AM
1 Kudo
can you give me your email?
... View more
07-27-2024
05:30 AM
1 Kudo
i used this code for testing from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
class WriteHelloWorld(FlowFileTransform):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
class ProcessorDetails:
version = '0.0.1-SNAPSHOT'
def __init__(self, **kwargs):
super().__init__(**kwargs)
def transform(self, context, flowfile):
return FlowFileTransformResult(relationship = "success", contents = "Hello World", attributes = {"greeting", "hello"}) python 3.11 , java 21 and Apache nifi 2.0.0 M4
... View more