The purpose of this tutorial is to walk you through the process of using NiFi to pull data from Twitter and push it to Elasticsearch. I also show an example Zeppelin dashboard which queries the Twitter data in Elasticsearch.
This is the second of two articles covering Elasticsearch on HDP. The first article covers manually creating Movie data in Zeppelin and pushing that data to Elasticsearch. You can find that article here: HCC Article
Note: The Zeppelin Elasticserch interpreter is a community provided interpreter. It is not yet considered GA by Hortonworks and should only be used for development and testing purposes.
Note: While not required, I recommend using Vagrant to manage multiple versions of the Sandbox. Follow my tutorial here to set that up: HCC Article
This tutorial was tested using the following environment and components:
We need to download Elasticsearch. The current version is 2.4.0. You can read more about Elasticsearch here: Elasticsearch Website
You can use curl to download Elasticsearch to your sandbox.
$ cd ~ $ curl -O https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2...
Next we need to extract Elasticsearch to the /opt directory, which is where we'll run it.
$ cd /opt $ sudo tar cvfz ~/elasticsearch-2.4.0.tar.gz
We need to make a couple of changes to the Elasticsearch configuration file /opt/elasticsearch-2.4.0/config/elastiserach.yml.
$ cd config $ vi elasticsearch.yml
We need to set the cluster.name setting to "elasticsearch". This is the default Zeppelin expects, however you can change this value in the Zeppelin configuration.
cluster.name: elasticsearch
We need to set the network.host setting to our sandbox hostname or ip. Elastic will default to binding to 127.0.0.1 which won't allow us to easily access it from outside of the sandbox.
network.host: sandbox.hortonworks.com
Make sure you have removed the # character at the start of the line for these two settings. Once you have completed these two changes, save the file:
Press the esc key !wq
We are going to create an elastic user to run the application.
$ sudo useradd elastic -d /home/elastic
We are going to change the ownership of the elastic directories to the elastic user:
$ sudo chown -R elastic:elastsic /opt/elasticserach-2.4.0
We want to run Elasticsearch as the elastic user so first we'll switch to that user.
$ sudo su - elastic
We want to run Elasticsearch as the elastic user so first we'll switch to that user.
$ cd /opt/elasticsearch-2.4.0 $ bin/elasticsearch
You will see something similar to :
$ bin/elasticsearch [2016-09-02 19:44:34,905][WARN ][bootstrap ] unable to install syscall filter: seccomp unavailable: CONFIG_SECCOMP not compiled into kernel, CONFIG_SECCOMP and CONFIG_SECCOMP_FILTER are needed [2016-09-02 19:44:35,168][INFO ][node ] [Skyhawk] version[2.4.0], pid[22983], build[ce9f0c7/2016-08-29T09:14:17Z] [2016-09-02 19:44:35,168][INFO ][node ] [Skyhawk] initializing ... [2016-09-02 19:44:35,807][INFO ][plugins ] [Skyhawk] modules [lang-groovy, reindex, lang-expression], plugins [], sites [] [2016-09-02 19:44:35,856][INFO ][env ] [Skyhawk] using [1] data paths, mounts [[/ (/dev/mapper/vg_sandbox-lv_root)]], net usable_space [26.2gb], net total_space [42.6gb], spins? [possibly], types [ext4] [2016-09-02 19:44:35,856][INFO ][env ] [Skyhawk] heap size [990.7mb], compressed ordinary object pointers [true] [2016-09-02 19:44:35,856][WARN ][env ] [Skyhawk] max file descriptors [4096] for elasticsearch process likely too low, consider increasing to at least [65536] [2016-09-02 19:44:38,032][INFO ][node ] [Skyhawk] initialized [2016-09-02 19:44:38,032][INFO ][node ] [Skyhawk] starting ... [2016-09-02 19:44:38,115][INFO ][transport ] [Skyhawk] publish_address {172.28.128.4:9300}, bound_addresses {172.28.128.4:9300} [2016-09-02 19:44:38,119][INFO ][discovery ] [Skyhawk] elasticsearch/31d3OvlZT5WRnqYUW-GJwA [2016-09-02 19:44:41,157][INFO ][cluster.service ] [Skyhawk] new_master {Skyhawk}{31d3OvlZT5WRnqYUW-GJwA}{172.28.128.4}{172.28.128.4:9300}, reason: zen-disco-join(elected_as_master, [0] joins received) [2016-09-02 19:44:41,206][INFO ][http ] [Skyhawk] publish_address {172.28.128.4:9200}, bound_addresses {172.28.128.4:9200} [2016-09-02 19:44:41,207][INFO ][node ] [Skyhawk] started [2016-09-02 19:44:41,223][INFO ][gateway ] [Skyhawk] recovered [0] indices into cluster_state
Using your web browser, verify you get a response from Elasticsearch by using the following address: http://sandbox.hortonworks.com:9200/
You should see something similar to:
Alternatively, you can use curl:
$ curl -XGET http://sandbox.hortonworks.com:9200
You will see a similar json output message:
$ curl -XGET http://sandbox.hortonworks.com:9200 { "name" : "Echo", "cluster_name" : "elasticsearch", "version" : { "number" : "2.4.0", "build_hash" : "ce9f0c7394dee074091dd1bc4e9469251181fc55", "build_timestamp" : "2016-08-29T09:14:17Z", "build_snapshot" : false, "lucene_version" : "5.5.2" }, "tagline" : "You Know, for Search" }
NiFi 1.0.0 requires JDK 1.8. You can install it on the sandbox using:
$ sudo yum install java-1.8.0-openjdk $ sudo yum install java-1.8.0-openjdk-devel
We need to download NiFi. The latest version is 1.0.0. You can read more about NiFi here: NiFi Website
You can use curl to download NiFi to your sandbox.
$ cd ~ $ curl -O http://mirrors.ibiblio.org/apache/nifi/1.0.0/nifi-1.0.0-bin.tar.gz
Note: You may want to use a mirror location closest to you by visiting: Apache Mirrors
We need to extract NiFi to /opt directory, which is where we'll run it.
$ cd /opt $ sudo tar xvfz ~/nifi-1.0.0-bin.tar.gz
We need to change the web port of NiFi. The default port is 8080, which will conflict with Ambari. We will change the port to 9090.
$ cd nifi-1.0.0/conf $ vi nifi.properties
Edit the nifi.web.http.port property to change the default port.
nifi.web.http.port=9090
Save the file:
Press the esc key !wq
We are going to create a nifi user to run the application.
$ sudo useradd nifi -d /home/nifi
We are going to change the ownership of the nifi directories to the nifi user:
$ sudo chown -R nifi:nifi /opt/nifi-1.0.0
We want to run NiFi as the nifi user so first we'll switch to that user.
$ sudo su - nifi $ cd /opt/nifi-1.0.0 $ bin/nifi.sh start
You should see something similar to this:
$ ./nifi.sh start Java home: /usr/lib/jvm/java NiFi home: /opt/nifi-1.0.0 Bootstrap Config File: /opt/nifi-1.0.0/conf/bootstrap.conf 2016-09-03 02:45:47,909 INFO [main] org.apache.nifi.bootstrap.Command Starting Apache NiFi... 2016-09-03 02:45:47,909 INFO [main] org.apache.nifi.bootstrap.Command Working Directory: /opt/nifi-1.0.0 2016-09-03 02:45:47,909 INFO [main] org.apache.nifi.bootstrap.Command Command: /usr/lib/jvm/java/bin/java -classpath /opt/nifi-1.0.0/./conf:/opt/nifi-1.0.0/./lib/jul-to-slf4j-1.7.12.jar:/opt/nifi-1.0.0/./lib/nifi-documentation-1.0.0.jar:/opt/nifi-1.0.0/./lib/logback-core-1.1.3.jar:/opt/nifi-1.0.0/./lib/nifi-runtime-1.0.0.jar:/opt/nifi-1.0.0/./lib/slf4j-api-1.7.12.jar:/opt/nifi-1.0.0/./lib/nifi-properties-loader-1.0.0.jar:/opt/nifi-1.0.0/./lib/jcl-over-slf4j-1.7.12.jar:/opt/nifi-1.0.0/./lib/log4j-over-slf4j-1.7.12.jar:/opt/nifi-1.0.0/./lib/bcprov-jdk15on-1.54.jar:/opt/nifi-1.0.0/./lib/nifi-framework-api-1.0.0.jar:/opt/nifi-1.0.0/./lib/nifi-nar-utils-1.0.0.jar:/opt/nifi-1.0.0/./lib/nifi-properties-1.0.0.jar:/opt/nifi-1.0.0/./lib/nifi-api-1.0.0.jar:/opt/nifi-1.0.0/./lib/commons-lang3-3.4.jar:/opt/nifi-1.0.0/./lib/logback-classic-1.1.3.jar -Dorg.apache.jasper.compiler.disablejsr199=true -Xmx512m -Xms512m -Dsun.net.http.allowRestrictedHeaders=true -Djava.net.preferIPv4Stack=true -Djava.awt.headless=true -XX:+UseG1GC -Djava.protocol.handler.pkgs=sun.net.www.protocol -Dnifi.properties.file.path=/opt/nifi-1.0.0/./conf/nifi.properties -Dnifi.bootstrap.listen.port=43343 -Dapp=NiFi -Dorg.apache.nifi.bootstrap.config.log.dir=/opt/nifi-1.0.0/logs org.apache.nifi.NiFi
Note: It will take a couple of minutes for NiFi to start up.
The NiFi web UI should be available via http://sandbox.hortonworks.com:9090/nifi. You should see the default canvas:
You add processors to NiFi by dragging the Processor icon from the left icon bar to the canvas. This screenshot shows the Processor icon:
Once you drag the icon to the canvas, you will see the Add Processor dialog box. This screenshot shows the dialog box:
Drag the Processor icon to the canvas. In the Add Processor dialog, enter "twitter" in the Filter box. This will filter the list of processors to show you matching processor names. You should see something similar to this:
Select the GetTwitter processor and click the ADD button. This will add the processor to the canvas. Your canvas should look similar to this:
Right click on the GetTwitter processor to display the context menu. It should look similar to this:
Click the Configure menu option. This will display the Configure Processor dialog box. It should look similar to this:
Click the Properties tab. You should set the following settings:
Once the settings are correct, click the APPLY button.
Drag the Processor icon to the canvas. In the Add Processor dialog, enter "elastic" in the Filter box. This will filter the list of processors to show you matching processor names. You should see something similar to this:
Select the PutElasticsearch processor and click the ADD button. This will add the processor to the canvas. Your canvas should look similar to this:
Right click on the PutElasticsearch processor to display the context menu. It should look similar to this:
Click the Configure menu option. This will display the Configure Processor dialog box. It should look similar to this:
Under Auto Terminate Relationships, check the success box. The failure and retry boxes should not be checked and will be handled via connection later.
Now click the Properties tab. You should see something similar to this:
Set the following settings:
Once the settings are correct, click the APPLY button.
Now we need to create a connector between our processors. Hover over the GetTwitter processor. You should see a dark circle with a white arrow in the middle. Drag this icon down over top of the PutElasticearch processor. The Create Connection dialog should open. It should look similar to this:
You can click the ADD button, as you don' t need to make any changes.
Now hover over the PutElasticsearch processor. Drag the arrow icon out past the PutElasticsearch processor, then back over top of it and release. This will display the Create Connection dialog. This connection is needed for fail or retry operations. Select the failure and retry options under Relationships. It should look similar to this:
Click the ADD button.
Right click on the GetTwitter processor to display the context menu. Click the Start option. This will start the processor. Right click on the PutElasticsearch processor to display the context menu. Click the Start option. This will start the processor.
The two processors should be running. You should see something similar to this:
You can verify that tweets are being written to elasticsearch by typing the following in a browser window:
http://sandobx.hortonworks.com:9200/twitter/_search?pretty
You should see something similar to this:
Now you can create a Zeppelin notebook to query Elasticsearch. If you follow the Zeppelin article I linked in the Prerequisites, you should be able to use the %elasticsearch interpreter. Here is an example dashboard I created against Twitter data.
You should notice that I'm using Elasticsearch DSL to run aggregation queries agains the data. Aggregations are ideally suited for charts and graphs.
If you see a red icon in the upper right of your processors, that indicates there is a problem. If you hover over the icon, you should see relevant error message information. Here is an example screenshot where I incorrectly set my port to 9200 instead of 9300 on the PutElasticsearch processor:
This tutorial walked you through installing Apache NiFi and Elasticsearch. You made the necessary configuration changes so that NiFi and Elasticsearch would run. You created a NiFi workflow using the GetTwitter and PutElasticsearch processors. The processors should have successfully pulled data from Twitter and pushed data to Elasticsearch. Finally, you should have been able to query Elasticsearch using Zeppelin.