Created on 03-13-2018 10:16 PM - edited 09-16-2022 01:42 AM
The concept of time is at the core of all Big Data processing technologies but is particularly important in the world of data stream processing. Indeed, it is reasonable to say that the way in which different systems handle time-based processing is what differentiates the wheat from the chaff as it were, at least in the world of real-time stream processing.
The demand for stream processing is increasing a lot these days. A common need across Hadoop projects is to build up-to-date indicators from streaming data.
Social Media analysis is a great use case for show how we can build a dashboard showing streaming analytics with NiFi, Kafka, Tranquility, Druid, and Superset
This processing flow has these steps:
1) Tweets ingestion using Apache NiFi
2) Stream processing using Apache Kafka
3) Integrating data with Tranquility
4) OLAP database storage using Druid
5) Visualization using Apache Superset
Before putting our hands on coding, take a look on each component:
Nifi: https://br.hortonworks.com/apache/nifi/
Kafka: https://br.hortonworks.com/apache/kafka/
Tranquility: https://github.com/druid-io/tranquility
Druid: https://br.hortonworks.com/apache/druid/
SuperSet: https://superset.incubator.apache.org/
We can install all components manually or simply use HDF from Hortonworks: https://br.hortonworks.com/products/data-platforms/hdf/
Guidelines for Planning our HDF Deployment
To build this HDF cluster, 4 machines were used, each of 16 cores and 32 RAM. I've put each one machine responsible for one component:
After setup this environment, we can get started build our flow in Nifi:
Phase 1: NIFI
This flow has 3 steps:
Step 1 - Get selected tweets
Step 2 - Clean and convert json tweets
Step 3 - Send json to a Kafka Topic
This process should give us a streaming message like this:
{ "tweet_id":971824225936953344, "created_unixtime":1520535925741, "created_time":"Thu Mar 08 19:05:25 +0000 2018", "lang":"en", "displayname":"thiagos25", "time_zone":"São Paulo - Brasil", "msg":"Hello world!" }
Apache Kafka is a real-time stream processor that uses the publish-subscribe message pattern. We will use Kafka to receive incoming messages and publish them to a specific topic-based queue (twitter_demo) that Druid will subscribe to. Tranquility (Druid indexer) will read off these messages and insert them into Druid database.
Use the below commands create a Kafka topic called “twitter_demo”:
==> on master-3
cd /usr/hdp/2.6.3.0-235/kafka ./kafka-topics.sh --create --zookeeper thiago-2.field.hortonworks.com:2181,thiago-3.field.hortonworks.com:2181,thiago-4.field.hortonworks.com:2181 --replication-factor 1 --partitions 1 --topic twitter_demo
We can check list of created topics with:
./kafka-topics.sh --list --zookeeper thiago-2.field.hortonworks.com:2181,thiago-3.field.hortonworks.com:2181,thiago-4.field.hortonworks.com:2181
We can consume messages with:
./kafka-console-consumer.sh --zookeeper thiago-2.field.hortonworks.com:2181,thiago-3.field.hortonworks.com:2181,thiago-4.field.hortonworks.com:2181 --topic twitter_demo --from-beginning
…and list it with:
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list thiago-2.field.hortonworks.com:2181 --topic twitter_demo --time -1
Now it’s time to get some tranquility - sorry for wordplay!
Tranquility is a friend of Druid and helps us send event streams to Druid in real-time. It handles partitioning, replication, service discovery, and schema rollover for us, seamlessly and without downtime. Tranquility is written in Scala, and bundles idiomatic Java and Scala APIs that work nicely with Finagle, Samza, Spark, Storm, and Trident.
Tranquility Kafka is an application which simplifies the ingestion of data from Kafka. It is scalable and highly available through the use of Kafka partitions and consumer groups, and can be configured to push data from multiple Kafka topics into multiple Druid dataSources.
https://github.com/druid-io/tranquility/blob/master/docs/kafka.md
First things first: To read from a Kafka stream we will define a configuration file to describe a data source name, a Kafka topic to read from, and some properties of the data that we read. Save the below JSON configuration as kafka.json
This instructs Tranquility to read from the topic “twitter_demo” and push the messages that it receives into a Druid data source called “twitter_demo”. In the messages it reads Tranquility uses the _submission_time column (or key) to represent the time stamp.
{ "dataSources" : { "twitter_demo" : { "spec" : { "dataSchema" : { "dataSource" : "twitter_demo", "parser" : { "type" : "string", "parseSpec" : { "timestampSpec" : { "column" : "created_unixtime", "format" : "auto" }, "dimensionsSpec" : { "dimensions" : [], "dimensionExclusions" : [ "timestamp", "value" ] }, "format" : "json" } }, "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "six_hour", "queryGranularity" : "none" }, "metricsSpec" : [] }, "ioConfig" : { "type" : "realtime" }, "tuningConfig" : { "type" : "realtime", "maxRowsInMemory" : "100000", "intermediatePersistPeriod" : "PT10M", "windowPeriod" : "PT720000M" } }, "properties" : { "task.partitions" : "1", "task.replicants" : "1", "topicPattern" : "twitter_demo" } } }, "properties" : { "zookeeper.connect" : "thiago-2.field.hortonworks.com:2181,thiago-3.field.hortonworks.com:2181,thiago-4.field.hortonworks.com:2181", "druid.discovery.curator.path" : "/druid/discovery", "druid.selectors.indexing.serviceName" : "druid/overlord", "commit.periodMillis" : "15000", "consumer.numThreads" : "2", "kafka.zookeeper.connect" : "thiago-2.field.hortonworks.com:2181,thiago-3.field.hortonworks.com:2181,thiago-4.field.hortonworks.com:2181", "kafka.group.id" : "tranquility-kafka" } }
…and place it in the same directory as Druid:
==> on master-4
cd /usr/hdp/2.6.3.0-235/druid/conf-quickstart/tranquility/kafka.json
To manage the continuous process that indexes Kafka data, we’ll download, change directories, and run Druid’s Tranquility extension. Use the following to get the lastest version and decompress it:
sudo curl -O http://static.druid.io/tranquility/releases/tranquility-distribution-0.8.0.tgz sudo tar xzvf tranquility-distribution-0.8.0.tgz cd tranquility-distribution-0.8.0 #/usr/hdp/2.6.3.0-235/druid/conf-quickstart/tranquility/tranquility-distribution-0.8.0/ sudo bin/tranquility kafka -configFile ../kafka.json
Druid is a rockin' exploratory analytical data store capable of offering interactive query of big data in realtime.
In HDP/HDF Druid can be used easily through SuperSet, we can build our Druid Datasource and manage all druid columns to fit our json tweets schema.
We can use Superset for exploratory analysis and to define the JSON queries that we will execute against the Druid API and use to build our dashboard.
Once your druid Data Source has been created, you can create your slices and put them all in your dashboard.
Some pictures are worth a thousand words:
1)creating our slices
2)Querying slice
3)Saving all slices in our dashboard
4)Presenting our dashboard
In the end, we can see a great real-time twitter dashboard with information about location, maps, languages, and with a little more endeavor, we could even read each tweet individually to see what is our customer sentimental analysis... but this is matter for next article.
References
http://druid.io/docs/latest/tutorials/tutorial-kafka.html
Created on 03-14-2018 02:06 PM
Great Article, Showcasing various technologies playing together.
You could possibly also simplify above flow a bit by skipping tranquility ->
Twitter -> Nifi -> Kafka -> Druid
Druid supports directly ingesting data from kafka (without tranquility too)
See - http://druid.io/docs/latest/development/extensions-core/kafka-ingestion.html
Created on 03-28-2018 12:46 PM
Hi Nishant,
You will understand why I did not skipped Tranquility on next article! 😉
Created on 06-05-2018 02:54 AM
druid-tq.pngdruid-tq1.pngdruid-tranquility.pngHi Thiago, Thanks for the article just wondering if do we need to wait six hour to have the first segment Thanks very much
Created on 06-19-2018 01:25 PM
Hi when i launch tranquility it stops on its own :
... 2018-06-19 10:30:11,753 [main] INFO k.c.ZookeeperConsumerConnector - [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a], end rebalancing consumer tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a try #0 2018-06-19 10:30:11,755 [main] INFO k.c.ZookeeperConsumerConnector - [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a], Creating topic event watcher for topics (couldwork) 2018-06-19 10:30:11,764 [main] INFO k.c.ZookeeperConsumerConnector - [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a], Topics to consume = List(couldwork) 2018-06-19 10:30:11,768 [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread] INFO kafka.utils.VerifiableProperties - Verifying properties 2018-06-19 10:30:11,769 [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread] INFO kafka.utils.VerifiableProperties - Property client.id is overridden to tranquility-kafka 2018-06-19 10:30:11,769 [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread] INFO kafka.utils.VerifiableProperties - Property metadata.broker.list is overridden to fr-001slli124.groupinfra.com:6667 2018-06-19 10:30:11,769 [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread] INFO kafka.utils.VerifiableProperties - Property request.timeout.ms is overridden to 30000 2018-06-19 10:30:11,787 [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread] INFO kafka.client.ClientUtils$ - Fetching metadata from broker id:1001,host:fr-001slli124.groupinfra.com,port:6667 with correlation id 0 for 1 topic(s) Set(couldwork) 2018-06-19 10:30:11,790 [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread] INFO kafka.producer.SyncProducer - Connected to fr-001slli124.groupinfra.com:6667 for producing 2018-06-19 10:30:11,808 [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread] INFO kafka.producer.SyncProducer - Disconnecting from fr-001slli124.groupinfra.com:6667 2018-06-19 10:30:11,855 [ConsumerFetcherThread-tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-0-1001] INFO kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-0-1001], Starting 2018-06-19 10:30:11,858 [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread] INFO k.consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1529404211455] Added fetcher for partitions ArrayBuffer([[couldwork,0], initOffset 45 to broker id:1001,host:fr-001slli124.groupinfra.com,port:6667] ) 2018-06-19 10:30:13,278 [Thread-4] INFO c.metamx.tranquility.kafka.KafkaMain - Initiating shutdown... 2018-06-19 10:30:13,278 [Thread-4] INFO c.m.tranquility.kafka.KafkaConsumer - Shutting down - attempting to flush buffers and commit final offsets 2018-06-19 10:30:13,281 [Thread-4] INFO k.c.ZookeeperConsumerConnector - [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a], ZKConsumerConnector shutting down 2018-06-19 10:30:13,288 [Thread-4] INFO k.c.ZookeeperTopicEventWatcher - Shutting down topic event watcher. 2018-06-19 10:30:13,288 [Thread-4] INFO k.consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1529404211455] Stopping leader finder thread 2018-06-19 10:30:13,288 [Thread-4] INFO k.c.ConsumerFetcherManager$LeaderFinderThread - [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread], Shutting down 2018-06-19 10:30:13,289 [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread] INFO k.c.ConsumerFetcherManager$LeaderFinderThread - [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread], Stopped 2018-06-19 10:30:13,289 [Thread-4] INFO k.c.ConsumerFetcherManager$LeaderFinderThread - [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread], Shutdown completed 2018-06-19 10:30:13,289 [Thread-4] INFO k.consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1529404211455] Stopping all fetchers 2018-06-19 10:30:13,290 [Thread-4] INFO kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-0-1001], Shutting down 2018-06-19 10:30:13,291 [ConsumerFetcherThread-tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-0-1001] INFO kafka.consumer.SimpleConsumer - Reconnect due to socket error: java.nio.channels.ClosedByInterruptException 2018-06-19 10:30:13,291 [ConsumerFetcherThread-tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-0-1001] INFO kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-0-1001], Stopped 2018-06-19 10:30:13,291 [Thread-4] INFO kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-0-1001], Shutdown completed 2018-06-19 10:30:13,292 [Thread-4] INFO k.consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1529404211455] All connections stopped 2018-06-19 10:30:13,294 [ZkClient-EventThread-14-10.80.145.201:2181,10.80.145.200:2181,10.80.145.199:2181] INFO org.I0Itec.zkclient.ZkEventThread - Terminate ZkClient event thread. 2018-06-19 10:30:13,298 [Thread-4] INFO org.apache.zookeeper.ZooKeeper - Session: 0x1641734cd6b0000 closed 2018-06-19 10:30:13,298 [main-EventThread] INFO org.apache.zookeeper.ClientCnxn - EventThread shut down 2018-06-19 10:30:13,298 [Thread-4] INFO k.c.ZookeeperConsumerConnector - [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a], ZKConsumerConnector shutdown completed in 17 ms 2018-06-19 10:30:13,298 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Commit thread interrupted. 2018-06-19 10:30:13,299 [Thread-4] INFO c.m.tranquility.kafka.KafkaConsumer - Finished clean shutdown.