This tutorial is intended to walk you through the process of creating a Process Group in NiFi to feed multiple Twitter streams to Elasticsearch.
This tutorial is the second part of a two part series. The first part can be found here: HCC Article. In this part of the series, we will create a process group which contains multiple Twitter feeds funneled to a single Elasticsearch instance. This allows you to have multiple feeds of data with different processing needs prior to pushing to Elasticsearch. We will be able to query Elasticsearch to see data from both of our example streams.
Admittedly this is a contrived example. However, the concept is fundamentally useful across a variety of NiFi use cases.
NOTE: The only required software components are NiFi and Elasticsearch which can be run in just about any Linux environment. However I recommend deploying these as part of your HDP sandbox or test cluster allowing for a broader integration of tools and capabilities such as Pig, Hive, Zeppelin, etc.
This tutorial was tested using the following environment and components:
We are picking up from where the last tutorial left off. We currently have a single dataflow. Our GetTwitter processor combines multiple filters. We would like to clearly define two GetTwitter processors, each with their own filter.
This is what our current data flow looks like:
The first thing we are going to do is to add a Process Group to our NiFi canvas. To do this, drag the Process Group icon from the menu bar to the canvas area. Here is a screenshot showing the Process Group icon:
Once you drag the process group icon to the canvas, the Add Process Group dialog will be displayed. It should look similar to this:
Give the process a group a meaningful name. In our case, we will call it Twitter Feed
. Click the ADD
button. The process group will be added to the canvas. You should see something similar to this:
You should drag the process group so that it is easier to see. You should see something similar to this:
Now we want to copy our existing flow to the process group we just created. Select all 4 of the processors in our current flow. Press the COMMAND-C (or CTRL-C on Windows) to copy the selected components. Now double click on the Twitter Feed process group. This will open the process group. You should see something similar to this:
Notice the canvas is now blank? You should also notice the bread crumb navigation in the lower left of the screen. NiFi Flow >> Twitter Feed
is your indication that you are inside the process group.
Now we can paste our copied flow files on the canvas. You should be able to press the COMMAND-V (CTRL-V on Windows) to copy our flow to the process group. You should see something similar to this:
You should see the 4 processors copied to the canvas. You should also notice the connections are missing. We need to reestablish the connections. Before doing that, we are going to delete the PutElasticsearch processor. It already exists outside of the process group and we don't need a copy inside.
Inside of your processor group (missing connections and bread crumb in lower left will confirm this), select the PutElasticsearch processor by clicking on it. Now you can delete it by pressing the delete key. You should see something similar to this:
Now we are going to create connections between the 3 processors. Drag the circle arrow icon from the GetTwitter processor to the UpdateAttribute processor. You don't need to change anything; click the ADD button. Drag the circle arrow icon from the UpdateAttribute processor to the ReplaceText processor. You don't need to change anything; click the ADD button. You should see something similar to this:
You should notice a red triangle in the upper left of the ReplaceText processor. That is because we haven't connected it to anything yet. We'll get to that shortly.
This first dataflow will be for our elasticsearch
related tweets. We need to edit the GetTwitter processor to filter only on elasticsearch
. Right click on the processor and select configure
. Click the PROPERTIES
tab. Click on the Terms to Filter On
value field to edit the value. Enter elasticearch
as the only term. Click the OK
button to save the change. You should see something similar to this:
Click the APPLY
button to save the change.
We need a similar data flow within this process group. The second data flow should be filtering on the term solr
. To do that, select all 3 processors and press the COMMAND-C keys. Now press the COMMND-V keys to paste a copy of the processors. You should see something similar to this:
The processors you copied should still be selected. Let's move them so it's easier to see the two flows. Drag the selected processors to the right. You should see something similar to this:
We need to edit the GetTwitter processor for the second data flow. Follow the same procedure we did the first time, only this time use the term solr
. You should have something that looks like this:
As we did before, create the connections between the processors in the second flow. Drag the circle arrow icon from the GetTwitter processor to the UpdateAttribute processor. You don't need to change anything; click the ADD
button. Drag the circle arrow icon from the UpdateAttribute processor to the ReplaceText processor. You don't need to change anything; click the ADD
button. You should see something similar to this:
We need the data flow from this process group to be sent outside of the group to enable connections to our Elasticsearch processor. To enable this, we are going to add an Output Port. Drag the Output Port icon from the menu bar to the canvas area. Here is a screenshot showing the Output Port icon:
An Add Port dialog should be displayed. You should see something similar to this:
This is a user-friendly name for the port that will be created. We'll call our port From Twitter Feed
. Click the ADD
button to add the port. You should see something similar to this:
You should notice a red triangle in the upper left of our From Twitter Feed
Output Port. This because there is no connection defined yet.
Now we need to create a connection from each of the ReplaceText processors to the Output Port. To do this, drag the circle arrow icon from the ReplaceText processor the Output Port. A Create Connection dialog will be displayed. Select the success
relationship. Click the ADD
button to create the connection. Do this for both ReplaceText processors. Now you should see something similar to this:
Now we are ready to create the connection between our Process Group and our PutElasticsearch processor. Using the bread crumb navigation in the lower left, click on the NiFi Flow
link to go up a level.
You should see something similar to this:
We no longer need the GetTwitter, UpdateAttribute and ReplaceText processors on the main canvas. Select each of the connections between the processors and delete the connections with the delete key. You should see something similar to this:
Now delete the GetTwitter, UpdateAttribute and ReplaceText processors from the main canvas. We want to keep the PutElasticsearch processor. You should see something similar to this:
Create a connection between the process group and the PutElasticsearch processor by dragging the circle arrow icon from the process group to the PutElasticsearch processor. A Create Connection dialog will be displayed. You don't need to change any options, so click the ADD
button to create the connection. You should have something that looks similar to this:
If you look inside your process group now, you should notice the red triangle is gone for the Output Port. That is because a connection exists now.
Now we can start our processes to test our flow. If you click on the processor group and then click the start arrow icon, that will start all of the processors inside the processor group. You should notice the start arrow icon in the processor group goes from 0 to 7 and the stop square icon goes from 7 to 0.
Because we are filtering on specific terms, it may take 20 or 30 minutes before any matching tweets are pulled in. Be patient. Once tweets start coming in you should see something similar to this:
You should notice the tweets are queuing up. We have not yet started our PutElasticsearch processor. Go ahead and do that now. Click on the PutElasticsearch process and click on the start arrow icon. You should see something similar to this:
You should noticed the queued tweets have been processed and are now in Elasticsearch.
We can now query Elasticsearch using the custom field we created, twitterFilterAttribute
. If you let the data flow run log enough, you should have at least a few tweets for each GetTwitter processor.
In your broswer window, query Elasticsearch using the following http://sandbox.hortonworks.com:9200/twitter_new/_search?pretty
. You should see something similar to this:
<code>{ "took" : 26, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "failed" : 0 }, "hits" : { "total" : 252831, "max_score" : 1.0, "hits" : [ { "_index" : "twitter_new", "_type" : "default", "_id" : "0827ce3c-21ab-4dfa-9d17-0ba90c116142", "_score" : 1.0, "_source" : { "created_at" : "Thu Sep 15 13:56:06 +0000 2016", "id" : 776419323955048448, "id_str" : "776419323955048448", "text" : "RT @cymia: I have the biggest heart I swear.", "source" : "<a href=\"http://twitter.com/download/android\" rel=\"nofollow\">Twitter for Android</a>", "truncated" : false, "in_reply_to_status_id" : null, "in_reply_to_status_id_str" : null, "in_reply_to_user_id" : null, "in_reply_to_user_id_str" : null, "in_reply_to_screen_name" : null, "user" : { "id" : 997413793, "id_str" : "997413793", "name" : "#######", "screen_name" : "#######", "location" : "Future In The Present", "url" : null, "description" : "~~_Trust No Bitch⚔ Y'all Opinions Doesn't Define Who I Am :sparkles: C/o '17 \uD83D\uDC10\uD83C\uDF93", "protected" : false, "verified" : false, "followers_count" : 453, "friends_count" : 391, "listed_count" : 0, "favourites_count" : 1448, "statuses_count" : 6803, "created_at" : "Sat Dec 08 15:41:26 +0000 2012", "utc_offset" : -14400, "time_zone" : "#######", "geo_enabled" : false, "lang" : "#######", "contributors_enabled" : false, "is_translator" : false, "profile_background_color" : "BADFCD", "profile_background_image_url" : "#######", "profile_background_image_url_https" : "#######", "profile_background_tile" : false, "profile_link_color" : "FF0000", "profile_sidebar_border_color" : "F2E195", "profile_sidebar_fill_color" : "FFF7CC", "profile_text_color" : "0C3E53", "profile_use_background_image" : true, "profile_image_url" : "#######", "profile_image_url_https" : "#######", "profile_banner_url" : "#######", "default_profile" : false, "default_profile_image" : false, "following" : null, "follow_request_sent" : null, "notifications" : null }, ...
You should notice that you should have a large number of tweets. In my case I have 252831
. Now let's query against our new field. In your browser, enter the following http://sandbox.hortonworks.com:9200/twitter_new/_search?q=twitterFilterAttribute:elasticsearch&prett...
. You should get a much smaller number of tweets. In my case I got 2
documents back. Here is my output:
<code>{ "took" : 142, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "failed" : 0 }, "hits" : { "total" : 2, "max_score" : 12.341856, "hits" : [ { "_index" : "twitter_new", "_type" : "default", "_id" : "a4226ef1-5bfe-4aff-84aa-dd357e874356", "_score" : 12.341856, "_source" : { "created_at" : "Tue Sep 27 21:56:45 +0000 2016", "id" : 780888938483425288, "id_str" : "780888938483425288", "text" : "Build a Search Engine with Node.js and Elasticsearch#######", "source" : "<a href=\"http://twitter.com\" rel=\"nofollow\">Twitter Web Client</a>", "truncated" : false, "in_reply_to_status_id" : null, "in_reply_to_status_id_str" : null, "in_reply_to_user_id" : null, "in_reply_to_user_id_str" : null, "in_reply_to_screen_name" : null, "user" : { "id" : 35983221, "id_str" : "35983221", "name" : "#######", "screen_name" : "#######", "location" : "#######", "url" : "#######", "description" : "Full stack web developer (PHP, Java, Rails), Docker enthusiast, gamer, also addicted to eletronic, photography and technology.", "protected" : false, "verified" : false, "followers_count" : 398, "friends_count" : 660, "listed_count" : 84, "favourites_count" : 348, "statuses_count" : 2532, "created_at" : "Tue Apr 28 04:04:57 +0000 2009", "utc_offset" : -14400, "time_zone" : "#######", "geo_enabled" : true, "lang" : "#######", "contributors_enabled" : false, "is_translator" : false, "profile_background_color" : "5D7382", "profile_background_image_url" : "#######", "profile_background_image_url_https" : "#######", "profile_background_tile" : false, "profile_link_color" : "CC0000", "profile_sidebar_border_color" : "000000", "profile_sidebar_fill_color" : "EFEFEF", "profile_text_color" : "333333", "profile_use_background_image" : true, "profile_image_url" : "#######", "profile_image_url_https" : "https://pbs.twimg.com/profile_images/760303804092968965/9mekDmQy_normal.jpg", "profile_banner_url" : "#######", "default_profile" : false, "default_profile_image" : false, "following" : null, "follow_request_sent" : null, "notifications" : null }, "geo" : null, "coordinates" : null, "place" : null, "contributors" : null, "is_quote_status" : false, "retweet_count" : 0, "favorite_count" : 0, "entities" : { "hashtags" : [ ], "urls" : [ { "url" : "#######", "expanded_url" : "#######", "display_url" : "sitepoint.com/search-engine-…", "indices" : [ 53, 76 ] } ], "user_mentions" : [ ], "symbols" : [ ] }, "favorited" : false, "retweeted" : false, "possibly_sensitive" : false, "filter_level" : "low", "lang" : "en", "timestamp_ms" : "1475013405806", "twitterFilterAttribute" : "elasticsearch" } ...
Notice the new field is present in the data and it contains elasticsearch
as the value? Now let's query for solr. Type the following in your browser http://sandbox.hortonworks.com:9200/twitter_new/_search?q=twitterFilterAttribute:solr&pretty
. You should get a similarly small number of results. In my case I got 2
documents returned. Here is my output:
<code>{ "took" : 28, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "failed" : 0 }, "hits" : { "total" : 2, "max_score" : 12.341865, "hits" : [ { "_index" : "twitter_new", "_type" : "default", "_id" : "b8b618db-99d8-4ac8-910b-84a20fa58396", "_score" : 12.341865, "_source" : { "created_at" : "Tue Sep 27 21:56:15 +0000 2016", "id" : 780888813157711872, "id_str" : "780888813157711872", "text" : "RT @shalinmangar: #Docker image for @ApacheSolr 6.2.1 is now available. https://t.co/lrakkMMhJn #solr", "source" : "<a href=\"http://twitter.com\" rel=\"nofollow\">Twitter Web Client</a>", "truncated" : false, "in_reply_to_status_id" : null, "in_reply_to_status_id_str" : null, "in_reply_to_user_id" : null, "in_reply_to_user_id_str" : null, "in_reply_to_screen_name" : null, "user" : { "id" : 362698158, "id_str" : "362698158", "name" : "#######", "screen_name" : "#######", "location" : "#######", "url" : "#######", "description" : "Digital Photography, Information Retrieval, Data Warehousing, Big Data, Cloud Computing. Solutions Engineer @ Hortonworks.", "protected" : false, "verified" : false, "followers_count" : 310, "friends_count" : 732, "listed_count" : 48, "favourites_count" : 108, "statuses_count" : 5003, "created_at" : "Fri Aug 26 20:53:04 +0000 2011", "utc_offset" : null, "time_zone" : null, "geo_enabled" : true, "lang" : "en", "contributors_enabled" : false, "is_translator" : false, "profile_background_color" : "C6E2EE", "profile_background_image_url" : "#######", "profile_background_image_url_https" : "#######", "profile_background_tile" : false, "profile_link_color" : "1B95E0", "profile_sidebar_border_color" : "C6E2EE", "profile_sidebar_fill_color" : "DAECF4", "profile_text_color" : "663B12", "profile_use_background_image" : true, "profile_image_url" : "#######", "profile_image_url_https" : "#######", "profile_banner_url" : "#######, "default_profile" : false, "default_profile_image" : false, "following" : null, "follow_request_sent" : null, "notifications" : null }, "geo" : null, "coordinates" : null, "place" : null, "contributors" : null, "retweeted_status" : { "created_at" : "Tue Sep 27 12:45:52 +0000 2016", "id" : 780750304060899328, "id_str" : "780750304060899328", "text" : "#Docker image for @ApacheSolr 6.2.1 is now available. https://t.co/lrakkMMhJn #solr", "source" : "<a href=\"http://twitter.com\" rel=\"nofollow\">Twitter Web Client</a>", "truncated" : false, "in_reply_to_status_id" : null, "in_reply_to_status_id_str" : null, "in_reply_to_user_id" : null, "in_reply_to_user_id_str" : null, "in_reply_to_screen_name" : null, "user" : { "id" : 7057932, "id_str" : "7057932", "name" : "#######", "screen_name" : "#######", "location" : "#######", "url" : "#######", "description" : "Engineer at Lucidworks, Committer on Apache Lucene/Solr, ex-AOLer", "protected" : false, "verified" : false, "followers_count" : 1431, "friends_count" : 388, "listed_count" : 106, "favourites_count" : 903, "statuses_count" : 3758, "created_at" : "Sun Jun 24 22:50:00 +0000 2007", "utc_offset" : 19800, "time_zone" : "New Delhi", "geo_enabled" : true, "lang" : "en", "contributors_enabled" : false, "is_translator" : false, "profile_background_color" : "EDECE9", "profile_background_image_url" : "#######", "profile_background_image_url_https" : "#######", "profile_background_tile" : false, "profile_link_color" : "088253", "profile_sidebar_border_color" : "D3D2CF", "profile_sidebar_fill_color" : "E3E2DE", "profile_text_color" : "634047", "profile_use_background_image" : false, "profile_image_url" : "#######", "profile_image_url_https" : "#######", "default_profile" : false, "default_profile_image" : false, "following" : null, "follow_request_sent" : null, "notifications" : null }, "geo" : null, "coordinates" : null, "place" : null, "contributors" : null, "is_quote_status" : false, "retweet_count" : 8, "favorite_count" : 9, "entities" : { "hashtags" : [ { "text" : "Docker", "indices" : [ 0, 7 ] }, { "text" : "solr", "indices" : [ 78, 83 ] } ], "urls" : [ { "url" : "#######", "expanded_url" : "#######", "display_url" : "#######", "indices" : [ 54, 77 ] } ], "user_mentions" : [ { "screen_name" : "ApacheSolr", "name" : "Apache Solr", "id" : 22742048, "id_str" : "22742048", "indices" : [ 18, 29 ] } ], "symbols" : [ ] }, "favorited" : false, "retweeted" : false, "possibly_sensitive" : false, "filter_level" : "low", "lang" : "en" }, "is_quote_status" : false, "retweet_count" : 0, "favorite_count" : 0, "entities" : { "hashtags" : [ { "text" : "Docker", "indices" : [ 18, 25 ] }, { "text" : "solr", "indices" : [ 96, 101 ] } ], "urls" : [ { "url" : "#######", "expanded_url" : "#######", "display_url" : "h#######", "indices" : [ 72, 95 ] } ], "user_mentions" : [ { "screen_name" : "shalinmangar", "name" : "Shalin Mangar", "id" : 7057932, "id_str" : "7057932", "indices" : [ 3, 16 ] }, { "screen_name" : "ApacheSolr", "name" : "Apache Solr", "id" : 22742048, "id_str" : "22742048", "indices" : [ 36, 47 ] } ], "symbols" : [ ] }, "favorited" : false, "retweeted" : false, "possibly_sensitive" : false, "filter_level" : "low", "lang" : "en", "timestamp_ms" : "1475013375926", "twitterFilterAttribute" : "solr" } ...
Look for the twitterFilterAttribute
field. You should see it has the value solr
.
If you were able to successfully work through the tutorial, you should have a good understanding how to create multiple flows within a process group and how to feed that data to an output port. In this tutorial, we created 2 feeds for different Twitter filters which added a new field called twitterFilterAttribute
to the Twitter JSON data. This field is now searchable within Elasticsearch to easily filter sources of data using a single index.
For next steps, you could try using the RouteOnAttribute processor to direct the flow to different Elasticsearch processors which write to different indexes.
Created on 12-01-2017 05:55 AM
whats the difference between, when i entering (elasticsearch,solr ) in Terms to Filter On inGET TWITTER processor and this article for creating a process group