Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Spark streaming Job to HBase stuck

avatar
Explorer

Hi all!!

I'm writing a simple Spark Streaming job that takes events from AWS Kinesis and stores them into HBase.

The job runs perfectly for the firsts batches but it rapidly gets stuck processing.

7501-screen-shot-2016-09-07-at-130840.png

The code is the following:

val hbaseConf = HBaseConfiguration.create()
val jobConfig: JobConf = new JobConf(hbaseConf, this.getClass)

jobConfig.set("hbase.zookeeper.quorum", zooQuorum)
jobConfig.set("hbase.zookeeper.property.clientPort", zooPort)
jobConfig.set("hbase.master", hbaseMaster)
jobConfig.setOutputFormat(classOf[TableOutputFormat])
jobConfig.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName)
jobConfig.setStrings("zookeeper.znode.parent", "/hbase-unsecure")
val awsCredential = new DefaultAWSCredentialsProviderChain().getCredentials()
require(
  awsCredential != null,
  "No AWS credentials found. Please specify credentials using one of the methods specified " +
    "in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html"
)
val kinesisClient = new AmazonKinesisClient(awsCredential)
kinesisClient.setEndpoint(endpointUrl)
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size

val receivedStreamName = kinesisClient.describeStream(streamName).getStreamDescription.getStreamName
val receivedStreamStatus = kinesisClient.describeStream(streamName).getStreamDescription.getStreamStatus
val numStreams = numShards
val batchInterval = Milliseconds(batchIntervalTime)
val kinesisCheckpointInterval = batchInterval
val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
val sparkConfig = new SparkConf().setAppName(appName)
val ssc = new StreamingContext(sparkConfig, batchInterval)

val kinesisStreams = (0 until numStreams).map { i =>
  KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName,
    InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
}

val unionDStreams = ssc.union(kinesisStreams)
val eventDStreams = EventExtension.transformToEventDStream(unionDStreams)

eventDStreams.foreachRDD { rdd =>
  rdd.map(_.convertToPut).saveAsHadoopDataset(jobConfig)
}

// Start the streaming context and await termination
ssc.start()
ssc.awaitTermination()

I've looked into the executor's logs and I found some problem related to zookeeper connections

16/09/08 11:29:26 WARN ClientCnxn: Session 0x0 for server ip-10-0-10-55.ec2.internal/10.0.10.55:2181, unexpected error, closing socket connection and attempting reconnect
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
    at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125)
16/09/08 11:29:26 WARN RecoverableZooKeeper: Possibly transient ZooKeeper, quorum=ip-10-0-10-55.ec2.internal:2181,ip-10-0-10-21.ec2.internal:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase-unsecure/meta-region-server
16/09/08 11:29:27 INFO MemoryStore: 3 blocks selected for dropping
16/09/08 11:29:27 INFO BlockManager: Dropping block input-1-1473333723548 from memory
16/09/08 11:29:27 INFO BlockManager: Writing block input-1-1473333723548 to disk
16/09/08 11:29:27 INFO BlockManager: Dropping block broadcast_31 from memory
16/09/08 11:29:27 INFO BlockManager: Writing block broadcast_31 to disk
16/09/08 11:29:27 INFO BlockManager: Dropping block input-1-1473333723555 from memory
16/09/08 11:29:27 INFO BlockManager: Writing block input-1-1473333723555 to disk
16/09/08 11:29:27 INFO MemoryStore: Block input-0-1473333723413 stored as values in memory (estimated size 5.9 MB, free 140.3 MB)
16/09/08 11:29:28 INFO MemoryStore: Block input-1-1473333723683 stored as values in memory (estimated size 1455.2 KB, free 141.7 MB)
16/09/08 11:29:28 INFO MemoryStore: Block input-2-1473333723407 stored as values in memory (estimated size 431.4 KB, free 142.1 MB)
16/09/08 11:29:28 INFO ClientCnxn: Opening socket connection to server ip-10-0-10-21.ec2.internal/10.0.10.21:2181. Will not attempt to authenticate using SASL (unknown error)
16/09/08 11:29:28 INFO ClientCnxn: Socket connection established to ip-10-0-10-21.ec2.internal/10.0.10.21:2181, initiating session
16/09/08 11:29:28 WARN ClientCnxn: Session 0x0 for server ip-10-0-10-21.ec2.internal/10.0.10.21:2181, unexpected error, closing socket connection and attempting reconnect
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
    at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125)

I'm using HDP 2.4.2 with Java8, HBase 1.1.2, Zookeeper 3.4.6 and Spark 1.6.1

Thanks

1 ACCEPTED SOLUTION

avatar
Explorer

Eventually increasing the zookeeper max connections solved the problem

7531-screen-shot-2016-09-11-at-134017.png

View solution in original post

5 REPLIES 5

avatar
Super Guru

The ZooKeeper exceptions and log messages seem to imply that either your client is having difficulty maintaining its heartbeat with the ZK server or the ZK server is having trouble responding. The "Connection reset by peer" is likely the ZK server just closing the connection from the client (probably to save resources), and not directly indicative of a problem.

The ConnectionLoss warning might be a sign of a large problem. Disconnected is a state in the ZK client's lifecycle in which the client is no longer in sync with the ZK server, but has the ability to re-connect to the server and sync its state. Commonly, this happens when a client application experiences garbage collection pauses which prevent the ZK heartbeat threads from running as intended.

I'd recommend that you try to determine what your Spark job is doing at the time that "it rapidly gets stuck processing". You could obtain a thread dump from the application to see what it is presently doing. Perhaps, obtain GC logs for the application via setting some system properties for JVM to pick up.

avatar
Explorer

Hi Josh and thank for your answer.

I've looked into the GC logs and didn't find something suspicious.

It always gets stuck when the Job tries to save to HBase using saveAsHadoopDataset

7530-screen-shot-2016-09-11-at-101929.png

I'm new to Spark and HBase so it is difficult for me to better spot the issue

Thanks

avatar
Explorer

If I replace "saveAsHadoopDataset" by "saveAsTextFile" the job doesn't get stuck at all

avatar
Explorer

Eventually increasing the zookeeper max connections solved the problem

7531-screen-shot-2016-09-11-at-134017.png

avatar
Super Guru

Odd that saving it as a text file doesn't cause an error, but glad you got to the bottom to it.

If you want/need any more context, I tried to capture some information on maxClientCnxns recently in https://community.hortonworks.com/articles/51191/understanding-apache-zookeeper-connection-rate-lim....