Member since 
    
	
		
		
		09-08-2016
	
	
	
	
	
	
	
	
	
	
	
	
	
	
			
      
                4
            
            
                Posts
            
        
                2
            
            
                Kudos Received
            
        
                1
            
            
                Solution
            
        My Accepted Solutions
| Title | Views | Posted | 
|---|---|---|
| 3604 | 09-11-2016 12:42 PM | 
			
    
	
		
		
		09-11-2016
	
		
		12:42 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 Eventually increasing the zookeeper max connections solved the problem     
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		09-11-2016
	
		
		08:03 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 If I replace "saveAsHadoopDataset" by "saveAsTextFile"  the job doesn't get stuck at all 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		09-11-2016
	
		
		07:25 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Hi Josh and thank for your answer.  I've looked into the GC logs and didn't find something suspicious.   It always gets stuck when the Job tries to save to HBase using saveAsHadoopDataset      I'm new to Spark and HBase so it is difficult for me to better spot the issue  Thanks 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		09-08-2016
	
		
		12:58 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 Hi all!!  I'm writing a simple Spark Streaming job that takes events from AWS Kinesis and stores them into HBase.   The job runs perfectly for the firsts batches but it rapidly gets stuck processing.      The code is the following:  val hbaseConf = HBaseConfiguration.create()
val jobConfig: JobConf = new JobConf(hbaseConf, this.getClass)
jobConfig.set("hbase.zookeeper.quorum", zooQuorum)
jobConfig.set("hbase.zookeeper.property.clientPort", zooPort)
jobConfig.set("hbase.master", hbaseMaster)
jobConfig.setOutputFormat(classOf[TableOutputFormat])
jobConfig.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName)
jobConfig.setStrings("zookeeper.znode.parent", "/hbase-unsecure")
val awsCredential = new DefaultAWSCredentialsProviderChain().getCredentials()
require(
  awsCredential != null,
  "No AWS credentials found. Please specify credentials using one of the methods specified " +
    "in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html"
)
val kinesisClient = new AmazonKinesisClient(awsCredential)
kinesisClient.setEndpoint(endpointUrl)
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
val receivedStreamName = kinesisClient.describeStream(streamName).getStreamDescription.getStreamName
val receivedStreamStatus = kinesisClient.describeStream(streamName).getStreamDescription.getStreamStatus
val numStreams = numShards
val batchInterval = Milliseconds(batchIntervalTime)
val kinesisCheckpointInterval = batchInterval
val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
val sparkConfig = new SparkConf().setAppName(appName)
val ssc = new StreamingContext(sparkConfig, batchInterval)
val kinesisStreams = (0 until numStreams).map { i =>
  KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName,
    InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
}
val unionDStreams = ssc.union(kinesisStreams)
val eventDStreams = EventExtension.transformToEventDStream(unionDStreams)
eventDStreams.foreachRDD { rdd =>
  rdd.map(_.convertToPut).saveAsHadoopDataset(jobConfig)
}
// Start the streaming context and await termination
ssc.start()
ssc.awaitTermination()  I've looked into the executor's logs and I found some problem related to zookeeper connections  16/09/08 11:29:26 WARN ClientCnxn: Session 0x0 for server ip-10-0-10-55.ec2.internal/10.0.10.55:2181, unexpected error, closing socket connection and attempting reconnect
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
    at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125)
16/09/08 11:29:26 WARN RecoverableZooKeeper: Possibly transient ZooKeeper, quorum=ip-10-0-10-55.ec2.internal:2181,ip-10-0-10-21.ec2.internal:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase-unsecure/meta-region-server
16/09/08 11:29:27 INFO MemoryStore: 3 blocks selected for dropping
16/09/08 11:29:27 INFO BlockManager: Dropping block input-1-1473333723548 from memory
16/09/08 11:29:27 INFO BlockManager: Writing block input-1-1473333723548 to disk
16/09/08 11:29:27 INFO BlockManager: Dropping block broadcast_31 from memory
16/09/08 11:29:27 INFO BlockManager: Writing block broadcast_31 to disk
16/09/08 11:29:27 INFO BlockManager: Dropping block input-1-1473333723555 from memory
16/09/08 11:29:27 INFO BlockManager: Writing block input-1-1473333723555 to disk
16/09/08 11:29:27 INFO MemoryStore: Block input-0-1473333723413 stored as values in memory (estimated size 5.9 MB, free 140.3 MB)
16/09/08 11:29:28 INFO MemoryStore: Block input-1-1473333723683 stored as values in memory (estimated size 1455.2 KB, free 141.7 MB)
16/09/08 11:29:28 INFO MemoryStore: Block input-2-1473333723407 stored as values in memory (estimated size 431.4 KB, free 142.1 MB)
16/09/08 11:29:28 INFO ClientCnxn: Opening socket connection to server ip-10-0-10-21.ec2.internal/10.0.10.21:2181. Will not attempt to authenticate using SASL (unknown error)
16/09/08 11:29:28 INFO ClientCnxn: Socket connection established to ip-10-0-10-21.ec2.internal/10.0.10.21:2181, initiating session
16/09/08 11:29:28 WARN ClientCnxn: Session 0x0 for server ip-10-0-10-21.ec2.internal/10.0.10.21:2181, unexpected error, closing socket connection and attempting reconnect
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
    at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125)
  I'm using HDP 2.4.2 with Java8, HBase 1.1.2, Zookeeper 3.4.6 and Spark 1.6.1  Thanks 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
			
	
					
			
		
	
	
	
	
				
		
	
	
- Labels:
- 
						
							
		
			Apache HBase
- 
						
							
		
			Apache Spark
 
        







