Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Using JDBC inside Spark application

Expert Contributor

I wrote a Spark application for bulk-loading a Phoenix Table. Everything worked for a few weeks now, but for a few days I get some Problems with duplicated rows. This was caused by faulty table stats. However, a possible Workaround for that would be to delete and re-generate the stats for this table.

Therefore I Need to open a JDBC Connection to my Phoenix database and call the Statements for deleting and creating the stats.

Since I Need to do this after iserting the new data via Spark, I also want to create and use this JDBC Connection inside my Spark Job, after doing the table bulk-loading stuff.

For that I added the following method and call it between the dataframe.save() and sparkContext.close() method in my Java Code:

private static void updatePhoenixTableStatistics(String phoenixTableName) {
  try {
   Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
         System.out.println("Connecting to database..");
         Connection conn = DriverManager.getConnection("jdbc:phoenix:my-sever.net:2181:/hbase-unsecure");
         System.out.println("Creating statement...");
         Statement st = conn.createStatement();
         
         st.executeUpdate("DELETE FROM SYSTEM.STATS WHERE physical_name='" + phoenixTableName + "'");
         System.out.println("Successfully deleted statistics data... Now refreshing it.");
         
         st.executeUpdate("UPDATE STATISTICS " + phoenixTableName + " ALL");
      System.out.println("Successfully refreshed statistics data.");
      
         st.close();
         conn.close();
         
         System.out.println("Connection closed.");
  } catch (Exception e) {
   System.out.println("Unable to update table statistics - Skipping this step!");
   e.printStackTrace();
  }
 }

The Problem is, that since I added this method I Always get the following exception at the end of my Spark Job:

Bulk-Load: DataFrame.save() completed - Import finished successfully!
Updating Table Statistics:
Connecting to database..
Creating statement...
Successfully deleted statistics data... Now refreshing it.
Successfully refreshed statistics data.
Connection closed.
Exception in thread "Thread-31" java.lang.RuntimeException: java.io.FileNotFoundException: /tmp/spark-e5b01508-0f84-4702-9684-4f6ceac803f9/gk-journal-importer-phoenix-0.0.3h.jar (No such file or directory)
        at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2794)
        at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2646)
        at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2518)
        at org.apache.hadoop.conf.Configuration.get(Configuration.java:1065)
        at org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1119)
        at org.apache.hadoop.conf.Configuration.getBoolean(Configuration.java:1520)
        at org.apache.hadoop.hbase.HBaseConfiguration.checkDefaultsVersion(HBaseConfiguration.java:68)
        at org.apache.hadoop.hbase.HBaseConfiguration.addHbaseResources(HBaseConfiguration.java:82)
        at org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:97)
        at org.apache.phoenix.query.ConfigurationFactory$ConfigurationFactoryImpl$1.call(ConfigurationFactory.java:49)
        at org.apache.phoenix.query.ConfigurationFactory$ConfigurationFactoryImpl$1.call(ConfigurationFactory.java:46)
        at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:78)
        at org.apache.phoenix.util.PhoenixContextExecutor.callWithoutPropagation(PhoenixContextExecutor.java:93)
        at org.apache.phoenix.query.ConfigurationFactory$ConfigurationFactoryImpl.getConfiguration(ConfigurationFactory.java:46)
        at org.apache.phoenix.jdbc.PhoenixDriver$1.run(PhoenixDriver.java:88)
Caused by: java.io.FileNotFoundException: /tmp/spark-e5b01508-0f84-4702-9684-4f6ceac803f9/gk-journal-importer-phoenix-0.0.3h.jar (No such file or directory)
        at java.util.zip.ZipFile.open(Native Method)
        at java.util.zip.ZipFile.<init>(ZipFile.java:225)
        at java.util.zip.ZipFile.<init>(ZipFile.java:155)
        at java.util.jar.JarFile.<init>(JarFile.java:166)
        at java.util.jar.JarFile.<init>(JarFile.java:103)
        at sun.net.www.protocol.jar.URLJarFile.<init>(URLJarFile.java:93)
        at sun.net.www.protocol.jar.URLJarFile.getJarFile(URLJarFile.java:69)
        at sun.net.www.protocol.jar.JarFileFactory.get(JarFileFactory.java:99)
        at sun.net.www.protocol.jar.JarURLConnection.connect(JarURLConnection.java:122)
        at sun.net.www.protocol.jar.JarURLConnection.getInputStream(JarURLConnection.java:152)
        at org.apache.hadoop.conf.Configuration.parse(Configuration.java:2612)
        at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2693)
        ... 14 more

