Member since
04-24-2017
106
Posts
13
Kudos Received
7
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
234 | 11-25-2019 12:49 AM | |
830 | 11-14-2018 10:45 AM | |
594 | 10-15-2018 03:44 PM | |
632 | 09-25-2018 01:54 PM | |
334 | 08-03-2018 09:47 AM |
11-25-2019
12:49 AM
1 Kudo
To answer my own question: Since I'm using multiple partitions for the Kafka topic, Spark uses more executors to process the data. Also Hive/Tez creates as many worker containers as the topic contains partitions.
... View more
11-24-2019
11:18 PM
I wrote a Kafka producer, that sends some simulated data to a Kafka stream (replication-factor 3, one partition).
Now, I want to access this data by using Hive and/or Spark Streaming.
First approach: Using an external Hive table with KafkaStorageHandler:
CREATE EXTERNAL TABLE mydb.kafka_timeseriestest ( description string, version int, ts timestamp, varname string, varvalue float ) STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' TBLPROPERTIES ( "kafka.topic" = "testtopic", "kafka.bootstrap.servers"="server1:6667,server2:6667,server3:6667" ); -- e.g. SELECT max(varvalue) from mydb.kafka_timeseriestest; -- takes too long, and only one Tez task is running
Second approach: Writing a Spark Streaming app, that accesses the Kafka topic:
// started with 10 executors, but only one executor is active ... JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)); ...
In both cases, only one Tez/Spark worker is active. Therefore reading all data (~500 million entries) takes a very long time. How can I increase the performance? Is the issue caused by the one-partition topic? If yes, is there a rule of thumb according to which the number of partitions should be determined?
I'm using a HDP 3.1 cluster, running Spark, Hive and Kafka on multiple nodes:
dataNode1 - dataNode3: Hive + Spark + Kafka broker
dataNode4 - dataNode8: Hive + Spark
... View more
08-29-2019
05:28 AM
1 Kudo
I've upgraded to a HDP 3.1 and now want to read a Hive external table in my Spark application. The following table shows the compatibilites: https://docs.hortonworks.com/HDPDocuments/HDP3/HDP-3.1.0/integrating-hive/content/hive_configure_a_spark_hive_connection.html I don't have LLAP activated, so it seems that I'm restricted on the Spark -> Hive access and vice-versa, right? But the compatibility table sais, that I can access external Hive tables by Spark without using the HWC (and also without LLAP), but with the hint that the Table must be defined in Spark catalog. What do I have to do here? I tried the following code, but it sais Table not found! SparkSession session = SparkSession.builder() .config("spark.executor.instances", "4") .master("yarn-client") .appName("Spark LetterCount") .config("hive.metastore.uris", "thrift://myhost.com:9083") .config("hive.metastore.warehouse.dir", "/warehouse/tablespace/managed/hive") .config("hive.metastore.warehouse.external.dir", "/warehouse/tablespace/external/hive") .config("spark.sql.warehouse.dir", new File("spark-warehouse").getAbsolutePath()) .config("spark.sql.hive.hiveserver2.jdbc.url", "jdbc:hive2://localhost:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2;user=student30") .enableHiveSupport(); Dataset<Row> dsRead = session.sql("SELECT * FROM hivedb.external_table"); System.out.println(dsRead.count()); Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view not found: `hivedb`.`external_table`; line 1 pos 14; 'Project [*] +- 'UnresolvedRelation `hivedb`.`external_table` at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:86) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:84) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:84) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642) at main.SparkSQLExample.main(SparkSQLExample.java:41) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Can someone help me, to solve the issue? Thank you!
... View more
06-19-2019
01:10 PM
I'm working with two tables in Hive (version 1.2.1 in HDP 2.6.5): tableA (ORC, not transactional) tableB (ORC, transactional, with bucketing). We periodically run a ALTER TABLE tableA CONCATENATE query to merge the small ORC files on HDFS to bigger files. For my transctional table (with buckets) I have to use the ALTER TABLE tableB COMPACTION 'major|minor' query to merge the small files in my HDFS warehouse directory. My question here is: What is the difference between these two commands? And is it possible to use the compaction / concatenate command in the other table somehow? Why are there two different commands for this file merge, if they do the same in general? Thank you!
... View more
Labels:
02-20-2019
08:45 PM
Oh yes, you're right @Josh Elser. A closer look at the PhoenixDriver class showed that they define a shutdown hook when creating the driver instance: https://github.com/apache/phoenix/blob/master/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java#L73 I was able to avoid the Exception when I stopped the JVM immediately after the SparkContext.close() call by calling Runtime.getRuntime().halt(0): ... Spark logic
...
updatePhoenixTableStats(phoenixTableName); // calling the Phoenix JDBC stuff
sc.close();
Runtime.getRuntime().halt(0); // Stopping the JVM immediately Do you know, is this a valid solution if I only use Spark and Phoenix functionalities?
... View more
02-20-2019
09:56 AM
I found the solution for my problem. The jar I exported included the phoenix-spark2 and phoenix-client dependencies and were included into my jar file. I changed these dependencies (as they are already existing in my clusters HDP installation) to be of scope provided: <dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark2</artifactId>
<version>4.7.0.2.6.5.0-292</version>
<scope>provided</scope> <!-- this did it, now have to add --jar to spark-submit -->
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>4.7.0.2.6.5.0-292</version>
<scope>provided</scope> <!-- this did it, now have to add --jar to spark-submit -->
</dependency> Now I start my Spark job with the --jars option and link these dependencies there. Now it works fine in yarn-client mode. spark-submit --class spark.dataimport.SparkImportApp --master yarn --deploy-mode client --jars /usr/hdp/current/phoenix-client/phoenix-spark2.jar,/usr/hdp/current/phoenix-client/phoenix-client.jar hdfs:/user/test/gk-journal-importer-phoenix-0.0.3h.jar <some parameters for the main method> PS: In yarn-cluster mode the application worked all the time (also with the fat-jar that included the dependencies).
... View more
02-19-2019
07:01 PM
Thank you for the fast answer @Josh Elser, but I'm not sure what you mean with "not a JAR that HBase or Phoenix own". The gk-journal-importer-phoenix-0.0.3h.jar is my exported Java project. It contains the classes of my import job and the Main class with the main method, that creates, uses and closes the SparkContext. I start my Spark application by running the command spark-submit --class spark.dataimport.SparkImportApp --master yarn --deploy-mode client hdfs:/user/test/gk-journal-importer-phoenix-0.0.3h.jar <some parameters for the main method>
... View more
02-19-2019
01:48 PM
I wrote a Spark application for bulk-loading a Phoenix Table. Everything worked for a few weeks now, but for a few days I get some Problems with duplicated rows. This was caused by faulty table stats. However, a possible Workaround for that would be to delete and re-generate the stats for this table. Therefore I Need to open a JDBC Connection to my Phoenix database and call the Statements for deleting and creating the stats. Since I Need to do this after iserting the new data via Spark, I also want to create and use this JDBC Connection inside my Spark Job, after doing the table bulk-loading stuff. For that I added the following method and call it between the dataframe.save() and sparkContext.close() method in my Java Code: private static void updatePhoenixTableStatistics(String phoenixTableName) {
try {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
System.out.println("Connecting to database..");
Connection conn = DriverManager.getConnection("jdbc:phoenix:my-sever.net:2181:/hbase-unsecure");
System.out.println("Creating statement...");
Statement st = conn.createStatement();
st.executeUpdate("DELETE FROM SYSTEM.STATS WHERE physical_name='" + phoenixTableName + "'");
System.out.println("Successfully deleted statistics data... Now refreshing it.");
st.executeUpdate("UPDATE STATISTICS " + phoenixTableName + " ALL");
System.out.println("Successfully refreshed statistics data.");
st.close();
conn.close();
System.out.println("Connection closed.");
} catch (Exception e) {
System.out.println("Unable to update table statistics - Skipping this step!");
e.printStackTrace();
}
} The Problem is, that since I added this method I Always get the following exception at the end of my Spark Job: Bulk-Load: DataFrame.save() completed - Import finished successfully!
Updating Table Statistics:
Connecting to database..
Creating statement...
Successfully deleted statistics data... Now refreshing it.
Successfully refreshed statistics data.
Connection closed.
Exception in thread "Thread-31" java.lang.RuntimeException: java.io.FileNotFoundException: /tmp/spark-e5b01508-0f84-4702-9684-4f6ceac803f9/gk-journal-importer-phoenix-0.0.3h.jar (No such file or directory)
at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2794)
at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2646)
at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2518)
at org.apache.hadoop.conf.Configuration.get(Configuration.java:1065)
at org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1119)
at org.apache.hadoop.conf.Configuration.getBoolean(Configuration.java:1520)
at org.apache.hadoop.hbase.HBaseConfiguration.checkDefaultsVersion(HBaseConfiguration.java:68)
at org.apache.hadoop.hbase.HBaseConfiguration.addHbaseResources(HBaseConfiguration.java:82)
at org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:97)
at org.apache.phoenix.query.ConfigurationFactory$ConfigurationFactoryImpl$1.call(ConfigurationFactory.java:49)
at org.apache.phoenix.query.ConfigurationFactory$ConfigurationFactoryImpl$1.call(ConfigurationFactory.java:46)
at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:78)
at org.apache.phoenix.util.PhoenixContextExecutor.callWithoutPropagation(PhoenixContextExecutor.java:93)
at org.apache.phoenix.query.ConfigurationFactory$ConfigurationFactoryImpl.getConfiguration(ConfigurationFactory.java:46)
at org.apache.phoenix.jdbc.PhoenixDriver$1.run(PhoenixDriver.java:88)
Caused by: java.io.FileNotFoundException: /tmp/spark-e5b01508-0f84-4702-9684-4f6ceac803f9/gk-journal-importer-phoenix-0.0.3h.jar (No such file or directory)
at java.util.zip.ZipFile.open(Native Method)
at java.util.zip.ZipFile.<init>(ZipFile.java:225)
at java.util.zip.ZipFile.<init>(ZipFile.java:155)
at java.util.jar.JarFile.<init>(JarFile.java:166)
at java.util.jar.JarFile.<init>(JarFile.java:103)
at sun.net.www.protocol.jar.URLJarFile.<init>(URLJarFile.java:93)
at sun.net.www.protocol.jar.URLJarFile.getJarFile(URLJarFile.java:69)
at sun.net.www.protocol.jar.JarFileFactory.get(JarFileFactory.java:99)
at sun.net.www.protocol.jar.JarURLConnection.connect(JarURLConnection.java:122)
at sun.net.www.protocol.jar.JarURLConnection.getInputStream(JarURLConnection.java:152)
at org.apache.hadoop.conf.Configuration.parse(Configuration.java:2612)
at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2693)
... 14 more Does someone know About this Problem and can help? How the generally work with JDBC inside a Spark Job? Or is there another possibility for doing that? I'm working on HDP 2.6.5 with Spark 2.3 and Phoenix 4.7 installed.
... View more
02-14-2019
08:27 PM
I found a strange behavior when running an example WordCount MapReduce job via command line: If I set the HADOOP_CLASSPATH to any value, the Job runs correctly. If I don't set the HADOOP_CLASSPATH it fails. Question here is: Why does the Job fail if Hadoop Classpath is not set and succeeds if any value is set? Is this a Bug? Steps to reproduce: 1. WordCount.java package test;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
public class WordCount {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.out.printf("Usage: WordCount <input dir> <output dir>\n");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(WordCount.class);
job.setJobName("Word Count");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(WordMapper.class);
job.setReducerClass(SumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
2. WordMapper.java package test;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
for (String word : line.split("\\W+")) {
if (word.length() > 0) {
context.write(new Text(word), new IntWritable(1));
}
}
}
}
3. SumReducer.java package test;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int wordCount = 0;
for (IntWritable value : values) {
wordCount += value.get();
}
context.write(key, new IntWritable(wordCount));
}
}
4. Change to src directory of WordCount Java project cd ~/wordcount/src 5. Compile the .java files javac -classpath `hadoop classpath` test/*.java 6. Build .jar from the .class files jar cvf wordcount.jar test/*.class 7. Set any value (e.g. "abc"), if the Job shall suceed, if not don't set the variable (skip this step) export HADOOP_CLASSPATH=abc 8. Run the Job (runs if HADOOP_CLASSPATH is set, fails otherwise) hadoop jar wordcount.jar test.WordCount input output 9. If Classpath is set, Job finishes successfully without the following warning and error. Is Classpath is not set, the following is printed: ...
WARN mapreduce.JobResourceUploader: No job jar file set. User classes may not be found. See Job or Job#setJar(String).
...
Error: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class test.WordMapper not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2308)
at org.apache.hadoop.mapreduce.task.JobContextImpl.getMapperClass(JobContextImpl.java:187)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:747)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:170)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:164)
Caused by: java.lang.ClassNotFoundException: Class test.WordMapper not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2214)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2306)
... 8 more
Can someone explain this behavior? Why is it also running if HADOOP_CLASSPATH contains any (nonsense) value? Is this a Bug or is there a good reason for this? --- Update 2019-02-15 --- Seems it has something to do with my default hadoop classpath, which is printed when running hadoop classpath command (wihtout calling export HADOOP_CLASSPATH=... before!) hadoop classpath
/usr/hdp/2.6.5.0-292/hadoop/conf:/usr/hdp/2.6.5.0-292/hadoop/lib/*:/usr/hdp/2.6.5.0-292/hadoop/.//*:/usr/hdp/2.6.5.0-292/hadoop-hdfs/./:/usr/hdp/2.6.5.0-292/hadoop-hdfs/lib/*:/usr/hdp/2.6.5.0-292/hadoop-hdfs/.//*:/usr/hdp/2.6.5.0-292/hadoop-yarn/lib/*:/usr/hdp/2.6.5.0-292/hadoop-yarn/.//*:/usr/hdp/2.6.5.0-292/hadoop-mapreduce/lib/*:/usr/hdp/2.6.5.0-292/hadoop-mapreduce/.//*::mysql-connector-java.jar:/usr/hdp/2.6.5.0-292/tez/*:/usr/hdp/2.6.5.0-292/tez/lib/*:/usr/hdp/2.6.5.0-292/tez/conf
My list contains an empty entry (to see by the :: in the list). The Job fails at this (default) setting. If I set the HADOOP_CLASSPATH with any value, the empty entry is filled by this value, so there's no :: anymore. After doing this the Job runs: export HADOOP_CLASSPATH=anything
hadoop classpath
/usr/hdp/2.6.5.0-292/hadoop/conf:/usr/hdp/2.6.5.0-292/hadoop/lib/*:/usr/hdp/2.6.5.0-292/hadoop/.//*:/usr/hdp/2.6.5.0-292/hadoop-hdfs/./:/usr/hdp/2.6.5.0-292/hadoop-hdfs/lib/*:/usr/hdp/2.6.5.0-292/hadoop-hdfs/.//*:/usr/hdp/2.6.5.0-292/hadoop-yarn/lib/*:/usr/hdp/2.6.5.0-292/hadoop-yarn/.//*:/usr/hdp/2.6.5.0-292/hadoop-mapreduce/lib/*:/usr/hdp/2.6.5.0-292/hadoop-mapreduce/.//*:anything:mysql-connector-java.jar:/usr/hdp/2.6.5.0-292/tez/*:/usr/hdp/2.6.5.0-292/tez/lib/*:/usr/hdp/2.6.5.0-292/tez/conf
... View more
Labels:
02-04-2019
02:39 PM
In Spark 2.2 I have a DataFrame, like this: id | ValueList (Array<Struct>)
1 | [(z, 1), (y, 2), (x, 3)]
2 | [(y, 3), (x, 1), (u, 5)] I want to transform my DataFrame by looking up the different keys in the Struct of the ValueList column and generating new columns with these names and set the value (or null if not existing in this row). So the final DataFrame should look like this: id | ValueList | u | x | y | z
1 | [(z, 1), (y, 2), (x, 3)] | null | 3 | 2 | 1
2 | [(y, 3), (x, 1), (u, 5)] | 5 | 1 | 3 | null How can I do this? I couldn't find a matching function for reading out the values of the Array<Struct> column, to be able to somehow transform the ValueList column into this new format. What I did so far, is reading the different keys (here u, x, y, z) from the DataFrame by collecting the data and iterate the resulting list of Rows. But how can I now use this information (I guess in combination with DF.withColumn) to fill this new DataFrame (and set the missing values to null)? Any help would be appreciated, thank you!
... View more
Labels:
12-19-2018
10:25 AM
I wrote different Spark applications, that save Dataset data to Phoenix tables. When I try to process huge datasets, some of my Spark Jobs fail by a ExecutorLostFailure exception. The jobs are retried and seem to finish successfully on their second approaches. Here the code, that saves the dataframe to my Phoenix table: dfToSave.write().format("org.apache.phoenix.spark").mode("overwrite").option("table", "PHOENIX_TABLE_NAME").option("zkUrl", "server.name:2181:/hbase-unsecure").save(); Here the output of one of the Jobs in the Spark History UI: ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 3.0 GB of 3 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. Why do I get this error when I use the Spark plugin for Apache Phoenix? Are there any configurations to manage the memory consumption of the Phoenix-Spark job?
... View more
11-20-2018
04:11 PM
I have a huge Phoenix Table in my cluster (HDP 2.6.5 with HBase 1.1.2, Spark 2.3 and Phoenix 4.7) and I'm accessing the data via Zeppelin UI, Spark Interpreter and the Phoenix-Spark Plugin (https://phoenix.apache.org/phoenix_spark.html). I call this code to read and show (20 entries) of the Phoenix Table: val TESTNAMES = List("a", "b", "c")
val MEASUREIDS = List(...)
val STEP = "T1"
val dfRead = sqlContext.read.options(Map("table" -> "MY_PHOENIX_TABLE", "zkUrl" -> "myserver:2181")).format("org.apache.phoenix.spark").load()
val RESULT = dfRead.filter(col("testname").isin(TESTNAMES:_*)).filter(col("measureid").isin(MEASUREIDS:_*)).filter(col("step") === STEP)
RESULT.show()
The execution of this paragraph is very slow (takes about 3 minutes to scan over the 50 million rows of table with key [testname, measureid, testnumber, step, starttime]). As I give the value selection list for the first two parts of the key [testname, measureid], it should skip ~49.999.900 rows (I have about 100 rows in my table for this [testname, measureid] combination). When I look at the "SQL" tab in the Spark History UI, I see the following: Here are the Stages that were processed by the DF.show command: Why does Spark generete a Stage with 1 task, afterwards a Stage with 4, then 20, then 100, ... etc.? Looks like repeating of this steps is taking all the time! How do I have to change my code, is something wrong with it? PS: Calling a DF.count instead of the DF.show leads to only two stages, with first 3340 tasks (takes also ~3 minutes) and then a very fast one with 1 task (0.5 sec)... I'm running 4 executors + driver with 1 core and 16 GB RAM each for my Spark Interpreter. UPDATE: Using the Phoenix JDBC connection with Spark, the above queries take ~5 seconds and this strange Stage re-generation is also gone! val dfRead = sqlContext.read.format("jdbc").options(Map("driver" -> "org.apache.phoenix.jdbc.PhoenixDriver", "url" -> "jdbc:phoenix:myserver:2181:/hbase-unsecure", "dbtable" -> "MY_PHOENIX_TABLE")).load()<br> But why does this behave so differently?
... View more
Labels:
11-14-2018
10:45 AM
I found the following Java based solution for me: Using the Dataset.filter method with FilterFunction: https://spark.apache.org/docs/2.3.0/api/java/index.html?org/apache/spark/sql/Dataset.html So, my code now looks like this: Dataset<Row> dsResult = sqlC.read()
.format("org.apache.phoenix.spark")
.option("table", tableName)
.option("zkUrl", hbaseUrl).load()
.where("OTHER_COLUMN = " + inputId)
.filter(row -> {
long readTime = row.getTimestamp(row.fieldIndex("TABLE_TS_COL")).getTime();
long tsFrom = new Timestamp(sdf.parse(dateFrom).getTime()).getTime();
long tsTo = new Timestamp(sdf.parse(dateTo).getTime()).getTime();
return readTime >= tsFrom && readTime <= tsTo;
});
... View more
11-14-2018
08:10 AM
I have a Phoenix Table, that I can access via SparkSQL (with Phoenix Spark Plugin). The table has also a Timestamp column. I have to filter this Timestamp column by a user input, like 2018-11-14 01:02:03. So I want to filter my Dataset (that represents the read Phoenix table) with the where / filter methods. My actual Java code looks the following: Timestamp t1 = new Timestamp(sdf.parse(dateFrom).getTime());
Timestamp t2 = new Timestamp(sdf.parse(dateTo).getTime());
Column c1 = new Column("TABLE_TS_COL").geq(t1);
Column c2 = new Column("TABLE_TS_COL").leq(t2);
Dataset<Row> dsResult = sqlContext.read()
.format("org.apache.phoenix.spark")
.option("table", tableName)
.option("zkUrl", hbaseUrl).load()
.where("OTHER_COLUMN = " + inputId) // This works
.where(c1) // Problem!
.where(c2) // Problem!
But this leads to follwoing exception: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.RuntimeException: org.apache.phoenix.exception.PhoenixParserException: ERROR 604 (42P00): Syntax error. Mismatched input. Expecting "RPAREN", got "06" at line 1, column 474. My Spark History UI shows the following select statement: ...
18/11/14 08:54:58 INFO PhoenixInputFormat: Select Statement: SELECT "OTHER_COLUMN", "TABLE_TS_COL" FROM HBASE_TEST3 WHERE ( "OTHER_COLUMN" = 0 AND "OTHER_COLUMN" IS NOT NULL AND "TABLE_TS_COL" IS NOT NULL AND "TABLE_TS_COL" >= 2018-09-24 06:49:01.0 AND "TABLE_TS_COL" <= 2018-09-24 06:49:01.0)
For me it looks like the quotation marks are missing for the timestamp values (not sure about that)? How can I filter a Timestamp column by a user input in Java and SparkSQL?
... View more
Labels:
11-09-2018
07:30 AM
I have a huge Phoenix Table taht I have to select by multiple columns and multiple rows. The table looks like this: STARTNO || STARTSTEP || STARTTIME || OP || UNIT || ...
=======================================================================
123456 || AB || 2017-01-01 || TEST || MPH || ...
898473 || AB || 2017-01-02 || TEST || N || ...
...
Since the primary key for this Phoenix table consists of [startno, startstep, starttime], selecting on startno is very fast. Now to my problem: I need to select data from this table, which matches the startno AND startstep value, let's say I want to select by 3 startno values and 2 startstep values. Therefore I created a new dataframe with this two columns, like this: STARTNO || STARTSTEP
==============================
123456 || AB
234567 || AB
345678 || AB
123456 || BC
234567 || BC
345678 || BC My plan was to join the "Phoenix dataframe" with this "selection dataframe" for getting the data also in a fast way, because both columns are in the primary key of the table. But when I call the following statement, SparkSQL / Phoenix seems to make a full table scan, and predicate pushdown doesn't work: val dfResult = dfPhoenixTable.join(filterDf,Seq("STARTNO", "STARTSTEP"))
dfResult.show() When I look at the execution plan, I can only find isnotnull filters: +- Filter (isnotnull(_2#7245) && isnotnull(_1#7244)) How can I give Phoenix more than one column for selection and avoid a full table scan? Thank you! I'm using HDP 2.6.5 with Spark 2.3 and Phoenix 4.7 (HBase 1.1). I'm reading the Phoenix Table by the Phoenix-Spark Plugin.
... View more
Labels:
11-07-2018
01:31 PM
I have a DataFrame, like this: dfFilters
c1_hash || c2
==================
00 || 123
00 || 456
01 || 789
And I have a Phoenix Table, that I read by SparkSQL, like this: val dfRead = sqlContext.read.options(Map("table" -> "MYPHOENIXTABLE", "zkUrl" -> "myserver:2181")).format("org.apache.phoenix.spark").load()
dfRead.show()
c1_hash || c2 || c3
==========================
00 || 123 || A
00 || 234 || B
... Now I want to filter my dfRead by the "complete" dfFilters dataframe, which means, that the columns c1_hash and c1 should match the values of dfFilter. I tried to use a DataFrame.join method, but this seems to make a Phoenix full table scan, the Pushed Filters look like this: Filter ((C1_HASH#3857 = C1_HASH#3899) && (C1#3858 = C1#3900))
...
Filter (isnotnull(C1#3858) && isnotnull(C1_HASH#3857))
How can I avoid full table scans here? The key of my MYPHOENIXTABLE is build on the c1_hash and c1 columns. So Phoenix should be able to make a range scan, right?
... View more
Labels:
11-05-2018
01:00 PM
I created a Phoenix table with following command: %jdbc(phoenix)
CREATE TABLE IF NOT EXISTS phoenixtable (
mykey_hash varchar not null,
mykey varchar not null,
starttime varchar not null,
endtime varchar,
value varchar,
...
CONSTRAINT pk PRIMARY KEY (mykey_hash, mykey, starttime))
SPLIT ON ('00', '01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19')
I filled the table and now I want to read the table using SparkSQL. I need to select on multiple columns (mykey_hash and mykey), and I have many values to filter by. Let's say I need to filter for the following column values: mykey_hash mykey
01 123456.000
01 234567.000
12 585345.233
I created a dataframe for this selection, containing these two colums. Now I want to select the table data that has these two values in column mykey_hash and mykey. I'm using the following code: %spark
val dfRead = sqlContext.read.options(Map("table" -> "PHOENIXTABLE", "zkUrl" -> "myserver:2181")).format("org.apache.phoenix.spark").load()
...
// with listOfKeyHash = [01, 01, 12]
// with listOfKeys = [123456.000, 234567.000, 585345.233]
val dfToJoin =listOfKeyHash.zip(listOfKey).toDF("MYKEY_HASH", "MYKEY")
val resultDf = dfRead.join(dfToJoin,Seq("MYKEY_HASH", "MYKEY"))
But this seems to make a full table scan instead of using the values of my primary key to limit the amount of data to scan. I'm using HDP 2.6.5 with HBase 1.1 and Phoenix 4.7 and Spark 2.3. Anyone an idea?
... View more
Labels:
10-23-2018
06:10 AM
I have a Hadoop cluster with HDP 3.0 installed. I tried to build and install Apache Metron (v0.6) as MPack on the Ambari Server. The ambari-server install-mpack --mpack=/path/to/mpack.tar.gz --verbose seems to work, but I can't see Metron in the list of Services in my Ambari when starting the Ambari server. I followed this instruction for building the MPack files: https://community.hortonworks.com/articles/60805/deploying-a-fresh-metron-cluster-using-ambari-serv.html Update: I just tried to add Metron on another cluster, running HDP 2.6 with Ambari 2.5.2, here it works! Question: How can I install this MPack on HDP 3.0 with Ambari 2.7.1? Or is this not possible yet at the moment? Thank you for the help!
... View more
10-15-2018
03:44 PM
Solved it - Phoenix Arrays are 1-based, so using the following query solved it: SELECT REGEXP_SPLIT(ROWKEY, ':')[1] as test, count(1) FROM "my_view" GROUP BY REGEXP_SPLIT(ROWKEY, ':')[1]
... View more
10-15-2018
03:40 PM
I have a Phoenix View which has a row key column ROWKEY that has a layout like this: <hash>:<attributeA>_<attributeB> I want to count the rows of each <hash> value of my table. Therefore I need to group my View by the <hash> value, which I get when I split the RowKey column. I tried to use the REGEXP_SPLIT function of Phoenix, but I get an exception: %jdbc(phoenix)
SELECT REGEXP_SPLIT(ROWKEY, ':')[0] as test, count(1) FROM "my_view" GROUP BY REGEXP_SPLIT(ROWKEY, ':')[0]
The exception: org.apache.phoenix.exception.PhoenixIOException: org.apache.phoenix.exception.PhoenixIOException: org.apache.hadoop.hbase.DoNotRetryIOException: my_view,000,1539582312877.93d8d6e785eae60fedac3c6088b4e556.: 32767
at org.apache.phoenix.util.ServerUtil.createIOException(ServerUtil.java:93)
at org.apache.phoenix.util.ServerUtil.throwIOException(ServerUtil.java:59)
at org.apache.phoenix.coprocessor.BaseScannerRegionObserver.postScannerOpen(BaseScannerRegionObserver.java:271)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$52.call(RegionCoprocessorHost.java:1301)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$RegionOperation.call(RegionCoprocessorHost.java:1660)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperation(RegionCoprocessorHost.java:1734)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperationWithResult(RegionCoprocessorHost.java:1699)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postScannerOpen(RegionCoprocessorHost.java:1296)
at org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:2404)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:32385)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2150)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:187)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:167)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 32767
at org.apache.phoenix.schema.types.PArrayDataType.positionAtArrayElement(PArrayDataType.java:418)
at org.apache.phoenix.schema.types.PArrayDataType.positionAtArrayElement(PArrayDataType.java:379)
at org.apache.phoenix.expression.function.ArrayIndexFunction.evaluate(ArrayIndexFunction.java:64)
at org.apache.phoenix.util.TupleUtil.getConcatenatedValue(TupleUtil.java:101)
at org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver.scanUnordered(GroupedAggregateRegionObserver.java:418)
at org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver.doPostScannerOpen(GroupedAggregateRegionObserver.java:162)
at org.apache.phoenix.coprocessor.BaseScannerRegionObserver.postScannerOpen(BaseScannerRegionObserver.java:237)
... 11 more
at org.apache.phoenix.util.ServerUtil.parseServerException(ServerUtil.java:117)
at org.apache.phoenix.iterate.BaseResultIterators.getIterators(BaseResultIterators.java:780)
at org.apache.phoenix.iterate.BaseResultIterators.getIterators(BaseResultIterators.java:721)
at org.apache.phoenix.iterate.MergeSortResultIterator.getMinHeap(MergeSortResultIterator.java:72)
at org.apache.phoenix.iterate.MergeSortResultIterator.minIterator(MergeSortResultIterator.java:93)
at org.apache.phoenix.iterate.MergeSortResultIterator.next(MergeSortResultIterator.java:58)
at org.apache.phoenix.iterate.BaseGroupedAggregatingResultIterator.next(BaseGroupedAggregatingResultIterator.java:64)
at org.apache.phoenix.iterate.DelegateResultIterator.next(DelegateResultIterator.java:44)
at org.apache.phoenix.iterate.LimitingResultIterator.next(LimitingResultIterator.java:47)
at org.apache.phoenix.jdbc.PhoenixResultSet.next(PhoenixResultSet.java:778)
at org.apache.commons.dbcp2.DelegatingResultSet.next(DelegatingResultSet.java:191)
at org.apache.commons.dbcp2.DelegatingResultSet.next(DelegatingResultSet.java:191)
at org.apache.zeppelin.jdbc.JDBCInterpreter.getResults(JDBCInterpreter.java:510)
at org.apache.zeppelin.jdbc.JDBCInterpreter.executeSql(JDBCInterpreter.java:694)
at org.apache.zeppelin.jdbc.JDBCInterpreter.interpret(JDBCInterpreter.java:763)
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:101)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:502)
at org.apache.zeppelin.scheduler.Job.run(Job.java:175)
at org.apache.zeppelin.scheduler.ParallelScheduler$JobRunner.run(ParallelScheduler.java:162)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.phoenix.exception.PhoenixIOException: org.apache.hadoop.hbase.DoNotRetryIOException: my_view,000,1539582312877.93d8d6e785eae60fedac3c6088b4e556.: 32767
at org.apache.phoenix.util.ServerUtil.createIOException(ServerUtil.java:93)
at org.apache.phoenix.util.ServerUtil.throwIOException(ServerUtil.java:59)
at org.apache.phoenix.coprocessor.BaseScannerRegionObserver.postScannerOpen(BaseScannerRegionObserver.java:271)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$52.call(RegionCoprocessorHost.java:1301)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$RegionOperation.call(RegionCoprocessorHost.java:1660)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperation(RegionCoprocessorHost.java:1734)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperationWithResult(RegionCoprocessorHost.java:1699)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postScannerOpen(RegionCoprocessorHost.java:1296)
at org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:2404)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:32385)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2150)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:187)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:167)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 32767
at org.apache.phoenix.schema.types.PArrayDataType.positionAtArrayElement(PArrayDataType.java:418)
at org.apache.phoenix.schema.types.PArrayDataType.positionAtArrayElement(PArrayDataType.java:379)
at org.apache.phoenix.expression.function.ArrayIndexFunction.evaluate(ArrayIndexFunction.java:64)
at org.apache.phoenix.util.TupleUtil.getConcatenatedValue(TupleUtil.java:101)
at org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver.scanUnordered(GroupedAggregateRegionObserver.java:418)
at org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver.doPostScannerOpen(GroupedAggregateRegionObserver.java:162)
at org.apache.phoenix.coprocessor.BaseScannerRegionObserver.postScannerOpen(BaseScannerRegionObserver.java:237)
... 11 more
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:206)
at org.apache.phoenix.iterate.BaseResultIterators.getIterators(BaseResultIterators.java:775)
... 24 more
Caused by: org.apache.phoenix.exception.PhoenixIOException: org.apache.hadoop.hbase.DoNotRetryIOException: my_view,000,1539582312877.93d8d6e785eae60fedac3c6088b4e556.: 32767
at org.apache.phoenix.util.ServerUtil.createIOException(ServerUtil.java:93)
at org.apache.phoenix.util.ServerUtil.throwIOException(ServerUtil.java:59)
at org.apache.phoenix.coprocessor.BaseScannerRegionObserver.postScannerOpen(BaseScannerRegionObserver.java:271)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$52.call(RegionCoprocessorHost.java:1301)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$RegionOperation.call(RegionCoprocessorHost.java:1660)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperation(RegionCoprocessorHost.java:1734)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperationWithResult(RegionCoprocessorHost.java:1699)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postScannerOpen(RegionCoprocessorHost.java:1296)
at org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:2404)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:32385)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2150)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:187)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:167)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 32767
at org.apache.phoenix.schema.types.PArrayDataType.positionAtArrayElement(PArrayDataType.java:418)
at org.apache.phoenix.schema.types.PArrayDataType.positionAtArrayElement(PArrayDataType.java:379)
at org.apache.phoenix.expression.function.ArrayIndexFunction.evaluate(ArrayIndexFunction.java:64)
at org.apache.phoenix.util.TupleUtil.getConcatenatedValue(TupleUtil.java:101)
at org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver.scanUnordered(GroupedAggregateRegionObserver.java:418)
at org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver.doPostScannerOpen(GroupedAggregateRegionObserver.java:162)
at org.apache.phoenix.coprocessor.BaseScannerRegionObserver.postScannerOpen(BaseScannerRegionObserver.java:237)
... 11 more
at org.apache.phoenix.util.ServerUtil.parseServerException(ServerUtil.java:117)
at org.apache.phoenix.iterate.TableResultIterator.initScanner(TableResultIterator.java:252)
at org.apache.phoenix.iterate.ParallelIterators$1.call(ParallelIterators.java:113)
at org.apache.phoenix.iterate.ParallelIterators$1.call(ParallelIterators.java:108)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.phoenix.job.JobManager$InstrumentedJobFutureTask.run(JobManager.java:183)
... 3 more
Caused by: org.apache.hadoop.hbase.DoNotRetryIOException: org.apache.hadoop.hbase.DoNotRetryIOException: my_view,000,1539582312877.93d8d6e785eae60fedac3c6088b4e556.: 32767
at org.apache.phoenix.util.ServerUtil.createIOException(ServerUtil.java:93)
at org.apache.phoenix.util.ServerUtil.throwIOException(ServerUtil.java:59)
at org.apache.phoenix.coprocessor.BaseScannerRegionObserver.postScannerOpen(BaseScannerRegionObserver.java:271)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$52.call(RegionCoprocessorHost.java:1301)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$RegionOperation.call(RegionCoprocessorHost.java:1660)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperation(RegionCoprocessorHost.java:1734)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperationWithResult(RegionCoprocessorHost.java:1699)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postScannerOpen(RegionCoprocessorHost.java:1296)
at org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:2404)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:32385)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2150)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:187)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:167)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 32767
at org.apache.phoenix.schema.types.PArrayDataType.positionAtArrayElement(PArrayDataType.java:418)
at org.apache.phoenix.schema.types.PArrayDataType.positionAtArrayElement(PArrayDataType.java:379)
at org.apache.phoenix.expression.function.ArrayIndexFunction.evaluate(ArrayIndexFunction.java:64)
at org.apache.phoenix.util.TupleUtil.getConcatenatedValue(TupleUtil.java:101)
at org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver.scanUnordered(GroupedAggregateRegionObserver.java:418)
at org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver.doPostScannerOpen(GroupedAggregateRegionObserver.java:162)
at org.apache.phoenix.coprocessor.BaseScannerRegionObserver.postScannerOpen(BaseScannerRegionObserver.java:237)
... 11 more
at sun.reflect.GeneratedConstructorAccessor38.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:95)
at org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRemoteException(ProtobufUtil.java:335)
at org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:391)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:208)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:63)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:211)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:396)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:370)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:136)
at org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture.run(ResultBoundedCompletionService.java:80)
... 3 more
Caused by: org.apache.hadoop.hbase.ipc.RemoteWithExtrasException(org.apache.hadoop.hbase.DoNotRetryIOException): org.apache.hadoop.hbase.DoNotRetryIOException: my_view,000,1539582312877.93d8d6e785eae60fedac3c6088b4e556.: 32767
at org.apache.phoenix.util.ServerUtil.createIOException(ServerUtil.java:93)
at org.apache.phoenix.util.ServerUtil.throwIOException(ServerUtil.java:59)
at org.apache.phoenix.coprocessor.BaseScannerRegionObserver.postScannerOpen(BaseScannerRegionObserver.java:271)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$52.call(RegionCoprocessorHost.java:1301)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$RegionOperation.call(RegionCoprocessorHost.java:1660)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperation(RegionCoprocessorHost.java:1734)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperationWithResult(RegionCoprocessorHost.java:1699)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postScannerOpen(RegionCoprocessorHost.java:1296)
at org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:2404)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:32385)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2150)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:187)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:167)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 32767
at org.apache.phoenix.schema.types.PArrayDataType.positionAtArrayElement(PArrayDataType.java:418)
at org.apache.phoenix.schema.types.PArrayDataType.positionAtArrayElement(PArrayDataType.java:379)
at org.apache.phoenix.expression.function.ArrayIndexFunction.evaluate(ArrayIndexFunction.java:64)
at org.apache.phoenix.util.TupleUtil.getConcatenatedValue(TupleUtil.java:101)
at org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver.scanUnordered(GroupedAggregateRegionObserver.java:418)
at org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver.doPostScannerOpen(GroupedAggregateRegionObserver.java:162)
at org.apache.phoenix.coprocessor.BaseScannerRegionObserver.postScannerOpen(BaseScannerRegionObserver.java:237)
... 11 more
at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1227)
at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:218)
at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:292)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.scan(ClientProtos.java:32831)
at org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:383)
... 10 more
How is the correct query?
... View more
Labels:
10-15-2018
02:47 PM
I want to access a HBase table by Phoenix in a Zepplin JDBC paragraph: %jdbc(phoenix)
SELECT COUNT(*) FROM myphoenixview For a smaller view it works perfectly, but when I run the query on a big view, I get a timeout exception: java.sql.SQLTimeoutException: Operation timed out.
at org.apache.phoenix.exception.SQLExceptionCode$14.newException(SQLExceptionCode.java:364)
at org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:150)
at org.apache.phoenix.iterate.BaseResultIterators.getIterators(BaseResultIterators.java:831)
at org.apache.phoenix.iterate.BaseResultIterators.getIterators(BaseResultIterators.java:721)
at org.apache.phoenix.iterate.ConcatResultIterator.getIterators(ConcatResultIterator.java:50)
at org.apache.phoenix.iterate.ConcatResultIterator.currentIterator(ConcatResultIterator.java:97)
at org.apache.phoenix.iterate.ConcatResultIterator.next(ConcatResultIterator.java:117)
at org.apache.phoenix.iterate.BaseGroupedAggregatingResultIterator.next(BaseGroupedAggregatingResultIterator.java:64)
at org.apache.phoenix.iterate.UngroupedAggregatingResultIterator.next(UngroupedAggregatingResultIterator.java:39)
at org.apache.phoenix.iterate.DelegateResultIterator.next(DelegateResultIterator.java:44)
at org.apache.phoenix.iterate.LimitingResultIterator.next(LimitingResultIterator.java:47)
at org.apache.phoenix.jdbc.PhoenixResultSet.next(PhoenixResultSet.java:778)
at org.apache.commons.dbcp2.DelegatingResultSet.next(DelegatingResultSet.java:191)
at org.apache.commons.dbcp2.DelegatingResultSet.next(DelegatingResultSet.java:191)
at org.apache.zeppelin.jdbc.JDBCInterpreter.getResults(JDBCInterpreter.java:510)
at org.apache.zeppelin.jdbc.JDBCInterpreter.executeSql(JDBCInterpreter.java:694)
at org.apache.zeppelin.jdbc.JDBCInterpreter.interpret(JDBCInterpreter.java:763)
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:101)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:502)
at org.apache.zeppelin.scheduler.Job.run(Job.java:175)
at org.apache.zeppelin.scheduler.ParallelScheduler$JobRunner.run(ParallelScheduler.java:162)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) I set the following properties in my Ambari HBase Service Config (I'm using HDP 2.6.5 with HBase 1.1 and Phoenix 4.7): hbase.rpc.timeout = 180000 phoenix.query.timeoutMs = 180000 zookeeper.session.timeout = 180000 I set the same properties also for my JDBC and HBase interpreter in Zeppelin. The query still times out after 60 seconds. How can I avoid this? Where to set the properties?
... View more
10-11-2018
09:21 AM
Is it possible to read the data of a Phoenix View by SparkSQL? I'm doing something like this: // HBase Shell create 'hbase_table', { NAME => 'cf1' }
put 'hbase_table', ...
// Afterwards, e.g. in Zeppelin
%jdbc(phoenix)
CREATE VIEW "hbase_table" (
rowkey VARCHAR PRIMARY KEY,
"cf"."col" VARCHAR)
// Afterwards %spark val df = sqlContext.read.format("jdbc").options(Map("driver" -> "org.apache.phoenix.jdbc.PhoenixDriver","url" -> "jdbc:phoenix:myhost:2181:/hbase-unsecure", "dbtable" -> "hbase_table")).load() The property dbtable seems to be a mandatory one... So are there only Phoenix Tables supported or can I also give it a Phoenix View somehow? Thank you!
... View more
Labels:
10-10-2018
06:16 AM
I have a huge table table1 in Hive, which contains around 60 million rows (~500 GB ORC files in HDFS). It is partitioned by the column partCol. Now I want to create a new table table2 in Hive, that has the same schema and shall contain only 50 million rows of table1. Therefore I run this query: set hive.exec.dynamic.partition=true;
INSERT OVERWRITE TABLE testdb.table2 partition(partCol) SELECT colA, colB, ..., partCol FROM testdb.table1 LIMIT 50000000; This creates a lot of Tez Mapper tasks, which looks and works fine - The tasks take around 1 h to finish. And now the problem: Afterwards there's only 1 Reducer Task, which runs for hours and then fails! How to increase this number of Reducer tasks for this query? Is the LIMIT clause the issue? System Information: I'm using the Hortonworks Data Platform 2.6.5 with Hive 1.2.1 The following Hive settings are configured: hive.execution.engine = TEZ hive.tez.auto.reducer.parallelism = true hive.exec.reducers.bytes.per.reducer = 64 MB hive.exec.reducers.max = 1009 Tez settings: tez.grouping.min-size = 16 MB tez.grouping.max-size = 1 GB
... View more
Labels:
09-25-2018
01:54 PM
1 Kudo
The problem was solved after changing the MySQL Database URL from jdbc:mysql://xxxx.yyyy/hive?createDatabaseIfNotExist=true to jdbc:mysql://xxxx.yyyy/hive?createDatabaseIfNotExist=true&serverTimezone=Europe/Berlin I found the relevant information here: https://community.hortonworks.com/questions/218023/error-setting-up-hive-on-hdp-265timezone-on-mysql.html
... View more
09-25-2018
01:27 PM
I'm just setting up a Hortonworks Data Platform 3.0 installation. When I want to start the services (first time), the Hive Metastore start brings an exception: Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
org.apache.hadoop.hive.metastore.HiveMetaException: Failed to get schema version.
Underlying cause: java.sql.SQLException : The server time zone value 'CEST' is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the serverTimezone configuration property) to use a more specifc time zone value if you want to utilize time zone support.
SQL Error code: 0
org.apache.hadoop.hive.metastore.HiveMetaException: Failed to get schema version.
at org.apache.hadoop.hive.metastore.tools.HiveSchemaHelper.getConnectionToMetastore(HiveSchemaHelper.java:94)
at org.apache.hive.beeline.HiveSchemaTool.getConnectionToMetastore(HiveSchemaTool.java:169)
at org.apache.hive.beeline.HiveSchemaTool.testConnectionToMetastore(HiveSchemaTool.java:475)
at org.apache.hive.beeline.HiveSchemaTool.doInit(HiveSchemaTool.java:581)
at org.apache.hive.beeline.HiveSchemaTool.doInit(HiveSchemaTool.java:567)
at org.apache.hive.beeline.HiveSchemaTool.main(HiveSchemaTool.java:1539)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:318)
at org.apache.hadoop.util.RunJar.main(RunJar.java:232)
Caused by: java.sql.SQLException: The server time zone value 'CEST' is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the serverTimezone configuration property) to use a more specifc time zone value if you want to utilize time zone support.
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:89)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:63)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:73)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:76)
at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:832)
at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:456)
at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:240)
at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:207)
at java.sql.DriverManager.getConnection(DriverManager.java:664)
at java.sql.DriverManager.getConnection(DriverManager.java:247)
at org.apache.hadoop.hive.metastore.tools.HiveSchemaHelper.getConnectionToMetastore(HiveSchemaHelper.java:88)
... 11 more In my CentOS system I set the timezone to Europe/Berlin. ls -l /etc/localtime
lrwxrwxrwx. 1 root root 35 Sep 25 10:45 /etc/localtime -> ../usr/share/zoneinfo/Europe/Berlin
timedatectl | grep -i 'time zone'
Time zone: Europe/Berlin (CEST, +0200)
Does anyone know how to solve this problem? Thank you!
... View more
09-14-2018
10:27 AM
@Felix Albani Thank you for your help! Without the LIMIT clause, the Job works perfectly (and in parallel).
... View more
09-12-2018
09:40 AM
I'm using the Spark HBase Connector (SHC, which is included in my HDP 2.6.5 installation) for moving data into HBase using Spark SQL. Since my dataframe (the cell values) is very huge, I sometimes get an "KeyValue size too large" exception. Therefore I found this article, that sais, that I have to increase the value of the HBase property "hbase.client.keyvalue.maxsize": https://community.hortonworks.com/content/supportkb/154006/errorjavalangillegalargumentexception-keyvalue-siz-1.html I changed this property in my HBase configuration via Ambari, this seems not to help. Now I tried to change this property in my application (Zeppelin notebook), therefore I need to give my SHC this property, but I don't know how to do this (setting the property in my Zeppelin HBase interpreter settings also didn't help). Can someone show how to change the property for the SHC? Is there a way to do this? Thank you!
... View more
09-11-2018
02:06 PM
Here the extended explain of the DF, LLAP is not enabled in our cluster: df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [importtime: timestamp, tester: string ... 21 more fields]
== Parsed Logical Plan ==
GlobalLimit 1
+- LocalLimit 1
+- Project [importtime#0, tester#1, testerhead#2, matchcode#3, revision#4, usertext#5, teststufe#6, temp#7, filedat#8, starttime#9, endtime#10, lotnrc#11, wafernr#12, testname#13, testnumber#14, testtype#15, unit#16, highl#17, lowl#18, highs#19, lows#20, valuelist#21, hashvalue#22]
+- SubqueryAlias mytable
+- Relation[importtime#0,tester#1,testerhead#2,matchcode#3,revision#4,usertext#5,teststufe#6,temp#7,filedat#8,starttime#9,endtime#10,lotnrc#11,wafernr#12,testname#13,testnumber#14,testtype#15,unit#16,highl#17,lowl#18,highs#19,lows#20,valuelist#21,hashvalue#22] orc
== Analyzed Logical Plan ==
importtime: timestamp, tester: string, testerhead: string, matchcode: string, revision: string, usertext: string, teststufe: string, temp: string, filedat: date, starttime: timestamp, endtime: timestamp, lotnrc: string, wafernr: string, testname: string, testnumber: string, testtype: string, unit: string, highl: string, lowl: string, highs: string, lows: string, valuelist: string, hashvalue: int
GlobalLimit 1
+- LocalLimit 1
+- Project [importtime#0, tester#1, testerhead#2, matchcode#3, revision#4, usertext#5, teststufe#6, temp#7, filedat#8, starttime#9, endtime#10, lotnrc#11, wafernr#12, testname#13, testnumber#14, testtype#15, unit#16, highl#17, lowl#18, highs#19, lows#20, valuelist#21, hashvalue#22]
+- SubqueryAlias mytable
+- Relation[importtime#0,tester#1,testerhead#2,matchcode#3,revision#4,usertext#5,teststufe#6,temp#7,filedat#8,starttime#9,endtime#10,lotnrc#11,wafernr#12,testname#13,testnumber#14,testtype#15,unit#16,highl#17,lowl#18,highs#19,lows#20,valuelist#21,hashvalue#22] orc
== Optimized Logical Plan ==
InMemoryRelation [importtime#0, tester#1, testerhead#2, matchcode#3, revision#4, usertext#5, teststufe#6, temp#7, filedat#8, starttime#9, endtime#10, lotnrc#11, wafernr#12, testname#13, testnumber#14, testtype#15, unit#16, highl#17, lowl#18, highs#19, lows#20, valuelist#21, hashvalue#22], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- CollectLimit 1
+- *FileScan orc mydb.mytable[importtime#0,tester#1,testerhead#2,matchcode#3,revision#4,usertext#5,teststufe#6,temp#7,filedat#8,starttime#9,endtime#10,lotnrc#11,wafernr#12,testname#13,testnumber#14,testtype#15,unit#16,highl#17,lowl#18,highs#19,lows#20,valuelist#21,hashvalue#22] Batched: false, Format: ORC, Location: CatalogFileIndex[hdfs://hdp-m-01:8020/apps/hive/warehouse/mydb.db/mytab..., PartitionCount: 1000, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<importtime:timestamp,tester:string,testerhead:string,matchcode:string,revision:string,user...
== Physical Plan ==
InMemoryTableScan [importtime#0, tester#1, testerhead#2, matchcode#3, revision#4, usertext#5, teststufe#6, temp#7, filedat#8, starttime#9, endtime#10, lotnrc#11, wafernr#12, testname#13, testnumber#14, testtype#15, unit#16, highl#17, lowl#18, highs#19, lows#20, valuelist#21, hashvalue#22]
+- InMemoryRelation [importtime#0, tester#1, testerhead#2, matchcode#3, revision#4, usertext#5, teststufe#6, temp#7, filedat#8, starttime#9, endtime#10, lotnrc#11, wafernr#12, testname#13, testnumber#14, testtype#15, unit#16, highl#17, lowl#18, highs#19, lows#20, valuelist#21, hashvalue#22], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- CollectLimit 1
+- *FileScan orc mydb.mytable[importtime#0,tester#1,testerhead#2,matchcode#3,revision#4,usertext#5,teststufe#6,temp#7,filedat#8,starttime#9,endtime#10,lotnrc#11,wafernr#12,testname#13,testnumber#14,testtype#15,unit#16,highl#17,lowl#18,highs#19,lows#20,valuelist#21,hashvalue#22] Batched: false, Format: ORC, Location: CatalogFileIndex[hdfs://hdp-m-01:8020/apps/hive/warehouse/mydb.db/mytab..., PartitionCount: 1000, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<importtime:timestamp,tester:string,testerhead:string,matchcode:string,revision:string,user...
<br>
... View more
09-11-2018
12:01 PM
I have a huge Hive Table (ORC) and I want to select just a few rows of the table (in Zeppelin). %spark sqlContext.setConf("spark.sql.orc.filterPushdown", "true") val df1 = sqlContext.sql("SELECT * FROM mydb.myhugetable LIMIT 1") // Takes 10 mins val df2 = sqlContext.sql("SELECT * FROM mydb.myhugetable").limit(1) // Takes 10 mins Using the LIMIT clause in my SQL statement or the corresponding dataframe method DF.limit doesn't help, as the query still takes too long. It seems to read the whole table first and then just returning the n rows. How can I achieve, that the filter limits the data during running the SQL and therefore runs faster? Shouldn't the filter pushdown help here? I can't see any difference with the setting spark.sql.orc.filterPushdown set to true or false.
Thank you!
... View more
09-07-2018
06:18 PM
I created an external Hive Table that refers to a corresponding HBase table (which has one column family "cf"): create external table testdb.testtable ( rowkey String, hashvalue String, valuelist String)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:valueList,cf:hashValue")
TBLPROPERTIES ('hbase.table.name' = 'hbase_table')
Now I want to create HFiles from an existing Hive table (origin_table), which has more than one column (rokey (=HBase row key), hashvalue, valuelist). When I run the following query to create HFiles from the Hive table, I always get an exception. Query: set hfile.family.path=/tmp/testtable/cf
set hive.hbase.generatehfiles=true
INSERT OVERWRITE TABLE testdb.testtable SELECT k, valuelist, hashvalue from testdb.origin_table DISTRIBUTE BY k SORT BY k;
Exception: java.io.IOException: Added a key not lexically larger than previous.
Current cell = 055:test_2018-08-28 09:09:31/cf:hashValue/1536343090127/Put/vlen=3/seqid=0,
lastCell = 055:test_2018-08-28 09:09:31/cf:valueList/1536343090127/Put/vlen=11417/seqid=0
This seems to occur because of the different column names (hashValue und valueList). There's no difference if I exchange the columns of the query (SELECT k, hashvalue, valuelist FROM ...), it also throws the exception! Of course, when change this example into a Hive table with only one column (+key column), the INSERT command works, as there is no other column to read out for the HFile creation. Question now: How can I create HFiles with this Hive-HBase integration, if there is more than one column (+key) to transfer?
... View more