Support Questions
Find answers, ask questions, and share your expertise

How to fetch hashtag and text from tweets which is stored in kafka topic using spark api in scala

Highlighted

How to fetch hashtag and text from tweets which is stored in kafka topic using spark api in scala

Expert Contributor

am working on Hortonworks.I have stored tweets from Twitter to Kafka topic using flume.I am performing sentiment analysis on tweets using Kafka as a Producer and Spark as a Consumer using Scala on Spark-shell. But I want to fetch only specific content from tweets like Text, HashTag, Sentiment Analysis result (tweets is positive or negative), words from the tweets which I have selected as a positive or negative word. My training data is Data.txt.

<code>Data.txt contains data as below: 

like positive
doom negative
doomed negative
doubt positive

I added dependencies : org.apache.spark:spark-streaming-kafka_2.10:1.6.2,org.apache.spark:spark-streaming_2.10:1.6.2

Here is my code:

<code>

import org.apache.spark._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds 
import org.apache.spark.streaming.kafka._


val conf = new SparkConf().setMaster("local[4]").setAppName("KafkaReceiver")
val ssc = new StreamingContext(conf, Seconds(5))
val zkQuorum="sandbox.hortonworks.com:2181"
val group="test-consumer-group"
val topics="test"
val numThreads=5
val args=Array(zkQuorum, group, topics, numThreads)
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val hashTags = lines.flatMap(_.split(" ")).filter(_.startsWith("#"))
val wordSentimentFilePath = "hdfs://sandbox.hortonworks.com:8020/TwitterData/Data.txt"
val wordSentiments = ssc.sparkContext.textFile(wordSentimentFilePath).map { line =>
      val Array(word, happiness) = line.split("\t")
      (word, happiness)
    } cache()
    val happiest60 = hashTags.map(hashTag => (hashTag.tail, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)).transform{topicCount => wordSentiments.join(topicCount)}.map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}.map{case (topic, happinessValue) => (happinessValue, topic)}.transform(_.sortByKey(false))
happiest60.print()
ssc.start()

using above code I got the output like this,

(negative, fear) (positive, fitness)

But I want output like this,

(#sports,Text from the Tweets,fitness,positive)

I have no idea about how to fetch HashTag and Text from tweet to get the output like above.