Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Phoenix Spark Plugin: DataFrame.show leads to repeating stages

Phoenix Spark Plugin: DataFrame.show leads to repeating stages

Expert Contributor

I have a huge Phoenix Table in my cluster (HDP 2.6.5 with HBase 1.1.2, Spark 2.3 and Phoenix 4.7) and I'm accessing the data via Zeppelin UI, Spark Interpreter and the Phoenix-Spark Plugin (https://phoenix.apache.org/phoenix_spark.html).

I call this code to read and show (20 entries) of the Phoenix Table:

val TESTNAMES = List("a", "b", "c")
val MEASUREIDS = List(...)
val STEP = "T1"

val dfRead = sqlContext.read.options(Map("table" -> "MY_PHOENIX_TABLE", "zkUrl" -> "myserver:2181")).format("org.apache.phoenix.spark").load()

val RESULT = dfRead.filter(col("testname").isin(TESTNAMES:_*)).filter(col("measureid").isin(MEASUREIDS:_*)).filter(col("step") === STEP)

RESULT.show()

The execution of this paragraph is very slow (takes about 3 minutes to scan over the 50 million rows of table with key [testname, measureid, testnumber, step, starttime]). As I give the value selection list for the first two parts of the key [testname, measureid], it should skip ~49.999.900 rows (I have about 100 rows in my table for this [testname, measureid] combination).

When I look at the "SQL" tab in the Spark History UI, I see the following:

93328-1.png

Here are the Stages that were processed by the DF.show command:

Why does Spark generete a Stage with 1 task, afterwards a Stage with 4, then 20, then 100, ... etc.? Looks like repeating of this steps is taking all the time! How do I have to change my code, is something wrong with it?

93327-2.png

PS: Calling a DF.count instead of the DF.show leads to only two stages, with first 3340 tasks (takes also ~3 minutes) and then a very fast one with 1 task (0.5 sec)...

I'm running 4 executors + driver with 1 core and 16 GB RAM each for my Spark Interpreter.

UPDATE:

Using the Phoenix JDBC connection with Spark, the above queries take ~5 seconds and this strange Stage re-generation is also gone!

val dfRead = sqlContext.read.format("jdbc").options(Map("driver" -> "org.apache.phoenix.jdbc.PhoenixDriver", "url" -> "jdbc:phoenix:myserver:2181:/hbase-unsecure", "dbtable" -> "MY_PHOENIX_TABLE")).load()<br>

But why does this behave so differently?