Member since
06-14-2023
90
Posts
27
Kudos Received
8
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2773 | 12-29-2023 09:36 AM | |
3617 | 12-28-2023 01:01 PM | |
904 | 12-27-2023 12:14 PM | |
416 | 12-08-2023 12:47 PM | |
1357 | 11-21-2023 10:56 PM |
07-22-2024
03:14 PM
Looks like custom might not needed since file and path allows for RegEx which means you could define multiple:
... View more
07-22-2024
03:10 PM
If I get time, I might see how the current SFTP processor works. From the sounds of it you'd want a processor that can do several file patterns/paths per SFTP host to avoid having hundreds of flows.
... View more
07-22-2024
12:44 PM
You could implement this using Groovy script which is forward and backward compatible. I've taken code written in NiFi 1.X and works just fine in 2.X
... View more
04-09-2024
02:02 PM
1 Kudo
Looks like NiFi is might be using GitHub - redis/jedis: Redis Java client under the covers so if it can run the commands you asked about, you might be able to build a custom Groovy processor to run them.
... View more
04-09-2024
01:48 PM
1 Kudo
What headers do you set inside of Postman?
... View more
04-09-2024
12:44 PM
1 Kudo
Have you tried not escaping you " with \" in your command line arguments?
... View more
04-09-2024
11:58 AM
I'm not sure if this can be done with out-of-the-box processors but I would do it with a Groovy based InvokeScriptedProcessor with code like this import groovy.json.JsonOutput
import groovy.json.JsonSlurper
class GroovyProcessor implements Processor {
PropertyDescriptor CHUNK_SIZE = new PropertyDescriptor.Builder()
.name("CHUNK_SIZE")
.displayName("Chunk Size")
.description("The chunk size to break up the incoming list of values.")
.required(true)
.defaultValue("5")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void initialize(ProcessorInitializationContext context) {
log = context.logger
}
Set<Relationship> getRelationships() {
Set<Relationship> relationships = new HashSet<>()
relationships.add(REL_FAILURE)
relationships.add(REL_SUCCESS)
return relationships
}
Collection<ValidationResult> validate(ValidationContext context) {
}
PropertyDescriptor getPropertyDescriptor(String name) {
}
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
}
List<PropertyDescriptor> getPropertyDescriptors() {
List<PropertyDescriptor> descriptors = new ArrayList<>()
descriptors.add(CHUNK_SIZE)
return Collections.unmodifiableList(descriptors)
}
String getIdentifier() {
}
void onScheduled(ProcessContext context) throws ProcessException {
}
void onUnscheduled(ProcessContext context) throws ProcessException {
}
void onStopped(ProcessContext context) throws ProcessException {
}
void setLogger(ComponentLog logger) {
}
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(1)
if (!flowFiles) return
Integer chunkSize = context.getProperty(CHUNK_SIZE).asInteger()
flowFiles.each { FlowFile flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
Map data = null
session.read(flowFile, { inputStream -> data = jsonSlurper.parse(inputStream) } as InputStreamCallback)
List<List<String>> chunkedObjectIDs = data.objectIDs.collate(chunkSize)
chunkedObjectIDs.each { chunk ->
data = [
"objectIDs": chunk
]
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(jsonOutput.toJson(data).getBytes("UTF-8")) } as OutputStreamCallback)
session.putAllAttributes(newFlowFile, customAttributes)
session.transfer(newFlowFile, REL_SUCCESS)
}
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
03-05-2024
07:13 PM
1 Kudo
@SAMSAL are you still on M1 or M2...I'm on M1 and took what you shared and just made those minor tweaks. I'll test it out more tomorrow...maybe I'll restart NiFi just to clear memory and any funny stuff that might there.
... View more
03-05-2024
03:29 PM
1 Kudo
Building on @SAMSAL 's discovery I found you have access to the JVM upon your init def __init__(self, jvm, **kwargs):
super().__init__()
self.jvm = jvm Which then lets you access the jvm gateway and Java data classes like this: jvm_gateway = self.jvm.gateway
# Create a Java Map/Dict
map = self.jvm.java.util.HashMap()
map.put("name", record['name'])
# Or convert the Python dict to a Java Map
data = {"name": record["name"]}
data = MapConverter().convert(data, jvm_gateway._gateway_client)
... View more
03-05-2024
12:31 PM
Have you considered using EvaluateJSONPath to extract the value as a FlowFile attribute and then use that for your key when publishing to Kafka?
... View more