Member since
06-14-2023
95
Posts
33
Kudos Received
8
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3843 | 12-29-2023 09:36 AM | |
5644 | 12-28-2023 01:01 PM | |
1110 | 12-27-2023 12:14 PM | |
558 | 12-08-2023 12:47 PM | |
1748 | 11-21-2023 10:56 PM |
04-09-2024
11:58 AM
I'm not sure if this can be done with out-of-the-box processors but I would do it with a Groovy based InvokeScriptedProcessor with code like this import groovy.json.JsonOutput
import groovy.json.JsonSlurper
class GroovyProcessor implements Processor {
PropertyDescriptor CHUNK_SIZE = new PropertyDescriptor.Builder()
.name("CHUNK_SIZE")
.displayName("Chunk Size")
.description("The chunk size to break up the incoming list of values.")
.required(true)
.defaultValue("5")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void initialize(ProcessorInitializationContext context) {
log = context.logger
}
Set<Relationship> getRelationships() {
Set<Relationship> relationships = new HashSet<>()
relationships.add(REL_FAILURE)
relationships.add(REL_SUCCESS)
return relationships
}
Collection<ValidationResult> validate(ValidationContext context) {
}
PropertyDescriptor getPropertyDescriptor(String name) {
}
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
}
List<PropertyDescriptor> getPropertyDescriptors() {
List<PropertyDescriptor> descriptors = new ArrayList<>()
descriptors.add(CHUNK_SIZE)
return Collections.unmodifiableList(descriptors)
}
String getIdentifier() {
}
void onScheduled(ProcessContext context) throws ProcessException {
}
void onUnscheduled(ProcessContext context) throws ProcessException {
}
void onStopped(ProcessContext context) throws ProcessException {
}
void setLogger(ComponentLog logger) {
}
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(1)
if (!flowFiles) return
Integer chunkSize = context.getProperty(CHUNK_SIZE).asInteger()
flowFiles.each { FlowFile flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
Map data = null
session.read(flowFile, { inputStream -> data = jsonSlurper.parse(inputStream) } as InputStreamCallback)
List<List<String>> chunkedObjectIDs = data.objectIDs.collate(chunkSize)
chunkedObjectIDs.each { chunk ->
data = [
"objectIDs": chunk
]
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(jsonOutput.toJson(data).getBytes("UTF-8")) } as OutputStreamCallback)
session.putAllAttributes(newFlowFile, customAttributes)
session.transfer(newFlowFile, REL_SUCCESS)
}
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
03-05-2024
12:31 PM
Have you considered using EvaluateJSONPath to extract the value as a FlowFile attribute and then use that for your key when publishing to Kafka?
... View more
02-23-2024
11:51 AM
1 Kudo
https://issues.apache.org/jira/browse/NIFI-12839
... View more
02-22-2024
08:48 PM
No error when I write the flow file but in the super onTrigger session.get() returns a null. So I have tried to transfer to the same queue using session.transfer(ff) but that gives an error saying Cannot transfer FlowFiles that are created in this Session back to self`. Finally I have decided to write a custom processor from scratch using the code from invokeHttp for my usecase. Thank you all for the inputs.
... View more
02-21-2024
01:45 PM
I had a need for multiple Lookups...custom Groovy processor with several LookUp services as a part of it...consolidated that, routed accordingly, and performed faster.
... View more
02-02-2024
10:39 AM
1 Kudo
yea I saw that post and finally got it to work by making sure I ran this command on Ubuntu to install venv: sudo apt install python3.11-venv After I ran that command, everything started up and stayed up normally for NIFI 2.0.0 M2.
... View more
01-26-2024
02:09 AM
2 Kudos
Hi @SandyClouds , I ran into this issue before and after some research I found that when you do the ConvertJsonToSQL nifi assigns timestamp data type (value = 93 in the sql.args.[n].type attribute ). When the PutSQL runs the generated sql statement it will parse the value according to the assigned type and format it accordingly. However for timestamp it expects it to be in the format of "yyyy-MM-dd HH:mm:ss.SSS" so if you are missing the milliseconds in the original datetime value it will fail with the specified error message. To resolve the issue make sure to assign 000 milliseconds to your datetime value before running the PUTSQL processor. You can do that in the source Json itself before the conversion to SQL or after conversion to SQL using UpdateAttribute, by using the later option you have to know which sql.args.[n].value will have the datetime and do expression language to reformat. If that helps please accept solution. Thanks
... View more
01-22-2024
12:30 PM
Do you have a sample? I'm not sure NiFi can do this natively, but I have recently done some PDF parsing inside NiFi with a custom Groovy processor.
... View more
01-13-2024
05:51 AM
Oh, I successfully managed to integrate and run NiFi 2.0 with Python on Windows using the method you suggested. Thank you so much!
... View more
01-10-2024
12:57 PM
@pratschavan FetchFile is typically used in conjunction with ListFile so that it only fetches the content for the FlowFile it is passed. ListFile would only list the file once. If you are using only the FetchFile processor, I am guessing you configured the "File to Fetch" property with the absolute path to you file. Using this processor in this way means that it will fetch that same file every time it is scheduled to execute via the processor's "Scheduling" tab configuration. Can you share screenshots of how you have these two processors configured? If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more