Community Articles

Find and share helpful community-sourced technical articles.
Labels (2)
avatar
Contributor

The concept of time is at the core of all Big Data processing technologies but is particularly important in the world of data stream processing. Indeed, it is reasonable to say that the way in which different systems handle time-based processing is what differentiates the wheat from the chaff as it were, at least in the world of real-time stream processing.

The demand for stream processing is increasing a lot these days. A common need across Hadoop projects is to build up-to-date indicators from streaming data.

Social Media analysis is a great use case for show how we can build a dashboard showing streaming analytics with NiFi, Kafka, Tranquility, Druid, and Superset

This processing flow has these steps:

1) Tweets ingestion using Apache NiFi

2) Stream processing using Apache Kafka

3) Integrating data with Tranquility

4) OLAP database storage using Druid

5) Visualization using Apache Superset

62890-imagem1.png

Before putting our hands on coding, take a look on each component:

Nifi: https://br.hortonworks.com/apache/nifi/

Kafka: https://br.hortonworks.com/apache/kafka/

Tranquility: https://github.com/druid-io/tranquility

Druid: https://br.hortonworks.com/apache/druid/

SuperSet: https://superset.incubator.apache.org/

We can install all components manually or simply use HDF from Hortonworks: https://br.hortonworks.com/products/data-platforms/hdf/

Guidelines for Planning our HDF Deployment

https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.0.1/bk_planning-your-deployment/bk_planning-you...

To build this HDF cluster, 4 machines were used, each of 16 cores and 32 RAM. I've put each one machine responsible for one component:

62892-imagem2.png

After setup this environment, we can get started build our flow in Nifi:

Phase 1: NIFI

http://thiago-2:9090/nifi/

62894-imagem3.png

This flow has 3 steps:

Step 1 - Get selected tweets

  • Processor: getTwitter

62895-imagem4.png

Step 2 - Clean and convert json tweets

  • Processor:EvaluateJasonPath
  • Pull key attributes from tweets

62896-imagem5.png

  • Procesor: RouteOnAttribute
  • Find only tweets not empty

62897-imagem6.png

  • Processor: Replace text
  • ReplaceText in tweet

62898-imagem7.png

Step 3 - Send json to a Kafka Topic

  • Processor: PutKafka

62899-imagem8.png

This process should give us a streaming message like this:

{  
   "tweet_id":971824225936953344,
   "created_unixtime":1520535925741,
   "created_time":"Thu Mar 08 19:05:25 +0000 2018",
   "lang":"en",
   "displayname":"thiagos25",
   "time_zone":"São Paulo - Brasil",
   "msg":"Hello world!"
}

Phase 2: Kafka

Apache Kafka is a real-time stream processor that uses the publish-subscribe message pattern. We will use Kafka to receive incoming messages and publish them to a specific topic-based queue (twitter_demo) that Druid will subscribe to. Tranquility (Druid indexer) will read off these messages and insert them into Druid database.

Use the below commands create a Kafka topic called “twitter_demo”:

==> on master-3

cd /usr/hdp/2.6.3.0-235/kafka

./kafka-topics.sh --create     --zookeeper thiago-2.field.hortonworks.com:2181,thiago-3.field.hortonworks.com:2181,thiago-4.field.hortonworks.com:2181      --replication-factor 1     --partitions 1     --topic twitter_demo

We can check list of created topics with:

./kafka-topics.sh --list --zookeeper thiago-2.field.hortonworks.com:2181,thiago-3.field.hortonworks.com:2181,thiago-4.field.hortonworks.com:2181

We can consume messages with:

./kafka-console-consumer.sh     --zookeeper thiago-2.field.hortonworks.com:2181,thiago-3.field.hortonworks.com:2181,thiago-4.field.hortonworks.com:2181      --topic twitter_demo 
    --from-beginning

…and list it with:

./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list thiago-2.field.hortonworks.com:2181 --topic twitter_demo --time -1

Phase 3: Tranquility

Now it’s time to get some tranquility - sorry for wordplay!

Tranquility is a friend of Druid and helps us send event streams to Druid in real-time. It handles partitioning, replication, service discovery, and schema rollover for us, seamlessly and without downtime. Tranquility is written in Scala, and bundles idiomatic Java and Scala APIs that work nicely with Finagle, Samza, Spark, Storm, and Trident.

Tranquility Kafka is an application which simplifies the ingestion of data from Kafka. It is scalable and highly available through the use of Kafka partitions and consumer groups, and can be configured to push data from multiple Kafka topics into multiple Druid dataSources.

https://github.com/druid-io/tranquility/blob/master/docs/kafka.md

First things first: To read from a Kafka stream we will define a configuration file to describe a data source name, a Kafka topic to read from, and some properties of the data that we read. Save the below JSON configuration as kafka.json

This instructs Tranquility to read from the topic “twitter_demo” and push the messages that it receives into a Druid data source called “twitter_demo”. In the messages it reads Tranquility uses the _submission_time column (or key) to represent the time stamp.

