Member since
06-14-2023
95
Posts
33
Kudos Received
8
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3843 | 12-29-2023 09:36 AM | |
5643 | 12-28-2023 01:01 PM | |
1110 | 12-27-2023 12:14 PM | |
558 | 12-08-2023 12:47 PM | |
1748 | 11-21-2023 10:56 PM |
12-27-2023
02:01 PM
1 Kudo
PublishKafka must have an active connection to Kafka before it even attempts to send a FlowFile which mean it never even gets into the block of code that sends it and routes it to "success" or "failure" accordingly. Making sure your Kafka cluster is up and running should be the focus if this is what you're experiencing. My guess if this "error" topic you have is on the same Kafka cluster then, even if PublishKafka was able to route a FlowFile to "failure" when it's unable to connect to Kafka, it wouldn't work anyways.
... View more
12-27-2023
12:14 PM
1 Kudo
How the FlowFile is distributed from your ListenUDP processor to the next in the flow is defined in the connection between them. Leveraging something like HAProxy, Nginx, or any other form of load balancer in front of your NiFi cluster would be a way to ensure you data is forwarded to any of the nodes that are still accessible as long as the cluster is up.
... View more
12-11-2023
09:35 AM
How about SplitJson $[*] followed by EvaluateJson $.SEARCH_RESULT
... View more
12-11-2023
09:01 AM
I think you could probably use EvaluateJsonPath to parse the JSON value for "SEARCH_RESULT" but I like scripted processors so I would use a Groovy based InvokeScriptedProcessor with this code import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("100")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void initialize(ProcessorInitializationContext context) {
log = context.logger
}
Set<Relationship> getRelationships() {
Set<Relationship> relationships = new HashSet<>()
relationships.add(REL_FAILURE)
relationships.add(REL_SUCCESS)
return relationships
}
Collection<ValidationResult> validate(ValidationContext context) {
}
PropertyDescriptor getPropertyDescriptor(String name) {
}
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
}
List<PropertyDescriptor> getPropertyDescriptors() {
List<PropertyDescriptor> descriptors = new ArrayList<>()
descriptors.add(BATCH_SIZE)
return Collections.unmodifiableList(descriptors)
}
String getIdentifier() {
}
void onScheduled(ProcessContext context) throws ProcessException {
}
void onUnscheduled(ProcessContext context) throws ProcessException {
}
void onStopped(ProcessContext context) throws ProcessException {
}
void setLogger(ComponentLog logger) {
}
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { FlowFile flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
flowFile = session.write(flowFile, { inputStream, outputStream ->
List<Map> searchResults = jsonSlurper.parse(inputStream)
searchResults = searchResults.collect { jsonSlurper.parseText(it.SEARCH_RESULT) }
outputStream.write(JsonOutput.toJson(searchResults).getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.putAllAttributes(flowFile, customAttributes)
session.transfer(flowFile, REL_SUCCESS)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor() It looks like a lot but most of it is just boilerplate with the actual work being done here: ...and the output
... View more
12-08-2023
01:52 PM
I don't see EvaluateJsonPath give options so it might be something you'd have to handle on your own...personally I'd do it via a Groovy scripted processor for greater control and performance.
... View more
12-08-2023
01:45 PM
If the input will always be like your example, I would use Groovy to make the transformation. The following Groovy based InvokeScriptedProcessor should create the output you posted. import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("100")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void initialize(ProcessorInitializationContext context) {
log = context.logger
}
Set<Relationship> getRelationships() {
Set<Relationship> relationships = new HashSet<>()
relationships.add(REL_FAILURE)
relationships.add(REL_SUCCESS)
return relationships
}
Collection<ValidationResult> validate(ValidationContext context) {
}
PropertyDescriptor getPropertyDescriptor(String name) {
}
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
}
List<PropertyDescriptor> getPropertyDescriptors() {
List<PropertyDescriptor> descriptors = new ArrayList<>()
descriptors.add(BATCH_SIZE)
return Collections.unmodifiableList(descriptors)
}
String getIdentifier() {
}
void onScheduled(ProcessContext context) throws ProcessException {
}
void onUnscheduled(ProcessContext context) throws ProcessException {
}
void onStopped(ProcessContext context) throws ProcessException {
}
void setLogger(ComponentLog logger) {
}
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { FlowFile flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
flowFile = session.write(flowFile, { inputStream, outputStream ->
List<Map> data = jsonSlurper.parse(inputStream)
data = data.collect { Map resouce ->
Map tags = jsonSlurper.parseText("{\"${resouce.Tags}\"}")
[
"Name": tags.Name,
"Owner": tags.Owner,
"ResourceId": resouce.ResourceId,
"Resourcename": resouce.ResourceId.split("/").last(),
"Tags": resouce.Tags
]
}
outputStream.write(JsonOutput.toJson(data).getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.putAllAttributes(flowFile, customAttributes)
session.transfer(flowFile, REL_SUCCESS)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
12-08-2023
01:07 PM
What processor are you using to send the data?
... View more
12-08-2023
12:47 PM
A similar question was recently asked. Kafka connections are meant to be persistent. If you want to handle what you're asking, you'll have to custom build a solution that monitors the queues and stop/starts the processors. All of these can be achieved via NiFi REST API.
... View more
12-07-2023
04:05 PM
Have you tried playing with these settings? Or perhaps an instance with faster disk since that might be the bottleneck?
... View more