Member since
06-14-2023
96
Posts
34
Kudos Received
9
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1732 | 11-04-2024 06:51 PM | |
| 6065 | 12-29-2023 09:36 AM | |
| 9411 | 12-28-2023 01:01 PM | |
| 1978 | 12-27-2023 12:14 PM | |
| 1192 | 12-08-2023 12:47 PM |
12-28-2023
05:23 AM
Looking at the thread, Ans is No, that's not the way NiFi Processor is meant to handle relationships, The Client Processor has to initialize /start properly with the given configuration then only it can try to write and then based on the write results flow files can be routed to failure or success, Here Processor can't even start properly.
... View more
12-12-2023
11:06 PM
disconnect and reconnect again this functionality i want to achieve with Publish kafka processor . ConsumerKafka will consumer continuously .
... View more
12-12-2023
06:57 AM
Can you post screenshot of the UpdateRecord processor configuration? Also you have to be careful with the provided input because there is an extra comma after last Garry value which makes the json invalid.
... View more
12-11-2023
04:07 PM
@edtech Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. Thanks.
... View more
12-08-2023
01:52 PM
I don't see EvaluateJsonPath give options so it might be something you'd have to handle on your own...personally I'd do it via a Groovy scripted processor for greater control and performance.
... View more
12-08-2023
01:45 PM
If the input will always be like your example, I would use Groovy to make the transformation. The following Groovy based InvokeScriptedProcessor should create the output you posted. import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("100")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void initialize(ProcessorInitializationContext context) {
log = context.logger
}
Set<Relationship> getRelationships() {
Set<Relationship> relationships = new HashSet<>()
relationships.add(REL_FAILURE)
relationships.add(REL_SUCCESS)
return relationships
}
Collection<ValidationResult> validate(ValidationContext context) {
}
PropertyDescriptor getPropertyDescriptor(String name) {
}
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
}
List<PropertyDescriptor> getPropertyDescriptors() {
List<PropertyDescriptor> descriptors = new ArrayList<>()
descriptors.add(BATCH_SIZE)
return Collections.unmodifiableList(descriptors)
}
String getIdentifier() {
}
void onScheduled(ProcessContext context) throws ProcessException {
}
void onUnscheduled(ProcessContext context) throws ProcessException {
}
void onStopped(ProcessContext context) throws ProcessException {
}
void setLogger(ComponentLog logger) {
}
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { FlowFile flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
flowFile = session.write(flowFile, { inputStream, outputStream ->
List<Map> data = jsonSlurper.parse(inputStream)
data = data.collect { Map resouce ->
Map tags = jsonSlurper.parseText("{\"${resouce.Tags}\"}")
[
"Name": tags.Name,
"Owner": tags.Owner,
"ResourceId": resouce.ResourceId,
"Resourcename": resouce.ResourceId.split("/").last(),
"Tags": resouce.Tags
]
}
outputStream.write(JsonOutput.toJson(data).getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.putAllAttributes(flowFile, customAttributes)
session.transfer(flowFile, REL_SUCCESS)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
12-08-2023
01:07 PM
What processor are you using to send the data?
... View more
12-08-2023
12:47 PM
A similar question was recently asked. Kafka connections are meant to be persistent. If you want to handle what you're asking, you'll have to custom build a solution that monitors the queues and stop/starts the processors. All of these can be achieved via NiFi REST API.
... View more
12-07-2023
04:05 PM
Have you tried playing with these settings? Or perhaps an instance with faster disk since that might be the bottleneck?
... View more
12-05-2023
04:41 AM
@SAMSAL , Thank you. this works.
... View more