Member since
04-11-2016
471
Posts
325
Kudos Received
118
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2070 | 03-09-2018 05:31 PM | |
2632 | 03-07-2018 09:45 AM | |
2529 | 03-07-2018 09:31 AM | |
4388 | 03-03-2018 01:37 PM | |
2468 | 10-17-2017 02:15 PM |
01-20-2017
03:05 PM
3 Kudos
@Michal R You can use an UpdateAttribute processor to change the filename. However, this would end up with every file having the same filename. Assuming each input filename is unique excpet has a .csv extension, you could do the following:
This would essentially replace the .csv extension with .avro while leaving the rest of the original filename unchanged. Thanks, Matt
... View more
03-11-2018
12:43 PM
Hi Kylo is providing the functionality you are looking for They have a concept together with nifi which is called reusable flow This flow can be connected to any feed templates and in the end all feeds hook into the same reusable flow in nifi So in short you create a flow in nifi with input port, create a template of this flow the reusable flow, export it and inport the file as a reusable flow in kylo. Create your feed template flows in nifi and create template of them, import the template from nifi in kylo and connect the output port of your template to the input port of the reusable flow. Create a feed in kylo from your template. Kilo will then generate a feed based on your feed template in nifi and connect this to the reusable flow. You can now modify your reusable flow and create a new template in nifi with the same name as the previous version of the reusable flow, export it to a file and import it to kylo and all your feeds are now connected to the new version of your reusable flow. If you do not want to use Kylo there is an alternative you create a group called reusable flow In this group you create a group called reusable flow v1 You connect your feeds to the input port on the reusable flow group And inside of the reusable flow you connect it to the v1 groups input port Now you want to make a new version of your reusable flow You make a copy of the group v1 to v2 Make the changes you want to do inside of v2, when you are satisfied you disconnect v1 and connect v2 instead. In this way it is very easy to switch to a new version without having to change any of your feeds. Below is link to youtube that describe the kylo reusable flow, but if you are only in nifi I recommend the groups in groups to maintain different versions as you then can easily switch back and forward without impacting all your feeds https://www.youtube.com/watch?v=Vj641MRJCd8
... View more
01-19-2017
10:49 AM
Hi @Pierre Villard. Thanks for the reply. Thats unfortunate, i was following this tutorial, https://github.com/georgevetticaden/hdp/tree/master/reference-apps/iot-trucking-app. I will attempt a previous version. Thank you
... View more
09-21-2016
01:24 PM
OMG, stupid me 😄 Thanks @mclark , exactly that solved the issue, sorry for bothering
... View more
09-07-2016
11:00 PM
4 Kudos
The objective of this post is to briefly explain how to setup an
unsecured NiFi cluster with NiFi 1.0.0 (a three nodes cluster with three embedded ZooKeeper instances). One really important change with this new version is the new paradigm
around cluster installation. From the NiFi documentation, we can read: Starting with the NiFi 1.0 release, NiFi
employs a Zero-Master Clustering paradigm. Each of the nodes in a NiFi
cluster performs the same tasks on the data but each operates on a
different set of data. Apache ZooKeeper elects one of the nodes as the
Cluster Coordinator, and failover is handled automatically by ZooKeeper.
All cluster nodes report heartbeat and status information to the
Cluster Coordinator. The Cluster Coordinator is responsible for
disconnecting and connecting nodes. As a DataFlow manager, you can
interact with the NiFi cluster through the UI of any node in the
cluster. Any change you make is replicated to all nodes in the cluster,
allowing for multiple entry points to the cluster.
OK, let’s start with the installation. As you may know it is greatly
recommended to use an odd number of ZooKeeper instances with at least 3
nodes (to maintain a majority also called quorum). NiFi comes with an
embedded instance of ZooKeeper, but you are free to use an existing
cluster of ZooKeeper instances if you want. In this article, we will use
the embedded ZooKeeper option. I have 3 VMs instances (minimal CentOS 7) that are able
to communicate to each other on requested ports. On each machine, I
configure my /etc/hosts file with: 192.168.56.101 node-1
192.168.56.102 node-2
192.168.56.103 node-3 I deploy the binaries file on my three instances and unzip it. I now have a NiFi directory on each one of my nodes. The first thing is to configure the list of the ZK (ZooKeeper) instances in the configuration file ‘./conf/zookeep.properties‘.
Since our three NiFi instances will run the embedded ZK instance, I
just have to complete the file with the following properties: server.1=node-1:2888:3888
server.2=node-2:2888:3888
server.3=node-3:2888:3888 Then, everything happens in the ‘./conf/nifi.properties‘. First, I specify that NiFi must run an embedded ZK instance, with the following property: nifi.state.management.embedded.zookeeper.start=true I also specify the ZK connect string: nifi.zookeeper.connect.string=node-1:2181,node-2:2181,node-3:2181 As you can notice, the ./conf/zookeeper.properties file has a property named dataDir. By default, this value is set to ./state/zookeeper. If more than one NiFi node is running an embedded ZK, it is important to tell the server which one it is. To do that, you need to create a file name myid and placing it in ZK’s data directory. The content of this file should be the index of the server as previously specify by the server.<number> property. On node-1, I’ll do: mkdir ./state
mkdir ./state/zookeeper
echo 1 > ./state/zookeeper/myid The same operation needs to be done on each node (don’t forget to change the ID). If you don’t do this, you may see the following kind of exceptions in the logs: Caused by: java.lang.IllegalArgumentException: ./state/zookeeper/myid file is missing Then we go to clustering properties. For this article, we are setting up an unsecured cluster, so we must keep: nifi.cluster.protocol.is.secure=false Then, we have the following properties: nifi.cluster.is.node=true
nifi.cluster.node.address=node-1
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file= I set the FQDN of the node I am configuring, and I choose the
arbitrary 9999 port for the communication with the elected cluster
coordinator. I apply the same configuration on my other nodes: nifi.cluster.is.node=true
nifi.cluster.node.address=node-2
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file= and nifi.cluster.is.node=true
nifi.cluster.node.address=node-3
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file= We have configured the exchanges between the nodes and the cluster
coordinator, now let’s move to the exchanges between the nodes (to
balance the data of the flows). We have the following properties: nifi.remote.input.host=node-1
nifi.remote.input.secure=false
nifi.remote.input.socket.port=9998
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec Again, I set the FQDN of the node I am configuring and I choose the
arbitrary 9998 port for the Site-to-Site (S2S) exchanges between the
nodes of my cluster. The same applies for all the nodes (just change the
host property with the correct FQDN). It is also important to set the FQDN for the web server property,
otherwise we may get strange behaviors with all nodes identified as
‘localhost’ in the UI. Consequently, for each node, set the following
property with the correct FQDN: nifi.web.http.host=node-1 And that’s all! Easy, isn’t it? OK, let’s start our nodes and let’s tail the logs to see what’s going on there! ./bin/nifi.sh start && tail -f ./logs/nifi-app.log If you look at the logs, you should see that one of the node gets
elected as the cluster coordinator and then you should see heartbeats
created by the three nodes and sent to the cluster coordinator every 5
seconds. You can connect to the UI using the node you want (you can have
multiple users connected to different nodes, modifications will be
applied on each node). Let’s go to: http://node-2:8080/nifi Here is what it looks like: As you can see in the top-left corner, there are 3 nodes in our cluster.
Besides, if we go in the menu (button in the top-right corner) and
select the cluster page, we have details on our three nodes: We see that my node-2 has been elected as cluster coordinator, and
that my node-3 is my primary node. This distinction is important because
some processors must run on a unique node (for data consistency) and in
this case we will want it to run “On primary node” (example below). We can display details on a specific node (“information” icon on the left): OK, let’s add a processor like GetTwitter. Since the flow will run on
all nodes (with balanced data between the nodes), this processor must
run on a unique processor if we don’t want to duplicate data. Then, in
the scheduling strategy, we will choose the strategy “On primary node”.
This way, we don’t duplicate data, and if the primary node changes
(because my node dies or gets disconnected), we won’t loose data, the
workflow will still be executed. Then I can connect my processor to a PutFile processor to save the tweets in JSON by setting a local directory (/tmp/twitter😞 If I run this flow, all my JSON tweets will be stored on the primary node, the data won’t be balanced.
To balance the data, I need to use a RPG (Remote Process Group), the
RPG will exchange with the coordinator to evaluate the load of each node
and balance the data over the nodes. It gives us the following flow: I have added an input port called “RPG”, then I have added a Remote Process Group that I connected to http://node-2:8080/nifi and I enabled transmission so that the Remote Process Group was aware
of the existing input ports on my cluster. Then in the Remote Process
Group configuration, I enabled the RPG input port. I then connected my
GetTwitter to the Remote Process Group and selected the RPG input port.
Finally, I connected my RPG input port to my PutFile processor. When running the flow, I now have balanced data all over my nodes (I can check in the local directory ‘/tmp/twitter‘ on each node). Note: this article is adapted from this one.
... View more
Labels:
09-08-2016
01:02 PM
@ pierre Villard - Hi- I went thru your article posted in below link - https://pierrevillard.com/2016/04/12/oauth-1-0a-with-apache-nifi-twitter-api-example/ In our API guide , I found that API uses use the OAuth 2.0 standard to authenticate all requests The APIs are all accessed via REST invocations and return results in JSON format. The API use the OAuth 2.0 standard to authenticate all requests To authenticate to the API endpoint, we will need a token we send with every API call. To retrieve a token please follow the steps below- Request an access token by sending your Client ID and Client Secret via HTTP Basic Authentication, using an HTTP POST request. The Client ID and Client Secret need to be encoded to Base64, using the UTF-8 character set, in the form of client_id:client_secret. A resource you can use for this purpose is https://www.base64encode.org/. This string is then passed as the Authorization header. The API will respond with an access token Note: Tokens are only valid for one hour. After one hour a new token is required. Pass the token as the Authorization header to access the API resources: API resource data is returned: I assume there were some more steps needed in between which is missing to make the correct flow. Can you please guide
... View more
09-01-2016
09:44 PM
🙂 thanks. its there. they should have removed it from the other place i was looking at.
... View more
09-02-2016
02:01 PM
Thanks Pierre now its beginning to make some sense , so the 5 GenerateFF processors are there to take care of the 5 countries I guess. I want to read my own log file , which processor would I use? I want to start with a simple task as read my log file , parse out some values by using the Regexp language and then save the parsed values to HIVE.
... View more
12-22-2016
01:35 PM
@David Kjerrumgaard I found out the issue was with my csv file. It had to do with the column names i was ingesting in the csv file. Also, the turning the Obtain Generated Keys property to false worked as well in my case.
... View more
12-30-2016
09:21 PM
In addition to @Pierre Villard's answer (which nicely gets the job done with ExecuteScript, I have a similar example here), since you are looking to do row-level operations (i.e. select columns from each row), you could use SplitText to split the large file into individual lines, then your ReplaceText above, then MergeContent to put the whole thing back together. I'm not sure which approach is faster per se; it would be an interesting exercise to try both.
... View more