Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Streaming twitter data from flume to spark for analysis issues

Highlighted

Streaming twitter data from flume to spark for analysis issues

Rising Star

Hello,

 

I am using official flume+spark configuration as mentioned in documentation, but after registering to host and port number flume is never able to send events successfully. on the other side spark TID never recieves anything more like its missed.

 

Below is my configuration: 

 

TwitterAgent1.sources = PublicStream2
TwitterAgent1.channels = fileCh2
TwitterAgent1.sinks = avrosink2


TwitterAgent1.sources.PublicStream2.type = com.cloudsigma.flume.twitter.TwitterSource
TwitterAgent1.sources.PublicStream2.channels = fileCh2
TwitterAgent1.sources.PublicStream2.consumerKey =
TwitterAgent1.sources.PublicStream2.consumerSecret =
TwitterAgent.sources.PublicStream2.accessToken =
TwitterAgent1.sources.PublicStream2.accessTokenSecret =
TwitterAgent1.sources.PublicStream2.keywords = some keywrds

#TwitterAgent1.sources.PublicStream2.locations = -,-
TwitterAgent1.sources.PublicStream2.language = en
TwitterAgent1.sources.PublicStream2.follow =,

TwitterAgent1.sinks.avrosink2.type = avro
TwitterAgent1.sinks.avrosink2.batch-size = 1
TwitterAgent1.sinks.avrosink2.hostname = 1x5.3x.3.1x2    -->  IP of host as i am in cluster
TwitterAgent1.sinks.avrosink2.port = 9988
TwitterAgent1.sinks.avrosink2.channel = fileCh2


TwitterAgent1.channels.fileCh2.type = file
TwitterAgent1.channels.fileCh2.capacity = 10000
TwitterAgent1.channels.fileCh2.transactionCapacity = 10000

 

Code for pyspark:

 

try:
# create SparkContext on all CPUs available: in my case I have 4 CPUs on my laptop
conf = SparkConf().setAppName("tweeterAnalysis")
sc = ps.SparkContext(conf=conf)
sqlContext = SQLContext(sc)
print("Just created a SparkContext")

except ValueError:
warnings.warn("SparkContext already exists in this scope")

 

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 10)
flumeStream = FlumeUtils.createStream(ssc, "pan0142.panoulu.net", 41414)

 

lines = flumeStream.map(lambda x: x[1])

ssc.start()
ssc.awaitTermination()

 

 

Error:

 

Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send events
	at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:389)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: pan0143.panoulu.net, port: 41414 }: Failed to send batch
	at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:314)
	at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:373)
	... 3 more

Spark:

 

WARN scheduler.TaskSetManager: Lost task 0.0 in stage 17093.0 (TID 32941, xxx.lu.net, executor 24): org.jboss.netty.channel.ChannelException: Failed to bind to: pan0143.panoulu.net/185.38.3.143:41414
at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:106)

 

 

any pne, please help