Member since
11-17-2021
1158
Posts
260
Kudos Received
30
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 273 | 04-23-2026 02:02 PM | |
| 753 | 03-17-2026 05:26 PM | |
| 5728 | 11-05-2025 10:13 AM | |
| 986 | 10-16-2025 02:45 PM | |
| 1695 | 10-06-2025 01:01 PM |
06-14-2023
12:33 PM
I would do this in a single step with a InvokeScriptedProcessor and the following Groovy code import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
List data = null
session.read(flowFile, {
inputStream -> data = jsonSlurper.parseText(IOUtils.toString(inputStream, StandardCharsets.UTF_8))
} as InputStreamCallback)
data.each { entry ->
entry.VisitList.each { visit ->
Map newData = [:]
newData.put("employer", entry.employer)
newData.put("loc_id", entry.loc_id)
newData.put("topId", entry.topId)
newData.put("VisitList", [visit])
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(jsonOutput.toJson([newData]).getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback)
newFlowFile = session.putAllAttributes(newFlowFile, customAttributes)
session.transfer(newFlowFile, REL_SUCCESS)
}
}
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
06-14-2023
07:10 AM
@Fredb This is a very difficult one to solve. Does anyone know what would cause the execution of the sample_Import_Load.bat to run correctly from the windows command prompt, but fail when executed via the ExecuteStreamCommand processor with these errors? This is most likely caused by permission issues. Nifi requires specific permissions against files and scripts it touches or executes from within processors. As such, the error is saying the processor does not know where any of the resources exist to run that .bat file. I do not have any experience with nifi on windows, other than to avoid it, but the solution is likely the same as other operating systems. Make sure the nifi user has full ownership of the file(s). Additionally, it is sometimes possible to find deeper errors looking at the nifi-app.log file while testing and/or setting the log level of the processor to be more aggressive.
... View more
06-14-2023
01:06 AM
Fun fact for those interested: In order to have 8 cores running you need in this example minimum 64m as xss options. If you chose 32m, then it will not give a stackoverflow error, but only 4 cores will be running 😲
... View more
06-12-2023
05:06 PM
@Motimot Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. Thanks.
... View more
06-09-2023
04:11 PM
1 Kudo
Hey @steven-matison and @Former Member thank you so much for your help It worked with StandardProxyConfigurationService controller services however I still have issues with StandardRestrictedSSLContextService controller service. Anyway, thank you so much for the help and details steps that helped me a lot. Thank you!!
... View more
06-09-2023
11:59 AM
@Josh2023 Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. Thanks.
... View more
06-08-2023
08:08 AM
@JeffB Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. Thanks.
... View more
06-06-2023
11:43 AM
1 Kudo
I solved my problem. The solution is to not wrap the variable with double quotes. ${search_after:isEmpty():not():ifElse(', \"search_after\": ${search_after}', '')},
... View more
06-05-2023
02:57 PM
Thats interesting. Let me make sure my understanding is correct. 1. So after configuring password through jceks, there are failures if "Generate HADOOP_CREDSTORE_PASSWORD" is unchecked. Can I please have the full exception, just want to make sure if the full exception points to anything else in latest failure. 2. Did you also remove the properties you already had added i.e "<name>fs.s3a.access.key</name> <name>fs.s3a.secret.key</name>" in core-site.xml
... View more
06-05-2023
06:32 AM
@asandovala21 Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. Thanks.
... View more