Created 06-26-2018 09:29 AM
We're running a spark application with reads data from Avro files ,creates Hfiles and bulkloads into Hbase table. Here's the error we're tackling with . Both Hbase and Spark Hdinsight clusters are on same VNET. We are using Scala 2.11, Spark 2.1 amd Hbase 1.1.2.2.6.3.2-14 versions.
Exception in thread "main" java.io.IOException: Retry attempted 30 times without completing, bailing out at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.performBulkLoad(LoadIncrementalHFiles.java:527) at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.doBulkLoad(LoadIncrementalHFiles.java:495) at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.doBulkLoad(LoadIncrementalHFiles.java:403) at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.doBulkLoad(LoadIncrementalHFiles.java:359) at com.yotabites.HBaseV2BulkLoad$.createAndLoadTable(HBaseV2BulkLoad.scala:390) at com.yotabites.HBaseV2BulkLoad$.main(HBaseV2BulkLoad.scala:196) at com.yotabites.HBaseV2BulkLoad.main(HBaseV2BulkLoad.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:751) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 18/06/26 07:50:04 INFO SparkContext: Invoking stop() from shutdown hook
from the communty help and other questions , I did try adding more Hbase config properties in scala code like increasing max file size, improving cluster configuration, increasing no of files to be processed at a time, i tried running spark app with different no of regions. Nothing works as of now.
Can someone please suggest whats going wrong here?
Here's how my bulkload snippet looks like.
def createAndLoadTable(tableName: String, columnFamily: String, zk: String, splitKeys: Array[Array[Byte]], outPath: String): Unit = { println("*************** createAndLoadTable ********************") val conf = getHBaseConf(zk) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); println("TEST: COMPRESSION ------" + conf.get("hfile.compression") + "...") val conn = ConnectionFactory.createConnection(conf) println("******************** connection created ***************************") val hAdmin = conn.getAdmin println("***********************************") println(hAdmin.toString()) println("*************000000000000000000**********************" + tableName) if (!hAdmin.tableExists(TableName.valueOf(tableName))) { println("*************111111111111111111**********************") val hTable = new HTableDescriptor(TableName.valueOf(tableName)) println("**************22222222222222222222222*********************") hTable.addFamily(new HColumnDescriptor(columnFamily).setCompressionType(Compression.Algorithm.SNAPPY)) println("*************333333333333333**********************") hAdmin.createTable(hTable, splitKeys) println("***********44444444444444444************************") val lih = new LoadIncrementalHFiles(conf) println("************5555555555555555555***********************") val regionLocator = new HRegionLocator(TableName.valueOf(tableName), conn.asInstanceOf[ClusterConnection]) if (regionLocator == null) { println("************regionLocator null ***********************") } else { println(regionLocator.getAllRegionLocations.toString()) println(regionLocator.getName.toString()) println(regionLocator.getConfiguration.toString()) } if (hAdmin == null) { println("************hAdmin null ***********************") } else { println(hAdmin.toString()) } val TableN = conn.getTable(TableName.valueOf(tableName)).asInstanceOf[org.apache.hadoop.hbase.client.HTable] println("************666666666666666666666***********************") //lih.doBulkLoad(new Path(outPath), hAdmin, conn.getTable(TableName.valueOf(tableName)), regionLocator) lih.doBulkLoad(new Path(outPath), TableN); println("************777777777777777777777777***********************") } else { println("Table already exists.") sys.exit(1) }<br>
Created on 04-26-2021 12:04 AM - edited 04-26-2021 12:26 AM
Hi mjadhav570,
Did you find a solution to that problem?
I'm using Hbase connector in my Spark application, and I'm struggling with the same error.
Created on 06-21-2021 10:38 AM - edited 06-21-2021 11:04 AM
Hi
I have this exact same issue running Bulkload process using on HDP-3.1.4 env, in my case I had an encryption zone setup for HBASE where the hfiles was not created/located into hbase ez, then I found into cloudera docs that you must have the hfiles placed/generated into hbase ez so bulkload can load the hfiles, then I moved my hfiles into hbase ez and my bulkload works fine.
I'm not sure if that is your use case, but if yes you have your solution now.
Another option is to remove your HBASE from the encryption zones, what I did tested and the bulkload works as well