Created on 09-08-2016 12:58 PM - edited 08-19-2019 01:09 AM
Hi all!!
I'm writing a simple Spark Streaming job that takes events from AWS Kinesis and stores them into HBase.
The job runs perfectly for the firsts batches but it rapidly gets stuck processing.
The code is the following:
val hbaseConf = HBaseConfiguration.create() val jobConfig: JobConf = new JobConf(hbaseConf, this.getClass) jobConfig.set("hbase.zookeeper.quorum", zooQuorum) jobConfig.set("hbase.zookeeper.property.clientPort", zooPort) jobConfig.set("hbase.master", hbaseMaster) jobConfig.setOutputFormat(classOf[TableOutputFormat]) jobConfig.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName) jobConfig.setStrings("zookeeper.znode.parent", "/hbase-unsecure") val awsCredential = new DefaultAWSCredentialsProviderChain().getCredentials() require( awsCredential != null, "No AWS credentials found. Please specify credentials using one of the methods specified " + "in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html" ) val kinesisClient = new AmazonKinesisClient(awsCredential) kinesisClient.setEndpoint(endpointUrl) val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size val receivedStreamName = kinesisClient.describeStream(streamName).getStreamDescription.getStreamName val receivedStreamStatus = kinesisClient.describeStream(streamName).getStreamDescription.getStreamStatus val numStreams = numShards val batchInterval = Milliseconds(batchIntervalTime) val kinesisCheckpointInterval = batchInterval val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName() val sparkConfig = new SparkConf().setAppName(appName) val ssc = new StreamingContext(sparkConfig, batchInterval) val kinesisStreams = (0 until numStreams).map { i => KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2) } val unionDStreams = ssc.union(kinesisStreams) val eventDStreams = EventExtension.transformToEventDStream(unionDStreams) eventDStreams.foreachRDD { rdd => rdd.map(_.convertToPut).saveAsHadoopDataset(jobConfig) } // Start the streaming context and await termination ssc.start() ssc.awaitTermination()
I've looked into the executor's logs and I found some problem related to zookeeper connections
16/09/08 11:29:26 WARN ClientCnxn: Session 0x0 for server ip-10-0-10-55.ec2.internal/10.0.10.55:2181, unexpected error, closing socket connection and attempting reconnect java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125) 16/09/08 11:29:26 WARN RecoverableZooKeeper: Possibly transient ZooKeeper, quorum=ip-10-0-10-55.ec2.internal:2181,ip-10-0-10-21.ec2.internal:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase-unsecure/meta-region-server 16/09/08 11:29:27 INFO MemoryStore: 3 blocks selected for dropping 16/09/08 11:29:27 INFO BlockManager: Dropping block input-1-1473333723548 from memory 16/09/08 11:29:27 INFO BlockManager: Writing block input-1-1473333723548 to disk 16/09/08 11:29:27 INFO BlockManager: Dropping block broadcast_31 from memory 16/09/08 11:29:27 INFO BlockManager: Writing block broadcast_31 to disk 16/09/08 11:29:27 INFO BlockManager: Dropping block input-1-1473333723555 from memory 16/09/08 11:29:27 INFO BlockManager: Writing block input-1-1473333723555 to disk 16/09/08 11:29:27 INFO MemoryStore: Block input-0-1473333723413 stored as values in memory (estimated size 5.9 MB, free 140.3 MB) 16/09/08 11:29:28 INFO MemoryStore: Block input-1-1473333723683 stored as values in memory (estimated size 1455.2 KB, free 141.7 MB) 16/09/08 11:29:28 INFO MemoryStore: Block input-2-1473333723407 stored as values in memory (estimated size 431.4 KB, free 142.1 MB) 16/09/08 11:29:28 INFO ClientCnxn: Opening socket connection to server ip-10-0-10-21.ec2.internal/10.0.10.21:2181. Will not attempt to authenticate using SASL (unknown error) 16/09/08 11:29:28 INFO ClientCnxn: Socket connection established to ip-10-0-10-21.ec2.internal/10.0.10.21:2181, initiating session 16/09/08 11:29:28 WARN ClientCnxn: Session 0x0 for server ip-10-0-10-21.ec2.internal/10.0.10.21:2181, unexpected error, closing socket connection and attempting reconnect java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125)
I'm using HDP 2.4.2 with Java8, HBase 1.1.2, Zookeeper 3.4.6 and Spark 1.6.1
Thanks
Created on 09-11-2016 12:42 PM - edited 08-19-2019 01:09 AM
Eventually increasing the zookeeper max connections solved the problem
Created 09-08-2016 03:09 PM
The ZooKeeper exceptions and log messages seem to imply that either your client is having difficulty maintaining its heartbeat with the ZK server or the ZK server is having trouble responding. The "Connection reset by peer" is likely the ZK server just closing the connection from the client (probably to save resources), and not directly indicative of a problem.
The ConnectionLoss warning might be a sign of a large problem. Disconnected is a state in the ZK client's lifecycle in which the client is no longer in sync with the ZK server, but has the ability to re-connect to the server and sync its state. Commonly, this happens when a client application experiences garbage collection pauses which prevent the ZK heartbeat threads from running as intended.
I'd recommend that you try to determine what your Spark job is doing at the time that "it rapidly gets stuck processing". You could obtain a thread dump from the application to see what it is presently doing. Perhaps, obtain GC logs for the application via setting some system properties for JVM to pick up.
Created on 09-11-2016 07:25 AM - edited 08-19-2019 01:09 AM
Hi Josh and thank for your answer.
I've looked into the GC logs and didn't find something suspicious.
It always gets stuck when the Job tries to save to HBase using saveAsHadoopDataset
I'm new to Spark and HBase so it is difficult for me to better spot the issue
Thanks
Created 09-11-2016 08:03 AM
If I replace "saveAsHadoopDataset" by "saveAsTextFile" the job doesn't get stuck at all
Created on 09-11-2016 12:42 PM - edited 08-19-2019 01:09 AM
Eventually increasing the zookeeper max connections solved the problem
Created 09-11-2016 07:08 PM
Odd that saving it as a text file doesn't cause an error, but glad you got to the bottom to it.
If you want/need any more context, I tried to capture some information on maxClientCnxns recently in https://community.hortonworks.com/articles/51191/understanding-apache-zookeeper-connection-rate-lim....