Member since
01-27-2023
229
Posts
74
Kudos Received
45
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1779 | 02-23-2024 01:14 AM | |
| 2331 | 01-26-2024 01:31 AM | |
| 1451 | 11-22-2023 12:28 AM | |
| 3619 | 11-22-2023 12:10 AM | |
| 3701 | 11-06-2023 12:44 AM |
06-14-2023
12:33 PM
I would do this in a single step with a InvokeScriptedProcessor and the following Groovy code import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
List data = null
session.read(flowFile, {
inputStream -> data = jsonSlurper.parseText(IOUtils.toString(inputStream, StandardCharsets.UTF_8))
} as InputStreamCallback)
data.each { entry ->
entry.VisitList.each { visit ->
Map newData = [:]
newData.put("employer", entry.employer)
newData.put("loc_id", entry.loc_id)
newData.put("topId", entry.topId)
newData.put("VisitList", [visit])
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(jsonOutput.toJson([newData]).getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback)
newFlowFile = session.putAllAttributes(newFlowFile, customAttributes)
session.transfer(newFlowFile, REL_SUCCESS)
}
}
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
06-09-2023
04:11 PM
1 Kudo
Hey @steven-matison and @Former Member thank you so much for your help It worked with StandardProxyConfigurationService controller services however I still have issues with StandardRestrictedSSLContextService controller service. Anyway, thank you so much for the help and details steps that helped me a lot. Thank you!!
... View more
06-07-2023
12:24 PM
2 Kudos
@SandyClouds Some clarity and additions to @cotopaul Pros and Cons: Single Node: PROs: - easy to manage. <-- Setup and managing configuration is easier since you only need to do that on one node. But in a cluster, all nodes configuration files will be almost the same (some variations in hostname properties and certificates if you secure your cluster). - easy to configure. <-- There are more configurations needed in a cluster setup, but once setup, nothing changes from the user experience when it comes to interacting with the UI. - no https required. <-- Not sure how this is a PRO. I would not recommend using an un-secure NiFi as doing so allow anyone access to your dataflows and the data being processed. You can also have an un-secure NiFi cluster while i do not recommend that either. CONs: - in case of issues with the node, you NiFi instance is down. <-- Very true, single point of failure. - it uses plenty of resources, when it needs to process data, as everything is done on a single node. Cluster: PROs: - redundancy and failover --> when a node goes down, the others will take over and process everything, meaning that you will not get affected. <-- Not complete accurate. Each node in a NiFi cluster is only aware of the data (FlowFiles) queued on that specific node. So each node works on the FlowFile present on that one node, so it is the responsibility of the dataflow designer/builder to make sure they built their dataflows in such away to ensure distribution of FlowFiles across all nodes. When a node goes down, any data FlowFiles currently queued on that down node are not going to be processed by the other nodes. However, other nodes will continue processing their data and all new data coming in to your dataflow cluster - the used resources will be split among all the nodes, meaning that you can cover more use cases as on a single node. <-- Different nodes do not share or pool resources from all nodes in the cluster. If your dataflow(s) are built correctly the volume of data (FlowFiles) being processed will be distributed across all your nodes along each node to process a smaller subset of the overall FlowFile volume. This means more resources available across yoru cluster to handle more volume. NEW -- A NiFi cluster can be accessed via any one of the member nodes. No matter which node's UI you access, you will be presented with stats for all nodes. There is a cluster UI accessible from the global menu that allows you to see a breakdown of each node. Any changes you make from the UI of any one of the member nodes will be replicated to all nodes. NEW -- Since all nodes run their own copy of the flow, a catastrophic node failure does not mean loss of all your work since the same flow.json.gz (contains everything related to your dataflows) can be retrieved from any of the other nodes in your cluster. CONs: - complex setup as it requires a Zookeeper + plenty of other config files. <-- NiFi cluster requires a multi node zookeeper setup. Zookeeper quorum is required for cluster stability and also stores cluster wide state needed for your dataflow. Zookeeper is responsible for electing a node in your cluster with the Cluster Coordinator role and Primary node role. IF a node goes down that has been assigned one of these roles, Zookeeper will elected one of the still up nodes to the role - complex to manage --> analysis will be done on X nodes instead of a single node. <-- not clear. Yes you have multiple nodes and all those nodes are producing their own set of NiFi-logs. However, if a component within your dataflow is producing bulletins (exceptions) it will report all nodes or the specific node(s) on which bulletin was produced. Cloudera offers centralized management of your NiFi cluster deployment via Cloudera Manager software. Makes deploying and managing NiFi cluster to multiple nodes easy, sets up and configures Zookeeper for you, and makes securing your NiFi easy as well by generating the needed certificates/keystores for you. Hope this helps, Matt
... View more
05-31-2023
06:20 AM
1 Kudo
@steven-matison Thanks man. It is true, the session.commit() method can be found in the abstract processor class, which is why I did not think of adding it. This helped me a lot! Also I needed to close the Inputstream with IOUtils.closeQuietly(stream_content) Thirdly I had to use the enumerate function for the dictionnary, because it couldn't read the line file = session.putAttribute(file, "list_value", d[file]) So I just filled the dict with empty values and used session.putAttribute(file, "list_value", json_data['list'][i]) It is ugly, but works at least.
... View more
05-31-2023
03:01 AM
Well I am not expert in migrating from a version to another so my answer might not be good enough for you :(. Besides that, I had no time to read the release notes for 1.21.0 and I am not quite sure if anything changed in terms of config files. Assuming that you will keep the hostname and the port for each nifi node and you are using the embedded zookeeper, you should: 1. Stop the current NiFi instance. 2. Copy the authorizations.xml, authorizers.xml, bootstrap.conf, flow.json.gz, flow.xml.gz, logback.xml, login-identity-providers.xml, nifi.properties, stateless.properties, state-management.xml, users.xml and zookeeper.properties into the conf folder from within your new nifi instance. 3. Make sure that in nifi.properties, on your new instance, you are pointing to the same content repository, database repostiory (and all the other repositories) as in the previous instance --> assuming that you followed the best practices and had all those repositories moved on separate disks. Otherwise, make sure that you process all your data on your old instance and start fresh on your new instance. 4. Start the new NiFi Instance. If you are just interested in migrating just the flows from the canvas, you can add all your flows into a template and save them on your local machine. Afterwards, you can open your new nifi instance, upload your template and add it to your canvas.
... View more
05-23-2023
11:51 PM
Thank you for your response cotopaul. In MQTT 5 there is a feature for Request-Response, in that feature there is a field setting to determine the response topic (different from the actual topic). The response topic field represents the topics on which the responses from the receivers of the message are expected. I was wondering if maybe NiFi could get the response topic field value from the MQTT as an attribute
... View more
05-22-2023
08:27 AM
@cotopaul Please go through my latest response before concluding its a hardware or setup issue. FYI, Nifi is working fine for more than a week for me since I removed the ExecuteScript Processor. That's the only change I did and I replicated the issue several times before posting here. Nifi has restarted several times since then without any issues. could you please care to explain what sort of hardware issue it could be that it affects only ExecuteScript processor running python code?
... View more
05-15-2023
10:22 PM
Thank you very much! This gives me confidence that my attempts are not going in the wrong direction.I already use ExecuteStreamCommand Processors excessively and it is a pleasure to use them here as well.
... View more
05-12-2023
11:58 AM
Thank you for replying. I have tried so many different ports and I am even trying external zookeeper. None of my nifi's can connect to any port I provide them. It's almost like something is wrong with the code. I have installed ZK on one of my actual nifi servers and that starts up immediately on port 2181. That's what is leading me to think it's something in the code. The other odd thing I keep seeing is, apache.zookeeper.ClientCnxnSocketNetty future isn't success.
... View more