Member since
06-14-2023
96
Posts
34
Kudos Received
9
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2827 | 11-04-2024 06:51 PM | |
| 7362 | 12-29-2023 09:36 AM | |
| 11200 | 12-28-2023 01:01 PM | |
| 2484 | 12-27-2023 12:14 PM | |
| 1503 | 12-08-2023 12:47 PM |
10-07-2023
09:48 PM
Created this Python ExecuteScript NiFi processor that extracts the files of a ZipFile (including those in subdirectories) into individual FlowFiles. It all happens inside of NiFi and not fully tested but it worked with a simple example in my lab. "Script Body" below: ''' Extract Zip Files '''
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import InputStreamCallback, OutputStreamCallback
import zipfile
from io import BytesIO
class PyInputStreamCallback(InputStreamCallback):
''' InputStream Callback '''
def __init__(self):
self.zip_file = None
def process(self, input_stream):
''' Process our InputStream '''
zip_buffer = BytesIO(IOUtils.toByteArray(input_stream))
self.zip_file = zipfile.ZipFile(zip_buffer, "r")
class PyOutputStreamCallback(OutputStreamCallback):
''' OutputStream Callback '''
def __init__(self, file):
self.file = file
def process(self, output_stream):
''' Process our OutputStream '''
output_stream.write(self.file.read())
flow_file = session.get()
if flow_file:
input_stream_callback = PyInputStreamCallback()
output_stream_callback = PyOutputStreamCallback
session.read(flow_file, input_stream_callback)
zip_filename = flow_file.getAttribute("filename")
zip_file = input_stream_callback.zip_file
for name in (name for name in zip_file.namelist() if not name.endswith("/")):
new_flow_file = session.create()
new_flow_file = session.putAttribute(new_flow_file, "filename", name)
new_flow_file = session.putAttribute(new_flow_file, "zip_filename", zip_filename)
new_flow_file = session.write(
new_flow_file,
output_stream_callback(zip_file.open(name))
)
session.transfer(new_flow_file, REL_SUCCESS)
zip_file.close()
session.remove(flow_file)
... View more
10-01-2023
01:29 PM
If you can run this python code under Python without any external modules, you should be able to run it as a scripted processor and have everything happen inside of NiFi.
... View more
09-18-2023
01:11 PM
Forgot the ";" in the replacement value $1($2='$3');
... View more
09-16-2023
04:35 PM
I would do this with a Groovy based InvokeScriptedProcessor Using this code: import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { flowFile ->
List data = null
session.read(flowFile, {
inputStream -> data = jsonSlurper.parseText(IOUtils.toString(inputStream, StandardCharsets.UTF_8))
} as InputStreamCallback)
List outputData = []
data.each { order ->
outputData.add("${order.orderId} ${order.orderName}")
order.orderItems.each { orderItem ->
outputData.add("${orderItem.orderItemId} ${orderItem.orderItemName}")
}
}
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(outputData.join('\n').getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback)
session.transfer(newFlowFile, REL_SUCCESS)
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor() Don't let all that code scare you when the part that's doing the formatting is only these lines: This is the generated output:
... View more
09-16-2023
01:30 PM
1 Kudo
ReplaceText processor should work.. This RegEx search pattern should do the trick: ^(.*?)\(([^=]*)=([^\)]*)\);$ With this replacement value: $1($2='$3') Don't forget to set the evaluation mode to line-by-line...
... View more
08-28-2023
07:10 AM
The 2 lines I provided are the very first lines and then all your code comes after.
... View more
07-29-2023
07:56 PM
NiFi is awesome with so many out-of-the-box processors...however, I have found sometimes a very specialized scripted Groovy processor that fetches 1000 or more files at a time to perform significantly faster... especially if your custom processor consolidates several processors into 1.
... View more
07-29-2023
11:16 AM
2 Kudos
You should be able to create a custom scripted processor and define an attribute that's sensitive so once you set it...it won't be visible. You might be able to handle your whole process inside the script so the password never gets written out. I've written several in Groovy. An alternative option is to leverage something like Vault (Vault by HashiCorp (vaultproject.io)). PropertyDescriptor OAUTH_TOKEN = new PropertyDescriptor.Builder() .name("oAuth Token") .displayName("oAuth Token") .description("oAuth Token") .required(true) .sensitive(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build()
... View more
07-02-2023
12:15 PM
If you download the Postgres JDBC driver's you can leverage any of the SQL processors to query Postgres directly instead of using pgdump. https://jdbc.postgresql.org/download/
... View more
06-26-2023
06:51 AM
1 Kudo
Use an UpdateAttribute processor to add attributes with your header values. i.e. Content-Type with value of application/json and in the InvokeHTTP you'll notice there's a field called Attributes to Send (or something like this... They will be the headers) and list those Accept|Authorize|Any other (it's regular expression so you might have to escape some characters). FlowFile content should be sent as the body.
... View more