Member since
06-14-2023
95
Posts
33
Kudos Received
8
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3841 | 12-29-2023 09:36 AM | |
5625 | 12-28-2023 01:01 PM | |
1106 | 12-27-2023 12:14 PM | |
558 | 12-08-2023 12:47 PM | |
1745 | 11-21-2023 10:56 PM |
06-24-2023
04:37 PM
You probably need to escape the $ which is a special character in NiFi...try adding \ or \\
... View more
06-22-2023
05:56 PM
This is likely what's generating the error... You're saying remove the Flow file in a loop and should only be done once. for element in found_elements: session.remove(flowFile) print(element.tag, element.text)
... View more
06-15-2023
11:41 AM
This simple InvokeScriptedProcessor will look for a FlowFile attribute called "ip_address" and will attempt the reverse lookup and create a new attribute called "host_name" with the resolved value. import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.net.InetAddress
import java.net.UnknownHostException
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
def reverseDnsLookup(String ipAddress) {
try {
InetAddress inetAddress = InetAddress.getByName(ipAddress)
String hostName = inetAddress.getCanonicalHostName()
return hostName
} catch (UnknownHostException e) {
return "Unknown"
}
}
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
Map customAttributes = [:]
flowFiles.each { flowFile ->
String ipAddress = flowFile.getAttribute("ip_address")
if (ipAddress) {
String hostName = reverseDnsLookup(ipAddress)
customAttributes["host_name"] = hostName
flowFile = session.putAllAttributes(flowFile, customAttributes)
}
session.transfer(flowFile, REL_SUCCESS)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
06-14-2023
10:00 PM
What are you using Vault for specifically? Retrieving secrets for some other purpose? Anytime the built in processors don't or can't do what I need or want I've found scripted processors to be ideal.
... View more
06-14-2023
03:04 PM
Does it need to be ECMA? I can probably whip something up tomorrow using Groovy.
... View more
06-14-2023
02:45 PM
Would a ScriptedProcessor within NiFi, not having to call an external script but within NiFi be adequate?
... View more
06-14-2023
02:42 PM
Consume Kafka has an attribute called "Message Demarcator"...click on it hold shift+enter and instead of pulling 1 event at a time it'll create a single FlowFile with several events at a time and might make your merge even better. You can do the same thing with the Publish...merge on shift+enter and configure the same demarcator and you'll achieve greater throughput
... View more
06-14-2023
02:33 PM
Does ExecuteSQL erase some of the attributes that could be used to associate the FlowFiles futher down stream?
... View more
- « Previous
- Next »