Member since
06-14-2023
96
Posts
34
Kudos Received
9
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2829 | 11-04-2024 06:51 PM | |
| 7365 | 12-29-2023 09:36 AM | |
| 11206 | 12-28-2023 01:01 PM | |
| 2487 | 12-27-2023 12:14 PM | |
| 1506 | 12-08-2023 12:47 PM |
06-26-2023
09:18 AM
@Ghilani You should know these 3 articles intimately if you want to use Execute Script https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-1/ta-p/248922 https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-2/ta-p/249018 https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-3/ta-p/249148 That said, it's sometimes helpful for me to see a working example and modify from there. As such, here is a github with a sample flow definition file (01_Fraud_Detection_Demo_Params_ExecuteScript.json) and script file(Fraud Demo ExecuteScript.py) that should work out of the box: https://github.com/cldr-steven-matison/Fraud-Prevention-With-Cloudera-SSB/tree/main/Templates Pay attention to imports and, then line 160 is what you want to get the flowfile. My flow ignores the content, but you should be able to find references in the Part 1 cookbook for anything you want to do w/ flowfile content.
... View more
06-26-2023
03:37 AM
The issue ended up being that ip is a reserved word (a default function) so changing the name of the attribute from ip to extractedIp solved it.
... View more
06-26-2023
12:02 AM
@Carson, Like @joseomjr wrote (but did not gave the entire link by mistake), you should take a look at the following Article as it describes exactly what you need --> https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-3/ta-p/249148 Basically, you add the property in your NiFi Processor and reference it to your parameter value and afterwards you call it in your script: myValue1 = myProperty1.getValue()
... View more
06-25-2023
12:21 PM
Apache PDFBox is supposed to allow you to merge PDF content. Since this is a Java library you can create a scripted Groovy processor to merge the files for you. https://pdfbox.apache.org https://javadoc.io/doc/org.apache.pdfbox/pdfbox/2.0.27/index.html
... View more
06-24-2023
05:55 PM
First, the incoming Data should be proper JSON that can be easily parsed and processed...i.e. something like this [
[
{
"ID": "100000",
"Date": "2022-09-22",
"Start Time": "08:00",
"End Time": "14:00"
},
{
"ID": "100001",
"Date": "2022-09-02",
"Start Time": "08:00",
"End Time": "14:00"
},
{
"ID": "100002",
"Date": "2022-09-02",
"Start Time": "08:00",
"End Time": "14:00"
}
],
{
"res_data": [
{
"Record": "Record 2",
"Description": "Invalid values for ID or Date"
},
{
"Record": "Record 3",
"Description": "Invalid values for ID or Date"
}
]
}
] Then Groovy code like this: import groovy.json.JsonOutput
import groovy.json.JsonSlurper
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
List data = jsonSlurper.parseText('''
[
[
{
"ID": "100000",
"Date": "2022-09-22",
"Start Time": "08:00",
"End Time": "14:00"
},
{
"ID": "100001",
"Date": "2022-09-02",
"Start Time": "08:00",
"End Time": "14:00"
},
{
"ID": "100002",
"Date": "2022-09-02",
"Start Time": "08:00",
"End Time": "14:00"
}
],
{
"res_data": [
{
"Record": "Record 2",
"Description": "Invalid values for ID or Date"
},
{
"Record": "Record 3",
"Description": "Invalid values for ID or Date"
}
]
}
]
''')
println(data)
println("=" * 80)
List<Map<String, String>> transformedList = []
data[0].eachWithIndex { map, index ->
Map<String, Object> record = data[1]["res_data"].find { it["Record"] == "Record ${index + 1}" }
if (record) {
map.putAll(record)
}
transformedList.add(map)
}
println(jsonOutput.prettyPrint(jsonOutput.toJson(transformedList))) Creates an output like this: [[[ID:100000, Date:2022-09-22, Start Time:08:00, End Time:14:00], [ID:100001, Date:2022-09-02, Start Time:08:00, End Time:14:00], [ID:100002, Date:2022-09-02, Start Time:08:00, End Time:14:00]], [res_data:[[Record:Record 2, Description:Invalid values for ID or Date], [Record:Record 3, Description:Invalid values for ID or Date]]]]
================================================================================
[
{
"ID": "100000",
"Date": "2022-09-22",
"Start Time": "08:00",
"End Time": "14:00"
},
{
"ID": "100001",
"Date": "2022-09-02",
"Start Time": "08:00",
"End Time": "14:00",
"Record": "Record 2",
"Description": "Invalid values for ID or Date"
},
{
"ID": "100002",
"Date": "2022-09-02",
"Start Time": "08:00",
"End Time": "14:00",
"Record": "Record 3",
"Description": "Invalid values for ID or Date"
}
]
... View more
06-24-2023
05:18 PM
This is Groovy code that achieves what you've mentioned and should be able to adapt to your scripted processor. import groovy.json.JsonOutput
import groovy.json.JsonSlurper
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
Map<String, String> data = jsonSlurper.parseText('''{
"Message": "\nRecord 1:\nRequired data is missing. \n\nRecord 2:\nprocessing failed\n"
}''')
println(data)
println("=" * 80)
List<String> records = data.Message.split("\\nRecord\\s\\d+:\\n").findAll { it.trim() != "" }
println(records)
println("=" * 80)
Map<String, String> messages = [:]
records.eachWithIndex { message, index ->
messages["Record ${index + 1}"] = message.trim()
}
println(jsonOutput.toJson(messages)) This is the output: [Message:
Record 1:
Required data is missing.
Record 2:
processing failed
]
================================================================================
[Required data is missing.
, processing failed
]
================================================================================
{"Record 1":"Required data is missing.","Record 2":"processing failed"}
... View more
06-22-2023
05:56 PM
This is likely what's generating the error... You're saying remove the Flow file in a loop and should only be done once. for element in found_elements: session.remove(flowFile) print(element.tag, element.text)
... View more
06-16-2023
11:26 AM
You have to create a separate case statement for each column you are trying to update similar to what is done for the shipment number.
... View more
06-15-2023
11:41 AM
This simple InvokeScriptedProcessor will look for a FlowFile attribute called "ip_address" and will attempt the reverse lookup and create a new attribute called "host_name" with the resolved value. import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.net.InetAddress
import java.net.UnknownHostException
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
def reverseDnsLookup(String ipAddress) {
try {
InetAddress inetAddress = InetAddress.getByName(ipAddress)
String hostName = inetAddress.getCanonicalHostName()
return hostName
} catch (UnknownHostException e) {
return "Unknown"
}
}
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
Map customAttributes = [:]
flowFiles.each { flowFile ->
String ipAddress = flowFile.getAttribute("ip_address")
if (ipAddress) {
String hostName = reverseDnsLookup(ipAddress)
customAttributes["host_name"] = hostName
flowFile = session.putAllAttributes(flowFile, customAttributes)
}
session.transfer(flowFile, REL_SUCCESS)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
06-14-2023
10:00 PM
What are you using Vault for specifically? Retrieving secrets for some other purpose? Anytime the built in processors don't or can't do what I need or want I've found scripted processors to be ideal.
... View more
- « Previous
- Next »