Member since
06-14-2023
95
Posts
33
Kudos Received
8
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3809 | 12-29-2023 09:36 AM | |
5571 | 12-28-2023 01:01 PM | |
1094 | 12-27-2023 12:14 PM | |
552 | 12-08-2023 12:47 PM | |
1731 | 11-21-2023 10:56 PM |
06-16-2023
11:26 AM
You have to create a separate case statement for each column you are trying to update similar to what is done for the shipment number.
... View more
06-15-2023
11:41 AM
This simple InvokeScriptedProcessor will look for a FlowFile attribute called "ip_address" and will attempt the reverse lookup and create a new attribute called "host_name" with the resolved value. import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.net.InetAddress
import java.net.UnknownHostException
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
def reverseDnsLookup(String ipAddress) {
try {
InetAddress inetAddress = InetAddress.getByName(ipAddress)
String hostName = inetAddress.getCanonicalHostName()
return hostName
} catch (UnknownHostException e) {
return "Unknown"
}
}
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
Map customAttributes = [:]
flowFiles.each { flowFile ->
String ipAddress = flowFile.getAttribute("ip_address")
if (ipAddress) {
String hostName = reverseDnsLookup(ipAddress)
customAttributes["host_name"] = hostName
flowFile = session.putAllAttributes(flowFile, customAttributes)
}
session.transfer(flowFile, REL_SUCCESS)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
06-14-2023
10:00 PM
What are you using Vault for specifically? Retrieving secrets for some other purpose? Anytime the built in processors don't or can't do what I need or want I've found scripted processors to be ideal.
... View more
06-14-2023
03:04 PM
Does it need to be ECMA? I can probably whip something up tomorrow using Groovy.
... View more
06-14-2023
02:42 PM
Consume Kafka has an attribute called "Message Demarcator"...click on it hold shift+enter and instead of pulling 1 event at a time it'll create a single FlowFile with several events at a time and might make your merge even better. You can do the same thing with the Publish...merge on shift+enter and configure the same demarcator and you'll achieve greater throughput
... View more
06-14-2023
02:33 PM
Does ExecuteSQL erase some of the attributes that could be used to associate the FlowFiles futher down stream?
... View more
06-14-2023
02:26 PM
This could possibly be achieved via a InvokeScriptedProcessor but I would need to know the source and what the expected output would be. For example, taking what you posted and you want to filter on code=6 and code=8 and only have the values of "other" as individual FlowFiles, then something like this Groovy based code could achive that import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
PropertyDescriptor FILTER_CODES = new PropertyDescriptor.Builder()
.name("FILTER_CODES")
.displayName("Filter Codes")
.description("Codes to Filter On")
.required(true)
.defaultValue("6,8")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE, FILTER_CODES]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
String filterCodesString = context.getProperty(FILTER_CODES).getValue()
List<Integer> filterCodes = filterCodesString.split(",").findAll { it.trim().matches("\\d+") }.collect { it as Integer }
flowFiles.each { flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
session.read(flowFile, { inputStream ->
inputStream.eachLine { line ->
if (line?.trim()) {
Map dataMap = jsonSlurper.parseText(line)
if (filterCodes.contains(dataMap.code.toInteger())) {
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile,
{
outputStream -> outputStream.write(jsonOutput.toJson(dataMap.other).getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
newFlowFile = session.putAllAttributes(newFlowFile, customAttributes)
session.transfer(newFlowFile, REL_SUCCESS)
}
}
}
} as InputStreamCallback)
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
06-14-2023
01:30 PM
You could use a SplitContent processor to create a FlowFile for each statement. For "Byte Sequence" hold Shift+Enter
... View more
06-14-2023
01:21 PM
Do you have the code you're trying and a sample of the file causing/generating the error?
... View more
- « Previous
- Next »