Member since
05-16-2019
6
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3334 | 05-16-2019 02:54 PM |
01-07-2020
11:54 AM
Trying to migrate code from Spark 1.6, Scala 2.10 to Spark 2.4, Scala 2.11, and having trouble compiling code that connects to HBase. Showing dependency versions, minimal example and compilation error below.
I understand that CDH 5 does not work with Spark 2. I can migrate to CDH 6.2.0 - but in that case, how can I connect to HBase? (Code samples, please.)
https://docs.cloudera.com/documentation/spark2/latest/topics/spark2_known_issues.html#ki_spark_on_hbase
// Dependencies
, "org.apache.spark" %% "spark-core" % "2.4.0"
, "org.apache.spark" %% "spark-sql" % "2.4.0"
, "org.apache.hbase" % "hbase-server" % "1.2.0-cdh5.14.4"
, "org.apache.hbase" % "hbase-common" % "1.2.0-cdh5.14.4"
, "org.apache.hbase" % "hbase-spark" % "1.2.0-cdh5.14.4"
// Minimal example
package spark2.hbase
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
object ConnectToHBase {
def main(args: Array[String]): Unit = {
implicit val spark: SparkSession = SparkSession.builder.appName("Connect to HBase from Spark 2")
.config("spark.master", "local")
.getOrCreate()
implicit val sc: SparkContext = spark.sparkContext
val hbaseConf = HBaseConfiguration.create()
val hbaseContext = new HBaseContext(sc, hbaseConf)
}
}
// Compilation error
[error] missing or invalid dependency detected while loading class file 'HBaseContext.class'.
[error] Could not access type Logging in package org.apache.spark,
[error] because it (or its dependencies) are missing. Check your build definition for
[error] missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)
[error] A full rebuild may help if 'HBaseContext.class' was compiled against an incompatible version of org.apache.spark.
... View more
Labels:
- Labels:
-
Apache HBase
-
Apache Spark
05-16-2019
02:54 PM
Mystery solved: the columns are not null. I am new to all this. What happened was that I ran queries in the Impala CLI like so: SELECT * FROM table_name WHERE pk_col = some_value; This returns the values of the PK and a lot of empty spaces, so I thought all those were NULLs. Even this query behaves the same way: SELECT col1, col2 FROM table_name WHERE pk_col = some_value; The col1 value gets printed, but not col2. But this works as expected: SELECT col2 FROM table_name WHERE pk_col = some_value; What I am learning here is that Kudu is a columnar data store and (for some reason) you cannot query more than one column at a time. But the data is there, and if you query it one column at a time, you can see it. Not sure why Impala has to behave this way when querying a Kudu table (e.g. throw no errors / give no hint of what is going on), but I am new to all of this.
... View more
05-16-2019
02:08 PM
Attachments don't seem to work, so here is the code import com.myco.util.config.AppConfig import com.myco.util.jdbc.{ExtractStructure, Table} import org.apache.kudu.spark.kudu._ import org.apache.spark.sql.{Dataset, SparkSession} object TableNameToKudu { def main(args: Array[String]): Unit = { val appConfig: AppConfig = AppConfig() val dataExport = appConfig.dataExport val dataModel: Map[String, Table] = ExtractStructure.dataModel(appConfig.config) val tableNameTable: Table = dataModel("table_name") val colNamesLower: Seq[String] = tableNameTable.columnNames.map(_.toLowerCase) val customSchema: String = tableNameTable.toSparkSchema.mkString(", ") val spark = SparkSession.builder.appName("Save TableName in Kudu format") .config("spark.master", "local") .config("spark.sql.warehouse.dir", "hdfs://server.name.myco.com:1234/user/hive/warehouse") .getOrCreate() val kuduContext = new KuduContext(appConfig.kuduMaster, spark.sparkContext) val minId = 1L val maxId = 400000000000L val step = (maxId - minId) / dataExport.getInt("numParts") val partitions = Range.Long(minId, maxId, step).map { start => val end = start + step s"$start <= equity_uid AND equity_uid < $end" }.toArray val props = appConfig.jdbcProps props.put("customSchema", customSchema) val startTime = System.currentTimeMillis() // Need this import to get an implicit encoder to make this compile: ".as[case_class_for_table]" import spark.implicits._ val df: Dataset[case_class_for_table] = spark.read .option("fetchsize", appConfig.fetchsize.toString) .option("driver", appConfig.jdbcDriver) .jdbc(appConfig.dbURL, "schema_name.table_name", partitions, props) .as[case_class_for_table] kuduContext.insertRows(df.toDF(colNamesLower: _*), "impala::schema_name.table_name") val endTime = System.currentTimeMillis() println("TOTAL TIME = " + (endTime - startTime)) } }
... View more
05-16-2019
02:05 PM
I am trying to copy a table from an Oracle DB to an Impala table having the same structure, in Spark, through Kudu. This is intended to be a 1-to-1 copy of data from Oracle to Impala. We found that all the rows have been copied, but every column except for the PK (partition key) is null everywhere. Why would that happen?
I have extracted the Oracle schema of the source table and created a target Impala table with the same structure (same column names, converted to lower case, and a reasonable mapping of data types). The PK of the Oracle table is the PK and "PARTITION BY HASH" of the Impala table. The Impala table is "STORED AS KUDU".
We used Spark to read the data from Oracle. Then we used a kuduContext to insert the data into Impala. No errors were raised, the row counts match, and we can find the same values in the table PK in Oracle and in Impala. But in Impala, every column except the PK is null everywhere. The PK which gets populated correctly) is a NUMBER in Oracle and an int64 in Impala. Other columns of the same type end up being null. How can we troubleshoot this? Attaching the (anonymized) code we used. You can see also details at StackOverflow.
... View more
Labels:
- Labels:
-
Apache Impala
-
Apache Kudu
-
Apache Spark