Member since
06-14-2023
90
Posts
27
Kudos Received
8
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3151 | 12-29-2023 09:36 AM | |
4240 | 12-28-2023 01:01 PM | |
934 | 12-27-2023 12:14 PM | |
431 | 12-08-2023 12:47 PM | |
1386 | 11-21-2023 10:56 PM |
12-08-2023
01:45 PM
If the input will always be like your example, I would use Groovy to make the transformation. The following Groovy based InvokeScriptedProcessor should create the output you posted. import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("100")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void initialize(ProcessorInitializationContext context) {
log = context.logger
}
Set<Relationship> getRelationships() {
Set<Relationship> relationships = new HashSet<>()
relationships.add(REL_FAILURE)
relationships.add(REL_SUCCESS)
return relationships
}
Collection<ValidationResult> validate(ValidationContext context) {
}
PropertyDescriptor getPropertyDescriptor(String name) {
}
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
}
List<PropertyDescriptor> getPropertyDescriptors() {
List<PropertyDescriptor> descriptors = new ArrayList<>()
descriptors.add(BATCH_SIZE)
return Collections.unmodifiableList(descriptors)
}
String getIdentifier() {
}
void onScheduled(ProcessContext context) throws ProcessException {
}
void onUnscheduled(ProcessContext context) throws ProcessException {
}
void onStopped(ProcessContext context) throws ProcessException {
}
void setLogger(ComponentLog logger) {
}
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { FlowFile flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
flowFile = session.write(flowFile, { inputStream, outputStream ->
List<Map> data = jsonSlurper.parse(inputStream)
data = data.collect { Map resouce ->
Map tags = jsonSlurper.parseText("{\"${resouce.Tags}\"}")
[
"Name": tags.Name,
"Owner": tags.Owner,
"ResourceId": resouce.ResourceId,
"Resourcename": resouce.ResourceId.split("/").last(),
"Tags": resouce.Tags
]
}
outputStream.write(JsonOutput.toJson(data).getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.putAllAttributes(flowFile, customAttributes)
session.transfer(flowFile, REL_SUCCESS)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
12-08-2023
01:07 PM
What processor are you using to send the data?
... View more
12-08-2023
12:47 PM
A similar question was recently asked. Kafka connections are meant to be persistent. If you want to handle what you're asking, you'll have to custom build a solution that monitors the queues and stop/starts the processors. All of these can be achieved via NiFi REST API.
... View more
12-07-2023
04:05 PM
Have you tried playing with these settings? Or perhaps an instance with faster disk since that might be the bottleneck?
... View more
11-30-2023
03:19 PM
Yes https://en.m.wikipedia.org/wiki/Syslog
... View more
11-30-2023
07:54 AM
@MattWho you can find some examples of those Python processors here: nifi/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-test-extensions/src/main/resources/extensions at main · apache/nifi · GitHub
... View more
11-30-2023
07:07 AM
A gotcha, you can do what you already do in your JOLT and use & instead of using 1
... View more
11-28-2023
03:54 PM
You don't have to put it back into an array [] if you don't want.
... View more
11-28-2023
03:42 PM
@steven-matison actually commented on another post NiFi 2.0 and I was just starting to read up on it...glad you got a head start on this @SAMSAL Don't recall where but I did see Java 21 was the minimum requirement. Also, could be Jython is gone because of the "Native" support @steven-matison mentioned in that other post. Release Notes - Apache NiFi - Apache Software Foundation New Features of 2.0.0-M1 Initial version of native Python API for Processors Stateless Execution mode for Process Groups Flow Analysis Rules API Kubernetes-based Leader Election and State Management extensions Python-based Processors for interacting with ChatGPT and Vector Databases ListenOTLP Processor for collecting OpenTelemetry ListenSlack and ConsumeSlack Processors for handling messages from Slack EncryptContentAge and DecryptContentAge Processors supporting age-encryption.org specification Schema Registry Services for Amazon Glue and Apicurio Parameter Provider for 1Password Vault YamlTreeReader for YAML as Records PackageFlowFile Processor for writing file streams and attributes as FlowFile Version 3 Migrated from H2 Database Engine to JetBrains Xodus for storing Flow Configuration History Now I want to go and try the shiny new toy...
... View more