Member since
01-27-2023
229
Posts
73
Kudos Received
45
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
710 | 02-23-2024 01:14 AM | |
911 | 01-26-2024 01:31 AM | |
620 | 11-22-2023 12:28 AM | |
1428 | 11-22-2023 12:10 AM | |
1618 | 11-06-2023 12:44 AM |
06-20-2023
12:16 AM
@cotopaul, tagging myself because I am struggling with a similar issue and was not quite able to figure it out myself ... and maybe I will get some hints from some of the answers.
... View more
06-19-2023
11:44 PM
@eykf, I never tried to send messages from NiFi to MS Teams but upon reading the documentation, I am not quite sure you need a Webhook URL at all. MS Teams provides the Graph API to be able to integrate MS Teams into external application: https://learn.microsoft.com/en-us/graph/api/resources/teams-api-overview?view=graph-rest-1.0 Now, assuming that you have the channel and the users already created, my first guess is that you would need only to send the message so basically all you have to do is create the API POST as described in the following link: https://learn.microsoft.com/en-us/graph/api/chatmessage-post?view=graph-rest-1.0&tabs=http And here are the parameters in case you need a more complex message, than the one described in the above link: https://learn.microsoft.com/en-us/graph/api/resources/chatmessage?view=graph-rest-1.0
... View more
06-19-2023
11:35 PM
@Fanxxx, First of all, make sure that nothing is running on the port 2182, as this is the port used by the embedded Zookeeper by default. It could be that you changed it, but my initial feeling is that you left it as it is. Secondly, make sure that you wrote the correct zookeeper connection string within state-management.xml and within each nifi.properties.
... View more
06-16-2023
07:23 AM
@bhadraka What version of NiFi are you using? In NiFi 1.20.0, you can use ReplaceText Processor after reading in the file. Using the line-by-line evaluation mode, there is a drop down "Except-Last-Line". You could then configure it to just replace all previous lines with empty strings. Here's a screenshot of my ReplaceText processor properties.
... View more
06-14-2023
12:33 PM
I would do this in a single step with a InvokeScriptedProcessor and the following Groovy code import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
List data = null
session.read(flowFile, {
inputStream -> data = jsonSlurper.parseText(IOUtils.toString(inputStream, StandardCharsets.UTF_8))
} as InputStreamCallback)
data.each { entry ->
entry.VisitList.each { visit ->
Map newData = [:]
newData.put("employer", entry.employer)
newData.put("loc_id", entry.loc_id)
newData.put("topId", entry.topId)
newData.put("VisitList", [visit])
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(jsonOutput.toJson([newData]).getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback)
newFlowFile = session.putAllAttributes(newFlowFile, customAttributes)
session.transfer(newFlowFile, REL_SUCCESS)
}
}
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
06-09-2023
04:11 PM
1 Kudo
Hey @steven-matison and @Edenwheeler thank you so much for your help It worked with StandardProxyConfigurationService controller services however I still have issues with StandardRestrictedSSLContextService controller service. Anyway, thank you so much for the help and details steps that helped me a lot. Thank you!!
... View more
06-07-2023
12:24 PM
2 Kudos
@SandyClouds Some clarity and additions to @cotopaul Pros and Cons: Single Node: PROs: - easy to manage. <-- Setup and managing configuration is easier since you only need to do that on one node. But in a cluster, all nodes configuration files will be almost the same (some variations in hostname properties and certificates if you secure your cluster). - easy to configure. <-- There are more configurations needed in a cluster setup, but once setup, nothing changes from the user experience when it comes to interacting with the UI. - no https required. <-- Not sure how this is a PRO. I would not recommend using an un-secure NiFi as doing so allow anyone access to your dataflows and the data being processed. You can also have an un-secure NiFi cluster while i do not recommend that either. CONs: - in case of issues with the node, you NiFi instance is down. <-- Very true, single point of failure. - it uses plenty of resources, when it needs to process data, as everything is done on a single node. Cluster: PROs: - redundancy and failover --> when a node goes down, the others will take over and process everything, meaning that you will not get affected. <-- Not complete accurate. Each node in a NiFi cluster is only aware of the data (FlowFiles) queued on that specific node. So each node works on the FlowFile present on that one node, so it is the responsibility of the dataflow designer/builder to make sure they built their dataflows in such away to ensure distribution of FlowFiles across all nodes. When a node goes down, any data FlowFiles currently queued on that down node are not going to be processed by the other nodes. However, other nodes will continue processing their data and all new data coming in to your dataflow cluster - the used resources will be split among all the nodes, meaning that you can cover more use cases as on a single node. <-- Different nodes do not share or pool resources from all nodes in the cluster. If your dataflow(s) are built correctly the volume of data (FlowFiles) being processed will be distributed across all your nodes along each node to process a smaller subset of the overall FlowFile volume. This means more resources available across yoru cluster to handle more volume. NEW -- A NiFi cluster can be accessed via any one of the member nodes. No matter which node's UI you access, you will be presented with stats for all nodes. There is a cluster UI accessible from the global menu that allows you to see a breakdown of each node. Any changes you make from the UI of any one of the member nodes will be replicated to all nodes. NEW -- Since all nodes run their own copy of the flow, a catastrophic node failure does not mean loss of all your work since the same flow.json.gz (contains everything related to your dataflows) can be retrieved from any of the other nodes in your cluster. CONs: - complex setup as it requires a Zookeeper + plenty of other config files. <-- NiFi cluster requires a multi node zookeeper setup. Zookeeper quorum is required for cluster stability and also stores cluster wide state needed for your dataflow. Zookeeper is responsible for electing a node in your cluster with the Cluster Coordinator role and Primary node role. IF a node goes down that has been assigned one of these roles, Zookeeper will elected one of the still up nodes to the role - complex to manage --> analysis will be done on X nodes instead of a single node. <-- not clear. Yes you have multiple nodes and all those nodes are producing their own set of NiFi-logs. However, if a component within your dataflow is producing bulletins (exceptions) it will report all nodes or the specific node(s) on which bulletin was produced. Cloudera offers centralized management of your NiFi cluster deployment via Cloudera Manager software. Makes deploying and managing NiFi cluster to multiple nodes easy, sets up and configures Zookeeper for you, and makes securing your NiFi easy as well by generating the needed certificates/keystores for you. Hope this helps, Matt
... View more
05-31-2023
06:20 AM
1 Kudo
@steven-matison Thanks man. It is true, the session.commit() method can be found in the abstract processor class, which is why I did not think of adding it. This helped me a lot! Also I needed to close the Inputstream with IOUtils.closeQuietly(stream_content) Thirdly I had to use the enumerate function for the dictionnary, because it couldn't read the line file = session.putAttribute(file, "list_value", d[file]) So I just filled the dict with empty values and used session.putAttribute(file, "list_value", json_data['list'][i]) It is ugly, but works at least.
... View more
05-31-2023
03:01 AM
Well I am not expert in migrating from a version to another so my answer might not be good enough for you :(. Besides that, I had no time to read the release notes for 1.21.0 and I am not quite sure if anything changed in terms of config files. Assuming that you will keep the hostname and the port for each nifi node and you are using the embedded zookeeper, you should: 1. Stop the current NiFi instance. 2. Copy the authorizations.xml, authorizers.xml, bootstrap.conf, flow.json.gz, flow.xml.gz, logback.xml, login-identity-providers.xml, nifi.properties, stateless.properties, state-management.xml, users.xml and zookeeper.properties into the conf folder from within your new nifi instance. 3. Make sure that in nifi.properties, on your new instance, you are pointing to the same content repository, database repostiory (and all the other repositories) as in the previous instance --> assuming that you followed the best practices and had all those repositories moved on separate disks. Otherwise, make sure that you process all your data on your old instance and start fresh on your new instance. 4. Start the new NiFi Instance. If you are just interested in migrating just the flows from the canvas, you can add all your flows into a template and save them on your local machine. Afterwards, you can open your new nifi instance, upload your template and add it to your canvas.
... View more
05-25-2023
12:59 AM
everybody is entitled to an opinion but may I ask why are you saying this? 🙂 As you are on a NiFi post, I assume that you are referring to the Cloudera NiFi documentation? I find it very helpful, especially combined with the NiFi's original documentation. It even has some additional thins compared to the original documentation. No matter the feedback, positive or negative, it is good when you know what to do with it. In your case, if you would provide a better and more structured feedback, maybe somebody from Cloudera would understand your point of view and he/she could modify the documentation 🙂
... View more