Created 02-19-2019 01:48 PM
I wrote a Spark application for bulk-loading a Phoenix Table. Everything worked for a few weeks now, but for a few days I get some Problems with duplicated rows. This was caused by faulty table stats. However, a possible Workaround for that would be to delete and re-generate the stats for this table.
Therefore I Need to open a JDBC Connection to my Phoenix database and call the Statements for deleting and creating the stats.
Since I Need to do this after iserting the new data via Spark, I also want to create and use this JDBC Connection inside my Spark Job, after doing the table bulk-loading stuff.
For that I added the following method and call it between the dataframe.save() and sparkContext.close() method in my Java Code:
private static void updatePhoenixTableStatistics(String phoenixTableName) { try { Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); System.out.println("Connecting to database.."); Connection conn = DriverManager.getConnection("jdbc:phoenix:my-sever.net:2181:/hbase-unsecure"); System.out.println("Creating statement..."); Statement st = conn.createStatement(); st.executeUpdate("DELETE FROM SYSTEM.STATS WHERE physical_name='" + phoenixTableName + "'"); System.out.println("Successfully deleted statistics data... Now refreshing it."); st.executeUpdate("UPDATE STATISTICS " + phoenixTableName + " ALL"); System.out.println("Successfully refreshed statistics data."); st.close(); conn.close(); System.out.println("Connection closed."); } catch (Exception e) { System.out.println("Unable to update table statistics - Skipping this step!"); e.printStackTrace(); } }
The Problem is, that since I added this method I Always get the following exception at the end of my Spark Job:
Bulk-Load: DataFrame.save() completed - Import finished successfully! Updating Table Statistics: Connecting to database.. Creating statement... Successfully deleted statistics data... Now refreshing it. Successfully refreshed statistics data. Connection closed. Exception in thread "Thread-31" java.lang.RuntimeException: java.io.FileNotFoundException: /tmp/spark-e5b01508-0f84-4702-9684-4f6ceac803f9/gk-journal-importer-phoenix-0.0.3h.jar (No such file or directory) at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2794) at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2646) at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2518) at org.apache.hadoop.conf.Configuration.get(Configuration.java:1065) at org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1119) at org.apache.hadoop.conf.Configuration.getBoolean(Configuration.java:1520) at org.apache.hadoop.hbase.HBaseConfiguration.checkDefaultsVersion(HBaseConfiguration.java:68) at org.apache.hadoop.hbase.HBaseConfiguration.addHbaseResources(HBaseConfiguration.java:82) at org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:97) at org.apache.phoenix.query.ConfigurationFactory$ConfigurationFactoryImpl$1.call(ConfigurationFactory.java:49) at org.apache.phoenix.query.ConfigurationFactory$ConfigurationFactoryImpl$1.call(ConfigurationFactory.java:46) at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:78) at org.apache.phoenix.util.PhoenixContextExecutor.callWithoutPropagation(PhoenixContextExecutor.java:93) at org.apache.phoenix.query.ConfigurationFactory$ConfigurationFactoryImpl.getConfiguration(ConfigurationFactory.java:46) at org.apache.phoenix.jdbc.PhoenixDriver$1.run(PhoenixDriver.java:88) Caused by: java.io.FileNotFoundException: /tmp/spark-e5b01508-0f84-4702-9684-4f6ceac803f9/gk-journal-importer-phoenix-0.0.3h.jar (No such file or directory) at java.util.zip.ZipFile.open(Native Method) at java.util.zip.ZipFile.<init>(ZipFile.java:225) at java.util.zip.ZipFile.<init>(ZipFile.java:155) at java.util.jar.JarFile.<init>(JarFile.java:166) at java.util.jar.JarFile.<init>(JarFile.java:103) at sun.net.www.protocol.jar.URLJarFile.<init>(URLJarFile.java:93) at sun.net.www.protocol.jar.URLJarFile.getJarFile(URLJarFile.java:69) at sun.net.www.protocol.jar.JarFileFactory.get(JarFileFactory.java:99) at sun.net.www.protocol.jar.JarURLConnection.connect(JarURLConnection.java:122) at sun.net.www.protocol.jar.JarURLConnection.getInputStream(JarURLConnection.java:152) at org.apache.hadoop.conf.Configuration.parse(Configuration.java:2612) at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2693) ... 14 more
Does someone know About this Problem and can help? How the generally work with JDBC inside a Spark Job? Or is there another possibility for doing that?
I'm working on HDP 2.6.5 with Spark 2.3 and Phoenix 4.7 installed.
Created 02-19-2019 03:44 PM
"gk-journal-importer-phoenix-0.0.3h.jar" is not a JAR file that HBase or Phoenix own -- this is a JAR file that your configuration is defines. You likely need to localize this JAR with your Spark application and set the proper localized path in the configuration.
Created 02-19-2019 07:01 PM
Thank you for the fast answer @Josh Elser, but I'm not sure what you mean with "not a JAR that HBase or Phoenix own". The gk-journal-importer-phoenix-0.0.3h.jar is my exported Java project. It contains the classes of my import job and the Main class with the main method, that creates, uses and closes the SparkContext. I start my Spark application by running the command
spark-submit --class spark.dataimport.SparkImportApp --master yarn --deploy-mode client hdfs:/user/test/gk-journal-importer-phoenix-0.0.3h.jar <some parameters for the main method>
Created 02-20-2019 02:48 PM
After looking closer at your stacktrace, I think I see what's going on. That PhoenixDriver.java:88 is in the ShutdownHook on the JVM that Phoenix installs to close internals. I'm guessing that Spark must also install a ShutdownHook to clean up these localized resources, and the Spark hook is running before the Phoenix hook.
I see your other update -- must have hit a different codepath inside of Spark which implicitly fixed the problem. Glad you got it figured out.
Created 02-20-2019 08:45 PM
Oh yes, you're right @Josh Elser. A closer look at the PhoenixDriver class showed that they define a shutdown hook when creating the driver instance: https://github.com/apache/phoenix/blob/master/phoenix-core/src/main/java/org/apache/phoenix/jdbc/Pho...
I was able to avoid the Exception when I stopped the JVM immediately after the SparkContext.close() call by calling Runtime.getRuntime().halt(0):
... Spark logic ... updatePhoenixTableStats(phoenixTableName); // calling the Phoenix JDBC stuff sc.close(); Runtime.getRuntime().halt(0); // Stopping the JVM immediately
Do you know, is this a valid solution if I only use Spark and Phoenix functionalities?
Created 02-20-2019 09:56 AM
I found the solution for my problem. The jar I exported included the phoenix-spark2 and phoenix-client dependencies and were included into my jar file.
I changed these dependencies (as they are already existing in my clusters HDP installation) to be of scope provided:
<dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-spark2</artifactId> <version>4.7.0.2.6.5.0-292</version> <scope>provided</scope> <!-- this did it, now have to add --jar to spark-submit --> </dependency> <dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-core</artifactId> <version>4.7.0.2.6.5.0-292</version> <scope>provided</scope> <!-- this did it, now have to add --jar to spark-submit --> </dependency>
Now I start my Spark job with the --jars option and link these dependencies there. Now it works fine in yarn-client mode.
spark-submit --class spark.dataimport.SparkImportApp --master yarn --deploy-mode client --jars /usr/hdp/current/phoenix-client/phoenix-spark2.jar,/usr/hdp/current/phoenix-client/phoenix-client.jar hdfs:/user/test/gk-journal-importer-phoenix-0.0.3h.jar <some parameters for the main method>
PS: In yarn-cluster mode the application worked all the time (also with the fat-jar that included the dependencies).