Member since
06-14-2023
87
Posts
27
Kudos Received
8
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1408 | 12-29-2023 09:36 AM | |
1281 | 12-28-2023 01:01 PM | |
474 | 12-27-2023 12:14 PM | |
246 | 12-08-2023 12:47 PM | |
828 | 11-21-2023 10:56 PM |
04-09-2024
02:02 PM
1 Kudo
Looks like NiFi is might be using GitHub - redis/jedis: Redis Java client under the covers so if it can run the commands you asked about, you might be able to build a custom Groovy processor to run them.
... View more
04-09-2024
01:48 PM
1 Kudo
What headers do you set inside of Postman?
... View more
04-09-2024
12:44 PM
1 Kudo
Have you tried not escaping you " with \" in your command line arguments?
... View more
04-09-2024
11:58 AM
I'm not sure if this can be done with out-of-the-box processors but I would do it with a Groovy based InvokeScriptedProcessor with code like this import groovy.json.JsonOutput
import groovy.json.JsonSlurper
class GroovyProcessor implements Processor {
PropertyDescriptor CHUNK_SIZE = new PropertyDescriptor.Builder()
.name("CHUNK_SIZE")
.displayName("Chunk Size")
.description("The chunk size to break up the incoming list of values.")
.required(true)
.defaultValue("5")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void initialize(ProcessorInitializationContext context) {
log = context.logger
}
Set<Relationship> getRelationships() {
Set<Relationship> relationships = new HashSet<>()
relationships.add(REL_FAILURE)
relationships.add(REL_SUCCESS)
return relationships
}
Collection<ValidationResult> validate(ValidationContext context) {
}
PropertyDescriptor getPropertyDescriptor(String name) {
}
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
}
List<PropertyDescriptor> getPropertyDescriptors() {
List<PropertyDescriptor> descriptors = new ArrayList<>()
descriptors.add(CHUNK_SIZE)
return Collections.unmodifiableList(descriptors)
}
String getIdentifier() {
}
void onScheduled(ProcessContext context) throws ProcessException {
}
void onUnscheduled(ProcessContext context) throws ProcessException {
}
void onStopped(ProcessContext context) throws ProcessException {
}
void setLogger(ComponentLog logger) {
}
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(1)
if (!flowFiles) return
Integer chunkSize = context.getProperty(CHUNK_SIZE).asInteger()
flowFiles.each { FlowFile flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
Map data = null
session.read(flowFile, { inputStream -> data = jsonSlurper.parse(inputStream) } as InputStreamCallback)
List<List<String>> chunkedObjectIDs = data.objectIDs.collate(chunkSize)
chunkedObjectIDs.each { chunk ->
data = [
"objectIDs": chunk
]
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(jsonOutput.toJson(data).getBytes("UTF-8")) } as OutputStreamCallback)
session.putAllAttributes(newFlowFile, customAttributes)
session.transfer(newFlowFile, REL_SUCCESS)
}
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
03-05-2024
07:13 PM
1 Kudo
@SAMSAL are you still on M1 or M2...I'm on M1 and took what you shared and just made those minor tweaks. I'll test it out more tomorrow...maybe I'll restart NiFi just to clear memory and any funny stuff that might there.
... View more
03-05-2024
12:31 PM
Have you considered using EvaluateJSONPath to extract the value as a FlowFile attribute and then use that for your key when publishing to Kafka?
... View more
02-23-2024
11:51 AM
1 Kudo
https://issues.apache.org/jira/browse/NIFI-12839
... View more
02-22-2024
08:48 PM
No error when I write the flow file but in the super onTrigger session.get() returns a null. So I have tried to transfer to the same queue using session.transfer(ff) but that gives an error saying Cannot transfer FlowFiles that are created in this Session back to self`. Finally I have decided to write a custom processor from scratch using the code from invokeHttp for my usecase. Thank you all for the inputs.
... View more
02-21-2024
01:45 PM
I had a need for multiple Lookups...custom Groovy processor with several LookUp services as a part of it...consolidated that, routed accordingly, and performed faster.
... View more
02-02-2024
10:39 AM
1 Kudo
yea I saw that post and finally got it to work by making sure I ran this command on Ubuntu to install venv: sudo apt install python3.11-venv After I ran that command, everything started up and stayed up normally for NIFI 2.0.0 M2.
... View more