Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Problems with Flafka in Quickstart Cloudera VM CDH 5.7

Highlighted

Problems with Flafka in Quickstart Cloudera VM CDH 5.7

New Contributor

Hi all,

 

I'm using "Flafka" in Quickstart Cloudera VM CDH 5.7 to build a pipeline to process tweets using Kafka as channel and Spark as Sink. I've done the following configuration in Flume:

 

TwitterAgent.sources = Twitter
TwitterAgent.sinks =spark
TwitterAgent.channels=KafkaChannel

TwitterAgent.sources.Twitter.type=com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.consumerKey =****
TwitterAgent.sources.Twitter.consumerSecret =****
TwitterAgent.sources.Twitter.accessToken =****
TwitterAgent.sources.Twitter.accessTokenSecret =****
TwitterAgent.sources.Twitter.language=en
TwitterAgent.sources.Twitter.channels = KafkaChannel

 

# Other properties are specific to each type of
# source, channel, or sink. In this case, we
# specify the capacity of the memory channel.
# Use a channel which buffers events in memory
TwitterAgent.channels.KafkaChannel.type=org.apache.flume.channel.kafka.KafkaChannel
TwitterAgent.channels.KafkaChannel.capacity=10000
TwitterAgent.channels.KafkaChannel.transactionCapacity=1000
TwitterAgent.channels.KafkaChannel.brokerList=quickstart.cloudera:9092
TwitterAgent.channels.KafkaChannel.topic=tweets
TwitterAgent.channels.KafkaChannel.zookeeperConnect=quickstart.cloudera:2181
TwitterAgent.channels.KafkaChannel.parseAsFlumeEvent=true
TwitterAgent.channels.KafkaChannel.kafka.consumer.timeout.ms=100000


# Describe the sink
TwitterAgent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
TwitterAgent.sinks.spark.hostname=quickstart.cloudera
TwitterAgent.sinks.spark.port=8677
TwitterAgent.sinks.spark.channel=KafkaChannel

 

I've configured a Spark Streaming process that consumes tweets in this way:


spark.app.name = TwitterSentimentAnalysis
spark.master = local[2]
spark.batch.duration = 60
spark.hostname=quickstart.cloudera
spark.port=8677

 

I retrieve Flume events in my Spark Streaming process using this:

 

FlumeUtils.createPollingStream(javaStreamingContext, context.getString(SPARK_HOSTNAME), Integer.parseInt(context.getString(SPARK_PORT))); 

 

 

When I launch Flume Service and the Spark Streaming process all works fine and it receives tweets and process them without any problem in the first window interval, but in the next intervals, Spark doesn't receive any flume event. I've checked Flume logs but I didn't found exceptions or errors.

 

Could anyone help me to find the reason of this strange behaviour?

 

Thank you in advance.

 

 

Don't have an account?
Coming from Hortonworks? Activate your account here