Member since
02-09-2016
559
Posts
422
Kudos Received
98
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2135 | 03-02-2018 01:19 AM | |
3520 | 03-02-2018 01:04 AM | |
2366 | 08-02-2017 05:40 PM | |
2343 | 07-17-2017 05:35 PM | |
1718 | 07-10-2017 02:49 PM |
10-03-2016
10:19 PM
5 Kudos
Objectives
Upon completion of this tutorial, you should have a 3 node cluster of NiFi running on CentOs 7.2 using Vagrant and VirtualBox. Prerequisites
You should already have installed VirtualBox 5.1.x. Read more here: VirtualBox
You should already have installed Vagrant 1.8.6. Read more here: Vagrant
NOTE: Version 1.8.6 fixes an annoying bug with permissions of the authorized_keys file and SSH. I highly recommend you upgrade to 1.8.6.
You should already have installed the vagrant-vbguest plugin. This plugin will keep the VirtualBox Guest Additions software current as you upgrade your kernel and/or VirtualBox versions. Read more here: Vagrant vbguest plugin
You should already have installed the vagrant-hostmanager plugin. This plugin will automatically manage the /etc/hosts file on your local mac and in your virtual machines. Read more here: Vagrant hostmanager plugin Scope
This tutorial was tested in the following environment:
Mac OS X 10.11.6 (El Capitan)
VirtualBox 5.1.6
Vagrant 1.8.6
vagrant-vbguest plugin 0.13.0
vagrant-hostnamanger plugin 1.8.5
Apache NiFi 1.0.0 Steps Create Vagrant project directory
Before we get started, determine where you want to keep your Vagrant project files. Each Vagrant project should have its own directory. I keep my Vagrant projects in my ~/Development/Vagrant directory. You should also use a helpful name for each Vagrant project directory you create.
$ cd ~/Development/Vagrant
$ mkdir centos7-nifi-cluster
$ cd centos7-nifi-cluster
We will be using a CentOS 7.2 Vagrant box, so I include centos7 in the Vagrant project name to differentiate it from a CentOS 6 project. The project is for NiFi, so I include that in the name. And this project will have a cluster of machines. Thus we have a project directory name of centos7-nifi-cluster. Create Vagrantfile
The Vagrantfile tells Vagrant how to configure your virtual machines. The name of the file is
Vagrantfile and it should be created in your Vagrant project directory. You can choose to copy/paste my Vagrantfile below or you can download it from the attachments to this article.
# -*- mode: ruby -*-
# vi: set ft=ruby :
# Using yaml to load external configuration files
require 'yaml'
Vagrant.configure("2") do |config|
# Using the hostmanager vagrant plugin to update the host files
config.hostmanager.enabled = true
config.hostmanager.manage_host = true
config.hostmanager.manage_guest = true
config.hostmanager.ignore_private_ip = false
# Loading in the list of commands that should be run when the VM is provisioned.
commands = YAML.load_file('commands.yaml')
commands.each do |command|
config.vm.provision :shell, inline: command
end
# Loading in the VM configuration information
servers = YAML.load_file('servers.yaml')
servers.each do |servers|
config.vm.define servers["name"] do |srv|
srv.vm.box = servers["box"] # Speciy the name of the Vagrant box file to use
srv.vm.hostname = servers["name"] # Set the hostname of the VM
srv.vm.network "private_network", ip: servers["ip"], :adapater=>2 # Add a second adapater with a specified IP
srv.vm.network :forwarded_port, guest: 22, host: servers["port"] # Add a port forwarding rule
srv.vm.provision :shell, inline: "sed -i'' '/^127.0.0.1\t#{srv.vm.hostname}\t#{srv.vm.hostname}$/d' /etc/hosts"
srv.vm.provider :virtualbox do |vb|
vb.name = servers["name"] # Name of the VM in VirtualBox
vb.cpus = servers["cpus"] # How many CPUs to allocate to the VM
vb.memory = servers["ram"] # How much memory to allocate to the VM
vb.customize ["modifyvm", :id, "--cpuexecutioncap", "33"] # Limit to VM to 33% of available CPU
end
end
end
end
Create a servers.yaml file
The servers.yaml file contains the configuration information for our virtual machines. The name of the file is
servers.yaml and it should created in your Vagrant project directory. This file is loaded in from the Vagrantfile. You can choose to copy/paste my servers.yaml file below or you can download it from the attachments to this article.
---
- name: nifi01
box: bento/centos-7.2
cpus: 2
ram: 2048
ip: 192.168.56.101
port: 10122
- name: nifi02
box: bento/centos-7.2
cpus: 2
ram: 2048
ip: 192.168.56.102
port: 10222
- name: nifi03
box: bento/centos-7.2
cpus: 2
ram: 2048
ip: 192.168.56.103
port: 10322
Create a commands.yaml file
The commands.yaml file contains a list of commands that should be run on each virtual machine when it is first provisioned. The name of the file is
commands.yaml and it should created in your Vagrant project directory. This file is loaded in from the Vagrantfile and allows us to automate configuration tasks that would otherwise be tedious and/or repetitive. You can choose to copy/paste my commands.yaml file below or you can download it from the attachments to this article.
- "sudo yum -y install net-tools ntp wget java-1.8.0-openjdk java-1.8.0-openjdk-devel"
- "sudo systemctl enable ntpd && sudo systemctl start ntpd"
- "sudo systemctl disable firewalld && sudo systemctl stop firewalld"
- "sudo sed -i --follow-symlinks 's/^SELINUX=.*/SELINUX=disabled/g' /etc/sysconfig/selinux"
Start the virtual machines
Once you have created the 3 files in your Vagrant project directory, you are ready to start your cluster. Creating the cluster for the first time and starting it every time after that uses the same command:
$ vagrant up
Once the process is complete you should have 3 servers running. You can verify by looking at VirtualBox. You should notice I have 3 virtual machines running called nifi01, nifi02 and nifi03:
Connect to each virtual machine
You are able to login to each of the virtual machines via ssh using the
vagrant ssh command. You must specify the name of the virtual machine you want to connect to.
$ vagrant ssh nifi01
Verify that you can login to each of the virtual machines: nifi01, nifi02, and nifi03. Download Apache NiFi
We need to download the NiFi distribution file so that we can install it on each of our nodes. Instead of downloading it 3 times, we will download it once on our Mac. We'll copy the file to our Vagrant project directory where each of our virtual machines can access the file via the /vagrant mount point.
$ cd ~/Documents/Vagrant/centos7-nifi-cluster
$ curl -O http://mirror.cc.columbia.edu/pub/software/apache/nifi/1.0.0/nifi-1.0.0-bin.tar.gz
NOTE: You may want to use a different mirror if you find your download speeds are too slow. Create nifi user
We will be running NiFi as the
nifi user. So we need to create that account on each server.
$ vagrant ssh nifi01
$ sudo useradd nifi -d /home/nifi
Repeat the process for nifi02 and nifi03. I recommend having 3 terminal windows open from this point forward, one for each of the NiFi servers. Verify Host Files Now that you are logged into each server, you should verify the /etc/hosts file on each server. You should notice the Vagrant hostmanager plugin as updated the /etc/hosts file with the ip address and hostnames of the 3 servers. NOTE: If you see 127.0.0.1 nifi01 (or nifi02, nifi03) at the top of the /etc/hosts file, delete that line. It will cause issues. The only entry with 127.0.0.1 should be the one with localhost. UPDATE: If you use the updated Vagrantfile with the "sed" command, this will remove the extraneous entry at the top of the host file. The following line was added to the Vagrantfile to fix the issue. srv.vm.provision :shell, inline: "sed -i'' '/^127.0.0.1\t#{srv.vm.hostname}\t#{srv.vm.hostname}$/d' /etc/hosts"
Extract nifi archive
We will be running NiFi from our /opt directory, which is where we will extract the archive. You should already be connected to the server from the previous step.
$ cd /opt
$ sudo tar xvfz /vagrant/nifi-1.0.0-bin.tar.gz
$ sudo chown -R nifi:nifi /opt/nifi-1.0.0
Repeat the process for nifi02 and nifi03 Edit nifi.properties file
We need to modify the nifi.properties file to setup clustering. The nifi.properties file is the main configuration file and is located at <nifi install>/conf/nifi.properties. In our case it should be located at
/opt/nifi-1.0.0/conf/nifi.properties.
$ sudo su - nifi
$ cd /opt/nifi-1.0.0/conf
$ vi nifi.properties
You should edit the following lines in the file on each of the servers:
nifi.web.http.host=nifi01
nifi.state.management.embedded.zookeeper.start=true
nifi.cluster.is.node=true
nifi.cluster.node.address=nifi01
nifi.cluster.node.protocol.port=9999
nifi.zookeeper.connect.string=nifi01:2181,nifi02:21818,nifi03:2181
nifi.remote.input.host=nifi01
nifi.remote.input.secure=false
nifi.remote.input.socket.port=9998
NOTE: Make sure you enter the hostname value that matches the name of the host you are on.
You have the option to specify any port for
nifi.cluster.node.protocol.port as long as there are no conflicts on the server and it matches the other server configurations. You have the option to specify any port for
nifi.remote.input.socket.port as long as there are no conflicts on the
server and it matches the other server configurations. The nifi.cluster.node.protocol.port and nifi.remote.input.socket.port should be different values.
If you used
vi to edit the file, press the following key sequence to save the file and exit vi:
:wq
Edit zookeeper.properties file
We need to modify the zookeeper.properties file on each of the servers. The zookeeper.properties file is the configuration file for zookeeper and is located at <nifi install>/conf/zookeeper.properties. In our case it should be located at
/opt/nifi-1.0.0/conf/zookeeper.properties. We are providing the list of known zookeeper servers.
$ cd /opt/nifi-1.0.0/conf
$ vi zookeeper.properties
Delete the line at the bottom of the file:
server.1=
Add these three lines at the bottom of the file:
server.1=nifi01:2888:3888
server.2=nifi02:2888:3888
server.3=nifi03:2888:3888
If you used
vi to edit the file, press the following key sequence to save the file and exit vi:
:wq
Create zookeeper state directory
Each NiFi server is running an embedded Zookeeper server. Each zookeeper instance needs a unique id, which is stored in the <nifi home>state/zookeeper/myid file. In our case, that location is
/opt/nifi-1.0.0./state/zookeeper/myid. For each of the hosts, you need to create the myid file. The ids for each server are: nifi01 is 1, nifi02 is 2 and nifi03 is 3.
$ cd /opt/nifi-1.0.0
$ mkdir -p state/zookeeper
$ echo 1 > state/zookeeper/myid
Remember that on nifi02 you
echo 2 and on nifi03 you echo 3 . Start NiFi
Now we should have everything in place to start NiFi. On each of the three servers run the following command:
$ cd /opt/nifi-1.0.0
$ bin/nifi.sh start
Monitor NiFi logs You can monitor the NiFi logs by using the tail command: $ tail -f logs/nifi-app.log
Once the servers connect to the cluster, you should notice log messages similar to this:
2016-09-30 13:22:59,260 INFO [Clustering Tasks Thread-2] org.apache.nifi.cluster.heartbeat Heartbeat created at 2016-09-30 13:22:59,253 and sent to nifi01:9999 at 2016-09-30 13:22:59,260; send took 5 millis
Access NiFi UI
Now you can access the NiFi web UI. You can log into any of the 3 servers using this URL:
http://nifi01:8080/nifi
You should see something similar to this:
Notice the cluster indicator in the upper left shows 3/3 which means that all 3 of our nodes are in the cluster. Notice the upper right has a post-it note icon. This icon gives you recent messages and will be colored red. You can see this screenshot showing a message about receiving a heartbeat.
Try accessing nifi02 and nifi03 web interfaces. Shutdown the cluster
To shutdown the cluster, you only need to run the vagrant command:
$ vagrant halt
Restarting the cluster
When you restart the cluster, you will need to log into each server and start NiFi as it is not configured to auto start. Review
If you successfully worked through the tutorial, you should have a Vagrant configuration that you can bring up at any time by using vagrant up and bringing down by using vagrant halt . You should also have Apache NiFi configured on 3 servers to run in a clustered configuration.
... View more
Labels:
10-01-2016
01:12 PM
1 Kudo
@Maikel Alderhout The docker load command does take some time as the docker file is ~14GB in size. However, it looks like you are trying to load the tar.gz file from your example above. Before running the docker load command, you must first gunzip the HDP_2.5_docker.tar.gz file. $ gunzip HDP_2.5_docker.tar.gz
$ docker load < HDP_2.5_docker.tar
... View more
09-29-2016
08:19 PM
@Marc Caubet Glad I could help!
... View more
09-29-2016
04:27 PM
@Ankit Jain Sometimes you have to delete the connection between two processors before you can delete one of the processors. Try that to see if you can delete your processor.
... View more
09-29-2016
03:41 PM
@Praneender Vuppala Hive does support UTF-8 encoding of data. As @jk has shown, you can create the table using the LazySimpleSerDe. You can read more about it Hive's UTF support here: Hive User FAQ You can use Unicode string on data/comments, but cannot use for database/table/column name.
You can use UTF-8 encoding for Hive data. However, other encodings are not
supported (HIVE-7142 introduce encoding for LazySimpleSerDe, however,
the implementation is not complete and not address all cases).
... View more
09-29-2016
01:59 AM
11 Kudos
Overview
This tutorial is intended to walk you through the process of creating a Process Group in NiFi to feed multiple Twitter streams to Elasticsearch.
This tutorial is the second part of a two part series. The first part can be found here: HCC Article. In this part of the series, we will create a process group which contains multiple Twitter feeds funneled to a single Elasticsearch instance. This allows you to have multiple feeds of data with different processing needs prior to pushing to Elasticsearch. We will be able to query Elasticsearch to see data from both of our example streams.
Admittedly this is a contrived example. However, the concept is fundamentally useful across a variety of NiFi use cases.
NOTE: The only required software components are NiFi and Elasticsearch which can be run in just about any Linux environment. However I recommend deploying these as part of your HDP sandbox or test cluster allowing for a broader integration of tools and capabilities such as Pig, Hive, Zeppelin, etc.
Prerequisites
You should already have completed the Using NiFi GetTwitter, UpdateAttributes and ReplaceText processors tutorial and associated prerequisites: HCC Article
Scope
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6
HDP 2.5 Tech Preview on Hortonworks Sandbox, although it should work for any HDP 2.5 deployments
Apache NiFi 1.0.0 (Read more here: Apache NiFi)
Elasticsearch 2.4.0, although it should work for any Elasticsearch version > 2.x (Read more here: Elasticsearch)
Steps
We are picking up from where the last tutorial left off. We currently have a single dataflow. Our GetTwitter processor combines multiple filters. We would like to clearly define two GetTwitter processors, each with their own filter.
This is what our current data flow looks like:
Create a process group
The first thing we are going to do is to add a Process Group to our NiFi canvas. To do this, drag the Process Group icon from the menu bar to the canvas area. Here is a screenshot showing the Process Group icon:
Once you drag the process group icon to the canvas, the Add Process Group dialog will be displayed. It should look similar to this:
Give the process a group a meaningful name. In our case, we will call it Twitter Feed . Click the ADD button. The process group will be added to the canvas. You should see something similar to this:
You should drag the process group so that it is easier to see. You should see something similar to this:
Copy data flow to process group
Now we want to copy our existing flow to the process group we just created. Select all 4 of the processors in our current flow. Press the COMMAND-C (or CTRL-C on Windows) to copy the selected components. Now double click on the Twitter Feed process group. This will open the process group. You should see something similar to this:
Notice the canvas is now blank? You should also notice the bread crumb navigation in the lower left of the screen. NiFi Flow >> Twitter Feed is your indication that you are inside the process group.
Now we can paste our copied flow files on the canvas. You should be able to press the COMMAND-V (CTRL-V on Windows) to copy our flow to the process group. You should see something similar to this:
You should see the 4 processors copied to the canvas. You should also notice the connections are missing. We need to reestablish the connections. Before doing that, we are going to delete the PutElasticsearch processor. It already exists outside of the process group and we don't need a copy inside.
Delete PutElasticsearch processor
Inside of your processor group (missing connections and bread crumb in lower left will confirm this), select the PutElasticsearch processor by clicking on it. Now you can delete it by pressing the delete key. You should see something similar to this:
Create connections between processors
Now we are going to create connections between the 3 processors. Drag the circle arrow icon from the GetTwitter processor to the UpdateAttribute processor. You don't need to change anything; click the ADD button. Drag the circle arrow icon from the UpdateAttribute processor to the ReplaceText processor. You don't need to change anything; click the ADD button. You should see something similar to this:
You should notice a red triangle in the upper left of the ReplaceText processor. That is because we haven't connected it to anything yet. We'll get to that shortly.
Edit GetTwitter processor
This first dataflow will be for our elasticsearch related tweets. We need to edit the GetTwitter processor to filter only on elasticsearch . Right click on the processor and select configure . Click the PROPERTIES tab. Click on the Terms to Filter On value field to edit the value. Enter elasticearch as the only term. Click the OK button to save the change. You should see something similar to this:
Click the APPLY button to save the change.
Copy data flow
We need a similar data flow within this process group. The second data flow should be filtering on the term solr . To do that, select all 3 processors and press the COMMAND-C keys. Now press the COMMND-V keys to paste a copy of the processors. You should see something similar to this:
The processors you copied should still be selected. Let's move them so it's easier to see the two flows. Drag the selected processors to the right. You should see something similar to this:
Edit GetTwitter processor
We need to edit the GetTwitter processor for the second data flow. Follow the same procedure we did the first time, only this time use the term solr . You should have something that looks like this:
Create connections between processors
As we did before, create the connections between the processors in the second flow. Drag the circle arrow icon from the GetTwitter processor to the UpdateAttribute processor. You don't need to change anything; click the ADD button. Drag the circle arrow icon from the UpdateAttribute processor to the ReplaceText processor. You don't need to change anything; click the ADD button. You should see something similar to this:
Create Output Port
We need the data flow from this process group to be sent outside of the group to enable connections to our Elasticsearch processor. To enable this, we are going to add an Output Port. Drag the Output Port icon from the menu bar to the canvas area. Here is a screenshot showing the Output Port icon:
An Add Port dialog should be displayed. You should see something similar to this:
This is a user-friendly name for the port that will be created. We'll call our port From Twitter Feed . Click the ADD button to add the port. You should see something similar to this:
You should notice a red triangle in the upper left of our From Twitter Feed Output Port. This because there is no connection defined yet.
Create connections to Output Port
Now we need to create a connection from each of the ReplaceText processors to the Output Port. To do this, drag the circle arrow icon from the ReplaceText processor the Output Port. A Create Connection dialog will be displayed. Select the success relationship. Click the ADD button to create the connection. Do this for both ReplaceText processors. Now you should see something similar to this:
Create connection between Process Group and PutElasticsearch processor
Now we are ready to create the connection between our Process Group and our PutElasticsearch processor. Using the bread crumb navigation in the lower left, click on the NiFi Flow link to go up a level.
You should see something similar to this:
We no longer need the GetTwitter, UpdateAttribute and ReplaceText processors on the main canvas. Select each of the connections between the processors and delete the connections with the delete key. You should see something similar to this:
Now delete the GetTwitter, UpdateAttribute and ReplaceText processors from the main canvas. We want to keep the PutElasticsearch processor. You should see something similar to this:
Create a connection between the process group and the PutElasticsearch processor by dragging the circle arrow icon from the process group to the PutElasticsearch processor. A Create Connection dialog will be displayed. You don't need to change any options, so click the ADD button to create the connection. You should have something that looks similar to this:
If you look inside your process group now, you should notice the red triangle is gone for the Output Port. That is because a connection exists now.
Start processors
Now we can start our processes to test our flow. If you click on the processor group and then click the start arrow icon, that will start all of the processors inside the processor group. You should notice the start arrow icon in the processor group goes from 0 to 7 and the stop square icon goes from 7 to 0.
Because we are filtering on specific terms, it may take 20 or 30 minutes before any matching tweets are pulled in. Be patient. Once tweets start coming in you should see something similar to this:
You should notice the tweets are queuing up. We have not yet started our PutElasticsearch processor. Go ahead and do that now. Click on the PutElasticsearch process and click on the start arrow icon. You should see something similar to this:
You should noticed the queued tweets have been processed and are now in Elasticsearch.
Query Elasticsearch
We can now query Elasticsearch using the custom field we created, twitterFilterAttribute . If you let the data flow run log enough, you should have at least a few tweets for each GetTwitter processor.
In your broswer window, query Elasticsearch using the following http://sandbox.hortonworks.com:9200/twitter_new/_search?pretty . You should see something similar to this:
<code>{
"took" : 26,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"failed" : 0
},
"hits" : {
"total" : 252831,
"max_score" : 1.0,
"hits" : [ {
"_index" : "twitter_new",
"_type" : "default",
"_id" : "0827ce3c-21ab-4dfa-9d17-0ba90c116142",
"_score" : 1.0,
"_source" : {
"created_at" : "Thu Sep 15 13:56:06 +0000 2016",
"id" : 776419323955048448,
"id_str" : "776419323955048448",
"text" : "RT @cymia: I have the biggest heart I swear.",
"source" : "<a href=\"http://twitter.com/download/android\" rel=\"nofollow\">Twitter for Android</a>",
"truncated" : false,
"in_reply_to_status_id" : null,
"in_reply_to_status_id_str" : null,
"in_reply_to_user_id" : null,
"in_reply_to_user_id_str" : null,
"in_reply_to_screen_name" : null,
"user" : {
"id" : 997413793,
"id_str" : "997413793",
"name" : "#######",
"screen_name" : "#######",
"location" : "Future In The Present",
"url" : null,
"description" : "~~_Trust No Bitch⚔ Y'all Opinions Doesn't Define Who I Am ✨ C/o '17 \uD83D\uDC10\uD83C\uDF93",
"protected" : false,
"verified" : false,
"followers_count" : 453,
"friends_count" : 391,
"listed_count" : 0,
"favourites_count" : 1448,
"statuses_count" : 6803,
"created_at" : "Sat Dec 08 15:41:26 +0000 2012",
"utc_offset" : -14400,
"time_zone" : "#######",
"geo_enabled" : false,
"lang" : "#######",
"contributors_enabled" : false,
"is_translator" : false,
"profile_background_color" : "BADFCD",
"profile_background_image_url" : "#######",
"profile_background_image_url_https" : "#######",
"profile_background_tile" : false,
"profile_link_color" : "FF0000",
"profile_sidebar_border_color" : "F2E195",
"profile_sidebar_fill_color" : "FFF7CC",
"profile_text_color" : "0C3E53",
"profile_use_background_image" : true,
"profile_image_url" : "#######",
"profile_image_url_https" : "#######",
"profile_banner_url" : "#######",
"default_profile" : false,
"default_profile_image" : false,
"following" : null,
"follow_request_sent" : null,
"notifications" : null
},
...
You should notice that you should have a large number of tweets. In my case I have 252831 . Now let's query against our new field. In your browser, enter the following http://sandbox.hortonworks.com:9200/twitter_new/_search?q=twitterFilterAttribute:elasticsearch&pretty . You should get a much smaller number of tweets. In my case I got 2 documents back. Here is my output:
<code>{
"took" : 142,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 12.341856,
"hits" : [ {
"_index" : "twitter_new",
"_type" : "default",
"_id" : "a4226ef1-5bfe-4aff-84aa-dd357e874356",
"_score" : 12.341856,
"_source" : {
"created_at" : "Tue Sep 27 21:56:45 +0000 2016",
"id" : 780888938483425288,
"id_str" : "780888938483425288",
"text" : "Build a Search Engine with Node.js and Elasticsearch#######",
"source" : "<a href=\"http://twitter.com\" rel=\"nofollow\">Twitter Web Client</a>",
"truncated" : false,
"in_reply_to_status_id" : null,
"in_reply_to_status_id_str" : null,
"in_reply_to_user_id" : null,
"in_reply_to_user_id_str" : null,
"in_reply_to_screen_name" : null,
"user" : {
"id" : 35983221,
"id_str" : "35983221",
"name" : "#######",
"screen_name" : "#######",
"location" : "#######",
"url" : "#######",
"description" : "Full stack web developer (PHP, Java, Rails), Docker enthusiast, gamer, also addicted to eletronic, photography and technology.",
"protected" : false,
"verified" : false,
"followers_count" : 398,
"friends_count" : 660,
"listed_count" : 84,
"favourites_count" : 348,
"statuses_count" : 2532,
"created_at" : "Tue Apr 28 04:04:57 +0000 2009",
"utc_offset" : -14400,
"time_zone" : "#######",
"geo_enabled" : true,
"lang" : "#######",
"contributors_enabled" : false,
"is_translator" : false,
"profile_background_color" : "5D7382",
"profile_background_image_url" : "#######",
"profile_background_image_url_https" : "#######",
"profile_background_tile" : false,
"profile_link_color" : "CC0000",
"profile_sidebar_border_color" : "000000",
"profile_sidebar_fill_color" : "EFEFEF",
"profile_text_color" : "333333",
"profile_use_background_image" : true,
"profile_image_url" : "#######",
"profile_image_url_https" : "https://pbs.twimg.com/profile_images/760303804092968965/9mekDmQy_normal.jpg",
"profile_banner_url" : "#######",
"default_profile" : false,
"default_profile_image" : false,
"following" : null,
"follow_request_sent" : null,
"notifications" : null
},
"geo" : null,
"coordinates" : null,
"place" : null,
"contributors" : null,
"is_quote_status" : false,
"retweet_count" : 0,
"favorite_count" : 0,
"entities" : {
"hashtags" : [ ],
"urls" : [ {
"url" : "#######",
"expanded_url" : "#######",
"display_url" : "sitepoint.com/search-engine-…",
"indices" : [ 53, 76 ]
} ],
"user_mentions" : [ ],
"symbols" : [ ]
},
"favorited" : false,
"retweeted" : false,
"possibly_sensitive" : false,
"filter_level" : "low",
"lang" : "en",
"timestamp_ms" : "1475013405806",
"twitterFilterAttribute" : "elasticsearch"
}
...
Notice the new field is present in the data and it contains elasticsearch as the value? Now let's query for solr. Type the following in your browser http://sandbox.hortonworks.com:9200/twitter_new/_search?q=twitterFilterAttribute:solr&pretty . You should get a similarly small number of results. In my case I got 2 documents returned. Here is my output:
<code>{
"took" : 28,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 12.341865,
"hits" : [ {
"_index" : "twitter_new",
"_type" : "default",
"_id" : "b8b618db-99d8-4ac8-910b-84a20fa58396",
"_score" : 12.341865,
"_source" : {
"created_at" : "Tue Sep 27 21:56:15 +0000 2016",
"id" : 780888813157711872,
"id_str" : "780888813157711872",
"text" : "RT @shalinmangar: #Docker image for @ApacheSolr 6.2.1 is now available. https://t.co/lrakkMMhJn #solr",
"source" : "<a href=\"http://twitter.com\" rel=\"nofollow\">Twitter Web Client</a>",
"truncated" : false,
"in_reply_to_status_id" : null,
"in_reply_to_status_id_str" : null,
"in_reply_to_user_id" : null,
"in_reply_to_user_id_str" : null,
"in_reply_to_screen_name" : null,
"user" : {
"id" : 362698158,
"id_str" : "362698158",
"name" : "#######",
"screen_name" : "#######",
"location" : "#######",
"url" : "#######",
"description" : "Digital Photography, Information Retrieval, Data Warehousing, Big Data, Cloud Computing. Solutions Engineer @ Hortonworks.",
"protected" : false,
"verified" : false,
"followers_count" : 310,
"friends_count" : 732,
"listed_count" : 48,
"favourites_count" : 108,
"statuses_count" : 5003,
"created_at" : "Fri Aug 26 20:53:04 +0000 2011",
"utc_offset" : null,
"time_zone" : null,
"geo_enabled" : true,
"lang" : "en",
"contributors_enabled" : false,
"is_translator" : false,
"profile_background_color" : "C6E2EE",
"profile_background_image_url" : "#######",
"profile_background_image_url_https" : "#######",
"profile_background_tile" : false,
"profile_link_color" : "1B95E0",
"profile_sidebar_border_color" : "C6E2EE",
"profile_sidebar_fill_color" : "DAECF4",
"profile_text_color" : "663B12",
"profile_use_background_image" : true,
"profile_image_url" : "#######",
"profile_image_url_https" : "#######",
"profile_banner_url" : "#######,
"default_profile" : false,
"default_profile_image" : false,
"following" : null,
"follow_request_sent" : null,
"notifications" : null
},
"geo" : null,
"coordinates" : null,
"place" : null,
"contributors" : null,
"retweeted_status" : {
"created_at" : "Tue Sep 27 12:45:52 +0000 2016",
"id" : 780750304060899328,
"id_str" : "780750304060899328",
"text" : "#Docker image for @ApacheSolr 6.2.1 is now available. https://t.co/lrakkMMhJn #solr",
"source" : "<a href=\"http://twitter.com\" rel=\"nofollow\">Twitter Web Client</a>",
"truncated" : false,
"in_reply_to_status_id" : null,
"in_reply_to_status_id_str" : null,
"in_reply_to_user_id" : null,
"in_reply_to_user_id_str" : null,
"in_reply_to_screen_name" : null,
"user" : {
"id" : 7057932,
"id_str" : "7057932",
"name" : "#######",
"screen_name" : "#######",
"location" : "#######",
"url" : "#######",
"description" : "Engineer at Lucidworks, Committer on Apache Lucene/Solr, ex-AOLer",
"protected" : false,
"verified" : false,
"followers_count" : 1431,
"friends_count" : 388,
"listed_count" : 106,
"favourites_count" : 903,
"statuses_count" : 3758,
"created_at" : "Sun Jun 24 22:50:00 +0000 2007",
"utc_offset" : 19800,
"time_zone" : "New Delhi",
"geo_enabled" : true,
"lang" : "en",
"contributors_enabled" : false,
"is_translator" : false,
"profile_background_color" : "EDECE9",
"profile_background_image_url" : "#######",
"profile_background_image_url_https" : "#######",
"profile_background_tile" : false,
"profile_link_color" : "088253",
"profile_sidebar_border_color" : "D3D2CF",
"profile_sidebar_fill_color" : "E3E2DE",
"profile_text_color" : "634047",
"profile_use_background_image" : false,
"profile_image_url" : "#######",
"profile_image_url_https" : "#######",
"default_profile" : false,
"default_profile_image" : false,
"following" : null,
"follow_request_sent" : null,
"notifications" : null
},
"geo" : null,
"coordinates" : null,
"place" : null,
"contributors" : null,
"is_quote_status" : false,
"retweet_count" : 8,
"favorite_count" : 9,
"entities" : {
"hashtags" : [ {
"text" : "Docker",
"indices" : [ 0, 7 ]
}, {
"text" : "solr",
"indices" : [ 78, 83 ]
} ],
"urls" : [ {
"url" : "#######",
"expanded_url" : "#######",
"display_url" : "#######",
"indices" : [ 54, 77 ]
} ],
"user_mentions" : [ {
"screen_name" : "ApacheSolr",
"name" : "Apache Solr",
"id" : 22742048,
"id_str" : "22742048",
"indices" : [ 18, 29 ]
} ],
"symbols" : [ ]
},
"favorited" : false,
"retweeted" : false,
"possibly_sensitive" : false,
"filter_level" : "low",
"lang" : "en"
},
"is_quote_status" : false,
"retweet_count" : 0,
"favorite_count" : 0,
"entities" : {
"hashtags" : [ {
"text" : "Docker",
"indices" : [ 18, 25 ]
}, {
"text" : "solr",
"indices" : [ 96, 101 ]
} ],
"urls" : [ {
"url" : "#######",
"expanded_url" : "#######",
"display_url" : "h#######",
"indices" : [ 72, 95 ]
} ],
"user_mentions" : [ {
"screen_name" : "shalinmangar",
"name" : "Shalin Mangar",
"id" : 7057932,
"id_str" : "7057932",
"indices" : [ 3, 16 ]
}, {
"screen_name" : "ApacheSolr",
"name" : "Apache Solr",
"id" : 22742048,
"id_str" : "22742048",
"indices" : [ 36, 47 ]
} ],
"symbols" : [ ]
},
"favorited" : false,
"retweeted" : false,
"possibly_sensitive" : false,
"filter_level" : "low",
"lang" : "en",
"timestamp_ms" : "1475013375926",
"twitterFilterAttribute" : "solr"
}
...
Look for the twitterFilterAttribute field. You should see it has the value solr .
Review
If you were able to successfully work through the tutorial, you should have a good understanding how to create multiple flows within a process group and how to feed that data to an output port. In this tutorial, we created 2 feeds for different Twitter filters which added a new field called twitterFilterAttribute to the Twitter JSON data. This field is now searchable within Elasticsearch to easily filter sources of data using a single index.
Next Steps
For next steps, you could try using the RouteOnAttribute processor to direct the flow to different Elasticsearch processors which write to different indexes.
... View more
Labels:
09-27-2016
02:34 PM
3 Kudos
@Robbert Naastepad It looks like there is a data type mismatch according to the error: ERROR org.apache.pig.tools.grunt.Grunt - ERROR 0:
Output Location Validation Failed for: 'riskfactor More info to
follow: Pig 'double' type in column 2(0-based) cannot map to HCat
'BIGINT'type. Target filed must be of HCat type {DOUBLE} Details at
logfile:
/hadoop/yarn/local/usercache/maria_dev/appcache/application_1474973150203_0003/container_1474973150203_0003_01_000002/pig_1474977081603.log
2016-09-27 11:51:28,746 [main] INFO org.apache.pig.Main - Pig script
completed in 7 seconds and 330 milliseconds (7330 ms
The log indicates that it's attempting to store a DOUBLE into a target column that should be a BIGINT. It saying "in column 2(0-based)", so the problem is with totmiles.
... View more
09-27-2016
02:09 PM
@Marc Caubet Call the API to see what configuration is coming back on the repositories: curl -u username:password http://ambariserver:8080/api/v1/stacks/HDP/versions/2.4/operating_systems/redhat7/repositories Is it returning the configuration you modified in repoinfo.xml? Did you restart Ambari Server after making those changes? According to the docs you can specify a repo like this: PUT /api/v1/stacks/:stack/versions/:stackVersion/operating_systems/:osType/repositories/:repoId
{
"Repositories” : {
"base_url” : ”<CUSTOM_REPO_BASE_URL>",
"verify_base_url” : true
}
}
Documentation here: https://cwiki.apache.org/confluence/display/AMBARI/Blueprints#Blueprints-Step4:SetupStackRepositories(Optional)
... View more
09-25-2016
03:46 PM
2 Kudos
@Arkaprova Saha
I'm a huge fan of Python myself. I think you'll find it is useable across a broad range of scenarios in and outside of Big Data. Python is very popular among Data Scientists. If you search Google for "python data science", you will find there are a number of online courses available to you. While Scala is native for Spark, Python is very well supported. It's also a very good language for data manipulation. You can use Python in NiFi. You an use Python with Hive and Pig for UDFs (User Defined Functions). Python is used for Ambari service scripts.
... View more
09-22-2016
01:51 PM
7 Kudos
Objective
This tutorial is intended to walk you through the process of using the GetTwitter, UpdateAttribute, ReplaceText and PutElasticsearch processors in Apache NiFi to modify Twitter JSON data before sending it to Elasticsearch.
This tutorial is the first part of a two part series. In this part of the series, we will create a single data flow that adds an additional field to the JSON data called twitterFilterAttribute using the ReplaceText processor. This will allow us to query Elasticsearch using a fielded query like q=twitterFilterAttribute:elasticsearch. The second part of the series will build on this example to create a process group with two GetTwitter feeds: one with elasticsearch term filter and the other with a solr term filter.
Admittedly this is a contrived example. However, the concept is fundamentally useful across a variety of NiFi use cases.
NOTE: The only required software components are NiFi and Elasticsearch which can be run in just about any linux environment. However I recommend deploying these as part of your HDP sandbox or test cluster allowing for a broader integration of tools and capabilities such as Pig, Hive, Zeppelin, etc. Prerequisites
You should already have completed the NiFi + Twitter + Elasticsearch tutorial and associated prerequisites: HCC Article Scope
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6 HDP 2.5 Tech Preview on Hortonworks Sandbox, although it should work for any HDP 2.5 deployments Apache NiFi 1.0.0 (Read more here: Apache NiFi) Elasticsearch 2.4.0, although it should work for any Elasticsearch version > 2.x (Read more here: Elasticsearch) Steps Stop NiFi processors
If your NiFi workflow from the previous tutorial is running, then you should stop your GetTwitter and PutElasticsearch processors. Your NiFi data flow should look something similar to this: NOTE: my processors are running in this screen shot:
As you can see, we have two NiFi processors. This is a very simple data flow. Remove Connection
Before we add the UpdateAttribute processor, we are going to remove the connection between the GetTwitter and PutElasticsearch processors. Click on the connection between the two processors to select the connection. Now press the delete or backspace key to delete the connection.
NOTE: You must have both processors stopped before you can delete the connection.
You should now see something similar to this:
Add UpdateAttribute Processor Now we are going to add the UpdateAttribute processor. Drag the processor icon from the NiFi menu bar to the data flow canvas. You will see the Add Processor dialog. Type updateattr in the filter box to filter the list of processors. You should something similar to this:
Select the UpdateAttribute processor and click the ADD button. You should see something similar to this:
Rearrange the processors on the canvas to make it easier to follow/trace connections later. You should have something similar to this:
Configure UpdateAttribute Processor
We are now going to configure the UpdateAttribute processor. Right click on the UpdateAttribute processor and select the Configure menu option. Click on the PROPERTIES tab. You should see something similar to this:
We are going to add a new property. Click on the + (plus) icon. You should see something similar to this:
For the Property Name, enter twitterFilterAttribute. This will add a property called twitterFilterAttribute to the flow files coming through this processor. Now click the OK button and you should see something similar to this:
For the Value, enter elasticsearch. This is the value that will be added to the twitterFilterAttribute property. Now click the OK button and then the APPLY button. Add Connection Between GetTwitter and UpdateAttribute Processors
We need to add a connection between the GetTwitter and UpdateAttribute processors. You do this by hovering over the GetTwitter processor until you see the circle-arrow icon. Drag the icon to the UpdateAttribute processor. You should see something similar to this:
You do not need to change any settings here. Click the ADD button to add the connection. Add ReplaceText Processor
We are now going to add the ReplaceText processor. Drag the processor icon from the NiFi menu bar to the data flow canvas. You will see the Add Processor dialog. Type replace in the filter box to filter the list of processors. You should something similar to this:
Select the ReplaceText processor and click the ADD button. Configure ReplaceText Processor
We are now going to configure the ReplaceText processor. We want to change the JSON message data and we are going to do it using Regular Expressions, which is enabled with the ReplaceText processor. You can read more on regular expressions here: Wikipedia.
Here is what the message looks like coming in:
{
...
"filter_level" : "low",
"lang" : "en",
"timestamp_ms" : "1473786418611"
}
Here is what the message should look like going out:
{
...
"filter_level" : "low",
"lang" : "en",
"timestamp_ms" : "1473786418611",
"twitterFilterAttribute"" : "elasticsearch"
}
We need to add a , and our new field twitterFilterAttribute with our value after the last entry in the JSON, but before the last } character.
Right click on the ReplaceText processor and select the Configure menu option. Click on the PROPERTIES tab. You should see something similar to this:
We need to change the Search Value and Replacement Value settings. Click on the Value box for the Search Value line. You should see something similar to this:
The value in this box is a regular expression. We are going to replace the entire value with: (?s:(^.*)}$) This regular expression looks for anything from the beginning of the line to a } character at the end of the line. Any matches it finds is put into a regular expression group by the () characters. We are looking for a } at the end of the line because that is the last part of the Twitter JSON message data. You will notice that we don't include the } in the () group. This is because we need to add a value before the closing } which we'll do in the Replacement Value section.
This regular expression will match everything up to the last line of the incoming message data:
...
"timestamp_ms" : "1473786418611"
}
Once you have entered the regular expression, click the OK button. Now we are going to change the Replacement Value setting. Click on the Value box for the Replacement Value line. You should see something similar to this:
The value in this box is a regular expression group. We are going to replace the entire value with: $1,"twitterFilterAttribute":"${twitterFilterAttribute}"} This will replace the entire text of the incoming data with the first matching group, which is all of the JSON twitter text without the last }. We then add a , because each JSON node needs to be separated by a comma. The "twitterFilterAttribute" text is a literal string. The ${} in the second part of that string is NiFi Expression Language. This adds the value of the attribute twitterFilterAttribute to the string.
Once you have entered the regular expression, click the OK button. You should see something similar to this:
You don't need to change any other settings. Click the APPLY button.
NOTE: Be careful using copy/paste as sometimes smart quotes will be inserted instead of standard quotes. This will cause Elasticsearch JSON parsing to have issues. Add Connection Between UpdateAttribute and ReplaceText Processors
We need to add a connection between the UpdateAttribute and ReplaceText processors. The process is the same as before. You do this by hovering over the UpdateAttribute processor until you see the circle-arrow icon. Drag the icon to the ReplaceText processor. You should see something similar to this:
You do not need to change any settings here. Click the ADD button to add the connection. Add Connection Between ReplaceText and PutElasticsearch Processors
We need to add a connection between the ReplaceText and PutElasticsearch processors. The process is similar to before. You do this by hovering over the ReplaceText processor until you see the circle-arrow icon. Drag the icon to the PutElasticsearch processor. You should see something similar to this:
You should notice this dialog doesn't look exactly the same as before. The For Relationships gives you both success and failure options. The last two times we did this, you only have the success option. For this connection, we are going to check the success box. You do not need to change any other settings here. Click the ADD button to add the connection.
Now we need to go back to the ReplaceText processor and make a change. You should notice a red triangle icon on this processor. That is because there is a failure relationship that we haven't handled. Right click on the processor and click the Configure option. Click the SETTINGS tab. For the Auto Terminate Relationships setting, check the failure option. You should see something similar to this:
This setting will drop any records where the ReplaceText processor was not successful. The connection to PutElasticsearch only accepts the successful replacement attempts. Click the APPLY button to save the settings.
Your final data flow should look similar to this:
Turn On processors
Now we can turn on all of our processors to make sure everything works. Make sure you have started Elasticsearch.
You can select all of the processors by pressing the CMD-A (CTRL-A if you are on Windows) keys. You should see something similar to this:
Then you can click the play arrow icon to start the flow. Verify New Field
Now we should be able to query Elasticsearch and verify the new field exists. You can type the following into a browser window to query Elasticsearch:
http://sandbox.hortonworks.com:9200/twitter/default/_search?q=twitterFilterAttribute:elasticsearch&pretty
You should get results from Elasticsearch using this query. Troubleshooting If you do not get any results when querying Elasticsearch, verify the query above. With the default schema, it may be case sensitive. In other words, twitterFilterAttribute is not the same as twitterfilterattribute.
If you experience any errors writing to Elasticsearch, the problem is likely one of two things: 1) you have not started Elasticsearch or 2) you have a copy/paste issue with smart quotes in your ReplaceText processor settings. Here are the kinds of messages you may to see if you have a smart quote issue:
,"twitterFilterAttribute":"elasticsearch”} ]}
MapperParsingException[failed to parse]; nested: JsonEOFException[Unexpected end-of-input in VALUE
_STRING
at [Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@681b0c9a; line: 1, column: 4443]];
at org.elasticsearch.index.mapper.DocumentParser.parseDocument(DocumentParser.java:156)
Review We have modified our existing, simple data flow to a create a new field in our Twitter JSON data. This field is being captured in Elasticsearch as twitterFilterAttribute and allows us to query Elasticsearch based on the values stored in this field. Next Steps Look for the next article in the series which will use process groups in NiFi with multiple Twitter streams using different filters and values for twitterFilterAttribute being written to Elasticsearch.
... View more
Labels: