Created on 09-22-2016 01:51 PM - edited 08-17-2019 09:51 AM
This tutorial is intended to walk you through the process of using the GetTwitter, UpdateAttribute, ReplaceText and PutElasticsearch processors in Apache NiFi to modify Twitter JSON data before sending it to Elasticsearch.
This tutorial is the first part of a two part series. In this part of the series, we will create a single data flow that adds an additional field to the JSON data called twitterFilterAttribute using the ReplaceText processor. This will allow us to query Elasticsearch using a fielded query like q=twitterFilterAttribute:elasticsearch. The second part of the series will build on this example to create a process group with two GetTwitter feeds: one with elasticsearch term filter and the other with a solr term filter.
Admittedly this is a contrived example. However, the concept is fundamentally useful across a variety of NiFi use cases.
NOTE: The only required software components are NiFi and Elasticsearch which can be run in just about any linux environment. However I recommend deploying these as part of your HDP sandbox or test cluster allowing for a broader integration of tools and capabilities such as Pig, Hive, Zeppelin, etc.
This tutorial was tested using the following environment and components:
If your NiFi workflow from the previous tutorial is running, then you should stop your GetTwitter and PutElasticsearch processors. Your NiFi data flow should look something similar to this:
NOTE: my processors are running in this screen shot:
 
As you can see, we have two NiFi processors. This is a very simple data flow.
Before we add the UpdateAttribute processor, we are going to remove the connection between the GetTwitter and PutElasticsearch processors. Click on the connection between the two processors to select the connection. Now press the delete or backspace key to delete the connection.
NOTE: You must have both processors stopped before you can delete the connection.
You should now see something similar to this:
 
Now we are going to add the UpdateAttribute processor. Drag the processor icon from the NiFi menu bar to the data flow canvas. You will see the Add Processor dialog. Type updateattr in the filter box to filter the list of processors. You should something similar to this:
 
Select the UpdateAttribute processor and click the ADD button. You should see something similar to this:
 
Rearrange the processors on the canvas to make it easier to follow/trace connections later. You should have something similar to this:
 
We are now going to configure the UpdateAttribute processor. Right click on the UpdateAttribute processor and select the Configure menu option. Click on the PROPERTIES tab. You should see something similar to this:
 
We are going to add a new property. Click on the + (plus) icon. You should see something similar to this:
 
For the Property Name, enter twitterFilterAttribute. This will add a property called twitterFilterAttribute to the flow files coming through this processor. Now click the OK button and you should see something similar to this:
 
For the Value, enter elasticsearch. This is the value that will be added to the twitterFilterAttribute property. Now click the OK button and then the APPLY button.
We need to add a connection between the GetTwitter and UpdateAttribute processors. You do this by hovering over the GetTwitter processor until you see the circle-arrow icon. Drag the icon to the UpdateAttribute processor. You should see something similar to this:
 
You do not need to change any settings here. Click the ADD button to add the connection.
We are now going to add the ReplaceText processor. Drag the processor icon from the NiFi menu bar to the data flow canvas. You will see the Add Processor dialog. Type replace in the filter box to filter the list of processors. You should something similar to this:
 
Select the ReplaceText processor and click the ADD button.
We are now going to configure the ReplaceText processor. We want to change the JSON message data and we are going to do it using Regular Expressions, which is enabled with the ReplaceText processor. You can read more on regular expressions here: Wikipedia.
Here is what the message looks like coming in:
{
 ...
 "filter_level" : "low",
 "lang" : "en",
 "timestamp_ms" : "1473786418611"
}
Here is what the message should look like going out:
{
 ...
 "filter_level" : "low",
 "lang" : "en",
 "timestamp_ms" : "1473786418611",
 "twitterFilterAttribute"" : "elasticsearch"
}
We need to add a , and our new field twitterFilterAttribute with our value after the last entry in the JSON, but before the last } character.
Right click on the ReplaceText processor and select the Configure menu option. Click on the PROPERTIES tab. You should see something similar to this:
 
We need to change the Search Value and Replacement Value settings. Click on the Value box for the Search Value line. You should see something similar to this:
 
