Member since
07-30-2019
16
Posts
36
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1352 | 02-22-2017 01:16 PM |
03-31-2022
03:25 PM
1 Kudo
Intro
Online shopping is on the rise as more of us stay at home and let our credit cards do the walking. Keeping pace with that trend is an unfortunate increase in credit card fraud.
It’s no surprise, really. According to Forbes, online fraud has been a growing problem for the past few years. And now, as consumers and businesses adapt to the worldwide pandemic and make more credit card transactions in the card-not-present (CNP) space, the resulting uptick in online shopping and e-commerce has opened up an even bigger playground for fraudsters to try out new tricks.
Fraud detection has been a major issue for financial services and institutions. But artificial intelligence has an enormous potential to reduce financial fraud. Artificial intelligence applications have a great potential to detect and prevent fraud.
Therefore, we will start a series of articles talking about that and how we can use Cloudera mechanisms to implement a whole Credit Card Fraud detection solution. But first, let's begin with a simple way to implement that:
Keep It Simple
On this MVP, let's start by using Apache NiFi to ingest and transforming simulated data from a public API, converting that data into data in the format expected by our fraud detection algorithm, throwing that data into an Apache Kafka topic, and using Apache Flink's SQL console to process a simple fraud detection algorithm. All of this will be even better with scalability, so the icing on the cake will be to convert the data transformation ingest flow into Cloudera Data Flow Services with Kubernetes.
All commented components are available in CDF (Cloudera Data Flow) and CSA Cloudera Streaming Analytics:
CLOUDERA DATA-IN-MOTION PLATFORM
Prerequisites
We will use CDP Public Cloud with CDF, and CSA data hubs:
Data Hub: 7.2.14 - Flow Management Light Duty with Apache NiFi, Apache NiFi Registry
Data Hub: 7.2.14 - Streams Messaging Light Duty: Apache Kafka, Schema Registry, Streams Messaging Manager, Streams Replication Manager, Cruise Control
Data Hub: 7.2.14 - Streaming Analytics Light Duty with Apache Flink
1 - Data ingestion
Let's get started ingesting our data in NiFi. With InvokeHTTP Processor, we can collect all data from randomuser API.
A simple call to: https://randomuser.me/api/?nat=br will return something like this:
{
"results": [
{
"gender": "female",
"name": {
"title": "Miss",
"first": "Shirlei",
"last": "Freitas"
},
"location": {
"street": {
"number": 6133,
"name": "Rua Santa Luzia "
},
"city": "Belford Roxo",
"state": "Amapá",
"country": "Brazil",
"postcode": 88042,
"coordinates": {
"latitude": "78.0376",
"longitude": "74.2175"
},
"timezone": {
"offset": "+11:00",
"description": "Magadan, Solomon Islands, New Caledonia"
}
},
"email": "shirlei.freitas@example.com",
"login": {
"uuid": "d73f9a11-d61c-424d-8309-51d6d8e83a73",
"username": "organicfrog175",
"password": "1030",
"salt": "yhVkrYWm",
"md5": "2bf9beb695c663a0a83aa060f27629c0",
"sha1": "f4dfdef9f2d2a9d04a0622636d0851b5d000164a",
"sha256": "e0a96117182914b3fa7fef22829f6692607bd58eb012b8fee763e34b21acf043"
},
"dob": {
"date": "1991-09-06T08:31:08.082Z",
"age": 31
},
"registered": {
"date": "2009-06-26T00:02:49.893Z",
"age": 13
},
"phone": "(59) 5164-1997",
"cell": "(44) 4566-5655",
"id": {
"name": "",
"value": null
},
"picture": {
"large": "https://randomuser.me/api/portraits/women/82.jpg",
"medium": "https://randomuser.me/api/portraits/med/women/82.jpg",
"thumbnail": "https://randomuser.me/api/portraits/thumb/women/82.jpg"
},
"nat": "BR"
}
],
"info": {
"seed": "fad8d9259d3f2b0b",
"results": 1,
"page": 1,
"version": "1.3"
}
}
Using JoltTransformJSON processor, we can easily transform this previous Json to our JSON structure:
We are going to use JOLT transformation to clean and adjust our data:
[
{
"operation": "shift",
"spec": {
"results": {
"*": {
"login": { "username": "customer_id", "uuid": "account_number" },
"name": { "first": "name", "last": "lastname" },
"email": "email",
"gender": "gender",
"location": {
"street": { "number": "charge_amount" },
"country": "country",
"state": "state",
"city": "city",
"coordinates": {
"latitude": "lat",
"longitude": "lon"
}
},
"picture": { "large": "image" }
}
}
}
},
{
"operation": "default",
"spec": {
"center_inferred_lat": -5.0000,
"center_inferred_lon": -5.0000,
"max_inferred_distance": 0.0,
"max_inferred_amount": 0.0
}
},
{
"operation": "modify-overwrite-beta",
"spec": {
"lat": "=toDouble",
"lon": "=toDouble"
}
}
]
And our output transformed data will be:
Result:
{
"customer_id" : "organicfrog175",
"account_number" : "d73f9a11-d61c-424d-8309-51d6d8e83a73",
"name" : "Shirlei",
"lastname" : "Freitas",
"email" : "shirlei.freitas@example.com",
"gender" : "female",
"charge_amount" : 6133,
"country" : "Brazil",
"state" : "Amapá",
"city" : "Belford Roxo",
"lat" : 78.0376,
"lon" : 74.2175,
"image" : "https://randomuser.me/api/portraits/women/82.jpg",
"max_inferred_distance" : 0.0,
"center_inferred_lat" : -5.0,
"center_inferred_lon" : -5.0,
"max_inferred_amount" : 0.0
}
Now, we can use the UpdateRecord processor to improve that and get some random numbers in some fields, and so, put our JSON data in Kafka using PublishKafka2RecordCDP Processor.
UpdateRecord Processor
PublishKafka2RecordCDP Processor
(It's important to pay attention to Kafka brokers variables that must be filled according to Kafka Cluster endpoints.)
In the end, our NiFi flow will be something like this:
(You can download this flow definition attached to this article)
2 - Data Buffering
On Kafka Clusters, we can create a new Kafka topic just by hitting the button "Add new" in the SMM (Streaming Messaging Manager) component: I've created the skilltransactions as an example.
Once we already have NiFi flow and Kafka topic created, it is time to turn on your flow and see our data getting into our Kafka topic. You can also take a look at data explorer icons to see all ingested data so far.
3 - Streaming SQL Analytics
Apache Flink is an open-source, unified stream-processing and batch-processing framework developed by the Apache Software Foundation. Flink provides a high-throughput, low-latency streaming engine as well as support for event-time processing and state management.
Flink's Table API is a SQL-like expression language for relational stream and batch processing that can be embedded in Flink's Java and Scala DataSet and DataStream APIs. The Table API and SQL interface operate on a relational Table abstraction. Tables can be created from external data sources or existing DataStreams and DataSets.
Cloudera has developed an application called Cloudera SQL Stream Builder that can map our Kafka Topics and query all data as a table through Flink's Table API.
We will easily create our "virtual table" mapping on Table Connector on SSB:
After creating this "virtual table" we can use SQL to do some mathematical calculations on how far a transaction has been made using power, sin, and radians SQL functions:
select account_number, charge_amount,
2 * 3961 * asin(sqrt(
power(
power((sin(radians((lat - center_inferred_lat) / 2))) , 2)
+ cos(radians(center_inferred_lat)) * cos(radians(lat))
* (sin(radians((lon - center_inferred_lon) / 2)))
, 2))) as distance, max_inferred_distance, max_inferred_amount
from `skilltransactions`
WHERE
2 * 3961 * asin(sqrt(
power(
power((sin(radians((lat - center_inferred_lat) / 2))) , 2)
+ cos(radians(center_inferred_lat)) * cos(radians(lat))
* (sin(radians((lon - center_inferred_lon) / 2)))
, 2))) > max_inferred_distance
To see more details about this query, please visit this great article by @sunile_manjee, on our Cloudera Community.
We can also create our function and just call it on or query.
For instance, let's create a DISTANCE_BETWEEN function and use it on our final query.
Final query
select account_number, charge_amount, DISTANCE_BETWEEN(lat, lon, center_inferred_lat, center_inferred_lon) as distance, max_inferred_distance, max_inferred_amount
from `skilltransactions`
WHERE DISTANCE_BETWEEN(lat, lon, center_inferred_lat, center_inferred_lon) > max_inferred_distance
OR charge_amount > max_inferred_amount
At this moment, our query should be able to detect suspicious transactions in real-time and you can call the police. 😜
But Wait! There's more!
It's time to see it in Production mode!
4 - From Development to Production
With this architecture, maybe you will face some issues on a BlackFriday or a big event like that. For that, you will need to ingest all streaming data with high performance and scalability; in other words… NiFi in Kubernetes.
Cloudera DataFlow service can deploy NiFi flows in Kubernetes, providing all scalability needed for a production environment.
CLOUDERA DATA FLOW SERVICE – PUBLIC CLOUD
Follow the deployment wizard to see your flow living in containers mode:
DEPLOYMENT WIZARD
KEY PERFORMANCE INDICATORS
DASHBOARD
DEPLOYMENT MANAGER
5 - Conclusion
This is the very first article on this streaming journey; here we can use Cloudera Data Flow to ingest, buffer, process events in real-time. I hope after this article you can understand CDF and CSA, see all Cloudera Streaming capabilities, and after all, also call the police.
See you in the next article, where we will use machine learning on Kubernetes (Cloudera Machine Learning) to accurate our Simple Credit Card Fraud Detection and go live in production.
... View more
01-03-2019
10:10 PM
Hi @Adam J, Not sure if I got you right. but maybe you can enable Site-to-Site communication. To do that we will use Ambari to update the required configuration. In Ambari the below property values can be found at http://<AMBARI_NODE>:8080/#/main/services/NIFI/configs .
Change: nifi.remote.input.socket.port=
To nifi.remote.input.socket.port=10000
Restart NiFi via Ambari Now you should be ready to create our flow. To do this do the following:
The first thing you are going to do is setup an Input Port. This is the port that NiFi will be sending data to. To do this drag the Input Port icon to the canvas and call it "From NiFi URL1". Now that the Input Port is configured you need to have somewhere for the data to go once we receive it. In this case you will keep it very simple and use a processor to route your content depending on url1, url2, url3, url4. Now that you have the input port and the processor to handle our data. Add a Remote Processor Group to the canvas For the URL copy and paste the URL for the NiFi UI from your browser
Connect the route content nifi to the Remote Process Group I hope this path helps! Cheers,
... View more
05-28-2018
04:21 PM
3 Kudos
How do you know if your customers (and potential customers) are talking about you on social media? The key to making the most of social media is listening to what your audience has to say about you, your competitors, and the market in general. Once you have the data you can undertake analysis, and finally, reach social business intelligence; using all these insights to know your customers better and improve your marketing strategy.
This is
the third part of the series of articles on how to ingest social media data
like streaming using the integration of HDP and HDF tools.
To
implement this article, you first need to implement the previous two.
https://community.hortonworks.com/articles/177561/streaming-tweets-with-nifi-kafka-tranquility-druid.html https://community.hortonworks.com/content/kbentry/182122/integrating-nifi-to-druid-with-a-custom-processor.html
Let’s get
started!
In this
new article, we will address two main points:
1 - How
to collect data from various social networks at the same time
2 - How
to integrate storage between HIVE and DRUID making the most of this
integration.
I've called this project "The Social Media
Stalker"
So, our new architecture diagram would look like this:
Ok, it’s time to hands on!
Let's divide this work into 3
parts:
Create Nifi ingestion for Druid
Setup Hive-Druid Integration
Update
our SuperSet Dashboard
1. Create
Nifi ingestion for Druid
You can
access all social media in their own API’s, but it's takes time and patience...
Let’s cute the chase going straight to a single point able to collect all
social data in a single API.
There are
a lot of social media monitoring tools:
https://www.brandwatch.com/blog/top-10-free-social-media-monitoring-tools/
We are
going to use this one:
https://www.social-searcher.com/
(Their main advantage is having sentimental analysis in the API response).
It’s
quite simple to get data based on Social Networks you want, just make the
request in the API by passing parameters like
q = search term and network
= desired social network
.
Example: https://api.social-searcher.com/v2/search?q=Hortonworks&network=facebook&limit=20
To make
this request, let’s pick InvokeHTTP Processor on Nifi
This
request will result in a data schema like this: { “userId”, “lang”, “location”, “name”, “network”, “posted”, “sentiment”, “text” } I did the same for 7 social networks (Twitter, Facebook, Youtube, Instagram, Reddit, GooglePlus, Vimeo) to have a flow
like this:
*To build
this Nifi flow, follow the previous article. I've
updated my replace text processor with: {"userId":"${user.userId}","lang":"${user.lang}","location":"${user.location}","name":"${user.name}","network":"${user.network}","posted":"${user.posted}","sentiment":"${user.sentiment}","text":"${user.text:replaceAll('[$&+,:;=?@#|\'<>.^*()%!-]',''):replace('"',''):replace('\n','')}","timestamp":${now()}} Obviously,
you should create your table in the Druid adding the Sentiment and Network
fields as described in the previous articles. I called my Druid Table “SocialStalker”
- Once you have done it, just push play to see all data at Druid. 2. Setup Hive-Druid
Integration Our main goal is to be able to index data from Hive
into Druid, and to be able to query Druid datasources from Hive. Completing
this work will bring benefits to the Druid and Hive systems alike: – Efficient
execution of OLAP queries in Hive. Druid is a system specially
well tailored towards the execution of OLAP queries on event data. Hive will be
able to take advantage of its efficiency for the execution of this type of
queries. – Introducing
a SQL interface on top of Druid. Druid queries are expressed in JSON, and Druid
is queried through a REST API over HTTP. Once a user has declared a Hive table
that is stored in Druid, we will be able to transparently generate Druid JSON
queries from the input Hive SQL queries. – Being
able to execute complex operations on Druid data. There are multiple
operations that Druid does not support natively yet, e.g. joins. Putting Hive
on top of Druid will enable the execution of more complex queries on Druid data
sources. – Indexing
complex query results in Druid using Hive. Currently, indexing in
Druid is usually done through MapReduce jobs. We will enable Hive to index the
results of a given query directly into Druid, e.g., as a new table or a
materialized view (HIVE-10459), and start querying and using
that dataset immediately. Integration
brings benefits both to Apache Druid and Apache Hive like: –
Indexing complex query results in Druid using Hive –
Introducing a SQL interface on top of Druid – Being
able to execute complex operations on Druid data – Efficient execution of OLAP
queries in Hive And even
there is an overlap between both if you are using Hive LLAP, it's important to
see each advantage in separated way:
The power
of Druid comes from precise IO optimization, not brute compute force. End
queries are performed to drill down on selected dimensions for a given
timestamp predicate for better performance. Druid
queries should use the timestamp predicates; so, druid knows how many segments
to scan. This will yield better results. Any UDFs or
SQL functions to be executed on Druid tables will be performed by Hive.
Performance of these queries solely depend on Hive. At this point they do not
function as Druid queries. If
aggregations over aggregated data are needed, queries will run as Hive LLAP
query not as a Druid query. hands on Hive-Druid! To
perform this, first you need be using Hive Interactive (with LLAP) to use the
Druid integration. --> Enable
Hive Interactive Query
-->
Download hive-druid-handler If you do
not have hive-druid-handler in your HDP version, just download it: https://javalibs.com/artifact/org.apache.hive/hive-druid-handler https://github.com/apache/hive/tree/master/druid-handler …and copy
it into hive-server2/lib folder cp hive-druid-handler-3.0.0.3.0.0.3-2.jar /usr/hdp/current/hive-server2/lib ... restart
your Hive. We
need to provide Druid data sources information to Hive: Let’s register
Druid data sources in Hive (CREATE EXTERNAL TABLE) for all Data that already is
stored in Druid // ADD JAR /usr/hdp/current/hive-server2/lib/hive-druid-handler-3.0.0.3.0.0.3-2.jar;
CREATE EXTERNAL TABLE SocialStalkerSTORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
TBLPROPERTIES ("druid.datasource" = "SocialStalker"); Now, you
should be able to query all Druid Data in Hive, but for that you MUST use Beeline in Interactive mode. Beeline
!connect jdbc:hive2://localhost:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-hive2
Finally, you
can use your beeline terminal, to make any query in your druid table.
You can insert data into your table, make some changes
in your SuperSet Slices (as previous articles) to complete step 3 and see your
Superset Dashboard like this one: Conclusion: You can
use HDP and HDF to build an end-to-end platform which allow you achieve the
success of your social media marketing campaign as well as the ultimate success
of your business. If you don’t pay attention to how your business is doing, you
are really only doing half of the job. It is the difference between walking
around in the dark and having an illuminated path that allows you to have an
understanding and an awareness of how your business is doing and how you can
continually make improvements that will bring you more and more exposure and a
rock-solid reputation. Many
companies are using social media monitoring to strengthen their businesses.
Those business people are savvy enough to realize the importance of social
media, how it positively influences their businesses and how critical the
monitoring piece of the strategy is to their ultimate success. References: https://pt.slideshare.net/Hadoop_Summit/interactive-analytics-at-scale-in-apache-hive-using-druid-80145456 https://cwiki.apache.org/confluence/display/Hive/Druid+Integration https://br.hortonworks.com/blog/sub-second-analytics-hive-druid/
... View more
Labels:
04-02-2018
02:24 PM
8 Kudos
In the previous
article, we saw how to stream tweets using NiFi, Kafka, Tranquility, Druid and
Superset ... https://community.hortonworks.com/articles/177561/streaming-tweets-with-nifi-kafka-tranquility-druid.html
You have to implement
in that previous article, the part of Druid datasource and Nifi flow to carry
on here.
But life already is
hard enough, why not simplify it?
The idea here is to
perform the same streaming but now integrating Nifi directly to Druid.
So, our new diagram
would look like this:
As we saw in that
article, we have Tranquility as an integrating factor between Kafka and Druid.
Some people asked me: Why not use Kafka Indexing Service instead of
Tranquility?
My answer: because
Tranquility as a framework, can be used flexibly, doing integration of almost
any component to the Druid.
Thus, on this
nifi-druid integration, we will build a custom Nifi processor, which uses
Tranquility to integrate data directly into Druid.
Ok, it’s
time to hands on!
Let's divide this work
into 3 parts:
Build druid
processor
Deploy it Set it up on Nifi
1. Build druid processor
1.1– Create a specific folder for your druid processor
like: nifi-druid-integration. Go to github and download this source code
https://github.com/hortonworks/fieldeng-nifi-druid-integration on your new folder. 1.2– You will need maven and Java installed on your
location machine to build it:
Here is how you can quickly
check if you have them installed
$ mvn -version
$ java -version
If these ones are not
installed:
https://maven.apache.org/install.html
https://www.java.com/
1.3 – Create nar file for your processor
cd <Home Dir>/nifi-druid-integration/fieldeng-nifi-druid-integration-master
mvn install
Once maven install is done you will have the nar file
at the target directory with name
nifi-druid-bundle-nar-0.0.1-SNAPSHOT.nar
cd <Home Dir>/nifi-druid-integration/fieldeng-nifi-druid-integration-master/nifi-druid-bundle-nar/target$ ls
nifi-druid-bundle-nar-0.0.1-SNAPSHOT.nar
2. Deploy it - It is a cinch.
Copy your nifi-druid-bundle-nar-0.0.1-SNAPSHOT.narfile for Nifi
Libs: you can use something like that:
sudo scp -i yourkeyfile.pem
/Users/tsantiago/Desktop/fieldeng-nifi-druid-integration-master/nifi-druid-bundle-nar/target/nifi-druid.nar
centos@thiago-6.field.hortonworks.com:/usr/hdf/current/nifi/lib/
restart your nifi –
and that’s it!
3. Set it up on Nifi
After restarting Nifi
you will get a fresh processor:
Replace the last one
step of that flow (putKafka) for PutDruidProcessor:
And then configure it:
You must fill this
properties on Controller Service:
data_source: twitter_demo
zk_connect_string: thiago-2.field.hortonworks.com:2181,thiago-3.field.hortonworks.com:2181,thiago-4.field.hortonworks.com:2181
dimensions_list: tweet_id,created_unixtime,created_time,lang,location,displayname,time_zone,msg
aggregators_descriptor
[
{
"type":"count",
"name":"count"
},
{
"name":"value_sum",
"type":"doubleSum",
"fieldName":"value"
},
{
"fieldName":"value",
"name":"value_min",
"type":"doubleMin"
},
{
"type":"doubleMax",
"name":"value_max",
"fieldName":"value"
}
]
Finally, push start on this new flow and see your superset being filled
Conclusion:
Now, you not only know
how to build a custom nifi processor, but also how to integrate Nifi to Druid.
It's Important to say
that playing Nifi straight to druid, we can lose some scalability and
application resilience, once that in high volume of tweets, the integration
between Nifi and druid can become a bottleneck.
However, if your
workload is not heavy, maybe keep it simple can be the best option.
References:
https://community.hortonworks.com/content/kbentry/177561/streaming-tweets-with-nifi-kafka-tranquility-druid.html
https://community.hortonworks.com/articles/4318/build-custom-nifi-processor.html
https://github.com/hortonworks/fieldeng-nifi-druid-integration
... View more
Labels:
03-28-2018
12:46 PM
Hi Nishant, You will understand why I did not skipped Tranquility on next article! 😉
... View more
03-13-2018
10:16 PM
12 Kudos
The concept of time is at the core of all Big
Data processing technologies but is particularly important in the world of data
stream processing. Indeed, it is reasonable to say that the way in which
different systems handle time-based processing is what differentiates the wheat
from the chaff as it were, at least in the world of real-time stream
processing. The demand for stream processing is increasing a
lot these days. A common need across Hadoop projects is to build up-to-date
indicators from streaming data. Social
Media analysis is a great use case for show how we can build a dashboard
showing streaming analytics with NiFi, Kafka, Tranquility, Druid, and Superset This processing flow has these steps: 1)
Tweets ingestion using Apache NiFi 2)
Stream processing using Apache Kafka 3)
Integrating data with Tranquility 4)
OLAP database storage using Druid 5)
Visualization using Apache Superset Before
putting our hands on coding, take a look on each component: Nifi: https://br.hortonworks.com/apache/nifi/ Kafka:
https://br.hortonworks.com/apache/kafka/ Tranquility: https://github.com/druid-io/tranquility Druid:
https://br.hortonworks.com/apache/druid/ SuperSet:
https://superset.incubator.apache.org/ We
can install all components manually or simply use HDF from Hortonworks: https://br.hortonworks.com/products/data-platforms/hdf/ Guidelines for Planning our HDF Deployment https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.0.1/bk_planning-your-deployment/bk_planning-your-deployment.pdf To
build this HDF cluster, 4 machines were used, each of 16 cores and 32 RAM. I've
put each one machine responsible for one component: After setup this environment, we
can get started build our flow in Nifi: Phase 1: NIFI http://thiago-2:9090/nifi/ This flow has 3 steps: Step 1 - Get selected tweets
Processor: getTwitter Step 2 - Clean and convert json tweets Processor:EvaluateJasonPath Pull key
attributes from tweets Procesor: RouteOnAttribute Find only tweets not empty
Processor: Replace text ReplaceText
in tweet Step 3 - Send json to a Kafka Topic Processor: PutKafka This process should give us a streaming
message like this: {
"tweet_id":971824225936953344,
"created_unixtime":1520535925741,
"created_time":"Thu Mar 08 19:05:25 +0000 2018",
"lang":"en",
"displayname":"thiagos25",
"time_zone":"São Paulo - Brasil",
"msg":"Hello world!"
}
Phase 2:
Kafka Apache
Kafka is a real-time stream processor that
uses the publish-subscribe message pattern. We will use Kafka to receive
incoming messages and publish them to a specific topic-based queue (twitter_demo)
that Druid will subscribe to. Tranquility (Druid indexer) will read off these
messages and insert them into Druid database. Use the below commands create a Kafka topic called
“twitter_demo”: ==> on master-3 cd /usr/hdp/2.6.3.0-235/kafka
./kafka-topics.sh --create --zookeeper thiago-2.field.hortonworks.com:2181,thiago-3.field.hortonworks.com:2181,thiago-4.field.hortonworks.com:2181 --replication-factor 1 --partitions 1 --topic twitter_demo
We can check list of created topics with: ./kafka-topics.sh --list --zookeeper thiago-2.field.hortonworks.com:2181,thiago-3.field.hortonworks.com:2181,thiago-4.field.hortonworks.com:2181 We can consume messages with: ./kafka-console-consumer.sh --zookeeper thiago-2.field.hortonworks.com:2181,thiago-3.field.hortonworks.com:2181,thiago-4.field.hortonworks.com:2181 --topic twitter_demo
--from-beginning
…and list it with: ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list thiago-2.field.hortonworks.com:2181 --topic twitter_demo --time -1 Phase 3:
Tranquility Now it’s time to get some
tranquility - sorry for wordplay! Tranquility is a friend of Druid and helps us send event streams to
Druid in real-time. It handles partitioning, replication, service discovery,
and schema rollover for us, seamlessly and without downtime. Tranquility is
written in Scala, and bundles idiomatic Java and Scala APIs that work nicely
with Finagle, Samza, Spark, Storm, and Trident. Tranquility Kafka is an application which simplifies the ingestion of
data from Kafka. It is scalable and highly available through the use of Kafka
partitions and consumer groups, and can be configured to push data from
multiple Kafka topics into multiple Druid dataSources. https://github.com/druid-io/tranquility/blob/master/docs/kafka.md First things first: To read from a Kafka stream we will define a configuration
file to describe a data source name, a Kafka topic to read from, and some
properties of the data that we read. Save the below JSON configuration
as kafka.json This instructs Tranquility to read from the
topic “twitter_demo” and push the messages that it receives into a
Druid data source called “twitter_demo”. In the messages it reads
Tranquility uses the _submission_time column (or key) to represent
the time stamp. {
"dataSources" : {
"twitter_demo" : {
"spec" : {
"dataSchema" : {
"dataSource" : "twitter_demo",
"parser" : {
"type" : "string",
"parseSpec" : {
"timestampSpec" : {
"column" : "created_unixtime",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions" : [],
"dimensionExclusions" : [
"timestamp",
"value"
]
},
"format" : "json"
}
},
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "six_hour",
"queryGranularity" : "none"
},
"metricsSpec" : []
},
"ioConfig" : {
"type" : "realtime"
},
"tuningConfig" : {
"type" : "realtime",
"maxRowsInMemory" : "100000",
"intermediatePersistPeriod" : "PT10M",
"windowPeriod" : "PT720000M"
}
},
"properties" : {
"task.partitions" : "1",
"task.replicants" : "1",
"topicPattern" : "twitter_demo"
}
}
},
"properties" : {
"zookeeper.connect" : "thiago-2.field.hortonworks.com:2181,thiago-3.field.hortonworks.com:2181,thiago-4.field.hortonworks.com:2181",
"druid.discovery.curator.path" : "/druid/discovery",
"druid.selectors.indexing.serviceName" : "druid/overlord",
"commit.periodMillis" : "15000",
"consumer.numThreads" : "2",
"kafka.zookeeper.connect" : "thiago-2.field.hortonworks.com:2181,thiago-3.field.hortonworks.com:2181,thiago-4.field.hortonworks.com:2181",
"kafka.group.id" : "tranquility-kafka"
}
}
…and place it in the same directory as Druid: ==> on master-4 cd /usr/hdp/2.6.3.0-235/druid/conf-quickstart/tranquility/kafka.json To manage the continuous process
that indexes Kafka data, we’ll download, change directories, and run Druid’s
Tranquility extension. Use the following to get the lastest version and decompress it: sudo curl -O http://static.druid.io/tranquility/releases/tranquility-distribution-0.8.0.tgz
sudo tar xzvf tranquility-distribution-0.8.0.tgz
cd tranquility-distribution-0.8.0
#/usr/hdp/2.6.3.0-235/druid/conf-quickstart/tranquility/tranquility-distribution-0.8.0/
sudo bin/tranquility kafka -configFile ../kafka.json
Phase 4:
Druid Druid is a rockin' exploratory analytical data store
capable of offering interactive query of big data in realtime. In HDP/HDF Druid can be used easily through SuperSet,
we can build our Druid Datasource and manage all druid columns to fit our json
tweets schema. Phase 5: Superset Dashboard We can use Superset for exploratory analysis and to define the JSON queries that we will execute against the Druid API and use to build our dashboard. Once your druid Data Source has been created, you can create your slices and put them all in your dashboard. Some pictures are worth a thousand words: 1)creating our slices 2)Querying slice 3)Saving all slices in our dashboard 4)Presenting our dashboard In the end, we can see a great
real-time twitter dashboard with information about location, maps, languages,
and with a little more endeavor, we could even read each tweet individually to
see what is our customer sentimental analysis... but this is matter for next
article. References http://druid.io/docs/latest/tutorials/tutorial-kafka.html http://druid.io/blog/2013/08/30/loading-data.html https://github.com/druid-io/tranquility
... View more
Labels:
09-02-2017
06:59 PM
11 Kudos
This article aims to show how to planning a Nifi Cluster following the best practices. 1) Hardware Provisioning 2) Hardware Considerations for HDF - General Hardware A key design point of NiFi is to use typical enterprise class application servers. Hardware failure: Nodes are typically configured with RAID 5/10 to handle hardware failures through replication, redundancy More nodes means less impact from failure. More nodes provide increased throughput. - Machine Class A NiFi cluster consists of a single class of machine Balanced NiFi Node: 8 CPU cores per node minimum. 6 Hard disks per node minimum, Spinning or SSD base on throughput requirements. 8 GB of RAM per node minimum. Designed for availability. Typical enterprise class application server. Resilience built into the server itself (RAID) Cost reduced where possible to strike proper price/performance ratio owing to volume. - Networking Network decisions play a role due to the clustered nature of data processing. In-rack backplane/Top-of-rack Switch: Keeps traffic local and reduces load on expensive aggregate switches. Dual NIC Recommended:
Depends on NTW requirements. 10G Recommended:
Immediate cost vs Future-proofing. Investment in 10G upfront will survive next 2-3 server hardware upgrades. In-rack/top-of-rack switches allow Cat6 copper or Twinax to reduce 10G costs. - NiFi:
Hardware Driving Factors NIFI is designed to
take advantage
of:
all
the cores on a machine all
the network capacity all
the disk speed many
GB of RAM (though usually not all) on a system Most
important
hardware factors
:
Top-end
disk throughput as configured which is a combination of seek time and raw
performance Network
speed CPU only
a concern when there is a
lot of compression,
encryption, or media analytics Need
to ensure
flow can take advantage of the contiguous block allocation approach NiFiuses
or it will result in lots of random seeks
thus increasing seek times
and decreasing
effective
throughput. 3) HDF Disk Partition Baseline
4) Disk Partitioning – Nifi Nodes (Repositories) 5) NiFi:
Default Cluster Recommendation When
not provided with information to gauge the rate and complexity of data flow,
start with a default cluster of three nodes. Three nodes are needed for HA by
Zookeeper Quorum process. The
SKU is priced for cores, but it can be split up. So, a 16 core SKU can be split
into 3 machines of 4 cores each. More cores per node will improve throughput
(up to an extent). So,
starting cluster
for, say, 50MB/s sustained throughput for average Flow is:
3 nodes each with:
CPU:
8+ cores (16 is preferred) Memory:
8+ GB Disk:
6 disks, each 1TB disks (could
be spinning or SSD) 6) NiFi Clusters Scale Linearly
... View more
Labels:
02-22-2017
01:16 PM
Hi aishwarya, Why not use a native processor? such as: QueryDatabaseTable ExecuteSQL PutSQL Anyway, if you need a custom processor, take a look on this videos: https://www.youtube.com/watch?v=3ldmNFlelhw https://www.youtube.com/watch?v=QRzVr82V_Is https://www.dropbox.com/s/dq6c98dvsklgeux/nifi-custom-component.mp4?dl=0 Hope this helps, Cheers
... View more
07-28-2016
01:58 AM
Hi @hoda, how are you? Have you found some solution?
I'm facing same issue.
... View more
06-12-2016
07:35 PM
The easiest way to do it: Just log in to the Ambari using these credentials: User: admin Pass: 4o12t0n cheers
... View more