Created 12-13-2016 05:21 PM
I am trying to use the Spark HBase connector (1.0.0-1.6-s_2.10) to load data into HBase. The Data Frame is created off of a Hive table. After the Data Frame is created and when it tries to save it into the HBase table, it throws an error. The error indicates a type being not supported but doesn't really pin point the exact location of the issue. I have verified everything to my knowledge but not able to find out the root cause. The details are provided below.
DF Schema:
root |-- col0: string (nullable = false) |-- col1: long (nullable = true) |-- col2: string (nullable = true) |-- col3: string (nullable = true) |-- col4: string (nullable = true) |-- col5: integer (nullable = true)
HBaseCatalog:
{ "table": { "namespace": "default", "name": "%1s" }, "rowkey": "key", "columns": { "col0": { "cf": "rowkey", "col": "key", "type": "string" }, "col1": { "cf": "p", "col": "acc", "type": "bigint" }, "col2": { "cf": "p", "col": "dvc", "type": "string" }, "col3": { "cf": "p", "col": "net1", "type": "string" }, "col4": { "cf": "p", "col": "net2", "type": "string" }, "col5": { "cf": "p", "col": "clk", "type": "int" }}
Code:
Map<String, String> options = new HashMap<>(); options.put(HBaseTableCatalog.tableCatalog(), getHbaseCatalog());
df.write().options(options).format("org.apache.spark.sql.execution.datasources.hbase").save();
Error:
p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco; color: #ff2600} span.s1 {text-decoration: underline ; color: #0433ff} span.Apple-tab-span {white-space:pre}
Caused by: java.lang.Exception: unsupported data type StringType
at org.apache.spark.sql.execution.datasources.hbase.Utils$.toBytes(Utils.scala:88)
at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation$$anonfun$org$apache$spark$sql$execution$datasources$hbase$HBaseRelation$$convertToPut$1$2.apply(HBaseRelation.scala:155)
at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation$$anonfun$org$apache$spark$sql$execution$datasources$hbase$HBaseRelation$$convertToPut$1$2.apply(HBaseRelation.scala:154)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.org$apache$spark$sql$execution$datasources$hbase$HBaseRelation$$convertToPut$1(HBaseRelation.scala:154)
at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation$$anonfun$insert$1.apply(HBaseRelation.scala:161)
Created 12-15-2016 02:54 AM
We do not recommend writing data into HBase using SHC using the default internal custom format of SHC. This format is not well defined and mainly used for testing etc. This format can change without being compatible. For storing data via SHC into HBase please use a standard and robust format like Avro. Currently SHC is supports Avro and we plan to support others like Phoenix types.
Created 12-13-2016 11:04 PM
This error occurs in 'org.apache.spark.sql.execution.datasources.hbase.Utils$.toBytes()' when the value of a String field in the DF is 'null'. The value doesn't really fall anywhere in the If-Else chain and eventually throws this error. I think this can be handled by checking for 'null' and just putting a blank String in hbase.
Created 12-20-2016 12:28 AM
I was trying to reproduce this issue, but could not reproduce it. The code provided in the description have two known issues: 1. SHC does not support the format of the table name "%1s". 2. The code hits the issue “Multiple columns in a column family support works on read but not on write/update”. We need more information to dig deeper. Could you share more code info? Thanks.
Created 12-20-2016 08:51 PM
'%1s' is a java format specifier that I use to dynamically add the HBase table name in the catalog for testing purpose.
The other issue (multiple columns) you mentioned is not applicable in my case since I don't create a new table using the option 'HBaseTableCatalog.newTable -> “5”'. I use an existing pre split table.
This is the code from 'Utils$.toBytes()'. If one of the fields in the Data Frame is Null, the method toBytes() gets the first argument 'input' as Null and Null is not instance of anything. So, it eventually goes to 'label323' and throws that error. So, the only workaround at this stage is to remove the null fields from the data frame or to populate them with something which is not always feasible.
public byte[] toBytes(Object input, Field field)
{
if (field.schema().isDefined());
Object record;
Object localObject1 = input;
Object localObject2;
if (localObject1 instanceof Boolean) { boolean bool = BoxesRunTime.unboxToBoolean(localObject1); localObject2 = Bytes.toBytes(bool);
} else if (localObject1 instanceof Byte) { int i = BoxesRunTime.unboxToByte(localObject1); localObject2 = new byte[] { i };
} else if (localObject1 instanceof byte[]) { byte[] arrayOfByte = (byte[])localObject1; localObject2 = arrayOfByte;
} else if (localObject1 instanceof Double) { double d = BoxesRunTime.unboxToDouble(localObject1); localObject2 = Bytes.toBytes(d);
} else if (localObject1 instanceof Float) { float f = BoxesRunTime.unboxToFloat(localObject1); localObject2 = Bytes.toBytes(f);
} else if (localObject1 instanceof Integer) { int j = BoxesRunTime.unboxToInt(localObject1); localObject2 = Bytes.toBytes(j);
} else if (localObject1 instanceof Long) { long l = BoxesRunTime.unboxToLong(localObject1); localObject2 = Bytes.toBytes(l);
} else if (localObject1 instanceof Short) { short s = BoxesRunTime.unboxToShort(localObject1); localObject2 = Bytes.toBytes(s);
} else if (localObject1 instanceof UTF8String) { UTF8String localUTF8String = (UTF8String)localObject1; localObject2 = localUTF8String.getBytes(); } else {
if (!(localObject1 instanceof String)) break label323; String str = (String)localObject1; localObject2 = Bytes.toBytes(str);
}
return
((record = field.catalystToAvro().apply(input)) ?
AvroSedes..MODULE$.serialize(record, (Schema)field.schema().get()) : (field.sedes().isDefined()) ?
((Sedes)field.sedes().get()).serialize(input) :
localObject2);
label323: throw new Exception(new StringContext(Predef..MODULE$.wrapRefArray((Object[])new String[] { "unsupported data type ", "" })).s(Predef..MODULE$.genericWrapArray(new Object[] { field.dt() })));
}
Created 12-15-2016 02:54 AM
We do not recommend writing data into HBase using SHC using the default internal custom format of SHC. This format is not well defined and mainly used for testing etc. This format can change without being compatible. For storing data via SHC into HBase please use a standard and robust format like Avro. Currently SHC is supports Avro and we plan to support others like Phoenix types.
Created 12-16-2016 05:04 PM
Thanks! I totally agree with you and we are already using Avro in other cases. But sometimes, Avro doesn't seem like a better option when you only need to read a few columns. Storing all the fields as Avro binary in one HBase column kind of forces you to read all the columns even when you need only a few.
Created 12-19-2016 01:58 AM
I am not sure if the HBase filters that SHC provides would help here or this points to some more feature work necessary in SHC. Could you please elaborate with some code samples?