Member since
06-26-2018
1
Post
0
Kudos Received
0
Solutions
06-26-2018
09:29 AM
We're running a spark application with reads data from Avro files ,creates Hfiles and bulkloads into Hbase table. Here's the error we're tackling with . Both Hbase and Spark Hdinsight clusters are on same VNET. We are using Scala 2.11, Spark 2.1 amd Hbase 1.1.2.2.6.3.2-14 versions. Exception in thread "main" java.io.IOException: Retry attempted 30 times without completing, bailing out
at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.performBulkLoad(LoadIncrementalHFiles.java:527)
at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.doBulkLoad(LoadIncrementalHFiles.java:495)
at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.doBulkLoad(LoadIncrementalHFiles.java:403)
at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.doBulkLoad(LoadIncrementalHFiles.java:359)
at com.yotabites.HBaseV2BulkLoad$.createAndLoadTable(HBaseV2BulkLoad.scala:390)
at com.yotabites.HBaseV2BulkLoad$.main(HBaseV2BulkLoad.scala:196)
at com.yotabites.HBaseV2BulkLoad.main(HBaseV2BulkLoad.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:751)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
18/06/26 07:50:04 INFO SparkContext: Invoking stop() from shutdown hook from the communty help and other questions , I did try adding more Hbase config properties in scala code like increasing max file size, improving cluster configuration, increasing no of files to be processed at a time, i tried running spark app with different no of regions. Nothing works as of now. Can someone please suggest whats going wrong here? Here's how my bulkload snippet looks like. def createAndLoadTable(tableName: String, columnFamily: String, zk: String, splitKeys: Array[Array[Byte]], outPath: String): Unit = {
println("*************** createAndLoadTable ********************")
val conf = getHBaseConf(zk)
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
println("TEST: COMPRESSION ------" + conf.get("hfile.compression") + "...")
val conn = ConnectionFactory.createConnection(conf)
println("******************** connection created ***************************")
val hAdmin = conn.getAdmin
println("***********************************")
println(hAdmin.toString())
println("*************000000000000000000**********************" + tableName)
if (!hAdmin.tableExists(TableName.valueOf(tableName))) {
println("*************111111111111111111**********************")
val hTable = new HTableDescriptor(TableName.valueOf(tableName))
println("**************22222222222222222222222*********************")
hTable.addFamily(new HColumnDescriptor(columnFamily).setCompressionType(Compression.Algorithm.SNAPPY))
println("*************333333333333333**********************")
hAdmin.createTable(hTable, splitKeys)
println("***********44444444444444444************************")
val lih = new LoadIncrementalHFiles(conf)
println("************5555555555555555555***********************")
val regionLocator = new HRegionLocator(TableName.valueOf(tableName), conn.asInstanceOf[ClusterConnection])
if (regionLocator == null) {
println("************regionLocator null ***********************")
} else {
println(regionLocator.getAllRegionLocations.toString())
println(regionLocator.getName.toString())
println(regionLocator.getConfiguration.toString())
}
if (hAdmin == null) {
println("************hAdmin null ***********************")
} else {
println(hAdmin.toString())
}
val TableN = conn.getTable(TableName.valueOf(tableName)).asInstanceOf[org.apache.hadoop.hbase.client.HTable]
println("************666666666666666666666***********************")
//lih.doBulkLoad(new Path(outPath), hAdmin, conn.getTable(TableName.valueOf(tableName)), regionLocator)
lih.doBulkLoad(new Path(outPath), TableN);
println("************777777777777777777777777***********************")
} else {
println("Table already exists.")
sys.exit(1)
}<br>
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache Spark