The value in this box is a regular expression. We are going to replace the entire value with:
(?s:(^.*)}$)
This regular expression looks for anything from the beginning of the line to a } character at the end of the line. Any matches it finds is put into a regular expression group by the () characters. We are looking for a } at the end of the line because that is the last part of the Twitter JSON message data. You will notice that we don't include the } in the () group. This is because we need to add a value before the closing } which we'll do in the Replacement Value section.
This regular expression will match everything up to the last line of the incoming message data:
... "timestamp_ms" : "1473786418611" }
Once you have entered the regular expression, click the OK button. Now we are going to change the Replacement Value setting. Click on the Value box for the Replacement Value line. You should see something similar to this:
 
The value in this box is a regular expression group. We are going to replace the entire value with:
$1,"twitterFilterAttribute":"${twitterFilterAttribute}"}This will replace the entire text of the incoming data with the first matching group, which is all of the JSON twitter text without the last }. We then add a , because each JSON node needs to be separated by a comma. The "twitterFilterAttribute" text is a literal string. The ${} in the second part of that string is NiFi Expression Language. This adds the value of the attribute twitterFilterAttribute to the string.
Once you have entered the regular expression, click the OK button. You should see something similar to this:
 
You don't need to change any other settings. Click the APPLY button.
NOTE: Be careful using copy/paste as sometimes smart quotes will be inserted instead of standard quotes. This will cause Elasticsearch JSON parsing to have issues.
We need to add a connection between the UpdateAttribute and ReplaceText processors. The process is the same as before. You do this by hovering over the UpdateAttribute processor until you see the circle-arrow icon. Drag the icon to the ReplaceText processor. You should see something similar to this:
 
You do not need to change any settings here. Click the ADD button to add the connection.
We need to add a connection between the ReplaceText and PutElasticsearch processors. The process is similar to before. You do this by hovering over the ReplaceText processor until you see the circle-arrow icon. Drag the icon to the PutElasticsearch processor. You should see something similar to this:
 
You should notice this dialog doesn't look exactly the same as before. The For Relationships gives you both success and failure options. The last two times we did this, you only have the success option. For this connection, we are going to check the success box. You do not need to change any other settings here. Click the ADD button to add the connection.
Now we need to go back to the ReplaceText processor and make a change. You should notice a red triangle icon on this processor. That is because there is a failure relationship that we haven't handled. Right click on the processor and click the Configure option. Click the SETTINGS tab. For the Auto Terminate Relationships setting, check the failure option. You should see something similar to this:
 
This setting will drop any records where the ReplaceText processor was not successful. The connection to PutElasticsearch only accepts the successful replacement attempts. Click the APPLY button to save the settings.
Your final data flow should look similar to this:
 
Now we can turn on all of our processors to make sure everything works. Make sure you have started Elasticsearch.
You can select all of the processors by pressing the CMD-A (CTRL-A if you are on Windows) keys. You should see something similar to this:
 
Then you can click the play arrow icon to start the flow.
Now we should be able to query Elasticsearch and verify the new field exists. You can type the following into a browser window to query Elasticsearch:
http://sandbox.hortonworks.com:9200/twitter/default/_search?q=twitterFilterAttribute:elasticsearch&p...
You should get results from Elasticsearch using this query.
If you do not get any results when querying Elasticsearch, verify the query above. With the default schema, it may be case sensitive. In other words, twitterFilterAttribute is not the same as twitterfilterattribute.
If you experience any errors writing to Elasticsearch, the problem is likely one of two things: 1) you have not started Elasticsearch or 2) you have a copy/paste issue with smart quotes in your ReplaceText processor settings. Here are the kinds of messages you may to see if you have a smart quote issue:
,"twitterFilterAttribute":"elasticsearch”} ]}
MapperParsingException[failed to parse]; nested: JsonEOFException[Unexpected end-of-input in VALUE
_STRING
 at [Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@681b0c9a; line: 1, column: 4443]];
    at org.elasticsearch.index.mapper.DocumentParser.parseDocument(DocumentParser.java:156)
We have modified our existing, simple data flow to a create a new field in our Twitter JSON data. This field is being captured in Elasticsearch as twitterFilterAttribute and allows us to query Elasticsearch based on the values stored in this field.
Look for the next article in the series which will use process groups in NiFi with multiple Twitter streams using different filters and values for twitterFilterAttribute being written to Elasticsearch.
Created on 08-30-2018 09:00 PM
I have similar case, however, Elasticsearch understanded timestamp_ms as string.
Do you know how to fix it ???
Many thanks
