Member since
07-30-2019
3123
Posts
1563
Kudos Received
907
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
220 | 12-13-2024 10:58 AM | |
329 | 12-05-2024 06:38 AM | |
270 | 11-22-2024 05:50 AM | |
235 | 11-19-2024 10:30 AM | |
204 | 11-14-2024 01:03 PM |
02-11-2016
09:36 PM
19 Kudos
The purpose of this article is to explain what Process Groups
and Remote Process Groups (RPGs) are and how input and output ports are used to
move FlowFiles between them. Process groups are a valuable addition to any
complex dataflow. They give DataFlow Managers (DFMs) the ability to group a set
of processors on to their own imbedded canvas. Remote Process groups allow a
DFM to treat another NiFi instance or cluster as just another process group in
the larger dataflow picture. Simply being able to build flows on different
canvases is nice, but what if I need to move NiFi FlowFiles between these
canvases? This is where input and output ports come in to play. They allow
you move FlowFiles between these canvases that are either local to a single
NiFi or between the canvases of complete different NiFi instances/clusters.
Embedded Process Groups:
Lets start by talking about the simplest use of multiple embedded canvases
through process groups. When you started NiFi for the very first time you are
given a blank canvas. This blank canvas is noting more then a process group in
itself. The process group is referred to
as the root process group.
From there you are able to add additional process groups to that top-level
canvas. These added process groups allow you drill down in to them giving
additional blank canvases you could build dataflows on. When you enter a
process group you will see the hierarchy represented just above the canvas in
the UI ( NiFi Flow >>
Process Group 1 ). NiFi does not restrict the number of process
groups you can create or the depth you can go with them. You could compare the
process group hierarchy to that of a Windows directory structure. So if you
added another process group inside one that you already created, you would
essentially now have gone two layers deep. (
NiFi Flow >> Process Group 1
>> Process Group 2 ).
The hierarchy represented above you canvas allows you to quickly jump up one or
more layers all the way to the root level by simply clicking on the name of the
process group. While you can add any number of process groups at the same
embedded level, the hierarchy is only shown from root down to the current
process group you are in.
Now that we understand how to add embedded process groups, lets talk about how
we move data in and out of these process groups. This is where input and output
ports come in to play. Input and output ports exist to move FlowFIles between a
process group andONE LEVEL UPfrom that process group. Input
ports will accept FlowFiles coming from one level up and output ports allow
FlowFiles to be sent one level up. If I have a process group added to my
canvas, I cannot drag a connection to it until at least one input port exists
inside that process group. I also cannot drag a connection off of that process
group until at least on output port exists inside the process group. You can only
move FlowFiles up or down one level at a time. Given the example of a process
group within another process group, FlowFiles would need to be moved from the
deepest level up to the middle layer before finally being able to be moved to
the root canvas. In the above example I have a small flow pushing FlowFiles into an embedded
process group (Process Group 1) and also pulling data from the same embedded
process group. As you can see, I have
created an input and output port inside Process Group 1. This allowed me to
draw a connection to and from the process group on the root canvas layer. You
can have as many different input and output ports inside any process group as
you like. When you draw the connection
to a process group, you will be able to select which input port to send the
FlowFiles to. When you draw a connection from a process group to another
processor, you will be able to pick which output port to pull FlowFiles from. Every input and output port within a single process group must
have a unique name. NiFi validates the port name to prevent this from
happening. Remote Process Groups: We refer to the ability to send FlowFiles between different NiFi
instances as Site-to-Site. Site-to-Site is configured very much in the same way
we just configured moving files between embedded process groups on a single
NiFi instance. Instead of moving FlowFiles between different process groups
(layers) within the same NiFi, we are moving FlowFiles between different NiFi
instances or clusters. If a DFM reaches a point in their dataflow where they
want to send data to another NiFi instance or cluster, they would add a Remote
Process Group (RPG). These Remote Process Groups are not configured with unique
system port numbers, but instead all utilize the same Site-to-Site port number
configured in your nifi.properties files. I will not be covering the specific
NiFi configuration needed to enable site-to-site in this article. For information on
how to enable and configure Site-to-Site on a NiFi instance, see the Site-to-Site
Properties section of the Admin Guide. Lets take a quick look at how these two components differ: As I explained earlier, input and output ports are used to move FlowFiles
one level up from the process group they are created in. At the top level of your
canvas (root process group level) adding input or output ports provides the
ability for that NiFi to receive (input port) FlowFiles from another NiFi
instance or have another NiFi pull files from (output port) that NiFi. We refer
to input and output ports added the top level as remote input or output ports. While
the same input and output icon in the UI is used to add both remote and
embedded input and output ports, you will notice that they are rendered
differently when added to the canvas. If your NiFi has been configured to be secure (HTTPS) using
server certificates, the remote input/output port’s configuration windows will
have an “Access Control” tab where you must authorize which remote NiFI systems
are allowed to see and access these ports. If not running secure, all remote
ports are exposed and accessible by any other NiFi instance.
In single instance you can send data to an input port inside a process group by
dragging a connection to the process group and selecting the name of the input
port from a selection menu provided. Provided that the remote NiFi instance has
input ports exposed to your NiFi instance, you can drag a connection to the RPG
much in the same way you previously dragged a connection to the embedded
process groups within a single instance of NiFi. You can also hover over the
RPG and drag a connection off of the RPG, which will allow you to pull data
from an available output port on the target NiFi. The Source NiFi (standalone or cluster) can have as many RPGs as
a DFM would like. You can have multiple RPGs in different areas of your
dataflows that all connect to the same remote instance. While the target NiFi
contains the input and output ports (Only Input and output ports added to root
level process group can be used for Site-to-Site Flowfile transfers). When sending data between two standalone NiFi instance the setup
of your RPG is fairly straight forward. When adding the RPG, simply provide the
URL for the target instance. The source RPG will communicate with the URL to
get the Site-to-Site port to use for FlowFile transfer. When sending FlowFiles via Site-to-Site to a
NiFi that is a NiFi cluster we want the data going to every node in the
cluster. The Site-to-Site protocol handles this for you with some additional load-balancing
benefits built in. The RPG is added and configured to point at the URL of the
NCM. (1)The NCM will respond with the Site-to-Site port for the NCM. (2) The
source will connect to the Site-to-Site port of the NCM which will respond to
the source NiFi with the URLs, Site-to-Site port numbers, and current loads on
every connected node. (3) The source NiFi will then load-balance FlowFile
delivery to each of those nodes giving fewer FlowFiles to nodes that are under
heavier load. The following diagram
illustrates these three steps: A DFM may choose to use Site-to-Site
to redistribute data arriving on a single node in a cluster to every node in
that same cluster by adding a RPG that points back at the NCM for that cluster.
In this case the source NiFi instance is also one of the target NiFi instances.
... View more
Labels:
02-09-2016
01:05 PM
4 Kudos
NiFi supports compression which can decrease the size of files being transferred across the network. NiFi can split large files in to smaller files which can be reassembled back in to the original larger files by a NiFi on the other side of the transfer. Those split files could be sent via multiple concurrent threads. If network issue occurs, entire file transfer does not start over, just that one small piece. NiFi could be used to remove unneeded portions of the content that does not need to be transferred (think system logs where some log lines have no value. Those log lines could be removed from the the larger log file reducing it size before being transferred).
... View more
02-05-2016
04:53 PM
2 Kudos
HDF has processor for connecting to HDFS (listHDFS, fetchHDFS, GetHDFSSequenceFile, putHDFS and getHDFS)
... View more
02-01-2016
05:50 PM
4 Kudos
You could use the invokeHTTP processor to connect to a restAPI to pull data. Once NiFi has the data you could do things like extract parts of the data in to NiFi FlowFIle attributes using the extractText processor.
... View more
01-27-2016
01:13 PM
2 Kudos
Just to add to what Aldrin said above. if you do not have the nifi.web.http.host populated in the nifi.properties on each of you nodes with either an IP or hostname that is reachable from the NCM, it is likely that your nodes are resolving that value to localhost. In this case the app log on the NCM would contain log lines showing localhost in the url string it is using for sending messages to the nodes. That of course will fail and the result would be the error you are indicating. Editing anything in the nifi.properties file will require a restart to take affect.
... View more
01-11-2016
08:40 PM
By Default NiFi will run as what ever user starts NiFi.
NiFi can however be configured to start as a specific user by setting the run.as= property inside the bootstrap.conf file.
*** Note that the user configured in this property will need to have sudo privileges to the java executable on linux based deployments. This could interfere with some processors that are dependent on the java process being owned by a particular user since it will be owned by root. *** Setting the run.as user allows you to setup NiFi as a service that can be configured to start as part of the OS starting.
... View more
01-08-2016
01:51 PM
9 Kudos
**** This article only pertains to HDF 1.x or Apache NiFi 0.x baseline versions. HDF 2.x+ and Apache NiFi 1.x went through a major infrastructure change in the NiFi core. Converting a Standalone NiFi instance
into a NiFi Cluster The purpose
of this article is to cover the process of taking standalone NiFi instance and
converting it into a NiFi cluster. Often times as a NiFi user we start simple
by creating a new dataflow on a single installed instance of NiFi. We may find ourselves doing this to save on
resources during the build and test phase or because at the time a single
server or VM provide all the resources required. Later we want to add in the fault tolerance
that a NiFi cluster provides or resources become constrained as our dataflow
grows requiring additional resources that a NiFi cluster also provides. It is possible to take all that work done on
that single Instance and convert it for use in NiFi Cluster. Your single NiFi
instance could easily become one of the attached Nodes to that cluster as well. The “work”
I am referring to would be the actual dataflow design you implemented on the
NiFi graph/canvas, any controller-services or reporting tasks you created, and
all the templates you created. There are
some additional considerations that will also be covered in this article that
deal with file/resource dependencies of your NiFi installation. This article
will not cover how to setup either NiFi Nodes or the NiFi Cluster Manger (NCM).
It will simply explain what steps need to be done to prepare these items for
installation in to a NiFi cluster you have already setup but have not started
yet. The biggest
difference between a NCM and node or standalone instance is where each stores
its flow.xml and templates. Both
standalone instances and cluster nodes rely on a flow.xml.gz file and a
templates directory. The NCM on the
other hand relies only on a flow.tar file.
Since the NCM is the manager of the NiFi cluster, it will need to have
its flow.tar created before it can be started or you will end up starting up
with nothing on your graph/canvas. The flow.xml.gz file: Your
existing standalone NiFi instance will have a flow.xml.gz file located in the
conf directory (default configuration) of your NiFi software installation. If
it is not found there, you can verify the location it was changed to by looking
in the nifi.properties file. This file is by far the most important file in
your NiFi instance. The flow.xml.gz file will include everything that you have
configured/added to the NiFi graph/canvas.
It will also include any and all of your configured controller-services
and reporting-tasks created for your NiFi. The flow.xml.gz file is nothing more
then a compressed xml file. The NCM of
your NiFi cluster does not use a flow.xml.gz file but rather relies on the
existence of a flow.tar file. So you
can’t just copy the flow.xml.gz file from your standalone instance into the
conf directory of your NCM and have everything work. The flow.tar consists of several files: The good news is while we do need
to create the flow.tar, we do not need to create all of the files that go inside
of it. NiFi will create the missing
pieces for you. All we need to do is get your flow.xml inside a tar named
flow.tar. You can so this by following
the below procedure: 1.Get a copy of your flow.xml.gz file from your
standalone instance. 2.Uncompress it using the gunzip command:
gunzip flow.xml.gz 3.Tar it up using the tar command:
tar –cvf flow.tar flow.xml Now you have your flow.tar needed
for your NiFi cluster NCM. What about any templates I created
on my standalone instance? How do I get
them loaded in to my NCMs flow.tar file? The TEMPLATES
directory: If you
created templates on your standalone instance, you will need to import those it
to your cluster after it is running and has at least one connected node. If you
do not have any templates, this section of the guide can be skipped. On your
standalone NiFi instance there is a templates directory located inside the conf
directory (default configuration). It will contain an xml file ending with
.template for every template you created.
These files cannot be just placed inside the flow.tar file like we
did with the flow.xml. The templates.xml
file inside the flow.tar is not actually xml but rather a binary file. You can
either use a custom java jar to create that binary templates.xml file or you
can import these manually later after your cluster is up and running. So make
sure you save off a copy of your templates directory since you will need it no
matter which path you choose. Option 1: (Use custom java code to create binary
templates.xml file) Copy the
attached java jar file (templates.jar.tar.gz) to the system containing the saved off
templates directory from your standalone instance. Unpack the file using the command: tar -xvzf templatejar.tar.gz
templatejar.tar.gz
Run the following command to create the
templates.xml file:
java -cp template.jar combine <path to templates
dir>/* * note the star, it needs a list of
files, not just a directory.
** It will create a templates.xml file in the
current directory Take the templates.xml file created and add it
to the flow.tar file you already created using the following command:
tar –rvf flow.tar templates.xml Place this flow.tar file in the conf directory
of your NCM instance. Option 2: (Manually add
templates after Cluster is running) Once you
have your NiFi cluster up and running, you can import each template from the
saved off templates directory just like you would import any other template
shared with you. Click on the templates icon in the upper right corner of the UI. Select the button in the upper right. Select one of the xxxx.template files you saved
off. Select the button to import the file. The file will be put inside the flow.tar and
placed in the templates directory on each of your nodes Repeat steps 2 – 4 for every, template file. What about
the controller-services and reporting-tasks I created on my standalone
instance? Controller-services and Reporting-tasks At the
beginning I mentioned how your flow.xml.gz file contained any controller
services and/or reporting tasks you may have created through the UI. Then a little later I show you that the
flow.tar has separate controller-services.xml and reporting-tasks.xml files in
it. The flow.xml used to create your
flow.tar still has this information; in fact that is where it still needs to
be. These two xml files in the flow.tar
are where any controller services or reporting tasks you create in the UI for
the “Cluster Manager” specifically will be kept. All the reporting tasks and controller
service in the flow.xml will run on every node in your cluster. There may
be controller services that you originally created on your standalone instance
that make better sense running in the NCM now that you are clustered. You can
create new “Cluster Manager” controller services and delete the “node” controller services after your NiFi
cluster is running. The following
controller services should be run on the “Cluster Manager”: -DistributedMapCacheServer -DistributedSetCacheServer There are also controller services
that may need to run on both nodes and your NCM. The following is a good
example: -StandardSSLContextService Some other controller services that run on the
NCM may need this service to function if they are configured to run securely
using SSL. Bottom line here is nothing needs
to be done before your NiFi cluster is started with regards to either
controller services or reporting tasks. Now for the good news… How
does my existing standalone instance differ from a node in my new NiFi cluster? Standalone NiFi versus Cluster
Node You now
have everything needed to get your NCM up running with the templates and flow
from your standalone NiFi. What do I do
with my standalone instance now?
Structurally, there is no difference between a standalone NiFi instance
and a NiFi Node. A Standalone instance
can easily become just another node in your cluster by configuring the
clustering properties at the bottom of its nifi.properties file and restarting
it. In fact we could just use this
standalone instance to replicate as many nodes as you want. There are a
few things that you need to remember when doing this. Some will apply no matter
what while others depend on your specific configuration: The things that always matter: Every node must have it own copy of every file
declared in any processor, controller-service or reporting task.
For
example some processors like MergeContent allow you to configure a Demarcation file
located somewhere on your system. That file would need to exist in the same
directory location on every Node. Every Node must have its nifi.properties file
updated so that the following properties are populated with unique values for
each node:
nifi.web.http.host= or nifi.web.https.host= (if
secure) Every Node must have the same value as the NCM
for the following nifi.properties property:
nifi.sensitive.props.key= *** This value must match what was used on your standalone instance when
the flow was created. This value was used to encrypt all passwords in your
processors. NiFi will not start if it cannot decrypt those passwords. If the NCM exists on the same system as one of
your Nodes (common practice since NCM is a light weight NiFi instance)
NCM and Node on same machine cannot share any
ports numbers. They must be unique in the nifi.properties files. The things that might matter: If your
NiFi is configure to run securely (https) or you are using the
StandardSSLContextService, there are some additional considerations since the
same identical dataflow runs on every node. SSL based controller services: Every
server/VM used for a NiFi cluster is typically assigned its own server
certificate that is used for SSL authentication. We have to understand that in a cluster every
Node is running the exact same dataflows and controller services. So for example if you have a
StandardSSLContextService configure to use a keystore located at /opt/nifi/certs/server1.jks with password
ABC, every Node will expect to find a server1.jks file in the same place on
each server and that the password is ABC.
Since each system gets a unique certificate, how do we work around this
issue? We cannot work around the
password issue, so every certificate and keystore must be created using the
same password. However, every key can be unique. We can create a symbolic link that maps a
generic keystore name to the actual keystore on each system System1: keystore.jks à
/opt/nifi/certs/server1.jks System2: keystore.jks à
/opt/nifi/certs/server2.jks And so on…. We can now
configure our StandardSSLContextService to use keystore.jks and the matching
password that every keystore/certificate was created with.
Modify security settings in each nodes
nifi.properties file. The
nifi.properties file on every node will need to have the keystore and
truststore properties modified to point to the particular nodes keystores and
truststores. In addition the
authority provider will need to be changed. In a standalone NiFi instance the authority
provider specified by the property nifi.security.user.authority.provider=
is set to file-provider. In a secure cluster, the NCM should be set to
cluster-ncm-provider
while each of the nodes should have it set to cluster-node-provider. Making these changes will also require
modifying the associated authority-providers.xml file. Modify the authority-providers.xml file: The
Authority-providers.xml file is where you configure both the
cluster-ncm-provider and the cluster-node-provider. There will already be commented out sections
in this fie for both. The cluster-ncm-provider will need to be configured to
bind to port and told where to find your authorized-users.xml file (this file
should have been copied from your standalone instance). The
cluster-node-provider simply needs to be configured with the same port number
used for the ncm provider above. Summary: The above
process may sound very involved, but it really isn’t that bad. We can sum up
these steps in the following high-level checklist: The conversion checklist:
Save off all the templates you may have created
on your standalone instance. Create flow.tar using flow.xml.gz from your standalone
instance. If you have templates, use provided
templates.jar to convert them for inclusion in the above flow.tar file Configured NCM and all Nodes nifi.properties
file security properties section with
same value for nifi.sensitive.props.key, unique
keystores, and proper nifi.security.user.authority.provider. All processors, controller-services, and
reporting-tasks file dependencies exist in same directory paths on every Node. Use symbolic links so that all keystores/truststores
have identical filenames on every node. (If using SSL enabled controller-services
or processors) Use same keystore/truststore passwords on every
Node. (If using SSL enabled controller-services or processors) After the cluster is running, import each of the
saved off templates (only necessary if you did not use templates.jar to create
templates.xml file)
... View more
Labels:
01-04-2016
05:08 PM
12 Kudos
This is part 2 of the fault tolerance tutorial. Part 1 can be found here:
https://community.hortonworks.com/articles/8607/ho... -------------------------------------------------------------------------------------------------------------------------------------- Multi-Cluster Fault Tolerance (One
cluster in each zone) (All Clusters Active all the time) Multiple
clusters can be setup across multiple zones and all are active at the same
time. This option requires some external process to keep flows in sync across
all clusters. Lets take a look at what
this setup might look like if we had 3 zones all running a NiFi cluster. We can see
right away that there is a lot more involved with the S2S configuration between
the sources systems and these multiple zones.
Towards the beginning of this guide we showed what a typical NiFi S2S
setup might look like on a source NiFi.
With this configuration the sources would need to be setup differently.
Rather than delivering data to the Nodes of a single cluster, as we have done
in all the other configurations so far, we will need to distribute delivery to
all three of our clusters in this configuration. Each of the
clusters operates independently of one another.
They do not share status or FlowFile information between them. In this
configuration, data will only stop flowing to one of the clusters if one of the
following is true:
1.The NCM for that cluster reports no nodes
available. a.This can happen if all nodes are down, but the
NCM is still up. b.Or NCM has marked all Nodes as disconnected due
to lack of heartbeats. 2.None of the Nodes in the cluster can be
communicated with over the S2S port to deliver data. 3.None of the nodes are accepting data because the
destination is full or backpressure has disabled the input ports on all nodes. The following image shows how a
source NiFi would be setup to do this load distribution to three separate clusters
running in your different zones. The DistributeLoad Processor would be configured as follows: The “Number of Relationships” value should be configured for
the number of clusters you will be load-balancing your data to. Each
relationship will be used to establish the connection with those clusters. If setup
correctly, data will be spread across all the NiFi clusters running in all
three zones. Should any zone become
unreachable completely (no Nodes reachable), data will queue on the connection
to that RPG until the backpressure object threshold configured on that
connection is reached. At that time all
new data will only be distributed to the connections still accepting files for
the other zones. Lets take a
look at a couple different scenarios: Scenario 1: Zone
Partial Outage In this
scenario we will take a look at what happens when connectivity to the NCM and
one node in Zone B is lost. In
the above scenario the NCM went down before it marked the first Node as
disconnected so the source systems will still try to push data to that down
node. That will of course fail so the source will try a different node within
Zone B. Had the node been marked disconnected before the NCM went down, the
source systems would only know about the two still connected nodes and only try
delivering to them. The remaining
reachable nodes in Zone B’s cluster now take on more data because of the lost
node. Data distribution to the nodes in the
clusters located in the remaining Zones are unaffected. What if you lost two nodes? The
last node in Zone B’s cluster would then receive all the data for that
cluster. (33.3% to Zone A, 33.3% to zone
B, and 33.3% to Zone C) Remember that
the clusters in each of the Zones do not talk to one another so that
distribution of data from the sources does not change. We can however design a
flow to make sure this use case does not cause us problems. Immediately following the input port (Input
ports are used to receive data via S2S) on the dataflow in every one of our Zone’s
cluster, we need to add a ControlRate processor. *** Penalty duration on ControlRate processor should be
changed from 30 to 1 sec. In the NiFi version (0.4.1) available at the time of
writing this, there is a bug that can affect the processors configured rate.
(reported in https://issues.apache.org/jira/browse/NIFI-1329
) Changing the penalty duration gets us around this bug. Remember
that because this is a cluster, the same flow exists on every Node. We can then
configure this ControlRate processor to allow only a certain amount of data to
pass through it based on a time interval. So if it were set to 1000
Flowfiles/sec, the 3 node cluster would allow a max of 3000 Flowfiles/sec to
flow through the processors. If we then set backpressure on the connection
between the input port and this ControlRate processor, we can effectively stop
the input port if the volume suddenly becomes too high. This will then trigger data to start to queue
up on our source. If you remember, backpressure
was also set on those source connections. That forces the sender to send more
data to the other two clusters, keeping this degraded cluster from becoming
overwhelmed. We can see
below what that would look like on the sending NiFi if the Zone B cluster became
degraded. Backpressure
is being applied at times on zone B’s connection, so more data is going to zone
A and zone C. Every time that
backpressure is applied more data is going to the Zone A and Zone C clusters.
Backpressure kicks in on Zone B’s connection when the input port on the Zone B
cluster nodes stop accepting data because of the control rate configuration. You
can see that data is still flowing to Zone B, but at a lower rate. Scenario 2: Complete Outage of Zone In this
scenario we have a complete outage of one of our zones. The overall behavior is
very close to what we saw in scenario 1.
Lets assume we lost Zone C completely. As we have
already learned, depending on the order in which the NiFi cluster in Zone C
went down, the source NiFi(s) may or may not be still trying to send data to
the nodes in this cluster via S2S.
Either way with a complete outage, no data will be sent and will begin
to queue on the connection feeding the RPG for Zone C on the sending
NiFis. Backpressure being set on that
connection is very important to make sure the data queue does not continue to
grow and grow. There is no failover
mechanism in S2S when all Nodes are down to cause this queued data to redirect
somewhere else such as another RPG. So this data will remain in this queue until
either the queue itself is manually redirected or the down cluster is returned
to service. As we can
see in the above, no files are sending to Zone C and backpressure has kicked in
on the connection feeding the Zone C RPG.
As a result all data is being load balanced to Zone A and Zone B with
the exception of the 100 files stuck in Zone C’s connection. Manual intervention would be required by a
DFM to re-route these 100 files. They
will of course transfer to Zone C on their own as soon as it comes back online. Just like the first two configurations, this configuration has
its pros and cons. PROs:
1.Losing one does not have impact on being able to
access the NCM of the other two zones. So monitoring of the dataflow on the
running clusters is still available. 2.Cluster communications between nodes and NCM are
not affected by cross-domain network latency. 3.Each zone can scale up or down with out needing
to interface with source NiFis sending data via S2S. 4.No DNS changes are required. CONs:
1.Some burden of setting up load balancing falls
back on the Source NiFis when using S2S.
Source NiFis will need to setup a flow designed to spread data across
each of the destination clusters. 2.There is no auto failover of data delivery from
one zone to another. Manual intervention
is required to redirect queues feeding down zones. 3.Dataflows must be managed independently on the
cluster in each zone. There is no manager of managers that can make sure a
dataflow is kept in sync across multiple clusters. 4.Changes are made directly on each cluster, the
flow.tar (on NCM) and flow.xml.gz (on Nodes) is not interchangeable between
clusters. They may appear the same;
however, component IDs for each of the processors and connections will differ. This type of deployment is best
suited for when the flow is static across all clusters. In order to keep the dataflow static, it
would typically be built on a development system and then deployed to each
cluster. In this configuration, changes
to the dataflow, directly on the production clusters would not be common
practice.
... View more
01-04-2016
04:54 PM
15 Kudos
Using NiFi to provide fault tolerance across data centers (The Multi-Zone approach) The purpose
of this document is to show how to take a NiFi cluster and create some additional
fault tolerance using multiple zones/regions. You can think of these
zones/regions as physically separated data centers. (They may not even be
geographically co-located) The document
will cover a few different configurations including one large cluster that
spans across multiple zones, two independent clusters (each located in a
different zone) with only one zone active at a time, and finally multiple
clusters (each located in a different zone) all active at the same time. There are pros and cons to each configuration
that will be covered. Depending on your deployment model, one configuration may
be better suited then another. A NiFi cluster consists of a single
NiFi Cluster Manager (NCM) and 1 to N number of NiFi Nodes. NiFi does not set a
limit on the number of Nodes that can exist in a cluster. While a NiFi cluster
does offer a level of fault tolerance through multiple nodes, this document
dives deeper in to having fault tolerance via a multiple zone deployment
approach. The role of the NCM is to provide a single point of management for
all the Nodes in a given cluster.
Changes a Dataflow Manager (DFM) makes on the NiFi canvas/graph,
controller services or reporting tasks are pushed out to every Node. DFMs are
the people responsible for and have the permission to construct dataflows
within NiFi through the process of stringing together multiple NiFi processors.
These connected Nodes all operate
independently of each other. The Nodes
all communicate with the NCM via heartbeat messages. These heartbeat messages allow the NCM to
gather together everything each Node is doing and report back to the user in
single point. While the NCM itself never handles actual data itself, it is a
pivotal part of the cluster. The NCM
plays an important role when it comes to using Remote Process Groups (RPG) to
send/pull FlowFiles from one NiFi to another. The process of sending data
between NiFi instances using RPGs is known as Site-To-Site (S2S). We will see later how the biggest
consideration with setting up fault tolerance revolves around dealing with S2S
communications. Since the NCM (at the time of
writing this) serves as single point of failure for access to a NiFi cluster
for monitoring and configuration, we will also discuss recovery of a lost NCM
in this document. The loss of a NCM does not inhibit the Nodes from continuing
to process their data. Its loss simply means the user has lost access to make
changes and monitor the cluster until it is restored. Lets take a quick high-level look
at how S2S works when talking to a NiFi cluster. Source NiFi systems (Source systems could be
standalone instances of NiFi or Nodes in NiFi cluster) will
add a RPG to their graph. The RPG
requires a URL to the target NiFi System.
Since we are talking to a NiFi cluster in this case, the URL provided
would be that of the NCM. Why the
NCM? After all it does not want any of your
data nor will it accept it. Well the NCM
knows all about its cluster (How many Nodes are currently connected, their
address on the network, how much data they have queued, etc…). This is a great source of information that
the source system will use for smart load-balanced delivery of data. So the target NiFi NCM provides this
information back to the source NiFi instance(s). The Source system(s) then smartly send data
directly to those nodes via the S2S port configured on each of them. So why is
S2S so important when talking about fault tolerance? Well since the source NiFi instances all
communicate with a single NCM to find out about all the nodes that data can be
sent to, we have a single point of failure. How bad is that? In order to add a
RPG to the graph of any source NiFi, that source NiFi must be able to
communicate with the target NCM. During
that time and on a regular interval there after, the target NCM updates the
source NiFi(s) with node status. Should
the target NCM become unreachable, the source NiFi(s) will continue to use the
last status it received to continue to try to send data to the nodes. Provided those nodes have not also gone down
or become unreachable, dataflow will continue.
It is important to understand that since the source is doing smart data
distribution to the target cluster’s nodes based on the status updates it gets,
during an target NCM outage the load balanced behavior is no longer that
smart. What do I mean by this? The
source will continue to distribute data to the target nodes under the
assumption that the last known strategy is still the best strategy. It will
continue to make that assumption until the target NCM can be reached for new
node statuses. Ok great, my data is still flowing even without the NCM. Here is
what the dataflow for delivery via S2S would look like on a source NiFi. Some
NiFi processor(s) connected to a RPG. This RPG would then load balance and smartly deliver data to
one or more input ports on each of the Nodes in the destination NiFi cluster. This single
point of failure (NCM) also affects the DFM(s) of the target NiFi because
without the NCM the UI cannot be reached. So no changes can be made to graph,
no monitoring can be done directly, and no provenance queries can be executed
until the NCM is brought back online. Does fault tolerance exist outside
of S2S? That depends on the design of the overall dataflow. The nodes in a cluster provide a level of fault
tolerance. Source systems that are not
NiFis or NiFis that are exchanging data without using S2S would need to be
configured to load-balance data to all the Nodes directly. The NCM has no role
in negotiating those dataflow path(s). The value added with S2S is that scaling
up or down a cluster is made easy. Source systems are automatically made aware
of what nodes are available to them at any given time. For those transfer methods other
than S2S, the infrastructure needs to support having these non-NiFi sources
failover data delivery between nodes in the one or more NiFi clusters. Here is an example of one such load-balanced
delivery directly to a target cluster’s nodes with failover: You can see
here that if one Node should become unreachable, the data will route to failure
and get redistributed to the other nodes. The examples that follow will only
cover the S2S data transport mechanism because it used to transfer data
(specifically NiFi FlowFiles) between NiFi instances/clusters and the NCM plays
an important role in that process. The above non-S2S delivery approach does not
interface with the NCM at all. Single NCM with nodes spanning multiple
zones The first
configuration we will discuss consists of a single NCM with nodes spanning
multiple zones. This will be our
active-active setup since the nodes in each zone will always be processing
data. To provide fault tolerance each
zone should be able to handle the entire load of data should the other zone(s)
go down. In a two-zone scenario during
normal operations, the load in either zone should not exceed 50% system
resource utilization. The NCM
cannot exist in more then one zone in this setup, so we must place the NCM in
one zone or the other. What about
S2S where the NCM plays a more important role?
Here is a quick look at how S2S would work for our multi-zone NiFI
cluster: Lets take a
look at a couple possible scenarios with this type of configuration. Scenario 1: Zone A
Down Here we
explore what would happen if we were to lose Zone A. The NCM is still reachable
in Zone B, so the source NiFi(s) will still be able to reach it and get status
on the remaining connected Nodes also in Zone B. Zone B is
now processing 100% of the data until Zone A is brought back online. Once back
online, the NCM will again receive heartbeats from the Zone A Nodes and report
those nodes as once again available to the source NiFi(s). It is also
possible that not all NiFi nodes in Zone A are down. If that is the case, the NCM will still
report those Nodes as available and data will still be sent to them. Scenario 2: Zone B
Down In this
scenario the source NiFi(s) can no longer talk to the NCM to get an updated
list of available nodes or status on each of the connected nodes. The source
NiFi(s) will continue to try to send data based on the last successful status
they received before Zone B went down.
While the Source systems will continue to try to send data to Zone B
nodes, those attempts will fail and that data will fail over to the zone A
nodes. In this scenario Zone A takes on
100% of the data until Zone B is back online. Based on what we have learned from the
preceding examples of a single NCM with nodes spanning multiple zones, what are
the Pros and Cons to this configuration? PROs:
1.All Nodes are always running current version of
dataflow in every zone. 2.Since data is flowing through all Nodes in every
zone, end-to-end functionality of the dataflows can be verified within each
zone. This avoids the surprises that can accompany bringing backup systems
online. 3.Provides better dataflow change management for
clusters with constantly changing dataflows.
No need to backup and transfer flow.tar to a separate backup cluster
every time a change is made. 4.While no Zone failures exist, extra resource
headroom is available to support spikes in processing load. 5.For S2S usage, there is no complex dataflow
design to manage on source NiFis. 6.Data from source NiFis to target NiFis using S2S
will only stop if all nodes in all zones become unavailable. (No manual
intervention on source NiFis needed to get data transferring to target nodes
that are still online.) CONs:
1.DFM(s) lose access to the cluster if the zone
containing the NCM goes down. 2.If the zone that is lost contained the NCM, source
NiFi(s) will continue trying to send data to all Nodes in every zone because it
is no longer receiving node statuses from that NCM. 3.Original NCM must be returned to service to
restore full cluster operations. 4.Standing up a new NCM requires reconfiguring all
nodes and restarting them so they connect to it (see the next section for how
to stand up a new NCM). 5.Network latency between zones can affect the
rate at which changes can be made to dataflows. 6.Network latency between zones can affect how
long it takes to return queries made against the cluster (Provenance). This configuration is typically
used when dataflows are continuously changing.
(Development in production model). This allows you to make real-time
changes to dataflow across multiple zones. The dataflows on each node are kept
identical (not just in design but all the component IDs for all processors and
connections are identical as well). Multi-Cluster Fault Tolerance (One
cluster in each zone) (One Cluster Active at a time) In this
configuration you have two or more completely separate NiFi clusters. One cluster
serves as your active cluster while the other remains off in a standby state. This
type of configuration would rely on some process external to and not provided
by NiFi to facilitate failover between clusters. DNS plays an important role in this
type of configuration because of how S2S works. We will explain why as we walk
through the scenario. Here is how a two zone setup would look: Here we
have all the source systems sending data to a NiFi cluster running in zone
B. We'll refer to this as our Active or Primary
cluster for now. The diagram above shows the use of S2S to send data from
source NiFis in to this cluster. Scenario 1: Partial
Outage on Active Cluster Lets look at the simplest case where
connectivity to the NCM or just a few Nodes in Zone B is lost. The loss of the
NCM only affects S2S, so we will only discuss that mode of data exchange
between systems here. If any of the
source systems using S2S lose connectivity to the NCM in Zone B, those source
systems will continue to use the last known state of the Nodes to continue
sending data to the Nodes. It is also
possible that in addition to losing connectivity with the NCM, connectivity to
one or more of the Nodes may be lost. In that case, the source systems will
continue to try and send data to the last list of active Nodes received from
the NCM. Here we see
loss of connectivity with the NCM and one Node. If the NCM went down before the
Node, the list of Nodes the source systems use for delivery will still include
that down node. Attempts to send data to
that down node will fail and caused the sending systems to try to send it to
one of the other Nodes from the list it is working from. In this scenario the active cluster is in a
degraded state. The Nodes that are still operating are carrying the full
workload. Scenario 2: Active
Cluster Completely Down Lets say there was a complete
outage of zone B. In that case the sending
NiFi(s) would be unable to send any data and would start to queue, creating a
backlog of data. In order to continue
processing while Zone B is down, the NiFi cluster in Zone A will need to be
started, but first we need to discuss how Zone A should have already been
setup.
1.The NCM configuration in Zone A must be
identical in configuration to the NCM in Zone B with the exception of IP
address. That identical configuration
must include the public resolvable hostname. a.Why is this important? The RPG processor set up on the NiFi sources
are trying to communicate with a specific NCM. If the NCM in Zone A also uses
the same hostname, no changes need to be made to the flow on all of the various
source NiFi(s). 2.There is no need for the Nodes to be identical
in either hostname or IP to those in the other zone. Once source NiFi(s)
connect to new NCM, they will get a current listing of the Nodes in Zone A. 3.All Nodes should be in a stopped state with no
flow.xml.gz file or template directory located in their conf directories. 4.The flow.tar file located in the Zone B NCM
should be copied to the Zone A NCM after any changes to the conf directory of
the NCM on Zone A. How often depends on
how often changes are made to the dataflow on Zone B. So Zone A is now setup so that it
is ready to take over running your dataflow.
Lets walk through the process that would need to occur in this scenario
where the complete loss of Zone B has occurred. This can be done manually, but
you may benefit from automating this process:
1.Zone B is determined to be down or NiFi(s)
running in Zone B are unreachable. 2.Zone A’s NCM is started using the most recent
version of the flow.tar from Zone B. 3.Zone A’s Nodes are started. These Nodes are connected to the NCM and
obtain the flow.xml.gz and templates from the NCM. 4.The DNS entry must be changed to resolve the
Zone B’s NCM hostname to the IP of Zone A’s NCM. (Reminder, Zone A NCM should
be using the same hostname as the Zone B NCM only with a different IP address) *** This DNS change is necessary so
that none of the source systems sending data via S2S need to change their
configuration. There are Pros and Cons to this
configuration as well: PROs:
1.Standby cluster can be turned on quickly
returning full access to the users so that monitoring and changes to the
dataflow can be done. 2.Being able to modify dataflows is not affected
by nodes down in other zone. Any modifications made would need to be copied to
zone B, which is now the new standby. 3.After switching to the standby zone, S2S is not full
functionality is restored and is not degraded by the lack of a NCM. 4.Cluster communications between nodes and the NCM
are not affected by network latency between zones. CONs: 1.To avoid breaking S2S, DNS must be used redirect
traffic to new NCM IP. This process occurs outside of NiFi. 2.The process of detecting a complete zone outage
and switching from one Zone to another is not built in to NiFi. 3.Some downtime in dataflow will exist while
transitioning form one zone to the other. 4.Zone B, while now the standby, will still need
to be brought back online to work off any data it had in its queue when it went
offline. This configuration is typically
used when high latency exists between zones. The latency does not affect
dataflow, but can affect the amount of time it takes for changes to be made on
the graph and the amount of time it takes for provenance and status queries to
return. External scripts/cron jobs are
needed to keep the backup zone in sync with the active zone. -------------------------------------------------------------------------------------------- Part 2 of this tutorial can be found here: https://community.hortonworks.com/articles/8631/ho... Part 2 covers Multi-Cluster Fault Tolerance (One cluster in each zone with All Clusters Active all the time)
... View more
Labels:
10-30-2015
01:13 AM
We need to make sure to get the terminology correct first. A "FlowFile" in NiFi is the combination of Metadata (A map of key/value pair attribute strings) and the associated content. Are we asking how to take an Attribute that is part of the FlowFile and add it to the content portion of a FlowFile? If so, Joe's answer above covers that.
... View more
- « Previous
- Next »