Member since
06-14-2023
95
Posts
33
Kudos Received
8
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3841 | 12-29-2023 09:36 AM | |
5620 | 12-28-2023 01:01 PM | |
1103 | 12-27-2023 12:14 PM | |
558 | 12-08-2023 12:47 PM | |
1745 | 11-21-2023 10:56 PM |
04-09-2024
12:44 PM
1 Kudo
Have you tried not escaping you " with \" in your command line arguments?
... View more
04-09-2024
11:58 AM
I'm not sure if this can be done with out-of-the-box processors but I would do it with a Groovy based InvokeScriptedProcessor with code like this import groovy.json.JsonOutput
import groovy.json.JsonSlurper
class GroovyProcessor implements Processor {
PropertyDescriptor CHUNK_SIZE = new PropertyDescriptor.Builder()
.name("CHUNK_SIZE")
.displayName("Chunk Size")
.description("The chunk size to break up the incoming list of values.")
.required(true)
.defaultValue("5")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void initialize(ProcessorInitializationContext context) {
log = context.logger
}
Set<Relationship> getRelationships() {
Set<Relationship> relationships = new HashSet<>()
relationships.add(REL_FAILURE)
relationships.add(REL_SUCCESS)
return relationships
}
Collection<ValidationResult> validate(ValidationContext context) {
}
PropertyDescriptor getPropertyDescriptor(String name) {
}
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
}
List<PropertyDescriptor> getPropertyDescriptors() {
List<PropertyDescriptor> descriptors = new ArrayList<>()
descriptors.add(CHUNK_SIZE)
return Collections.unmodifiableList(descriptors)
}
String getIdentifier() {
}
void onScheduled(ProcessContext context) throws ProcessException {
}
void onUnscheduled(ProcessContext context) throws ProcessException {
}
void onStopped(ProcessContext context) throws ProcessException {
}
void setLogger(ComponentLog logger) {
}
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(1)
if (!flowFiles) return
Integer chunkSize = context.getProperty(CHUNK_SIZE).asInteger()
flowFiles.each { FlowFile flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
Map data = null
session.read(flowFile, { inputStream -> data = jsonSlurper.parse(inputStream) } as InputStreamCallback)
List<List<String>> chunkedObjectIDs = data.objectIDs.collate(chunkSize)
chunkedObjectIDs.each { chunk ->
data = [
"objectIDs": chunk
]
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(jsonOutput.toJson(data).getBytes("UTF-8")) } as OutputStreamCallback)
session.putAllAttributes(newFlowFile, customAttributes)
session.transfer(newFlowFile, REL_SUCCESS)
}
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
03-05-2024
07:13 PM
1 Kudo
@SAMSAL are you still on M1 or M2...I'm on M1 and took what you shared and just made those minor tweaks. I'll test it out more tomorrow...maybe I'll restart NiFi just to clear memory and any funny stuff that might there.
... View more
03-05-2024
03:29 PM
1 Kudo
Building on @SAMSAL 's discovery I found you have access to the JVM upon your init def __init__(self, jvm, **kwargs):
super().__init__()
self.jvm = jvm Which then lets you access the jvm gateway and Java data classes like this: jvm_gateway = self.jvm.gateway
# Create a Java Map/Dict
map = self.jvm.java.util.HashMap()
map.put("name", record['name'])
# Or convert the Python dict to a Java Map
data = {"name": record["name"]}
data = MapConverter().convert(data, jvm_gateway._gateway_client)
... View more
03-05-2024
12:31 PM
Have you considered using EvaluateJSONPath to extract the value as a FlowFile attribute and then use that for your key when publishing to Kafka?
... View more
02-22-2024
12:35 PM
1 Kudo
Others can chime in, but I personally consider them safe for production environments. I written some processors that work from NiFi version 1.8 all the way to 2.0.0 without issues or need to rewrite them.
... View more
02-21-2024
01:45 PM
I had a need for multiple Lookups...custom Groovy processor with several LookUp services as a part of it...consolidated that, routed accordingly, and performed faster.
... View more
02-21-2024
01:37 PM
2 Kudos
"Also, this processor would be the first in the flow so I assume there will be no flow file" when I have a need similar to this, I'll still use a GenerateFlowFile processor to help schedule frequency and then a custom scripted processor that will add/replace data and/or add attributes to the FlowFile for everything down stream.
... View more
02-21-2024
01:21 PM
Groovy code can be used with ExecuteGroovyScript or InvokeScriptedProcessor...the latter offers the same structure and functionality as a native NAR file. I've built a production processor that's used by several customers that can easily handle hundreds of millions of events on a single NiFi instance. NiFi 2.X remove all scripting languages from InvokeScriptedProcessor except for Groovy. Last time I built a NAR I followed these instructions until I discovered InvokeScriptedProcessor Groovy code gave me the same access and functionality and haven't looked back since. Creating Custom Processors and Controllers in Apache NiFi | by Hashmap, an NTT DATA Company | Hashmap, an NTT DATA Company | Medium
... View more
02-20-2024
02:58 PM
Ultimately, what is it you'll want your custom processor to do? I can tell you a Groovy based custom processor can perform amazingly well and leverage Java libraries just like a native .nar file with the advantage that you can make your changes and test without having to build every time you make a change.
... View more