Member since
06-14-2023
90
Posts
27
Kudos Received
8
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3151 | 12-29-2023 09:36 AM | |
4240 | 12-28-2023 01:01 PM | |
934 | 12-27-2023 12:14 PM | |
431 | 12-08-2023 12:47 PM | |
1386 | 11-21-2023 10:56 PM |
12-29-2023
09:36 AM
2 Kudos
@Heeya8876 , both @SAMSAL and I have recently gone through the adventures of getting 2.0.0-M1 to run with the Python extension enabled. Here are some findings so far on the Linux side of things. Java 21 is required (any platform) Python 3.9+ (any platform) is required (I believe @SAMSAL, correct me if I'm wrong, said Python 3.12 did NOT work, but we both got 3.11 to run) If it's installed, make sure it's the default set with "sudo update-alternatives java" Make sure your environment has JAVA_HOME defined with the path for Java 21 Make sure Python3.9+ is the default prior to running NiFi with "sudo update-alternatives --config python3" Executing python3 --version should show whichever version you set as your default and it should be 3.9~3.11 You can see what version was copied by NiFi in the directory "./work/python/controller/bin/python3 --version" If this is showing anything <3.9 then delete the work folder, follow the steps above, and try again. If you build a processor from scratch the Developer guide says to use this for your __init__ def __init__(self, **kwargs):
super().__init__(**kwargs) You'll get an error...replace super().__init__(**kwargs) with pass like the examples that come with the install. Changes to your Python extensions are not immediate....NiFi polls the directory periodically to detect changes, download dependencies, and load the updated processors. Sometimes I had to restart NiFi to get it to detect my changes if my previous code update made it really unhappy. ./logs/nifi-python.log will be your friend for Python extension related issues If your Python extension has dependencies and it fails to download them you can see the command it attempted in nifi-python.log; I manually ran the commands in the logs and it downloaded the modules into the correct place and worked...perhaps there's a timeout for module downloads? (just a guess since the module had a ton of large dependencies) I don't think I saw it in the Developer's Guide but did notice while building a custom FlowFileTransform Python extension, the "content" data returned with the FlowFileTransformResult should be a string or byte array. @SAMSAL has additional insight on getting it to start up on Windows
... View more
12-29-2023
08:50 AM
If you want to avoid duplicates you could hash the content of the files and leverage the DetectDuplicate processor to only insert the unique files into your DB.
... View more
12-28-2023
01:17 PM
Agree with @SAMSAL's approach and if you can provide a parameter or something in the header or request so your API returns a JSON response each time it'll make things a lot easier for you to parse and build the request for the next step in your flow.
... View more
12-28-2023
01:01 PM
1 Kudo
ExecuteGroovyScript alternative with this input {
"idTransakcji": "123",
"date": "",
"name": "sam"
} import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
FlowFile flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile, { inputStream, outputStream ->
Map data = jsonSlurper.parse(inputStream)
data = [
"id": data.idTransakcji,
"user": [
"date": data.date?.isNumber() ? Long.parseLong(data.date) : null,
"name": data.name
]
]
outputStream.write(jsonOutput.toJson(data).getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
... View more
12-28-2023
12:25 PM
2 Kudos
...a 3rd option because I like scripted processors 😂...using ExcecuteGroovyScript import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
FlowFile flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile, { inputStream, outputStream ->
List<Map> data = jsonSlurper.parse(inputStream)
data.each {
it.order_item = jsonSlurper.parseText(it.order_item)
}
outputStream.write(jsonOutput.toJson(data).getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS) Looks like a lot but this is what takes the string JSON and converts it to JSON: it.order_item = jsonSlurper.parseText(it.order_item)
... View more
12-27-2023
02:01 PM
1 Kudo
PublishKafka must have an active connection to Kafka before it even attempts to send a FlowFile which mean it never even gets into the block of code that sends it and routes it to "success" or "failure" accordingly. Making sure your Kafka cluster is up and running should be the focus if this is what you're experiencing. My guess if this "error" topic you have is on the same Kafka cluster then, even if PublishKafka was able to route a FlowFile to "failure" when it's unable to connect to Kafka, it wouldn't work anyways.
... View more
12-27-2023
12:14 PM
1 Kudo
How the FlowFile is distributed from your ListenUDP processor to the next in the flow is defined in the connection between them. Leveraging something like HAProxy, Nginx, or any other form of load balancer in front of your NiFi cluster would be a way to ensure you data is forwarded to any of the nodes that are still accessible as long as the cluster is up.
... View more
12-11-2023
09:35 AM
How about SplitJson $[*] followed by EvaluateJson $.SEARCH_RESULT
... View more
12-11-2023
09:01 AM
I think you could probably use EvaluateJsonPath to parse the JSON value for "SEARCH_RESULT" but I like scripted processors so I would use a Groovy based InvokeScriptedProcessor with this code import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("100")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void initialize(ProcessorInitializationContext context) {
log = context.logger
}
Set<Relationship> getRelationships() {
Set<Relationship> relationships = new HashSet<>()
relationships.add(REL_FAILURE)
relationships.add(REL_SUCCESS)
return relationships
}
Collection<ValidationResult> validate(ValidationContext context) {
}
PropertyDescriptor getPropertyDescriptor(String name) {
}
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
}
List<PropertyDescriptor> getPropertyDescriptors() {
List<PropertyDescriptor> descriptors = new ArrayList<>()
descriptors.add(BATCH_SIZE)
return Collections.unmodifiableList(descriptors)
}
String getIdentifier() {
}
void onScheduled(ProcessContext context) throws ProcessException {
}
void onUnscheduled(ProcessContext context) throws ProcessException {
}
void onStopped(ProcessContext context) throws ProcessException {
}
void setLogger(ComponentLog logger) {
}
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { FlowFile flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
flowFile = session.write(flowFile, { inputStream, outputStream ->
List<Map> searchResults = jsonSlurper.parse(inputStream)
searchResults = searchResults.collect { jsonSlurper.parseText(it.SEARCH_RESULT) }
outputStream.write(JsonOutput.toJson(searchResults).getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.putAllAttributes(flowFile, customAttributes)
session.transfer(flowFile, REL_SUCCESS)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor() It looks like a lot but most of it is just boilerplate with the actual work being done here: ...and the output
... View more
12-08-2023
01:52 PM
I don't see EvaluateJsonPath give options so it might be something you'd have to handle on your own...personally I'd do it via a Groovy scripted processor for greater control and performance.
... View more