Reply
Highlighted
Contributor
Posts: 54
Registered: ‎06-24-2018

Streaming twitter data from flume to spark for analysis issues

Hello,

 

I am using official flume+spark configuration as mentioned in documentation, but after registering to host and port number flume is never able to send events successfully. on the other side spark TID never recieves anything more like its missed.

 

Below is my configuration: 

 

TwitterAgent1.sources = PublicStream2
TwitterAgent1.channels = fileCh2
TwitterAgent1.sinks = avrosink2


TwitterAgent1.sources.PublicStream2.type = com.cloudsigma.flume.twitter.TwitterSource
TwitterAgent1.sources.PublicStream2.channels = fileCh2
TwitterAgent1.sources.PublicStream2.consumerKey =
TwitterAgent1.sources.PublicStream2.consumerSecret =
TwitterAgent.sources.PublicStream2.accessToken =
TwitterAgent1.sources.PublicStream2.accessTokenSecret =
TwitterAgent1.sources.PublicStream2.keywords = some keywrds

#TwitterAgent1.sources.PublicStream2.locations = -,-
TwitterAgent1.sources.PublicStream2.language = en
TwitterAgent1.sources.PublicStream2.follow =,

TwitterAgent1.sinks.avrosink2.type = avro
TwitterAgent1.sinks.avrosink2.batch-size = 1
TwitterAgent1.sinks.avrosink2.hostname = 1x5.3x.3.1x2    -->  IP of host as i am in cluster
TwitterAgent1.sinks.avrosink2.port = 9988
TwitterAgent1.sinks.avrosink2.channel = fileCh2


TwitterAgent1.channels.fileCh2.type = file
TwitterAgent1.channels.fileCh2.capacity = 10000
TwitterAgent1.channels.fileCh2.transactionCapacity = 10000

 

Code for pyspark:

 

try:
# create SparkContext on all CPUs available: in my case I have 4 CPUs on my laptop
conf = SparkConf().setAppName("tweeterAnalysis")
sc = ps.SparkContext(conf=conf)
sqlContext = SQLContext(sc)
print("Just created a SparkContext")

except ValueError:
warnings.warn("SparkContext already exists in this scope")

 

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 10)
flumeStream = FlumeUtils.createStream(ssc, "pan0142.panoulu.net", 41414)

 

lines = flumeStream.map(lambda x: x[1])

ssc.start()
ssc.awaitTermination()

 

 

Error:

 

Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send events
	at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:389)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: pan0143.panoulu.net, port: 41414 }: Failed to send batch
	at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:314)
	at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:373)
	... 3 more

Spark:

 

WARN scheduler.TaskSetManager: Lost task 0.0 in stage 17093.0 (TID 32941, xxx.lu.net, executor 24): org.jboss.netty.channel.ChannelException: Failed to bind to: pan0143.panoulu.net/185.38.3.143:41414
at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:106)

 

 

any pne, please help