Member since
06-14-2023
90
Posts
27
Kudos Received
8
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2833 | 12-29-2023 09:36 AM | |
3794 | 12-28-2023 01:01 PM | |
909 | 12-27-2023 12:14 PM | |
417 | 12-08-2023 12:47 PM | |
1370 | 11-21-2023 10:56 PM |
06-14-2023
03:04 PM
Does it need to be ECMA? I can probably whip something up tomorrow using Groovy.
... View more
06-14-2023
02:45 PM
Would a ScriptedProcessor within NiFi, not having to call an external script but within NiFi be adequate?
... View more
06-14-2023
02:42 PM
Consume Kafka has an attribute called "Message Demarcator"...click on it hold shift+enter and instead of pulling 1 event at a time it'll create a single FlowFile with several events at a time and might make your merge even better. You can do the same thing with the Publish...merge on shift+enter and configure the same demarcator and you'll achieve greater throughput
... View more
06-14-2023
02:33 PM
Does ExecuteSQL erase some of the attributes that could be used to associate the FlowFiles futher down stream?
... View more
06-14-2023
02:26 PM
This could possibly be achieved via a InvokeScriptedProcessor but I would need to know the source and what the expected output would be. For example, taking what you posted and you want to filter on code=6 and code=8 and only have the values of "other" as individual FlowFiles, then something like this Groovy based code could achive that import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
PropertyDescriptor FILTER_CODES = new PropertyDescriptor.Builder()
.name("FILTER_CODES")
.displayName("Filter Codes")
.description("Codes to Filter On")
.required(true)
.defaultValue("6,8")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE, FILTER_CODES]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
String filterCodesString = context.getProperty(FILTER_CODES).getValue()
List<Integer> filterCodes = filterCodesString.split(",").findAll { it.trim().matches("\\d+") }.collect { it as Integer }
flowFiles.each { flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
session.read(flowFile, { inputStream ->
inputStream.eachLine { line ->
if (line?.trim()) {
Map dataMap = jsonSlurper.parseText(line)
if (filterCodes.contains(dataMap.code.toInteger())) {
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile,
{
outputStream -> outputStream.write(jsonOutput.toJson(dataMap.other).getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
newFlowFile = session.putAllAttributes(newFlowFile, customAttributes)
session.transfer(newFlowFile, REL_SUCCESS)
}
}
}
} as InputStreamCallback)
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
06-14-2023
01:30 PM
You could use a SplitContent processor to create a FlowFile for each statement. For "Byte Sequence" hold Shift+Enter
... View more
06-14-2023
01:21 PM
Do you have the code you're trying and a sample of the file causing/generating the error?
... View more
06-14-2023
12:33 PM
I would do this in a single step with a InvokeScriptedProcessor and the following Groovy code import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
List data = null
session.read(flowFile, {
inputStream -> data = jsonSlurper.parseText(IOUtils.toString(inputStream, StandardCharsets.UTF_8))
} as InputStreamCallback)
data.each { entry ->
entry.VisitList.each { visit ->
Map newData = [:]
newData.put("employer", entry.employer)
newData.put("loc_id", entry.loc_id)
newData.put("topId", entry.topId)
newData.put("VisitList", [visit])
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(jsonOutput.toJson([newData]).getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback)
newFlowFile = session.putAllAttributes(newFlowFile, customAttributes)
session.transfer(newFlowFile, REL_SUCCESS)
}
}
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
06-14-2023
12:05 PM
1 Kudo
Was getting the same error with some of our Python/Jython scripts and how I got it to work was remove the modules path from "Module Directory" and add it with the following at the very top of your script. import sys sys.path.append("/Users/bonthala/Library/Python/2.7/lib/python/site-packages/")
... View more
- « Previous
- Next »