Member since
06-14-2023
96
Posts
34
Kudos Received
9
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2829 | 11-04-2024 06:51 PM | |
| 7365 | 12-29-2023 09:36 AM | |
| 11206 | 12-28-2023 01:01 PM | |
| 2487 | 12-27-2023 12:14 PM | |
| 1507 | 12-08-2023 12:47 PM |
11-22-2023
02:46 PM
Yes, turns out the groovy-yaml module is an optional module in Groovy 3, however groovy-yaml-3.0.17.jar is actually included in nifi-scripting-nar-1.23.2 but the ExecuteScript processor when started throws the exception I mentioned. Possibly this indicates there are other dependencies that would need to be explicitly included in the classpath (e.g. jackson-databind, jackson-dataformat-yaml, groovy-json) I was hoping to use YamlSlurper followed by JsonBuilder to do YAML to JSON conversions. In the end I used the Jackson YamlFactory and ObjectMapper to achieve the desired result.
... View more
11-21-2023
01:56 PM
What's wrong with 1:1 relationship? If you're concerned with performance, then leveraging "Message Demarcator" for the Consume and Publish will provide the best throughput.
... View more
10-09-2023
11:51 AM
1 Kudo
InvokeScriptedProcessor is the closest you'll get to a native (NAR) NiFi processor from my experience. With it, you do NOT need to define all three relationships...if your code handles all possible problems correctly, you could just have "success" or other for your relationship. From what I've seen you do need to have at least one relationship if you're modifying the FlowFile or creating new ones and require session.transfer to send it to that relationship. You don't need to transfer the original...if you read the original FlowFile and create a new one or several new ones you can dispose of the original with session.remove(your_orginal_flow_file)
... View more
09-18-2023
01:11 PM
Forgot the ";" in the replacement value $1($2='$3');
... View more
09-16-2023
04:35 PM
I would do this with a Groovy based InvokeScriptedProcessor Using this code: import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { flowFile ->
List data = null
session.read(flowFile, {
inputStream -> data = jsonSlurper.parseText(IOUtils.toString(inputStream, StandardCharsets.UTF_8))
} as InputStreamCallback)
List outputData = []
data.each { order ->
outputData.add("${order.orderId} ${order.orderName}")
order.orderItems.each { orderItem ->
outputData.add("${orderItem.orderItemId} ${orderItem.orderItemName}")
}
}
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(outputData.join('\n').getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback)
session.transfer(newFlowFile, REL_SUCCESS)
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor() Don't let all that code scare you when the part that's doing the formatting is only these lines: This is the generated output:
... View more
08-28-2023
07:50 AM
Thanks for the reply. I did it in the same way you have mentioned (I am importing Selenium module and from this module, Service, WebDriverWait and EC components are imported) but late in the code execution, my scripts gives an error "global name Service is not defined". Any idea whats causing the issue?
... View more
07-31-2023
05:34 AM
Thanks for the suggestions. I ended up moving everything to stored procedures in the database, which are run under the existing context (service), so no need for a sensitive parameter.
... View more
07-02-2023
12:15 PM
If you download the Postgres JDBC driver's you can leverage any of the SQL processors to query Postgres directly instead of using pgdump. https://jdbc.postgresql.org/download/
... View more
06-30-2023
05:19 AM
@madhs Have you resolved your issue. If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future.
... View more