Member since
06-14-2023
90
Posts
27
Kudos Received
8
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3395 | 12-29-2023 09:36 AM | |
4635 | 12-28-2023 01:01 PM | |
978 | 12-27-2023 12:14 PM | |
465 | 12-08-2023 12:47 PM | |
1471 | 11-21-2023 10:56 PM |
07-23-2024
06:41 AM
I referred to the other article because of the custom listSFTP processor. This would make it possible to use only one PG with the workflow "move file from A to B", read the job configuration by any other processor, put the config values on a flowfile and pass it to this listSFTP processor.
... View more
07-22-2024
12:44 PM
You could implement this using Groovy script which is forward and backward compatible. I've taken code written in NiFi 1.X and works just fine in 2.X
... View more
05-28-2024
05:51 AM
Apparently in the github repo, there's a folder for processor example of base level, though it might not cover everything yet, might be a good place to solve basic issues. I had issues building a custom relationship as well, then I saw this example, which helped. Official Python Processor Examples
... View more
04-09-2024
02:02 PM
1 Kudo
Looks like NiFi is might be using GitHub - redis/jedis: Redis Java client under the covers so if it can run the commands you asked about, you might be able to build a custom Groovy processor to run them.
... View more
04-09-2024
01:48 PM
1 Kudo
What headers do you set inside of Postman?
... View more
04-09-2024
12:44 PM
1 Kudo
Have you tried not escaping you " with \" in your command line arguments?
... View more
04-09-2024
11:58 AM
I'm not sure if this can be done with out-of-the-box processors but I would do it with a Groovy based InvokeScriptedProcessor with code like this import groovy.json.JsonOutput
import groovy.json.JsonSlurper
class GroovyProcessor implements Processor {
PropertyDescriptor CHUNK_SIZE = new PropertyDescriptor.Builder()
.name("CHUNK_SIZE")
.displayName("Chunk Size")
.description("The chunk size to break up the incoming list of values.")
.required(true)
.defaultValue("5")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void initialize(ProcessorInitializationContext context) {
log = context.logger
}
Set<Relationship> getRelationships() {
Set<Relationship> relationships = new HashSet<>()
relationships.add(REL_FAILURE)
relationships.add(REL_SUCCESS)
return relationships
}
Collection<ValidationResult> validate(ValidationContext context) {
}
PropertyDescriptor getPropertyDescriptor(String name) {
}
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
}
List<PropertyDescriptor> getPropertyDescriptors() {
List<PropertyDescriptor> descriptors = new ArrayList<>()
descriptors.add(CHUNK_SIZE)
return Collections.unmodifiableList(descriptors)
}
String getIdentifier() {
}
void onScheduled(ProcessContext context) throws ProcessException {
}
void onUnscheduled(ProcessContext context) throws ProcessException {
}
void onStopped(ProcessContext context) throws ProcessException {
}
void setLogger(ComponentLog logger) {
}
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(1)
if (!flowFiles) return
Integer chunkSize = context.getProperty(CHUNK_SIZE).asInteger()
flowFiles.each { FlowFile flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
Map data = null
session.read(flowFile, { inputStream -> data = jsonSlurper.parse(inputStream) } as InputStreamCallback)
List<List<String>> chunkedObjectIDs = data.objectIDs.collate(chunkSize)
chunkedObjectIDs.each { chunk ->
data = [
"objectIDs": chunk
]
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(jsonOutput.toJson(data).getBytes("UTF-8")) } as OutputStreamCallback)
session.putAllAttributes(newFlowFile, customAttributes)
session.transfer(newFlowFile, REL_SUCCESS)
}
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
03-05-2024
12:31 PM
Have you considered using EvaluateJSONPath to extract the value as a FlowFile attribute and then use that for your key when publishing to Kafka?
... View more
02-23-2024
11:51 AM
1 Kudo
https://issues.apache.org/jira/browse/NIFI-12839
... View more
02-22-2024
08:48 PM
No error when I write the flow file but in the super onTrigger session.get() returns a null. So I have tried to transfer to the same queue using session.transfer(ff) but that gives an error saying Cannot transfer FlowFiles that are created in this Session back to self`. Finally I have decided to write a custom processor from scratch using the code from invokeHttp for my usecase. Thank you all for the inputs.
... View more