{
  "dataSources" : {
    "twitter_demo" : {
      "spec" : {
        "dataSchema" : {
          "dataSource" : "twitter_demo",
          "parser" : {
            "type" : "string",
            "parseSpec" : {
              "timestampSpec" : {
                "column" : "created_unixtime",
                "format" : "auto"
              },
              "dimensionsSpec" : {
                "dimensions" : [],
                "dimensionExclusions" : [
                  "timestamp",
                  "value"
                ]
              },
              "format" : "json"
            }
          },
          "granularitySpec" : {
            "type" : "uniform",
            "segmentGranularity" : "six_hour",
            "queryGranularity" : "none"
          },
          "metricsSpec" : []
        },
        "ioConfig" : {
          "type" : "realtime"
        },
        "tuningConfig" : {
          "type" : "realtime",
          "maxRowsInMemory" : "100000",
          "intermediatePersistPeriod" : "PT10M",
          "windowPeriod" : "PT720000M"
        }
      },
      "properties" : {
        "task.partitions" : "1",
        "task.replicants" : "1",
        "topicPattern" : "twitter_demo"
      }
    }
  },
  "properties" : {
    "zookeeper.connect" : "thiago-2.field.hortonworks.com:2181,thiago-3.field.hortonworks.com:2181,thiago-4.field.hortonworks.com:2181",
    "druid.discovery.curator.path" : "/druid/discovery",
    "druid.selectors.indexing.serviceName" : "druid/overlord",
    "commit.periodMillis" : "15000",
    "consumer.numThreads" : "2",
    "kafka.zookeeper.connect" : "thiago-2.field.hortonworks.com:2181,thiago-3.field.hortonworks.com:2181,thiago-4.field.hortonworks.com:2181",
    "kafka.group.id" : "tranquility-kafka"
  }
}

…and place it in the same directory as Druid:

==> on master-4

cd /usr/hdp/2.6.3.0-235/druid/conf-quickstart/tranquility/kafka.json 

To manage the continuous process that indexes Kafka data, we’ll download, change directories, and run Druid’s Tranquility extension. Use the following to get the lastest version and decompress it:

sudo curl -O http://static.druid.io/tranquility/releases/tranquility-distribution-0.8.0.tgz
sudo tar xzvf tranquility-distribution-0.8.0.tgz
cd tranquility-distribution-0.8.0

#/usr/hdp/2.6.3.0-235/druid/conf-quickstart/tranquility/tranquility-distribution-0.8.0/
sudo bin/tranquility kafka  -configFile ../kafka.json

Phase 4: Druid

Druid is a rockin' exploratory analytical data store capable of offering interactive query of big data in realtime.

In HDP/HDF Druid can be used easily through SuperSet, we can build our Druid Datasource and manage all druid columns to fit our json tweets schema.

62900-imagem9.png

62901-imagem10.png

Phase 5: Superset Dashboard

We can use Superset for exploratory analysis and to define the JSON queries that we will execute against the Druid API and use to build our dashboard.

Once your druid Data Source has been created, you can create your slices and put them all in your dashboard.

Some pictures are worth a thousand words:

1)creating our slices

62902-imagem11.png

2)Querying slice

62903-imagem12.png

3)Saving all slices in our dashboard

62904-imagem13.png

4)Presenting our dashboard

62905-imagem14.png

In the end, we can see a great real-time twitter dashboard with information about location, maps, languages, and with a little more endeavor, we could even read each tweet individually to see what is our customer sentimental analysis... but this is matter for next article.

References

http://druid.io/docs/latest/tutorials/tutorial-kafka.html

http://druid.io/blog/2013/08/30/loading-data.html

https://github.com/druid-io/tranquility

16,532 Views
Comments

Great Article, Showcasing various technologies playing together.

You could possibly also simplify above flow a bit by skipping tranquility ->

Twitter -> Nifi -> Kafka -> Druid

Druid supports directly ingesting data from kafka (without tranquility too)

See - http://druid.io/docs/latest/development/extensions-core/kafka-ingestion.html

Hi Nishant,

You will understand why I did not skipped Tranquility on next article! 😉

