Reply
New Contributor
Posts: 3
Registered: ‎02-16-2018

Spark Streaming Application not communicating on Flume agent port

My goal is to use the pull method to read lines from a log file stored in a local directory, and display it using Spark Streaming. The Flume agent seems to be working fine. When I use netstat after starting Flume, I can see that the agent is listening on the port.

 

However, when I try to run the Spark script. I keep getting the same error every time it tries to gather information from the port.

 

18/02/15 15:34:03 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.io.IOException: Error connecting to localhost/127.0.0.1:9992
	at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:292)
	at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:206)
	at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:141)
	at org.apache.spark.streaming.flume.FlumePollingReceiver$$anonfun$onStart$1.apply(FlumePollingInputDStream.scala:83)
	at org.apache.spark.streaming.flume.FlumePollingReceiver$$anonfun$onStart$1.apply(FlumePollingInputDStream.scala:82)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.streaming.flume.FlumePollingReceiver.onStart(FlumePollingInputDStream.scala:82)
	at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
	at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
	at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:607)
	at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597)
	at org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2173)
	at org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2173)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused: localhost/127.0.0.1:9992
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152)
	at org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
	at org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79)
	at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
	at org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
	at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
	at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
	... 3 more

 

Here is the Flume agent configuration:

agent1.sources = webserver-log-source
agent1.channels = memory-channel
agent1.sinks = sparkSink

# Describe the source
agent1.sources.webserver-log-source.type = spooldir
agent1.sources.webserver-log-source.spoolDir = <path>
agent1.sources.webserver-log-source.channels = memory-channel

# Describe the sink
agent1.sinks.sparkSink.type = org.apache.spark.streaming.flume.sink.SparkSink
agent1.sinks.sparkSink.channel = memory-channel
agent1.sinks.sparkSink.hostname = localhost
agent1.sinks.sparkSink.port = 9992
agent1.sinks.sparkSink.batch-size = 1

# Use a channel which buffers events in memory
agent1.channels.memory-channel.type = memory
agent1.channels.memory-channel.capacity = 100000
agent1.channels.memory-channel.transactionCapacity = 1000

And here is the Spark code:

 

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils


if __name__ == "__main__":
    if len(sys.argv) != 3:
        print >> sys.stderr, "Usage: StreamingLogs.py <hostname> <port>"
        sys.exit(-1)
    
    # get hostname and port of data source from application arguments
    hostname = sys.argv[1]
    port = int(sys.argv[2])
     
    # Create a new SparkContext
    sc = SparkContext()

    # Set log level to ERROR to avoid distracting extra output
    sc.setLogLevel("ERROR")

    # Create and configure a new Streaming Context 
    # with a 1 second batch duration
    ssc = StreamingContext(sc,1)

    #Create input DStream with flume
    addresses = {(hostname, port)}
    flumeStream = FlumeUtils.createPollingStream(ssc, addresses)
	
    flumeStream.pprint()
	
    # Start the streaming context and then wait for application to terminate
    ssc.start()
    ssc.awaitTermination()

 

I've tried other ports and also the fully qualified hostname for the machine itself, and I always get the same error.

I've also noticed that the Flume agent hasn't moved anything out of the channel and into the Sink. Is that just because it's the pull method instead of the push method?

 

18/02/15 15:34:49 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider stopping
18/02/15 15:34:49 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memory-channel stopped
18/02/15 15:34:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: memory-channel. channel.start.time == 1518726765931
18/02/15 15:34:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: memory-channel. channel.stop.time == 1518726889801
18/02/15 15:34:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: memory-channel. channel.capacity == 100000
18/02/15 15:34:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: memory-channel. channel.current.size == 1297
18/02/15 15:34:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: memory-channel. channel.event.put.attempt == 1297
18/02/15 15:34:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: memory-channel. channel.event.put.success == 1297
18/02/15 15:34:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: memory-channel. channel.event.take.attempt == 0
18/02/15 15:34:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: memory-channel. channel.event.take.success == 0
18/02/15 15:34:49 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: webserver-log-source stopped
18/02/15 15:34:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SOURCE, name: webserver-log-source. source.start.time == 1518726765956
18/02/15 15:34:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SOURCE, name: webserver-log-source. source.stop.time == 1518726889802
18/02/15 15:34:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SOURCE, name: webserver-log-source. src.append-batch.accepted == 15
18/02/15 15:34:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SOURCE, name: webserver-log-source. src.append-batch.received == 15
18/02/15 15:34:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SOURCE, name: webserver-log-source. src.append.accepted == 0
18/02/15 15:34:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SOURCE, name: webserver-log-source. src.append.received == 0
18/02/15 15:34:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SOURCE, name: webserver-log-source. src.events.accepted == 1297
18/02/15 15:34:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SOURCE, name: webserver-log-source. src.events.received == 1297
18/02/15 15:34:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SOURCE, name: webserver-log-source. src.open-connection.count == 0
18/02/15 15:34:49 INFO source.SpoolDirectorySource: SpoolDir source webserver-log-source stopped. Metrics: SOURCE:webserver-log-source{src.events.accepted=1297, src.open-connection.count=0, src.append.received=0, src.append-batch.received=15, src.append-batch.accepted=15, src.append.accepted=0, src.events.received=1297}
18/02/15 15:34:49 INFO sink.SparkSink: Stopping Spark Sink: sparkSink
18/02/15 15:34:49 INFO sink.SparkAvroCallbackHandler: Shutting down Spark Avro Callback Handler
18/02/15 15:34:49 INFO sink.SparkSink: Stopping Avro Server for sink: sparkSink
 Is there a specific port I should use instead?
 
The required plugins seem to be already installed in the cloudera/parcels/CDH-5.12....../jars folder. I'm also spark2-submitting the Spark code along with the spark streaming jar since we are using Python
Announcements