Member since
11-17-2015
21
Posts
17
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2644 | 03-29-2016 02:06 PM | |
2242 | 02-18-2016 08:41 PM |
04-25-2016
03:14 PM
This wasn't set up for me by default. It also wasn't mentioned in any of the documentation that I've reviewed. Thanks for the help!
... View more
04-25-2016
03:08 PM
This is set up by default on the HDP sandbox. See reply above.
... View more
04-25-2016
03:07 PM
Under /usr/hdp/current/spark-client/conf on the sandbox: [root@sandbox conf]# cat hive-site.xml
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://sandbox.hortonworks.com:9083</value>
</property>
</configuration>
... View more
04-22-2016
10:26 PM
1 Kudo
I'm using the latest HDP (2.4) Sandbox. I have a class that attempts to establish a HiveThriftServer and write a table called cumulativeVitals. When I go to look for the table using Beeline, I can't find it. Code: package stlhug.streaming;
import java.util.ArrayList;
import java.sql.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import stlhug.domain.VitalRecord;
import stlhug.streaming.function.ConvertStreamToDataFrame;
import stlhug.streaming.function.MapMessageToVital;
public class PatientVitalStream2 {
@SuppressWarnings("deprecation")
public static void main(String[] args) {
String zkQuorum = args[1];
String kafkaConsumerGroupId = args[2];
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("PatientVitals");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
HiveContext sqlContext = new HiveContext(jssc.sparkContext().sc());
sqlContext.setConf("hive.server2.thrift.port", "10001");
initializeTable(jssc, sqlContext, "cumulativeVitals");
sqlContext.sql("select * from cumulativeVitals").show();
Map<String, Integer> map = new HashMap<String, Integer>();
map.put("patientVitals", 1);
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(jssc, zkQuorum,
kafkaConsumerGroupId, map);
JavaDStream<VitalRecord> vitals = kafkaStream.map(new MapMessageToVital());
vitals.foreach(new ConvertStreamToDataFrame("cumulativeVitals"));
HiveThriftServer2.startWithContext(sqlContext);
jssc.start();
jssc.awaitTermination();
}
//TODO: http://stackoverflow.com/questions/32362206/spark-dataframe-saveastable-with-partitionby-creates-no-orc-file-in-hdfs
@SuppressWarnings("deprecation")
private static void initializeTable(JavaStreamingContext jssc, HiveContext sqlContext, String tableName) {
List<VitalRecord> records = new ArrayList<VitalRecord>();
VitalRecord record = new VitalRecord(0, new Date(new java.util.Date().getTime()), 0,0,0,0);
records.add(record);
JavaRDD<VitalRecord> recordRDD = jssc.sparkContext().parallelize(records);
DataFrame recordDF = sqlContext.createDataFrame(recordRDD, VitalRecord.class);
recordDF.registerTempTable(tableName);
// String fileName = "/home/"+new Random().nextInt(20000);
// recordDF.saveAsParquetFile(fileName);
// DataFrame parquetFile = sqlContext.parquetFile(fileName);
// sqlContext.registerDataFrameAsTable(parquetFile, tableName);
}
}
Start Beeline. Connect to Database: [root@sandbox spark-client]# pwd
/usr/hdp/current/spark-client
[root@sandbox spark-client]# ./bin/beeline
Beeline version 1.6.0.2.4.0.0-169 by Apache Hive
beeline> !connect jdbc:hive2://localhost:10001
Connecting to jdbc:hive2://localhost:10001
Enter username for jdbc:hive2://localhost:10001: root
Enter password for jdbc:hive2://localhost:10001: **********
16/04/22 22:19:10 INFO Utils: Supplied authorities: localhost:10001
16/04/22 22:19:10 INFO Utils: Resolved authority: localhost:10001
16/04/22 22:19:10 INFO HiveConnection: Will try to open client transport with JDBC Uri: jdbc:hive2://localhost:10001
Connected to: Spark SQL (version 1.6.0)
Driver: Spark Project Core (version 1.6.0.2.4.0.0-169)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://localhost:10001> show tables;
+------------+--------------+--+
| tableName | isTemporary |
+------------+--------------+--+
| sample_07 | false |
| sample_08 | false |
+------------+--------------+--+
2 rows selected (0.305 seconds)
0: jdbc:hive2://localhost:10001>
... View more
Labels:
- Labels:
-
Apache Spark
03-29-2016
02:26 PM
2 Kudos
I have a stream of records that I want to write into HBase. The stream contains different types of records that I need to write to different HBase tables (e.g. a patient record goes to the patient table, an order record goes to the order table). Since the number of tables is finite, I could brute force the application by creating a filter for each table and then handling the resulting streams of records individually. However, I'd like to do something a little more elegant where I create something akin to a Map<ColumnValue, List<Record>>.
... View more
Labels:
- Labels:
-
Apache Spark
03-29-2016
02:06 PM
2 Kudos
I finally got it to work. Here was the final working code: final JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(STREAM_DURATION_IN_SECS));
jsc.sparkContext().hadoopConfiguration().set("textinputformat.record.delimiter", "\u0003");
JavaDStream<String> inputStream = jsc.textFileStream(cli.getOptionValue(INPUT_DIRECTORY));
... View more
03-27-2016
10:04 PM
I added the following line: jsc.sparkContext().hadoopConfiguration().set("textinputformat.record.delimiter","\\u0003"); Now I'm getting a new set of errors: 16/03/27 17:04:01 INFO SparkContext: Created broadcast 0 from fileStream at CacheStreamProcessorTest.java:82
16/03/27 17:04:01 ERROR JobScheduler: Error generating jobs for time 1459116240000 ms
java.lang.InstantiationException
at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:374)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:88)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:264)
at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:254)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:254)
at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:148)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at org.apache.spark.streaming.dstream.FlatMappedDStream.compute(FlatMappedDStream.scala:35)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at org.apache.spark.streaming.dstream.FlatMappedDStream.compute(FlatMappedDStream.scala:35)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Exception in thread "main" java.lang.InstantiationException
at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:374)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:88)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:264)
at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:254)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:254)
at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:148)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at org.apache.spark.streaming.dstream.FlatMappedDStream.compute(FlatMappedDStream.scala:35)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at org.apache.spark.streaming.dstream.FlatMappedDStream.compute(FlatMappedDStream.scala:35)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
16/03/27 17:04:01 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
... View more
03-27-2016
03:27 AM
1 Kudo
I have a set of files that span multiple lines, so we are using Unicode character \\u0003 to denote the end of a record. However, my Spark code is ignoring the instruction to use the alternate delimiter, so each line is being returned as an entry. Here is the relevant snippets of code: SparkConf conf = new SparkConf();
conf.setMaster("local[2]");
conf.setAppName("MyApp");
Configuration config = new Configuration();
config.set("textinputformat.record.delimiter", "\\u0003");
conf.set("textinputformat.record.delimiter", "\\u0003");
final int batchSize = Integer.parseInt(cli.getOptionValue(BATCH_SIZE));
final JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(STREAM_DURATION_IN_SECS));
JavaDStream<String> inputStream = jsc.textFileStream(cli.getOptionValue(INPUT_DIRECTORY));
JavaDStream<Record> records = inputStream.flatMap(new CacheFileStreamFlatMap());
//in the above function I have a Log message where I can see the records coming out line by line
boolean mapSideCombine = true;
JavaPairDStream<String, Record> iniRecords = records.flatMapToPair(new RecordToIniRecord());
JavaPairDStream<String, List<Record>> combined = iniRecords.combineByKey(new CreateListOfRecords(),
// Create combiners
new AddRecordToList(),
// merge Values
new MergeListsOfRecords(), new HashPartitioner(batchSize), mapSideCombine);
combined.foreach(new WriteRecordsToIniTable());
jsc.start(); // Start the computation
jsc.awaitTermination(); // Wait for the computation to terminate
return 0;
Every entry that I can find in StackOverflow points at this as being the right way to accomplish this task, but I'm not seeing it work. Thanks!
... View more
Labels:
- Labels:
-
Apache Spark
03-16-2016
09:20 PM
3 Kudos
I am trying to get a DataFrame to write to Phoenix. I've created what looks like a HelloWorld program: package mercy.dm;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.phoenix.query.QueryServices;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import com.google.common.collect.ImmutableMap;
public class ReadWriteToPhoenix {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("ReadWriteToPhoenix");
sparkConf.setMaster("local");
sparkConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase-secure");
sparkConf.set(HConstants.ZOOKEEPER_QUORUM, "ZK_QUORUM:2181");
sparkConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181");
sparkConf.set(HConstants.ZOOKEEPER_CONFIG_NAME, "/hbase-secure");
sparkConf.set(QueryServices.HBASE_CLIENT_KEYTAB, "/path/to/keytab/THEUSER.user.keytab");
sparkConf.set(QueryServices.HBASE_CLIENT_PRINCIPAL, "THEUSER@PROD");
JavaSparkContext conf = new JavaSparkContext(sparkConf);
conf.setLocalProperty(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase-secure");
conf.setLocalProperty(HConstants.ZOOKEEPER_QUORUM, "ZK_QUORUM:2181");
conf.setLocalProperty(HConstants.ZOOKEEPER_CLIENT_PORT, "2181");
conf.setLocalProperty(HConstants.ZOOKEEPER_CONFIG_NAME, "/hbase-secure");
conf.setLocalProperty(QueryServices.HBASE_CLIENT_KEYTAB, "/path/to/keytab/THEUSER.user.keytab");
conf.setLocalProperty(QueryServices.HBASE_CLIENT_PRINCIPAL, "THEUSER@PROD");
try {
UserGroupInformation.setConfiguration(new Configuration());
UserGroupInformation.loginUserFromKeytab("THEUSER@PROD",
"/path/to/keytab/THEUSER.user.keytab");
} catch (Exception e) {
System.out.println(e.toString());
}
String quorum = conf.getLocalProperty("hbase.zookeeper.quorum");
String clientPort = conf.getLocalProperty("hbase.zookeeper.property.clientPort");
String znodeParent = conf.getLocalProperty("zookeeper.znode.parent");
System.out.println("Quorum = " + quorum);
System.out.println("clientPort = " + clientPort);
System.out.println("znodeParent = " + znodeParent);
HBaseConfiguration hbaseConf = new HBaseConfiguration();
hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase-secure");
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "ZK_QUORUM:2181");
hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181");
hbaseConf.set(HConstants.ZOOKEEPER_CONFIG_NAME, "/hbase-secure");
hbaseConf.set(QueryServices.HBASE_CLIENT_KEYTAB, "/path/to/keytab/THEUSER.user.keytab");
hbaseConf.set(QueryServices.HBASE_CLIENT_PRINCIPAL, "THEUSER@PROD");
final SQLContext sqlContext = new SQLContext(conf);
// Map<String, String> options = new HashMap<String, String>();
// options.put("zkUrl", "lnxhdp01.smrcy.com:2181:/hbase-secure");
// options.put("table", "TABLE1");
// sqlContext.load("org.apache.phoenix.spark", options);
List<Table1> dataSet = new ArrayList<Table1>();
dataSet.add(new Table1(1, "1"));
dataSet.add(new Table1(2, "2"));
dataSet.add(new Table1(3, "3"));
// TODO: Fix error below:
// Exception in thread "main" java.lang.RuntimeException: [1.1] failure:
// ``with'' expected but identifier CREATE found
// CREATE TABLE TABLE1 (ID BIGINT NOT NULL PRIMARY KEY, COL1 VARCHAR);
// sqlContext.executeSql("CREATE TABLE TABLE1 (ID BIGINT NOT NULL
// PRIMARY KEY, COL1 VARCHAR);");
JavaRDD<Table1> rdd = conf.parallelize(dataSet);
DataFrame df = sqlContext.createDataFrame(rdd, Table1.class);
df.write().format("org.apache.phoenix.spark").mode(SaveMode.Overwrite)
.options(ImmutableMap.of("zkUrl", "ZK_QUORUM:2181:/hbase-secure", "table", "TABLE1"))
.save();
DataFrame fromPhx = sqlContext.read().format("jdbc")
.options(ImmutableMap.of("driver", "org.apache.phoenix.jdbc.PhoenixDriver", "url",
"jdbc:phoenix:ZK_QUORUM:2181:/hbase-secure", "dbtable", "TABLE1"))
.load();
fromPhx.collect();
}
public static class Table1 implements Serializable {
int id;
String col1;
public Table1() {
}
public Table1(int id, String col1) {
this.id = id;
this.col1 = col1;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getCol1() {
return col1;
}
public void setCol1(String col1) {
this.col1 = col1;
}
}
}
I've already defined the table in Phoenix. I'm getting this error trying to write to it.
16/03/16 15:14:11 INFO ConnectionQueryServicesImpl: Trying to connect to a secure cluster with keytab:/hbase
Exception in thread "main" java.sql.SQLException: ERROR 103 (08004): Unable to establish connection.
at org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLExceptionCode.java:386)
at org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:145)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:288)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(ConnectionQueryServicesImpl.java:171)
at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1883)
at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1862)
at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:77)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:1862)
at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:180)
at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDriver.java:132)
at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:151)
at java.sql.DriverManager.getConnection(DriverManager.java:571)
at java.sql.DriverManager.getConnection(DriverManager.java:187)
at org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionUtil.java:99)
at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:82)
at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:70)
at org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getUpsertColumnMetadataList(PhoenixConfigurationUtil.java:213)
at org.apache.phoenix.spark.ConfigurationUtil$.encodeColumns(ConfigurationUtil.scala:57)
at org.apache.phoenix.spark.DataFrameFunctions.saveToPhoenix(DataFrameFunctions.scala:33)
at org.apache.phoenix.spark.DefaultSource.createRelation(DefaultSource.scala:47)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:309)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)
at mercy.dm.ReadWriteToPhoenix.main(ReadWriteToPhoenix.java:102)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: Login failure for 2181 from keytab /hbase: javax.security.auth.login.LoginException: Unable to obtain password from user
at org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab(UserGroupInformation.java:962)
at org.apache.hadoop.security.SecurityUtil.login(SecurityUtil.java:246)
at org.apache.hadoop.hbase.security.User$SecureHadoopUser.login(User.java:386)
at org.apache.hadoop.hbase.security.User.login(User.java:253)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:283)
... 29 more
Caused by: javax.security.auth.login.LoginException: Unable to obtain password from user
at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:856)
at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:719)
at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:584)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:762)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:203)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:690)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:688)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:687)
at javax.security.auth.login.LoginContext.login(LoginContext.java:595)
at org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab(UserGroupInformation.java:953)
... 33 more
16/03/16 15:14:11 INFO SparkContext: Invoking stop() from shutdown hook
... View more
Labels:
- Labels:
-
Apache Phoenix
-
Apache Spark
02-18-2016
08:41 PM
1 Kudo
The solution that ended up working was to adapt the code I found here: http://stackoverflow.com/questions/23755976/kafka-writing-custom-serializer
... View more