druid-tq.pngdruid-tq1.pngdruid-tranquility.pngHi Thiago, Thanks for the article just wondering if do we need to wait six hour to have the first segment Thanks very much

  • granularitySpec":{
  • "type":"uniform",
  • "segmentGranularity":"six_hour",
  • Hi when i launch tranquility it stops on its own :

    ...
    
    2018-06-19 10:30:11,753 [main] INFO  k.c.ZookeeperConsumerConnector - [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a], end rebalancing consumer tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a try #0
    2018-06-19 10:30:11,755 [main] INFO  k.c.ZookeeperConsumerConnector - [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a], Creating topic event watcher for topics (couldwork)
    2018-06-19 10:30:11,764 [main] INFO  k.c.ZookeeperConsumerConnector - [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a], Topics to consume = List(couldwork)
    2018-06-19 10:30:11,768 [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread] INFO  kafka.utils.VerifiableProperties - Verifying properties
    2018-06-19 10:30:11,769 [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread] INFO  kafka.utils.VerifiableProperties - Property client.id is overridden to tranquility-kafka
    2018-06-19 10:30:11,769 [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread] INFO  kafka.utils.VerifiableProperties - Property metadata.broker.list is overridden to fr-001slli124.groupinfra.com:6667
    2018-06-19 10:30:11,769 [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread] INFO  kafka.utils.VerifiableProperties - Property request.timeout.ms is overridden to 30000
    2018-06-19 10:30:11,787 [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread] INFO  kafka.client.ClientUtils$ - Fetching metadata from broker id:1001,host:fr-001slli124.groupinfra.com,port:6667 with correlation id 0 for 1 topic(s) Set(couldwork)
    2018-06-19 10:30:11,790 [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread] INFO  kafka.producer.SyncProducer - Connected to fr-001slli124.groupinfra.com:6667 for producing
    2018-06-19 10:30:11,808 [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread] INFO  kafka.producer.SyncProducer - Disconnecting from fr-001slli124.groupinfra.com:6667
    2018-06-19 10:30:11,855 [ConsumerFetcherThread-tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-0-1001] INFO  kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-0-1001], Starting
    2018-06-19 10:30:11,858 [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread] INFO  k.consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1529404211455] Added fetcher for partitions ArrayBuffer([[couldwork,0], initOffset 45 to broker id:1001,host:fr-001slli124.groupinfra.com,port:6667] )
    2018-06-19 10:30:13,278 [Thread-4] INFO  c.metamx.tranquility.kafka.KafkaMain - Initiating shutdown...
    2018-06-19 10:30:13,278 [Thread-4] INFO  c.m.tranquility.kafka.KafkaConsumer - Shutting down - attempting to flush buffers and commit final offsets
    2018-06-19 10:30:13,281 [Thread-4] INFO  k.c.ZookeeperConsumerConnector - [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a], ZKConsumerConnector shutting down
    2018-06-19 10:30:13,288 [Thread-4] INFO  k.c.ZookeeperTopicEventWatcher - Shutting down topic event watcher.
    2018-06-19 10:30:13,288 [Thread-4] INFO  k.consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1529404211455] Stopping leader finder thread
    2018-06-19 10:30:13,288 [Thread-4] INFO  k.c.ConsumerFetcherManager$LeaderFinderThread - [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread], Shutting down
    2018-06-19 10:30:13,289 [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread] INFO  k.c.ConsumerFetcherManager$LeaderFinderThread - [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread], Stopped
    2018-06-19 10:30:13,289 [Thread-4] INFO  k.c.ConsumerFetcherManager$LeaderFinderThread - [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-leader-finder-thread], Shutdown completed
    2018-06-19 10:30:13,289 [Thread-4] INFO  k.consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1529404211455] Stopping all fetchers
    2018-06-19 10:30:13,290 [Thread-4] INFO  kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-0-1001], Shutting down
    2018-06-19 10:30:13,291 [ConsumerFetcherThread-tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-0-1001] INFO  kafka.consumer.SimpleConsumer - Reconnect due to socket error: java.nio.channels.ClosedByInterruptException
    2018-06-19 10:30:13,291 [ConsumerFetcherThread-tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-0-1001] INFO  kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-0-1001], Stopped
    2018-06-19 10:30:13,291 [Thread-4] INFO  kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a-0-1001], Shutdown completed
    2018-06-19 10:30:13,292 [Thread-4] INFO  k.consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1529404211455] All connections stopped
    2018-06-19 10:30:13,294 [ZkClient-EventThread-14-10.80.145.201:2181,10.80.145.200:2181,10.80.145.199:2181] INFO  org.I0Itec.zkclient.ZkEventThread - Terminate ZkClient event thread.
    2018-06-19 10:30:13,298 [Thread-4] INFO  org.apache.zookeeper.ZooKeeper - Session: 0x1641734cd6b0000 closed
    2018-06-19 10:30:13,298 [main-EventThread] INFO  org.apache.zookeeper.ClientCnxn - EventThread shut down
    2018-06-19 10:30:13,298 [Thread-4] INFO  k.c.ZookeeperConsumerConnector - [tranquility-kafka_FR-001SLLI129-1529404211282-a8c8a12a], ZKConsumerConnector shutdown completed in 17 ms
    2018-06-19 10:30:13,298 [KafkaConsumer-CommitThread] INFO  c.m.tranquility.kafka.KafkaConsumer - Commit thread interrupted.
    2018-06-19 10:30:13,299 [Thread-4] INFO  c.m.tranquility.kafka.KafkaConsumer - Finished clean shutdown.