Created on 02-18-2021 07:56 AM - edited 02-18-2021 07:58 AM
Hi
I am trying to get HWC connector working but there is no documentation about what libraries to include and how to set it up. My project works perfectly until I try to use the Hive Warehouse Connector.
The error I get is:
Exception in thread "main" java.lang.NoSuchMethodError: 'shaded.hwc.org.apache.thrift.transport.TTransport org.apache.hadoop.hive.common.auth.HiveAuthUtils.getSocketTransport(java.lang.String, int, int)'
at org.apache.hive.jdbc.HiveConnection.createUnderlyingTransport(HiveConnection.java:659)
at org.apache.hive.jdbc.HiveConnection.createBinaryTransport(HiveConnection.java:679)
What I'm doing is (very simple):
SparkSession spark = SparkSession.builder()
.appName("App Name")
.enableHiveSupport()
.config("spark.sql.hive.hiveserver2.jdbc.url","jdbc:hive2://maksed.this.url:10000/default")
.config("spark.datasource.hive.warehouse.load.staging.dir","/tmp")
.config("spark.hadoop.hive.llap.daemon.service.hosts","@llap0")
.config("spark.sql.hive.hiveserver2.jdbc.url.principal","user@realm.CO.ZA")
.master("local")
.getOrCreate();
HiveWarehouseSession hive = HiveWarehouseSession.session(spark).build();
hive.showTables();
So I have mounted the Hadoop site files which I download from Cloudera Manager (core-site.xml, hdfs-site.xml, hive-site.xml,yarn-site.xml), and that how spark finds its Hadoop config
If I run the same thing with pyspark in the shell on a node I can connect fine and everything works it's just when I port it to a maven project.
My instinct tells me perhaps I am missing a maven dependency?
My pom.xml looks like this:
<properties>
<java.version>1.8</java.version>
<scala.version>2.11</scala.version>
<spark.version>2.4.0.7.1.4.0-203</spark.version>
<hwc.version>1.0.0.7.1.4.0-203</hwc.version>
</properties>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.hortonworks.hive</groupId>
<artifactId>hive-warehouse-connector_2.11</artifactId>
<version>${hwc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
</exclusions>
</dependency>
Am I missing something?
Created 02-19-2021 03:19 AM
Hi Keith,
Please find the sample spark-shell code:
sudo -u hive spark-shell \
--jars /opt/cloudera/parcels/CDH/jars/hive-warehouse-connector-assembly-*.jar \
--conf spark.sql.hive.hiveserver2.jdbc.url='jdbc:hive2://localhost:10000' \
--conf spark.sql.hive.hwc.execution.mode=spark \
--conf spark.datasource.hive.warehouse.metastoreUri='thrift://localhost:9083' \
--conf spark.datasource.hive.warehouse.load.staging.dir='/tmp' \
--conf spark.datasource.hive.warehouse.user.name=hive \
--conf spark.datasource.hive.warehouse.password=hive \
--conf spark.datasource.hive.warehouse.smartExecution=false \
--conf spark.datasource.hive.warehouse.read.via.llap=false \
--conf spark.datasource.hive.warehouse.read.jdbc.mode=cluster \
--conf spark.datasource.hive.warehouse.read.mode=DIRECT_READER_V2 \
--conf spark.security.credentials.hiveserver2.enabled=false \
--conf spark.sql.extensions=com.hortonworks.spark.sql.rule.Extensions
import com.hortonworks.hwc.HiveWarehouseSession
import com.hortonworks.hwc.HiveWarehouseSession._
val hive = HiveWarehouseSession.session(spark).build()
hive.createDatabase("hwc_db", true)
hive.dropTable("employee", true, true)
hive.createTable("employee").ifNotExists().column("id", "bigint").column("name", "string").column("age", "smallint").column("salary", "double").column("manager_id", "bigint").create()
hive.showTables().show(truncate=false)
val empData = Seq((1, "Ranga", 32, 245000.30, 1), (2, "Nishanth", 2, 345000.10, 1), (3, "Raja", 32, 245000.86, 2), (4, "Mani", 14, 45000.00, 3))
val employeeDF = empData.toDF("id", "name", "age", "salary", "manager_id")
employeeDF.write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table", "employee").save()
hive.executeQuery("select * from hwc_db.employee").show(truncate=false)
Created on 02-19-2021 03:48 AM - edited 02-19-2021 06:33 AM
Thank you for your response
If you read my question I mention that it works fine with the spark-shell. I can use HWC with no issue with pyspark or spark-shell. When i separate it into a maven project I can't get it to work - I imagine I don't have the correct dependencies in the pom. Similarly, this is an issue for an sbt project
You have given me some useful config to try, so thanks for that
There are no "hello world" examples using Cloudera jars and an example pom with HWC
I'm hoping somebody who has done this with maven, Cloudera, and HWC can help
Created 02-20-2021 08:59 PM
Hello Keith,
Please find below Spark HWC Integration java code using Maven.
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cloudera.spark.hwc</groupId>
<artifactId>SparkCDPHWCExample</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>SparkCDPHWCExample</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spark.version>2.4.0.7.1.4.0-203</spark.version>
<hwc.version>1.0.0.7.1.4.0-203</hwc.version>
<spark.scope>provided</spark.scope>
<maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
<maven-shade-plugin.version>3.2.3</maven-shade-plugin.version>
<scala-maven-plugin.version>4.3.1</scala-maven-plugin.version>
<scala.version>2.11</scala.version>
<scala-binary.version>2.12</scala-binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<scope>${spark.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
<scope>${spark.scope}</scope>
</dependency>
<dependency>
<groupId>com.hortonworks.hive</groupId>
<artifactId>hive-warehouse-connector_2.11</artifactId>
<version>${hwc.version}</version>
<scope>${spark.scope}</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Maven Compiler Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.compiler.plugin.version}</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<!-- add Main-Class to manifest file -->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.cloudera.spark.hwc.SparkHWCJavaExample</mainClass>
</transformer>
</transformers>
<!-- Remove signed keys to prevent security exceptions on uber jar -->
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Java Code:
Employee.java
package com.cloudera.spark.hwc;
public class Employee {
private long id;
private String name;
private int age;
private double salary;
public Employee() {
}
public Employee(long id, String name, int age, double salary) {
this.id = id;
this.name = name;
this.age = age;
this.salary = salary;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public double getSalary() {
return salary;
}
public void setSalary(double salary) {
this.salary = salary;
}
}
SparkHWCJavaExample.java
package com.cloudera.spark.hwc;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import com.hortonworks.hwc.HiveWarehouseSession;
import java.util.ArrayList;
import java.util.List;
public class SparkHWCJavaExample {
private final static String DATABASE_NAME = "hwc_db";
private final static String TABLE_NAME = "employee";
private final static String DATABASE_TABLE_NAME = DATABASE_NAME +"."+TABLE_NAME;
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().
setSparkHome("Spark CDP HWC Example")
.setIfMissing("spark.master", "local");
SparkSession spark = SparkSession.builder().config(sparkConf)
.enableHiveSupport()
.getOrCreate();
HiveWarehouseSession hive = HiveWarehouseSession.session(spark).build();
// Create a Database
hive.createDatabase(DATABASE_NAME, true);
// Display all databases
hive.showDatabases().show(false);
// Use database
hive.setDatabase(DATABASE_NAME);
// Display all tables
hive.showTables().show(false);
// Drop a table
hive.dropTable(DATABASE_TABLE_NAME, true, true);
// Create a Table
hive.createTable("employee").ifNotExists().column("id", "bigint").column("name", "string").
column("age", "smallint").column("salary", "double").create();
// Insert the data
Encoder<Employee> encoder = Encoders.bean(Employee.class);
List<Employee> employeeList = new ArrayList<>();
employeeList.add(new Employee(1, "Ranga", 32, 245000.30));
employeeList.add(new Employee(2, "Nishanth", 2, 345000.10));
employeeList.add(new Employee(3, "Raja", 32, 245000.86));
employeeList.add(new Employee(4, "Mani", 14, 45000));
Dataset<Employee> employeeDF = spark.createDataset(employeeList, encoder);
employeeDF.write().format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").
mode("append").option("table", "hwc_db.employee").save();
// Select the data
Dataset<Row> empDF = hive.executeQuery("SELECT * FROM "+DATABASE_TABLE_NAME);
empDF.printSchema();
empDF.show();
// Close the SparkSession
spark.close();
}
}
Run Script:
sudo -u hive spark-submit \
--master yarn \
--deploy-mode client \
--executor-memory 1g \
--num-executors 2 \
--driver-memory 1g \
--jars /opt/cloudera/parcels/CDH/jars/hive-warehouse-connector-assembly-*.jar \
--conf spark.sql.hive.hiveserver2.jdbc.url='jdbc:hive2://localhost:10000' \
--conf spark.sql.hive.hwc.execution.mode=spark \
--conf spark.datasource.hive.warehouse.metastoreUri='thrift://localhost:9083' \
--conf spark.datasource.hive.warehouse.load.staging.dir='/tmp' \
--conf spark.datasource.hive.warehouse.user.name=hive \
--conf spark.datasource.hive.warehouse.password=hive \
--conf spark.datasource.hive.warehouse.smartExecution=false \
--conf spark.datasource.hive.warehouse.read.via.llap=false \
--conf spark.datasource.hive.warehouse.read.jdbc.mode=cluster \
--conf spark.datasource.hive.warehouse.read.mode=DIRECT_READER_V2 \
--conf spark.security.credentials.hiveserver2.enabled=false \
--conf spark.sql.extensions=com.hortonworks.spark.sql.rule.Extensions \
--class com.cloudera.spark.hwc.SparkHWCJavaExample \
/tmp/SparkCDPHWCExample-1.0.0-SNAPSHOT.jar
Created 02-22-2021 03:33 AM
Thanks for the response
If I use your code exactly as is and just change the URLs and config to reflect your configs set I get the exact same error as I previously reported. This is when I debug the java program locally. When I build the jar and submit I get a permission error which I expect and thus I want to say it works.
Is this perhaps an issue because I am trying to debug my code as I write it and I'm executing it with a java debug session? How do I get this to work without having to package and submit to be able to test?
Created on 02-22-2021 06:28 AM - edited 02-22-2021 06:30 AM
If i add the hive-jdbc jar at compile time
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>3.1.3000.7.1.4.9-1</version>
<scope>provided</scope>
</dependency>
I can get around the class Error vut then I get the following error
21/02/22 16:08:21 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.RuntimeException: java.lang.NullPointerException
at com.hortonworks.spark.sql.hive.llap.JdbcInputPartition.createPartitionReader(JdbcInputPartition.java:34)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1289)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
at scala.collection.immutable.StringLike$class.stripPrefix(StringLike.scala:156)
at scala.collection.immutable.StringOps.stripPrefix(StringOps.scala:29)
at com.hortonworks.spark.sql.hive.llap.JDBCWrapper.getConnector(HS2JDBCWrapper.scala:423)
at com.hortonworks.spark.sql.hive.llap.DefaultJDBCWrapper.getConnector(HS2JDBCWrapper.scala)
at com.hortonworks.spark.sql.hive.llap.util.QueryExecutionUtil.getConnection(QueryExecutionUtil.java:68)
at com.hortonworks.spark.sql.hive.llap.JdbcInputPartitionReader.getConnection(JdbcInputPartitionReader.java:60)
at com.hortonworks.spark.sql.hive.llap.JdbcInputPartitionReader.<init>(JdbcInputPartitionReader.java:39)
at com.hortonworks.spark.sql.hive.llap.JdbcInputPartition.createPartitionReader(JdbcInputPartition.java:32)
... 20 more
21/02/22 16:08:21 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.RuntimeException: java.lang.NullPointerException
at com.hortonworks.spark.sql.hive.llap.JdbcInputPartition.createPartitionReader(JdbcInputPartition.java:34)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1289)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
at scala.collection.immutable.StringLike$class.stripPrefix(StringLike.scala:156)
at scala.collection.immutable.StringOps.stripPrefix(StringOps.scala:29)
at com.hortonworks.spark.sql.hive.llap.JDBCWrapper.getConnector(HS2JDBCWrapper.scala:423)
at com.hortonworks.spark.sql.hive.llap.DefaultJDBCWrapper.getConnector(HS2JDBCWrapper.scala)
at com.hortonworks.spark.sql.hive.llap.util.QueryExecutionUtil.getConnection(QueryExecutionUtil.java:68)
at com.hortonworks.spark.sql.hive.llap.JdbcInputPartitionReader.getConnection(JdbcInputPartitionReader.java:60)
at com.hortonworks.spark.sql.hive.llap.JdbcInputPartitionReader.<init>(JdbcInputPartitionReader.java:39)
at com.hortonworks.spark.sql.hive.llap.JdbcInputPartition.createPartitionReader(JdbcInputPartition.java:32)
... 20 more
I can perform a show table now which ais a win
hive.showDatabases().show(false);
But I cannot read from an existing hive table
Created 02-22-2021 08:28 PM
Hi Keith,
To check the issue further could you please create a cloudera case. I will investigate further from your side.