Support Questions

Find answers, ask questions, and share your expertise

Phoenix using Spark SQL using mapreduce


We are writing a Java app to do ingesting into Phoenix, this small side application needs to query files we've loaded into Phoenix to identify which files to reprocess by our support team if issues in our rules and custom binary file parsers are fixed and we need to reload data. The application is using spark to run lots of parsers on the cluster in parallel.

I was going to start using PhoenixConfigurationUtil class to ultimately provide a SQL query since we are choosing which files to reprocess using a SQL where clause and PhoenixConfigurationUtil.setInputQuery lets us provide a SQL query. It's also one of the few examples that shows how to use Spark and Phoenix that's written in Java, all the other examples are in Scala. We are using HDP 2.5.3, Phoenix 4.7, and spark 1.6.

What I noticed in Spark 1.6 and it appears, Spark 2.0 is that all the Scala variations mentioned on the Phoenix site related to Spark that shows calls to phoenixTableAsRDD and phoenixTableAsDataFrame both end up calling PhoenixConfigurationUtil which is part of the org.apache.phoenix.mapreduce.util namespace.

So my question is whether anyone recommends using PhoenixConfigurationUtil (which seems to mapreduce based) or one of the scala Spark based, but which look at least right now they call the same PhoenixConfigurationUtil? Are you expecting a pure spark or Spark SQL based solution at some point in the future so that phoenixTableAsRDD or phoenixTableAsDataFrame will no longer call the mapreduce Phoenix code. Or is it expected that those APIs will continue to be joined at the hip for the indefinite future?



@Jeff Watson

MapReduce the compute engine is never used and is not required in any way for Phoenix Spark adapter. The PhoenixConfigurationUtil class just helps to define the configuration of the RDD by leveraging the existing Phoenix input/output format definition.

Below is a snippet from org.apache.phoenix.spark.PhoenixRDD. The variable phoenixConf is defined using PhoenixConfigurationUtil class. There is no distributed compute, just serialization definition like record start/end and columns for DataFrame. It's just a way to explain to Spark how to turn a row in target Phoenix table into an RDD record.

def getPhoenixConfiguration: Configuration = {
    // This is just simply not serializable, so don't try, but clone it because
    // PhoenixConfigurationUtil mutates it.
    val config = HBaseConfiguration.create(conf)

    PhoenixConfigurationUtil.setInputClass(config, classOf[PhoenixRecordWritable])
    PhoenixConfigurationUtil.setInputTableName(config, table)
    if(!columns.isEmpty) {
      PhoenixConfigurationUtil.setSelectColumnNames(config, columns.toArray)

    if(predicate.isDefined) {
      PhoenixConfigurationUtil.setInputTableConditions(config, predicate.get)

phoenixConf is then used to create a new "custom" Hadoop API RDD using a method of SparkContext. This is where Spark will distribute all of the target rows of the Phoenix table into an RDD.

val phoenixRDD = sc.newAPIHadoopRDD(phoenixConf, classOf[PhoenixInputFormat[PhoenixRecordWritable]], classOf[NullWritable], classOf[PhoenixRecordWritable])

You do not need MapReduce installed on the cluster, just the util JAR in the classpath (included in the maven package of the Phoenix Spark adapter). All compute is handled by Spark. This is no different than needing the CSV reader JAR from Databricks in order to create a Spark RDD from a CSV file (snippet from...).

import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)
val df =
    .option("header", "true") // Use first line of all files as header
    .option("inferSchema", "true") // Automatically infer data types

val selectedData ="year", "model")
    .option("header", "true")

It's just that the package com.databricks.spark.csv from Databricks was only designed for Spark and thus has the word spark in the package name. Hbase/Phoenix originated at a time when MapReduce was the compute main engine for Hadoop. Hence the package name has the word mapreduce in it. It's not that the MapReduce is a dependency, it's just that Engineering teams rarely go back and change package names.


Well now that's a relief. I think what threw me is (a) we just rewrote our apps to use spark instead of mapreduce, and (b) the class that contains the PhoenixConfigurationUtil is org.apache.phoenix.mapreduce.util so I thought we might be accidentally jumping backwards. Thanks for clarifying.


@Jeff Watson

Glad I could help. Would you mind accepting the answer as correct?