import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import twitter4j.conf.ConfigurationBuilder import twitter4j.auth.OAuthAuthorization import twitter4j.Status import org.apache.spark.streaming.twitter.TwitterUtils object TwitterData { def main(args: Array[String]) { if (args.length < 4) { System.err.println("Usage: TwitterData " + "[]") System.exit(1) } val appName = "TwitterData" val conf = new SparkConf() conf.setAppName(appName).setMaster("local[3]") val ssc = new StreamingContext(conf, Seconds(5)) val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4) val filters = args.takeRight(args.length - 4) val cb = new ConfigurationBuilder cb.setDebugEnabled(true).setOAuthConsumerKey(consumerKey) .setOAuthConsumerSecret(consumerSecret) .setOAuthAccessToken(accessToken) .setOAuthAccessTokenSecret(accessTokenSecret) val auth = new OAuthAuthorization(cb.build) val tweets = TwitterUtils.createStream(ssc, Some(auth)) val englishTweets = tweets.filter(_.getLang() == "en") englishTweets .saveAsTextFiles(args(5), "json") ssc.start() ssc.awaitTermination() } }