Member since
06-14-2023
95
Posts
33
Kudos Received
8
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3841 | 12-29-2023 09:36 AM | |
5625 | 12-28-2023 01:01 PM | |
1107 | 12-27-2023 12:14 PM | |
558 | 12-08-2023 12:47 PM | |
1745 | 11-21-2023 10:56 PM |
10-07-2023
09:48 PM
Created this Python ExecuteScript NiFi processor that extracts the files of a ZipFile (including those in subdirectories) into individual FlowFiles. It all happens inside of NiFi and not fully tested but it worked with a simple example in my lab. "Script Body" below: ''' Extract Zip Files '''
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import InputStreamCallback, OutputStreamCallback
import zipfile
from io import BytesIO
class PyInputStreamCallback(InputStreamCallback):
''' InputStream Callback '''
def __init__(self):
self.zip_file = None
def process(self, input_stream):
''' Process our InputStream '''
zip_buffer = BytesIO(IOUtils.toByteArray(input_stream))
self.zip_file = zipfile.ZipFile(zip_buffer, "r")
class PyOutputStreamCallback(OutputStreamCallback):
''' OutputStream Callback '''
def __init__(self, file):
self.file = file
def process(self, output_stream):
''' Process our OutputStream '''
output_stream.write(self.file.read())
flow_file = session.get()
if flow_file:
input_stream_callback = PyInputStreamCallback()
output_stream_callback = PyOutputStreamCallback
session.read(flow_file, input_stream_callback)
zip_filename = flow_file.getAttribute("filename")
zip_file = input_stream_callback.zip_file
for name in (name for name in zip_file.namelist() if not name.endswith("/")):
new_flow_file = session.create()
new_flow_file = session.putAttribute(new_flow_file, "filename", name)
new_flow_file = session.putAttribute(new_flow_file, "zip_filename", zip_filename)
new_flow_file = session.write(
new_flow_file,
output_stream_callback(zip_file.open(name))
)
session.transfer(new_flow_file, REL_SUCCESS)
zip_file.close()
session.remove(flow_file)
... View more
09-18-2023
01:11 PM
Forgot the ";" in the replacement value $1($2='$3');
... View more
09-18-2023
12:45 PM
@Kumar_ Check the configuration value boxes to see if you added a line return. If the configuration pop-up box shows more then just a "1" line then you have a lone return at the end of line "1". Click on line to and hit delete to get rid of the line return if present. If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
09-16-2023
04:35 PM
I would do this with a Groovy based InvokeScriptedProcessor Using this code: import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { flowFile ->
List data = null
session.read(flowFile, {
inputStream -> data = jsonSlurper.parseText(IOUtils.toString(inputStream, StandardCharsets.UTF_8))
} as InputStreamCallback)
List outputData = []
data.each { order ->
outputData.add("${order.orderId} ${order.orderName}")
order.orderItems.each { orderItem ->
outputData.add("${orderItem.orderItemId} ${orderItem.orderItemName}")
}
}
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(outputData.join('\n').getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback)
session.transfer(newFlowFile, REL_SUCCESS)
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor() Don't let all that code scare you when the part that's doing the formatting is only these lines: This is the generated output:
... View more
09-16-2023
01:38 PM
Are you or have you considered leveraging ES bulk API? Bulk API | Elasticsearch Guide [8.9] | Elastic
... View more
09-16-2023
01:15 PM
As far as I know, NiFi leverage Jython and would currently limit you to using Python 2.7 compatible code and only modules written in pure Python. Home | Jython
... View more
08-28-2023
07:50 AM
Thanks for the reply. I did it in the same way you have mentioned (I am importing Selenium module and from this module, Service, WebDriverWait and EC components are imported) but late in the code execution, my scripts gives an error "global name Service is not defined". Any idea whats causing the issue?
... View more
07-31-2023
05:34 AM
Thanks for the suggestions. I ended up moving everything to stored procedures in the database, which are run under the existing context (service), so no need for a sensitive parameter.
... View more
07-02-2023
12:15 PM
If you download the Postgres JDBC driver's you can leverage any of the SQL processors to query Postgres directly instead of using pgdump. https://jdbc.postgresql.org/download/
... View more