Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Hi Team, I am doing twitter streaming by the following code:-

Highlighted

Hi Team, I am doing twitter streaming by the following code:-

New Contributor

Hi Team, I am doing twitter streaming by the following code and its not working properly.Please suggest the solution

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.internal.Logging

import org.apache.spark.streaming.twitter._

object TwitterStreaming {

def main(args:Array[String]) {

var conf = new SparkConf().setMaster("local[*]").setAppName("socketStreaming").set("spark.network.timeout", "6s").set("spark.executor.heartbeatInterval", "1000s").set("spark.executor.memory", "2g").set("spark.driver.memory", "8g");

var sc = new SparkContext(conf); sc.addJar("/root/heena/simple_project/spark-streaming-twitter_2.10-1.2.0.jar"); sc.addJar("/root/heena/simple_project/twitter4j-stream-4.0.2.jar");

sc.addJar("/root/heena/simple_project/twitter4j-core-4.0.2.jar");

sc.addJar("/root/heena/simple_project/spark-core_2.11-1.5.2.logging.jar");

//sc.stop(); //Write this code here as writing at first causes the error

import org.apache.spark.streaming._

import org.apache.spark.streaming.twitter._

import twitter4j.auth._

import twitter4j.conf._

import org.apache.spark.internal.Logging

val ssc = new StreamingContext(sc,Seconds(1));

val socket = ssc.socketTextStream("localhost",8585);

val cb = new ConfigurationBuilder;

cb.setDebugEnabled(true).setOAuthConsumerKey("IqmtNPPtRAkdzDvTxdE5HrXYp").setOAuthConsumerSecret("Ul8ttM4Jmafa8z9T4rKmgU6TMx09BL0C7HduhWUzhJw8GLRSdJ").setOAuthAccessToken("2858023206-ku6vr7wjCJlhJZ6AUWYyWXLrBCaI2PZWxtPjvvy").setOAuthAccessTokenSecret("zaAZSuoWtnhWEjH7GtrrtHiLyP2LcZAGlBWUroGJLpbqg")

val auth = new OAuthAuthorization(cb.build);

val tweets = TwitterUtils.createStream(ssc,Some(auth));

val englishTweets = tweets.filter(_.getLang()=="en");

val status = englishTweets.map(status=>status.getText)

ssc.checkpoint("hdfs://localhost:9000/user/heena")

ssc.start

ssc.awaitTermination

}}

My build.sbt is

name := "twitter4j-tutorial" version := "0.1.0 " scalaVersion := "2.10.0" libraryDependencies += "org.twitter4j" % "twitter4j-stream" % "3.0.3"

Don't have an account?
Coming from Hortonworks? Activate your account here