Created 04-22-2016 10:26 PM
I'm using the latest HDP (2.4) Sandbox. I have a class that attempts to establish a HiveThriftServer and write a table called cumulativeVitals. When I go to look for the table using Beeline, I can't find it.
Code:
package stlhug.streaming; import java.util.ArrayList; import java.sql.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.hive.HiveContext; import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import stlhug.domain.VitalRecord; import stlhug.streaming.function.ConvertStreamToDataFrame; import stlhug.streaming.function.MapMessageToVital; public class PatientVitalStream2 { @SuppressWarnings("deprecation") public static void main(String[] args) { String zkQuorum = args[1]; String kafkaConsumerGroupId = args[2]; SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("PatientVitals"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); HiveContext sqlContext = new HiveContext(jssc.sparkContext().sc()); sqlContext.setConf("hive.server2.thrift.port", "10001"); initializeTable(jssc, sqlContext, "cumulativeVitals"); sqlContext.sql("select * from cumulativeVitals").show(); Map<String, Integer> map = new HashMap<String, Integer>(); map.put("patientVitals", 1); JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(jssc, zkQuorum, kafkaConsumerGroupId, map); JavaDStream<VitalRecord> vitals = kafkaStream.map(new MapMessageToVital()); vitals.foreach(new ConvertStreamToDataFrame("cumulativeVitals")); HiveThriftServer2.startWithContext(sqlContext); jssc.start(); jssc.awaitTermination(); } //TODO: http://stackoverflow.com/questions/32362206/spark-dataframe-saveastable-with-partitionby-creates-no-... @SuppressWarnings("deprecation") private static void initializeTable(JavaStreamingContext jssc, HiveContext sqlContext, String tableName) { List<VitalRecord> records = new ArrayList<VitalRecord>(); VitalRecord record = new VitalRecord(0, new Date(new java.util.Date().getTime()), 0,0,0,0); records.add(record); JavaRDD<VitalRecord> recordRDD = jssc.sparkContext().parallelize(records); DataFrame recordDF = sqlContext.createDataFrame(recordRDD, VitalRecord.class); recordDF.registerTempTable(tableName); // String fileName = "/home/"+new Random().nextInt(20000); // recordDF.saveAsParquetFile(fileName); // DataFrame parquetFile = sqlContext.parquetFile(fileName); // sqlContext.registerDataFrameAsTable(parquetFile, tableName); } }
Start Beeline. Connect to Database:
[root@sandbox spark-client]# pwd /usr/hdp/current/spark-client [root@sandbox spark-client]# ./bin/beeline Beeline version 1.6.0.2.4.0.0-169 by Apache Hive beeline> !connect jdbc:hive2://localhost:10001 Connecting to jdbc:hive2://localhost:10001 Enter username for jdbc:hive2://localhost:10001: root Enter password for jdbc:hive2://localhost:10001: ********** 16/04/22 22:19:10 INFO Utils: Supplied authorities: localhost:10001 16/04/22 22:19:10 INFO Utils: Resolved authority: localhost:10001 16/04/22 22:19:10 INFO HiveConnection: Will try to open client transport with JDBC Uri: jdbc:hive2://localhost:10001 Connected to: Spark SQL (version 1.6.0) Driver: Spark Project Core (version 1.6.0.2.4.0.0-169) Transaction isolation: TRANSACTION_REPEATABLE_READ 0: jdbc:hive2://localhost:10001> show tables; +------------+--------------+--+ | tableName | isTemporary | +------------+--------------+--+ | sample_07 | false | | sample_08 | false | +------------+--------------+--+ 2 rows selected (0.305 seconds) 0: jdbc:hive2://localhost:10001>
Created 04-23-2016 05:57 PM
In Spark 1.6, by default the Thrift server runs in multi-session mode. Which means each JDBC/ODBC connection owns a copy of their own SQL configuration and temporary function registry. Cached tables are still shared. You are registering a temp table and so in order to see the temp table, you need to run the Thrift server in single-session mode. In spark-default.conf set spark.sql.hive.thriftServer.singleSession to true. When you call for an instance of the Thrift server in you code, it should start up in single session mode. When you initialize and register the temp table, it should show up when you connect and issue show tables command. You can create a permanent table in which case it should show up in multi session mode and from Hive (You have the code to do that but it's commented out).
Created 04-23-2016 12:13 AM
If you create a table via Spark/Beeline and you can see that table, but not the table that exist within Hive, that typically means Spark isn't configured to use Hive metastore.
Pl see this and can you verify that there is a hive-site.xml under spark/conf and it pointing to right host, port corresponding to Hive Meta store.
Created 04-25-2016 03:07 PM
Under /usr/hdp/current/spark-client/conf on the sandbox:
[root@sandbox conf]# cat hive-site.xml <configuration> <property> <name>hive.metastore.uris</name> <value>thrift://sandbox.hortonworks.com:9083</value> </property> </configuration>
Created 04-23-2016 05:57 PM
In Spark 1.6, by default the Thrift server runs in multi-session mode. Which means each JDBC/ODBC connection owns a copy of their own SQL configuration and temporary function registry. Cached tables are still shared. You are registering a temp table and so in order to see the temp table, you need to run the Thrift server in single-session mode. In spark-default.conf set spark.sql.hive.thriftServer.singleSession to true. When you call for an instance of the Thrift server in you code, it should start up in single session mode. When you initialize and register the temp table, it should show up when you connect and issue show tables command. You can create a permanent table in which case it should show up in multi session mode and from Hive (You have the code to do that but it's commented out).
Created 04-25-2016 03:14 PM
This wasn't set up for me by default. It also wasn't mentioned in any of the documentation that I've reviewed.
Thanks for the help!
Created 04-25-2016 06:35 AM
You need to have hive-site.xml with HMS(hive metastore service) properties under your SPARK_CONF_DIR
Created 04-25-2016 03:08 PM
This is set up by default on the HDP sandbox. See reply above.