Member since
04-11-2016
471
Posts
325
Kudos Received
118
Solutions
07-27-2024
05:32 AM
1 Kudo
can you give me your email?
... View more
07-16-2024
04:14 AM
3 Kudos
Cloudera recently announced the release of Kubernetes Operators for Data in Motion. These Kubernetes Operators enable customers to deploy Apache NiFi, Apache Kafka, and Apache Flink clusters on Kubernetes application platforms such as Red Hat OpenShift, the industry’s leading hybrid cloud application platform powered by Kubernetes. With these Kubernetes Operators, customers can easily deploy end-to-end data streaming capabilities on their existing Kubernetes clusters and benefit from auto-scaling, efficient resource management, and streamlined setup and operations. In this blog post, we will revisit the advantages of running Apache NiFi on Kubernetes and take a closer look at the architecture and deployment model when using the Cloudera Flow Management Kubernetes Operator. Advantages of running Apache NiFi on Kubernetes Apache NiFi is a powerful tool for building data movement pipelines using a visual flow designer. Hundreds of built-in processors make it easy to connect to any application and transform data structures or data formats. Since it supports both structured and unstructured data for streaming and batch integrations, Apache NiFi is a core component of modern data pipelines and multimodal Generative AI use cases. While it is incredibly versatile and easy to use, Apache NiFi often requires significant administrative overhead. The recent release of Cloudera Flow Management Kubernetes Operator simplifies administration, improving development speed and resource utilization to deliver greater innovation at a lower Total Cost of Ownership (TCO). Apache NiFi deployments typically start small, with a limited number of users and data flows, and they grow quickly once organizations realize how easy it is to implement new use cases. During this growth phase, organizations run into multi-tenancy issues, like resource contention and all of their data flows sharing the same failure domain. To overcome these challenges, organizations typically start creating isolated clusters to separate data flows based on business units, use cases, or SLAs. This adds significant overhead in terms of operations, maintenance, and more. Depending on how the clusters are sized initially, organizations often need to add additional compute resources to keep up with the growing number of use cases and ever-increasing data volumes. While NiFi nodes can be added to an existing cluster, it is a multi-step process that requires organizations to set up constant monitoring of resource usage, detect when there is enough demand to scale, automate the provisioning of a new node with the required software, and configure security. Downscaling is even more complex because users must make sure that the NiFi node they want to decommission has processed all of its data and does not receive any new data to avoid potential data loss. Implementing an automated scale-up and scale-down procedure for NiFi clusters is complex and time-consuming. Finally, when running Apache NiFi on bare metal or Virtual Machines (VMs), losing a node will make the in-flight data unavailable as long as the node is. NiFi ensures no data is lost, but temporarily having data unavailable can be an issue for many critical use cases. Users can use the NiFi Stateless engine and proper flow design to solve this problem, but implementing this solution can be another challenge for the development team. Ultimately, these challenges force NiFi teams to spend a lot of time on managing the cluster infrastructure instead of building new data flows, which slows down adoption. Running NiFi on Kubernetes solves all of these challenges: It is easy to scale a NiFi cluster up and down, either manually by updating the deployment configuration file, or automatically based on resources consumption using an Horizontal Pod Autoscaler (HPA). It is easy to start new NiFi clusters in minutes and manage multiple NiFi deployments in one place. It is also possible to easily have many different NiFi versions to test new features, and users can separate use cases into dedicated NiFi clusters to ensure resource isolation. It ensures data durability, even if a NiFi node goes down. Kubernetes takes care of provisioning a new node and moving the volumes from the dead node to the new node, ensuring uninterrupted processing of the in-flight data. In addition to those advantages, the Cloudera Flow Management Kubernetes Operator brings several additional features to the table: It removes the Zookeeper dependency by doing leader election and state management in Kubernetes. It supports rolling upgrades with NiFi 2. It enables customers to bring their own Kubernetes cluster. Users don’t have to dedicate the Kubernetes cluster to Cloudera workloads, and there is no dependency on any other Cloudera product. Diving into running Cloudera Flow Management on RedHat OpenShift The Cloudera Flow Management Operator is responsible for managing NiFi and NiFi Registry deployments. It is deployed within a designated operator namespace, while the actual NiFi and NiFi Registry instances are managed within one or more separate namespaces. The following diagram shows a typical NiFi deployment with Cloudera Flow Management Operator: Cloudera Flow Management Operator architecture overview Installing the Cloudera Flow Management Kubernetes Operator To make things extremely easy, Cloudera provides a CLI to help with the installation of the operator in the Kubernetes cluster. After connecting to the cluster: $ oc login https://api….openshiftapps.com:6443/ -u myUser Users must install cert-manager, which will be used to provision and sign the certificates for the NiFi and NiFi Registry resources. $ helm install cert-manager jetstack/cert-manager \
--namespace cert-manager \
--create-namespace \
--set installCRDs=true You will want to configure cert-manager with a CA certificate of your choice to sign the provisioned certificates. You can also use a self-signed CA certificate for Proof-of-Concept and testing. Once you are done, and once you have downloaded the cfmctl CLI provided by Cloudera, you can easily install the operator: $ kubectl create namespace cfm-operator-system
namespace/cfm-operator-system created
$ kubectl create secret docker-registry docker-pull-secret \
--namespace cfm-operator-system \
--docker-server container.repository.cloudera.com \
--docker-username ****** \
--docker-password ******
secret/docker-pull-secret created
$ ./cfmctl install –-license <license file> The section above shows how to create a new namespace ‘cfm-operator-system’ and a secret, which is configured with your Cloudera credentials and will be used to access the required container images. Once the namespace and secret are in place, all you have to do is run cfmctl and specify the license file, and it will install the operator on your cluster. Deploying a NiFi cluster on Kubernetes As described in the documentation, you can now write a YAML file that will describe your desired NiFi and NiFi Registry deployments. While examples of such files can be found in the documentation, let’s look at the specific parts that constitute a deployment file. apiVersion: cfm.cloudera.com/v1alpha1
kind: Nifi
metadata:
name: mynifi
namespace: my-nifi-cluster The section above defines the name that will be used for the NiFi cluster and the NiFi nodes, and it also specifies the namespace in which resources will be deployed. A single NiFi cluster should be deployed in a given namespace. replicas: 3 The number of replicas defines the number of NiFi nodes in the NiFi cluster. Editing this value and submitting the updated file to the operator will effectively scale up and down the NiFi cluster. This is the manual approach for scaling the cluster. image:
repository: container.repository.cloudera.com/cloudera/cfm-nifi-k8s
tag: 2.8.0-b10
pullPolicy: IfNotPresent
pullSecret: docker-pull-secret
tiniImage:
repository: container.repository.cloudera.com/cloudera/cfm-tini
tag: 2.8.0-b10
pullPolicy: IfNotPresent
pullSecret: docker-pull-secret This section describes the images used for running NiFi. This provides a way to manually upgrade the NiFi version in an existing cluster or very quickly roll out NiFi clusters with new versions. persistence:
size: 10Gi
storageClass: nifi-storage-class
contentRepo:
size: 10Gi
storageClass: nifi-storage-class
flowfileRepo:
size: 10Gi
storageClass: nifi-storage-class
provenanceRepo:
size: 10Gi
storageClass: nifi-storage-class This section specifies the storage that will be used for the NiFi Repositories. It can be defined globally or overridden for specific repositories. The storage classes must be specified at the OpenShift level to match the IOPS expectations for your NiFi workloads. NiFi requires persistent volumes and the storage class to support read and write operations. security:
ldap:
authenticationStrategy: SIMPLE
managerDN: "cn=admin,dc=example,dc=org"
secretName: openldap-creds
referralStrategy: FOLLOW
connectTimeout: 3 secs
readTimeout: 10 secs
url: ldap://my.ldap.server.com:389
userSearchBase: "dc=example,dc=org"
userSearchFilter: "(uid={0})"
identityStrategy: USE_USERNAME
authenticationExpiration: 12 hours
sync:
interval: 1 min
userObjectClass: inetOrgPerson
userSearchScope: SUBTREE
userIdentityAttribute: cn
userGroupNameAttribute: ou
userGroupNameReferencedGroupAttribute: ou
groupSearchBase: "dc=example,dc=org"
groupObjectClass: organizationalUnit
groupSearchScope: OBJECT
groupNameAttribute: ou
initialAdminIdentity: nifiadmin
nodeCertGen:
issuerRef:
name: self-signed-ca-issuer
kind: ClusterIssuer The security section contains the information that will be injected into the identity provider configuration file of NiFi, as well as the authorizers configuration file. In this example, we retrieve the users and groups from an external LDAP server. The user nifiadmin is the initial admin of NiFi. This user can log into the NiFi UI and set the appropriate policies based on users and groups for other users to access the NiFi UI and use the NiFi deployment. Don't forget to create the secret for storing the password of the user accessing LDAP: kubectl -n my-nifi-cluster create secret generic openldap-creds --from-literal=managerPassword=Not@SecurePassw0rd Finally, the security section also references the certificate issuer that should be used from cert-manager to issue certificates for all the provisioned resources. hostName: mynifi.openshiftapps.com
uiConnection:
type: Route
serviceConfig:
sessionAffinity: ClientIP
routeConfig:
tls:
termination: passthrough The section above defines how to provide access to the NiFi UI to users, who use a Route with the CFM Kubernetes Operator on OpenShift. configOverride:
nifiProperties:
upsert:
nifi.cluster.load.balance.connections.per.node: "1"
nifi.cluster.load.balance.max.thread.count: "4"
nifi.cluster.node.connection.timeout: "60 secs"
nifi.cluster.node.read.timeout: "60 secs"
nifi.cluster.leader.election.implementation: "KubernetesLeaderElectionManager"
bootstrapConf:
upsert:
java.arg.2: -Xms2g
java.arg.3: -Xmx2g
java.arg.13: -XX:+UseConcMarkSweepGC
stateManagement:
clusterProvider:
id: kubernetes-provider
class: org.apache.nifi.kubernetes.state.provider.KubernetesConfigMapStateProvider The section above provides a way to customize the NiFi deployment by overriding properties that are defined in the NiFi properties configuration file, as well as the bootstrap configuration file to configure the amount of memory allocated for the heap of NiFi. This example also includes the configuration that specifies that Kubernetes should handle leader election and state management. Finally, we can specify the resources (CPU and memory) that should be allocated to the NiFi containers: resources:
nifi:
requests:
cpu: "2"
memory: 4Gi
limits:
cpu: "2"
memory: 4Gi
log:
requests:
cpu: 50m
memory: 128Mi Cloudera recommends using a ratio of 1:1:2 to configure the number of cores, the amount of memory allocated for the heap of NiFi, and the amount of memory for the NiFi containers. In the example above, there are 2 cores, 2GB of memory for the heap of NiFi and 4GB of memory for the NiFi container. For automatic auto-scaling based on resources consumption, configure an Horizontal Pod Autoscaler: apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: nifi-hpa
spec:
maxReplicas: 3
minReplicas: 1
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 75
scaleTargetRef:
apiVersion: cfm.cloudera.com/v1alpha1
kind: Nifi
name: mynifi Submit the deployment file to Kubernetes to deploy and/or update the NiFi cluster: $ kubectl create namespace my-nifi-cluster
namespace/my-nifi-cluster created
$ kubectl apply -f nifi.yaml
nifi.cfm.cloudera.com/mynifi created
$ kubectl apply -f nifi-hpa.yaml The NiFi cluster is deployed and the initial administrator can access the NiFi UI. Deploying a NiFi Registry instance on Kubernetes Deploying a NiFi Registry instance is very similar to the process for deploying a NiFi cluster. The file below describes a NiFi Registry deployment: apiVersion: cfm.cloudera.com/v1alpha1
kind: NifiRegistry
metadata:
name: mynifiregistry
namespace: nifi-registry
spec:
image:
repository: container.repository.cloudera.com/cloudera/cfm-nifiregistry-k8s
tag: 2.8.0-b10
tiniImage:
repository: container.repository.cloudera.com/cloudera/cfm-tini
tag: 2.8.0-b10
hostName: mynifiregistry.openshiftapps.com
uiConnection:
type: Route
routeConfig:
tls:
termination: passthrough
security:
initialAdminIdentity: nifiadmin
nodeCertGen:
issuerRef:
name: self-signed-ca-issuer
kind: ClusterIssuer
ldap:
authenticationStrategy: SIMPLE
managerDN: "cn=admin,dc=example,dc=org"
secretName: secret-openldap
referralStrategy: FOLLOW
connectTimeout: 3 secs
readTimeout: 10 secs
url: ldap://my.ldap.server.com:389
userSearchBase: "dc=example,dc=org"
userSearchFilter: "(uid={0})"
identityStrategy: USE_USERNAME
authenticationExpiration: 12 hours
sync:
interval: 1 min
userObjectClass: inetOrgPerson
userSearchScope: SUBTREE
userIdentityAttribute: cn
userGroupNameAttribute: ou
userGroupNameReferencedGroupAttribute: ou
groupSearchBase: "dc=example,dc=org"
groupObjectClass: organizationalUnit
groupSearchScope: OBJECT
groupNameAttribute: ou Similarly, the nifiadmin user is the initial admin user who can access the NiFi Registry UI and set the proper policies for users and groups. Additionally, the admin should create proxy policies for the nodes of the NiFi clusters that interact with the NiFi Registry instance by adding a user with the name CN=<your CR name>, O=Cluster Node for every cluster via the NiFi Registry UI, and proxy permissions for each user. $ kubectl create namespace nifi-registry
namespace/nifi-registry created
$ kubectl -n nifi-registry create secret generic secret-openldap --from-literal=managerPassword=Not@SecurePassw0rd
$ kubectl apply -f nifiregistry.yaml
nifiregistry.cfm.cloudera.com/mynifiregistry created The screenshot below shows the user and permissions required for NiFi to successfully interact with the NiFi Registry instance: Create this user and the associated policies in the NiFi Registry to enable NiFi requests Putting everything together The following screenshots show access to the NiFi cluster where a process group can be versioned-controlled in the NiFi Registry instance: The NiFi canvas with a version-controlled process group running on Kubernetes NiFi Registry running on Kubernetes with one bucket where flows are stored and versioned When scaling down the NiFi cluster, the operator executes all of the required steps in the right order to ensure no data is lost, and the data is properly offloaded from the removed nodes onto the remaining nodes. Conclusion The Cloudera Flow Management Kubernetes Operator makes it easy to deploy NiFi clusters and NiFi Registry instances on an OpenShift cluster. Running your data flows at scale on Kubernetes is now only a few commands away. For a demonstration, watch our release webinar and visit the product documentation to learn more.
... View more
Labels:
05-28-2024
05:51 AM
Apparently in the github repo, there's a folder for processor example of base level, though it might not cover everything yet, might be a good place to solve basic issues. I had issues building a custom relationship as well, then I saw this example, which helped. Official Python Processor Examples
... View more
01-12-2023
09:49 AM
Apache Flink Upgrade Deployments can now upgrade from Flink 1.14 to 1.15.1. This update includes 40 bug fixes and a number of other enhancements. To learn more about what has been fixed check out the release notes. SQL Stream Builder UI The Streaming SQL Console (UI) of SQL Stream Builder has been completely reworked with new design elements. The new design provides improved user access to artifacts that are commonly used or already created as part of a project simplifying navigation and saving the user time. Software Development Lifecycle support with Projects Projects for SQL Stream Builder that improves upon the Software Developer Life Cycle needs of developers and analysts writing applications, allowing them to group together related artifacts and sync them to GitHub for versioning and CI/CD management. Before today, when users would create SQL jobs, functions, and other artifacts there was no effective way to migrate them to another environment for use (ie: from dev to prod.) The typical way that artifacts were migrated was through copy and pasting the code between environments; this lived outside of code repositories, the typical CI/CD process many companies utilize, took additional hands on keyboard time, allowed potential errors to be introduced during the copying process and when updating specific environmental configurations. These issues are solved with SQL Stream Builder Projects. Users just simply create a new project giving it a name and link a GitHub repository to it as part of creation, from this point onward any artifacts created in the project can be pushed to the GitHub Repository with a click of a button. For environment specific needs a parameterized key value configuration can be used to prevent having to edit configurations that change between deployments by referencing generic properties that are set differently between environments. Job Notifications Job notifications can help make sure that you can detect failed jobs without checking on the UI, which can save a lot of time for the user. This feature is very useful, especially when the user has numerous jobs running and keeping track of their state would be hard without notifications. Notifications can be made to send to a single user or a group of users both over email or by using a webhook. Summary In this post, we looked at some of the new features that came out in Cloudera on cloud 7.2.16. This includes Flink 1.15.1 which comes with many bug fixes, a brand new UI for SQL Stream Builder, the ability to monitor jobs for failures and send notifications and new Software Development Lifecycle capabilities with Projects. For more details read the latest release notes. Give Cloudera Streaming Analytics 7.2.16 for Datahub a try today and check out all the greatest new features added!
... View more
12-14-2022
08:22 AM
1 Kudo
You can find the release notes and the download links in the documentation. Key features for this release Rebase against NiFi 1.18 bringing the latest and greatest of Apache NiFi. It contains a ton of improvements and new features. Reset of the end of life policy: CFM 2.1.5 will be supported until August 2025 to match the CDP 7.1.7 LTS policy. It is particularly important as HDF and CFM 1.x are near to end of life. Parameter Providers: we are introducing the concept of Parameter Providers allowing users to fetch the values of parameters from external locations. In addition to a better separation of duties, it is also very useful to make CI/CD better and easier. With this release, we're supporting the following Parameter Providers: AWS Secret Manager GCP Secret Manager HashiCorp Vault Database Environment Variables External file Registry Client to connect to a DataFlow Catalog. The registry endpoint is now an extension in NiFi. It means that it is no longer limited to accessing a NiFi Registry instance. With this release we're adding an implementation allowing users to connect NiFi to their DataFlow Catalog and use it just like they would with NiFi Registry. For hybrid customers, it means that they can easily checkout and version flow definitions in the same place for both on-prem and cloud usage. It also means that on-prem customers can access the ReadyFlows gallery, assuming they have a public cloud tenant. Iceberg processor (Tech Preview). We're making available a PutIceberg processor in Technical Preview allowing users to push data into Iceberg using NiFi. This can be used in both batch and streaming (micro-batch) fashion. Snowflake ingest with Snowpipe (Tech Preview). Until now, only JDBC could be used to push data into Snowflake with NiFi. We're now making available a set of processors leveraging Snowpipe to push data into Snowflake in a more efficient way. New components: we are adding a bunch of new components... ConsumeTwitter Processors to interact with Box, Dropbox, Google Drive, SMB Processors to interact with HubSpot, Shopify, Zendesk, Workday, Airtable PutBigQuery (leveraging the new API) ListenBeats is now Cloudera supported UpdateDatabaseTable to manage updates on table's schema (add columns for example) AzureEventHubRecordSink & UDPEventRecordSink CiscoEmblemSyslogMessageReader to make it easy to ingest logs from Cisco systems such as ASA VPNs ConfluentSchemaRegistry is now Cloudera supported Iceberg and Snowflake components as mentioned before Replay last event: with this release we add the possibility to replay the last event at processor level (right-click on the processor, replay last event). This is making it super easy to replay the last flow file (instead of going to the provenance events, take the last event and click replay). This is something very useful when developing flows! And, as usual, bug fixes, security patches, performance improvements, etc.
... View more
Labels:
09-16-2021
02:40 AM
1 Kudo
With the release of CDP 7.2.11, it is now super easy to deploy your custom components on your Flow Management DataHub clusters by dropping your components in a bucket of your cloud provider. Until now, when building custom components in NiFi, you had to SSH to all of your NiFi nodes to deploy your components and make them available to use in your flow definitions. This was adding an operational overhead and was also causing issues when scaling up clusters. From now on, it's easy to configure your NiFi clusters to automatically fetch custom components from an external location in the object store of the cloud provider where NiFi is running. All of your nodes will fetch the components after you dropped them in the configured location. You can find more information in the documentation.
... View more
09-16-2021
02:31 AM
1 Kudo
With the release of CDP 7.2.11, you now have the possibility to scale up and down both your light duty and heavy duty Flow Management clusters on all cloud providers. You can find more information in the documentation.
... View more
09-07-2016
11:00 PM
4 Kudos
The objective of this post is to briefly explain how to setup an
unsecured NiFi cluster with NiFi 1.0.0 (a three nodes cluster with three embedded ZooKeeper instances). One really important change with this new version is the new paradigm
around cluster installation. From the NiFi documentation, we can read: Starting with the NiFi 1.0 release, NiFi
employs a Zero-Master Clustering paradigm. Each of the nodes in a NiFi
cluster performs the same tasks on the data but each operates on a
different set of data. Apache ZooKeeper elects one of the nodes as the
Cluster Coordinator, and failover is handled automatically by ZooKeeper.
All cluster nodes report heartbeat and status information to the
Cluster Coordinator. The Cluster Coordinator is responsible for
disconnecting and connecting nodes. As a DataFlow manager, you can
interact with the NiFi cluster through the UI of any node in the
cluster. Any change you make is replicated to all nodes in the cluster,
allowing for multiple entry points to the cluster.
OK, let’s start with the installation. As you may know it is greatly
recommended to use an odd number of ZooKeeper instances with at least 3
nodes (to maintain a majority also called quorum). NiFi comes with an
embedded instance of ZooKeeper, but you are free to use an existing
cluster of ZooKeeper instances if you want. In this article, we will use
the embedded ZooKeeper option. I have 3 VMs instances (minimal CentOS 7) that are able
to communicate to each other on requested ports. On each machine, I
configure my /etc/hosts file with: 192.168.56.101 node-1
192.168.56.102 node-2
192.168.56.103 node-3 I deploy the binaries file on my three instances and unzip it. I now have a NiFi directory on each one of my nodes. The first thing is to configure the list of the ZK (ZooKeeper) instances in the configuration file ‘./conf/zookeep.properties‘.
Since our three NiFi instances will run the embedded ZK instance, I
just have to complete the file with the following properties: server.1=node-1:2888:3888
server.2=node-2:2888:3888
server.3=node-3:2888:3888 Then, everything happens in the ‘./conf/nifi.properties‘. First, I specify that NiFi must run an embedded ZK instance, with the following property: nifi.state.management.embedded.zookeeper.start=true I also specify the ZK connect string: nifi.zookeeper.connect.string=node-1:2181,node-2:2181,node-3:2181 As you can notice, the ./conf/zookeeper.properties file has a property named dataDir. By default, this value is set to ./state/zookeeper. If more than one NiFi node is running an embedded ZK, it is important to tell the server which one it is. To do that, you need to create a file name myid and placing it in ZK’s data directory. The content of this file should be the index of the server as previously specify by the server.<number> property. On node-1, I’ll do: mkdir ./state
mkdir ./state/zookeeper
echo 1 > ./state/zookeeper/myid The same operation needs to be done on each node (don’t forget to change the ID). If you don’t do this, you may see the following kind of exceptions in the logs: Caused by: java.lang.IllegalArgumentException: ./state/zookeeper/myid file is missing Then we go to clustering properties. For this article, we are setting up an unsecured cluster, so we must keep: nifi.cluster.protocol.is.secure=false Then, we have the following properties: nifi.cluster.is.node=true
nifi.cluster.node.address=node-1
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file= I set the FQDN of the node I am configuring, and I choose the
arbitrary 9999 port for the communication with the elected cluster
coordinator. I apply the same configuration on my other nodes: nifi.cluster.is.node=true
nifi.cluster.node.address=node-2
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file= and nifi.cluster.is.node=true
nifi.cluster.node.address=node-3
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file= We have configured the exchanges between the nodes and the cluster
coordinator, now let’s move to the exchanges between the nodes (to
balance the data of the flows). We have the following properties: nifi.remote.input.host=node-1
nifi.remote.input.secure=false
nifi.remote.input.socket.port=9998
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec Again, I set the FQDN of the node I am configuring and I choose the
arbitrary 9998 port for the Site-to-Site (S2S) exchanges between the
nodes of my cluster. The same applies for all the nodes (just change the
host property with the correct FQDN). It is also important to set the FQDN for the web server property,
otherwise we may get strange behaviors with all nodes identified as
‘localhost’ in the UI. Consequently, for each node, set the following
property with the correct FQDN: nifi.web.http.host=node-1 And that’s all! Easy, isn’t it? OK, let’s start our nodes and let’s tail the logs to see what’s going on there! ./bin/nifi.sh start && tail -f ./logs/nifi-app.log If you look at the logs, you should see that one of the node gets
elected as the cluster coordinator and then you should see heartbeats
created by the three nodes and sent to the cluster coordinator every 5
seconds. You can connect to the UI using the node you want (you can have
multiple users connected to different nodes, modifications will be
applied on each node). Let’s go to: http://node-2:8080/nifi Here is what it looks like: As you can see in the top-left corner, there are 3 nodes in our cluster.
Besides, if we go in the menu (button in the top-right corner) and
select the cluster page, we have details on our three nodes: We see that my node-2 has been elected as cluster coordinator, and
that my node-3 is my primary node. This distinction is important because
some processors must run on a unique node (for data consistency) and in
this case we will want it to run “On primary node” (example below). We can display details on a specific node (“information” icon on the left): OK, let’s add a processor like GetTwitter. Since the flow will run on
all nodes (with balanced data between the nodes), this processor must
run on a unique processor if we don’t want to duplicate data. Then, in
the scheduling strategy, we will choose the strategy “On primary node”.
This way, we don’t duplicate data, and if the primary node changes
(because my node dies or gets disconnected), we won’t loose data, the
workflow will still be executed. Then I can connect my processor to a PutFile processor to save the tweets in JSON by setting a local directory (/tmp/twitter😞 If I run this flow, all my JSON tweets will be stored on the primary node, the data won’t be balanced.
To balance the data, I need to use a RPG (Remote Process Group), the
RPG will exchange with the coordinator to evaluate the load of each node
and balance the data over the nodes. It gives us the following flow: I have added an input port called “RPG”, then I have added a Remote Process Group that I connected to http://node-2:8080/nifi and I enabled transmission so that the Remote Process Group was aware
of the existing input ports on my cluster. Then in the Remote Process
Group configuration, I enabled the RPG input port. I then connected my
GetTwitter to the Remote Process Group and selected the RPG input port.
Finally, I connected my RPG input port to my PutFile processor. When running the flow, I now have balanced data all over my nodes (I can check in the local directory ‘/tmp/twitter‘ on each node). Note: this article is adapted from this one.
... View more
Labels:
02-27-2017
02:59 PM
Hi! Nice example of Nifi integration I encountered a problem doing the same thing with HDP sandox 2.5 I get this error message on zeppelin <console>:11: error: identifier expected but ',' found.
time struct<millis:bigint>, Do you import more librairies before ?
... View more