Member since
09-08-2016
4
Posts
2
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2883 | 09-11-2016 12:42 PM |
09-11-2016
12:42 PM
1 Kudo
Eventually increasing the zookeeper max connections solved the problem
... View more
09-11-2016
08:03 AM
If I replace "saveAsHadoopDataset" by "saveAsTextFile" the job doesn't get stuck at all
... View more
09-11-2016
07:25 AM
Hi Josh and thank for your answer. I've looked into the GC logs and didn't find something suspicious. It always gets stuck when the Job tries to save to HBase using saveAsHadoopDataset I'm new to Spark and HBase so it is difficult for me to better spot the issue Thanks
... View more
09-08-2016
12:58 PM
1 Kudo
Hi all!! I'm writing a simple Spark Streaming job that takes events from AWS Kinesis and stores them into HBase. The job runs perfectly for the firsts batches but it rapidly gets stuck processing. The code is the following: val hbaseConf = HBaseConfiguration.create()
val jobConfig: JobConf = new JobConf(hbaseConf, this.getClass)
jobConfig.set("hbase.zookeeper.quorum", zooQuorum)
jobConfig.set("hbase.zookeeper.property.clientPort", zooPort)
jobConfig.set("hbase.master", hbaseMaster)
jobConfig.setOutputFormat(classOf[TableOutputFormat])
jobConfig.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName)
jobConfig.setStrings("zookeeper.znode.parent", "/hbase-unsecure")
val awsCredential = new DefaultAWSCredentialsProviderChain().getCredentials()
require(
awsCredential != null,
"No AWS credentials found. Please specify credentials using one of the methods specified " +
"in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html"
)
val kinesisClient = new AmazonKinesisClient(awsCredential)
kinesisClient.setEndpoint(endpointUrl)
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
val receivedStreamName = kinesisClient.describeStream(streamName).getStreamDescription.getStreamName
val receivedStreamStatus = kinesisClient.describeStream(streamName).getStreamDescription.getStreamStatus
val numStreams = numShards
val batchInterval = Milliseconds(batchIntervalTime)
val kinesisCheckpointInterval = batchInterval
val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
val sparkConfig = new SparkConf().setAppName(appName)
val ssc = new StreamingContext(sparkConfig, batchInterval)
val kinesisStreams = (0 until numStreams).map { i =>
KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName,
InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
}
val unionDStreams = ssc.union(kinesisStreams)
val eventDStreams = EventExtension.transformToEventDStream(unionDStreams)
eventDStreams.foreachRDD { rdd =>
rdd.map(_.convertToPut).saveAsHadoopDataset(jobConfig)
}
// Start the streaming context and await termination
ssc.start()
ssc.awaitTermination() I've looked into the executor's logs and I found some problem related to zookeeper connections 16/09/08 11:29:26 WARN ClientCnxn: Session 0x0 for server ip-10-0-10-55.ec2.internal/10.0.10.55:2181, unexpected error, closing socket connection and attempting reconnect
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125)
16/09/08 11:29:26 WARN RecoverableZooKeeper: Possibly transient ZooKeeper, quorum=ip-10-0-10-55.ec2.internal:2181,ip-10-0-10-21.ec2.internal:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase-unsecure/meta-region-server
16/09/08 11:29:27 INFO MemoryStore: 3 blocks selected for dropping
16/09/08 11:29:27 INFO BlockManager: Dropping block input-1-1473333723548 from memory
16/09/08 11:29:27 INFO BlockManager: Writing block input-1-1473333723548 to disk
16/09/08 11:29:27 INFO BlockManager: Dropping block broadcast_31 from memory
16/09/08 11:29:27 INFO BlockManager: Writing block broadcast_31 to disk
16/09/08 11:29:27 INFO BlockManager: Dropping block input-1-1473333723555 from memory
16/09/08 11:29:27 INFO BlockManager: Writing block input-1-1473333723555 to disk
16/09/08 11:29:27 INFO MemoryStore: Block input-0-1473333723413 stored as values in memory (estimated size 5.9 MB, free 140.3 MB)
16/09/08 11:29:28 INFO MemoryStore: Block input-1-1473333723683 stored as values in memory (estimated size 1455.2 KB, free 141.7 MB)
16/09/08 11:29:28 INFO MemoryStore: Block input-2-1473333723407 stored as values in memory (estimated size 431.4 KB, free 142.1 MB)
16/09/08 11:29:28 INFO ClientCnxn: Opening socket connection to server ip-10-0-10-21.ec2.internal/10.0.10.21:2181. Will not attempt to authenticate using SASL (unknown error)
16/09/08 11:29:28 INFO ClientCnxn: Socket connection established to ip-10-0-10-21.ec2.internal/10.0.10.21:2181, initiating session
16/09/08 11:29:28 WARN ClientCnxn: Session 0x0 for server ip-10-0-10-21.ec2.internal/10.0.10.21:2181, unexpected error, closing socket connection and attempting reconnect
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125)
I'm using HDP 2.4.2 with Java8, HBase 1.1.2, Zookeeper 3.4.6 and Spark 1.6.1 Thanks
... View more
Labels:
- Labels:
-
Apache HBase
-
Apache Spark