Member since
06-14-2023
90
Posts
27
Kudos Received
8
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3148 | 12-29-2023 09:36 AM | |
4235 | 12-28-2023 01:01 PM | |
934 | 12-27-2023 12:14 PM | |
429 | 12-08-2023 12:47 PM | |
1386 | 11-21-2023 10:56 PM |
06-25-2023
11:28 AM
Have you seen if this has what you're referring to? https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-3/ta-p/
... View more
06-24-2023
08:16 PM
I've done this With several Java libraries and Groovy based InvokeScriptedProcessor. Just looked up Kazoo... You can do the same with Python/Jython as long as the Kazoo module is 100% Python and not C based
... View more
06-24-2023
05:55 PM
First, the incoming Data should be proper JSON that can be easily parsed and processed...i.e. something like this [
[
{
"ID": "100000",
"Date": "2022-09-22",
"Start Time": "08:00",
"End Time": "14:00"
},
{
"ID": "100001",
"Date": "2022-09-02",
"Start Time": "08:00",
"End Time": "14:00"
},
{
"ID": "100002",
"Date": "2022-09-02",
"Start Time": "08:00",
"End Time": "14:00"
}
],
{
"res_data": [
{
"Record": "Record 2",
"Description": "Invalid values for ID or Date"
},
{
"Record": "Record 3",
"Description": "Invalid values for ID or Date"
}
]
}
] Then Groovy code like this: import groovy.json.JsonOutput
import groovy.json.JsonSlurper
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
List data = jsonSlurper.parseText('''
[
[
{
"ID": "100000",
"Date": "2022-09-22",
"Start Time": "08:00",
"End Time": "14:00"
},
{
"ID": "100001",
"Date": "2022-09-02",
"Start Time": "08:00",
"End Time": "14:00"
},
{
"ID": "100002",
"Date": "2022-09-02",
"Start Time": "08:00",
"End Time": "14:00"
}
],
{
"res_data": [
{
"Record": "Record 2",
"Description": "Invalid values for ID or Date"
},
{
"Record": "Record 3",
"Description": "Invalid values for ID or Date"
}
]
}
]
''')
println(data)
println("=" * 80)
List<Map<String, String>> transformedList = []
data[0].eachWithIndex { map, index ->
Map<String, Object> record = data[1]["res_data"].find { it["Record"] == "Record ${index + 1}" }
if (record) {
map.putAll(record)
}
transformedList.add(map)
}
println(jsonOutput.prettyPrint(jsonOutput.toJson(transformedList))) Creates an output like this: [[[ID:100000, Date:2022-09-22, Start Time:08:00, End Time:14:00], [ID:100001, Date:2022-09-02, Start Time:08:00, End Time:14:00], [ID:100002, Date:2022-09-02, Start Time:08:00, End Time:14:00]], [res_data:[[Record:Record 2, Description:Invalid values for ID or Date], [Record:Record 3, Description:Invalid values for ID or Date]]]]
================================================================================
[
{
"ID": "100000",
"Date": "2022-09-22",
"Start Time": "08:00",
"End Time": "14:00"
},
{
"ID": "100001",
"Date": "2022-09-02",
"Start Time": "08:00",
"End Time": "14:00",
"Record": "Record 2",
"Description": "Invalid values for ID or Date"
},
{
"ID": "100002",
"Date": "2022-09-02",
"Start Time": "08:00",
"End Time": "14:00",
"Record": "Record 3",
"Description": "Invalid values for ID or Date"
}
]
... View more
06-24-2023
05:18 PM
This is Groovy code that achieves what you've mentioned and should be able to adapt to your scripted processor. import groovy.json.JsonOutput
import groovy.json.JsonSlurper
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
Map<String, String> data = jsonSlurper.parseText('''{
"Message": "\nRecord 1:\nRequired data is missing. \n\nRecord 2:\nprocessing failed\n"
}''')
println(data)
println("=" * 80)
List<String> records = data.Message.split("\\nRecord\\s\\d+:\\n").findAll { it.trim() != "" }
println(records)
println("=" * 80)
Map<String, String> messages = [:]
records.eachWithIndex { message, index ->
messages["Record ${index + 1}"] = message.trim()
}
println(jsonOutput.toJson(messages)) This is the output: [Message:
Record 1:
Required data is missing.
Record 2:
processing failed
]
================================================================================
[Required data is missing.
, processing failed
]
================================================================================
{"Record 1":"Required data is missing.","Record 2":"processing failed"}
... View more
06-24-2023
04:56 PM
Should look more like this: context.getProperty(externalRequestTokenDescriptor).evaluateAttributeExpressions().getValue()
... View more
06-24-2023
04:37 PM
You probably need to escape the $ which is a special character in NiFi...try adding \ or \\
... View more
06-22-2023
05:56 PM
This is likely what's generating the error... You're saying remove the Flow file in a loop and should only be done once. for element in found_elements: session.remove(flowFile) print(element.tag, element.text)
... View more
06-15-2023
11:41 AM
This simple InvokeScriptedProcessor will look for a FlowFile attribute called "ip_address" and will attempt the reverse lookup and create a new attribute called "host_name" with the resolved value. import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.net.InetAddress
import java.net.UnknownHostException
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
def reverseDnsLookup(String ipAddress) {
try {
InetAddress inetAddress = InetAddress.getByName(ipAddress)
String hostName = inetAddress.getCanonicalHostName()
return hostName
} catch (UnknownHostException e) {
return "Unknown"
}
}
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
Map customAttributes = [:]
flowFiles.each { flowFile ->
String ipAddress = flowFile.getAttribute("ip_address")
if (ipAddress) {
String hostName = reverseDnsLookup(ipAddress)
customAttributes["host_name"] = hostName
flowFile = session.putAllAttributes(flowFile, customAttributes)
}
session.transfer(flowFile, REL_SUCCESS)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
06-14-2023
10:00 PM
What are you using Vault for specifically? Retrieving secrets for some other purpose? Anytime the built in processors don't or can't do what I need or want I've found scripted processors to be ideal.
... View more