Does someone know About this Problem and can help? How the generally work with JDBC inside a Spark Job? Or is there another possibility for doing that?

I'm working on HDP 2.6.5 with Spark 2.3 and Phoenix 4.7 installed.

5 REPLIES 5

"gk-journal-importer-phoenix-0.0.3h.jar" is not a JAR file that HBase or Phoenix own -- this is a JAR file that your configuration is defines. You likely need to localize this JAR with your Spark application and set the proper localized path in the configuration.

Expert Contributor

Thank you for the fast answer @Josh Elser, but I'm not sure what you mean with "not a JAR that HBase or Phoenix own". The gk-journal-importer-phoenix-0.0.3h.jar is my exported Java project. It contains the classes of my import job and the Main class with the main method, that creates, uses and closes the SparkContext. I start my Spark application by running the command

spark-submit --class spark.dataimport.SparkImportApp --master yarn --deploy-mode client hdfs:/user/test/gk-journal-importer-phoenix-0.0.3h.jar <some parameters for the main method>

After looking closer at your stacktrace, I think I see what's going on. That PhoenixDriver.java:88 is in the ShutdownHook on the JVM that Phoenix installs to close internals. I'm guessing that Spark must also install a ShutdownHook to clean up these localized resources, and the Spark hook is running before the Phoenix hook.

I see your other update -- must have hit a different codepath inside of Spark which implicitly fixed the problem. Glad you got it figured out.

Expert Contributor

Oh yes, you're right @Josh Elser. A closer look at the PhoenixDriver class showed that they define a shutdown hook when creating the driver instance: https://github.com/apache/phoenix/blob/master/phoenix-core/src/main/java/org/apache/phoenix/jdbc/Pho...

I was able to avoid the Exception when I stopped the JVM immediately after the SparkContext.close() call by calling Runtime.getRuntime().halt(0):

... Spark logic
...

updatePhoenixTableStats(phoenixTableName);    // calling the Phoenix JDBC stuff

sc.close();
Runtime.getRuntime().halt(0);                 // Stopping the JVM immediately

Do you know, is this a valid solution if I only use Spark and Phoenix functionalities?

Expert Contributor

I found the solution for my problem. The jar I exported included the phoenix-spark2 and phoenix-client dependencies and were included into my jar file.

I changed these dependencies (as they are already existing in my clusters HDP installation) to be of scope provided:

<dependency>
    <groupId>org.apache.phoenix</groupId>
    <artifactId>phoenix-spark2</artifactId>
    <version>4.7.0.2.6.5.0-292</version>
    <scope>provided</scope>                          <!-- this did it, now have to add --jar to spark-submit -->
</dependency>
<dependency>
    <groupId>org.apache.phoenix</groupId>
    <artifactId>phoenix-core</artifactId>
    <version>4.7.0.2.6.5.0-292</version>
    <scope>provided</scope>                          <!-- this did it, now have to add --jar to spark-submit -->
</dependency>

Now I start my Spark job with the --jars option and link these dependencies there. Now it works fine in yarn-client mode.

spark-submit --class spark.dataimport.SparkImportApp --master yarn --deploy-mode client --jars /usr/hdp/current/phoenix-client/phoenix-spark2.jar,/usr/hdp/current/phoenix-client/phoenix-client.jar hdfs:/user/test/gk-journal-importer-phoenix-0.0.3h.jar <some parameters for the main method>

PS: In yarn-cluster mode the application worked all the time (also with the fat-jar that included the dependencies).

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.