Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

I'm getting an error trying to write from Spark to Phoenix using Java

Solved Go to solution

I'm getting an error trying to write from Spark to Phoenix using Java

New Contributor

I am trying to get a DataFrame to write to Phoenix. I've created what looks like a HelloWorld program:

package mercy.dm;


import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.phoenix.query.QueryServices;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;


import com.google.common.collect.ImmutableMap;


public class ReadWriteToPhoenix {


	public static void main(String[] args) {


		SparkConf sparkConf = new SparkConf().setAppName("ReadWriteToPhoenix");
		sparkConf.setMaster("local");
		sparkConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase-secure");
		sparkConf.set(HConstants.ZOOKEEPER_QUORUM, "ZK_QUORUM:2181");
		sparkConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181");
		sparkConf.set(HConstants.ZOOKEEPER_CONFIG_NAME, "/hbase-secure");
		sparkConf.set(QueryServices.HBASE_CLIENT_KEYTAB, "/path/to/keytab/THEUSER.user.keytab");
		sparkConf.set(QueryServices.HBASE_CLIENT_PRINCIPAL, "THEUSER@PROD");
		
		JavaSparkContext conf = new JavaSparkContext(sparkConf);
		conf.setLocalProperty(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase-secure");
		conf.setLocalProperty(HConstants.ZOOKEEPER_QUORUM, "ZK_QUORUM:2181");
		conf.setLocalProperty(HConstants.ZOOKEEPER_CLIENT_PORT, "2181");
		conf.setLocalProperty(HConstants.ZOOKEEPER_CONFIG_NAME, "/hbase-secure");
		conf.setLocalProperty(QueryServices.HBASE_CLIENT_KEYTAB, "/path/to/keytab/THEUSER.user.keytab");
		conf.setLocalProperty(QueryServices.HBASE_CLIENT_PRINCIPAL, "THEUSER@PROD");


		try {
			UserGroupInformation.setConfiguration(new Configuration());
			UserGroupInformation.loginUserFromKeytab("THEUSER@PROD",
					"/path/to/keytab/THEUSER.user.keytab");
		} catch (Exception e) {
			System.out.println(e.toString());
		}


		String quorum = conf.getLocalProperty("hbase.zookeeper.quorum");
		String clientPort = conf.getLocalProperty("hbase.zookeeper.property.clientPort");
		String znodeParent = conf.getLocalProperty("zookeeper.znode.parent");


		System.out.println("Quorum = " + quorum);
		System.out.println("clientPort = " + clientPort);
		System.out.println("znodeParent = " + znodeParent);


		HBaseConfiguration hbaseConf = new HBaseConfiguration();
		hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase-secure");
		hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "ZK_QUORUM:2181");
		hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181");
		hbaseConf.set(HConstants.ZOOKEEPER_CONFIG_NAME, "/hbase-secure");
		hbaseConf.set(QueryServices.HBASE_CLIENT_KEYTAB, "/path/to/keytab/THEUSER.user.keytab");
		hbaseConf.set(QueryServices.HBASE_CLIENT_PRINCIPAL, "THEUSER@PROD");


		final SQLContext sqlContext = new SQLContext(conf);


		// Map<String, String> options = new HashMap<String, String>();
		// options.put("zkUrl", "lnxhdp01.smrcy.com:2181:/hbase-secure");
		// options.put("table", "TABLE1");
		// sqlContext.load("org.apache.phoenix.spark", options);


		List<Table1> dataSet = new ArrayList<Table1>();
		dataSet.add(new Table1(1, "1"));
		dataSet.add(new Table1(2, "2"));
		dataSet.add(new Table1(3, "3"));


		// TODO: Fix error below:
		// Exception in thread "main" java.lang.RuntimeException: [1.1] failure:
		// ``with'' expected but identifier CREATE found
		// CREATE TABLE TABLE1 (ID BIGINT NOT NULL PRIMARY KEY, COL1 VARCHAR);
		// sqlContext.executeSql("CREATE TABLE TABLE1 (ID BIGINT NOT NULL
		// PRIMARY KEY, COL1 VARCHAR);");


		JavaRDD<Table1> rdd = conf.parallelize(dataSet);


		DataFrame df = sqlContext.createDataFrame(rdd, Table1.class);


		df.write().format("org.apache.phoenix.spark").mode(SaveMode.Overwrite)
				.options(ImmutableMap.of("zkUrl", "ZK_QUORUM:2181:/hbase-secure", "table", "TABLE1"))
				.save();


		DataFrame fromPhx = sqlContext.read().format("jdbc")
				.options(ImmutableMap.of("driver", "org.apache.phoenix.jdbc.PhoenixDriver", "url",
						"jdbc:phoenix:ZK_QUORUM:2181:/hbase-secure", "dbtable", "TABLE1"))
				.load();


		fromPhx.collect();


	}


	public static class Table1 implements Serializable {
		int id;
		String col1;


		public Table1() {
		}


		public Table1(int id, String col1) {
			this.id = id;
			this.col1 = col1;
		}


		public int getId() {
			return id;
		}


		public void setId(int id) {
			this.id = id;
		}


		public String getCol1() {
			return col1;
		}


		public void setCol1(String col1) {
			this.col1 = col1;
		}


	}


}

I've already defined the table in Phoenix. I'm getting this error trying to write to it.
16/03/16 15:14:11 INFO ConnectionQueryServicesImpl: Trying to connect to a secure cluster with keytab:/hbase
Exception in thread "main" java.sql.SQLException: ERROR 103 (08004): Unable to establish connection.
        at org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLExceptionCode.java:386)
        at org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:145)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:288)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(ConnectionQueryServicesImpl.java:171)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1883)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1862)
        at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:77)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:1862)
        at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:180)
        at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDriver.java:132)
        at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:151)
        at java.sql.DriverManager.getConnection(DriverManager.java:571)
        at java.sql.DriverManager.getConnection(DriverManager.java:187)
        at org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionUtil.java:99)
        at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:82)
        at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:70)
        at org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getUpsertColumnMetadataList(PhoenixConfigurationUtil.java:213)
        at org.apache.phoenix.spark.ConfigurationUtil$.encodeColumns(ConfigurationUtil.scala:57)
        at org.apache.phoenix.spark.DataFrameFunctions.saveToPhoenix(DataFrameFunctions.scala:33)
        at org.apache.phoenix.spark.DefaultSource.createRelation(DefaultSource.scala:47)
        at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:309)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)
        at mercy.dm.ReadWriteToPhoenix.main(ReadWriteToPhoenix.java:102)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: Login failure for 2181 from keytab /hbase: javax.security.auth.login.LoginException: Unable to obtain password from user


        at org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab(UserGroupInformation.java:962)
        at org.apache.hadoop.security.SecurityUtil.login(SecurityUtil.java:246)
        at org.apache.hadoop.hbase.security.User$SecureHadoopUser.login(User.java:386)
        at org.apache.hadoop.hbase.security.User.login(User.java:253)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:283)
        ... 29 more
Caused by: javax.security.auth.login.LoginException: Unable to obtain password from user


        at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:856)
        at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:719)
        at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:584)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at javax.security.auth.login.LoginContext.invoke(LoginContext.java:762)
        at javax.security.auth.login.LoginContext.access$000(LoginContext.java:203)
        at javax.security.auth.login.LoginContext$4.run(LoginContext.java:690)
        at javax.security.auth.login.LoginContext$4.run(LoginContext.java:688)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:687)
        at javax.security.auth.login.LoginContext.login(LoginContext.java:595)
        at org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab(UserGroupInformation.java:953)
        ... 33 more
16/03/16 15:14:11 INFO SparkContext: Invoking stop() from shutdown hook




1 ACCEPTED SOLUTION

Accepted Solutions

Re: I'm getting an error trying to write from Spark to Phoenix using Java

Super Collaborator

This is a bug in Phoenix-Spark plugin. More details can be found at PHOENIX-2817

As an a workaround you may try to change zookeeper znode parent to "/hbase" on your hbase cluster and replace zkUrl from "ZK_QUORUM:2181:/hbase-secure" to "ZK_QUORUM"

5 REPLIES 5

Re: I'm getting an error trying to write from Spark to Phoenix using Java

Login failure for 2181 from keytab /hbase

Is very strange looking ("2181" should be the user and "/hbase" should be the keytab). Maybe some of the configuration values you're writing are incorrect?

If you take a look at https://community.hortonworks.com/articles/808/phoenix-jdbc-client-setup.html, you can see how Kerberos information can be provided via the JDBC URL. It almost seems like the last two elements of your JDBC URL are getting parsed as the principal and keytab -- it should really only look for the 4th and 5th property (not just the last two properties). Maybe there is a bug in Phoenix here? It's a starting point to investigate.

However, if your Spark code is only running locally, since you've already logged in (should probably error out it the login fails instead of just printing the exception, though), you shouldn't have to pass this additional Kerberos login information down into Phoenix.

Re: I'm getting an error trying to write from Spark to Phoenix using Java

Have you tried using this Spark syntax described here:

http://phoenix.apache.org/phoenix_spark.html

import org.apache.spark.SparkContext
import org.apache.phoenix.spark._

val sc = new SparkContext("local", "phoenix-test")
val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))

sc
  .parallelize(dataSet)
  .saveToPhoenix(
    "OUTPUT_TEST_TABLE",
    Seq("ID","COL1","COL2"),
    zkUrl = Some("phoenix-server:2181")
  )

Re: I'm getting an error trying to write from Spark to Phoenix using Java

Expert Contributor

can you add this to the VM option

-Dsun.security.krb5.debug=true

and also enable a bit more log, maybe with these in log4j or equivalent:

log4j.logger.org.apache.hadoop=DEBUG

hopefully you can learn a bit more from the enhanced log. These are what I I get in a successful login:

DEBUG	2016-03-18 08:39:57,465	6788	org.apache.hadoop.hbase.ipc.AbstractRpcClient	[hconnection-0x2da3fac3-metaLookup-shared--pool3-t1]	RPC Server Kerberos principal name for service=ClientService is hbase/sandbox.hortonworks.com@KRB.HDP
DEBUG	2016-03-18 08:39:57,465	6788	org.apache.hadoop.hbase.ipc.AbstractRpcClient	[hconnection-0x1efd2e3d-metaLookup-shared--pool4-t1]	RPC Server Kerberos principal name for service=ClientService is hbase/sandbox.hortonworks.com@KRB.HDP
DEBUG	2016-03-18 08:39:57,466	6789	org.apache.hadoop.hbase.ipc.AbstractRpcClient	[hconnection-0x2da3fac3-metaLookup-shared--pool3-t1]	Use KERBEROS authentication for service ClientService, sasl=true
DEBUG	2016-03-18 08:39:57,466	6789	org.apache.hadoop.hbase.ipc.AbstractRpcClient	[hconnection-0x1efd2e3d-metaLookup-shared--pool4-t1]	Use KERBEROS authentication for service ClientService, sasl=true
DEBUG	2016-03-18 08:39:57,484	6807	org.apache.hadoop.hbase.ipc.AbstractRpcClient	[hconnection-0x2da3fac3-metaLookup-shared--pool3-t1]	Connecting to sandbox.hortonworks.com/10.184.26.82:16020
DEBUG	2016-03-18 08:39:57,484	6807	org.apache.hadoop.hbase.ipc.AbstractRpcClient	[hconnection-0x1efd2e3d-metaLookup-shared--pool4-t1]	Connecting to sandbox.hortonworks.com/10.184.26.82:16020
DEBUG	2016-03-18 08:39:57,491	6814	org.apache.hadoop.security.UserGroupInformation	[hconnection-0x2da3fac3-metaLookup-shared--pool3-t1]	PrivilegedAction as:spark-Sandbox@KRB.HDP (auth:KERBEROS) from:org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:734)
DEBUG	2016-03-18 08:39:57,491	6814	org.apache.hadoop.security.UserGroupInformation	[hconnection-0x1efd2e3d-metaLookup-shared--pool4-t1]	PrivilegedAction as:spark-Sandbox@KRB.HDP (auth:KERBEROS) from:org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:734)
DEBUG	2016-03-18 08:39:57,495	6818	org.apache.hadoop.hbase.security.HBaseSaslRpcClient	[hconnection-0x2da3fac3-metaLookup-shared--pool3-t1]	Creating SASL GSSAPI client. Server's Kerberos principal name is hbase/sandbox.hortonworks.com@KRB.HDP
DEBUG	2016-03-18 08:39:57,495	6818	org.apache.hadoop.hbase.security.HBaseSaslRpcClient	[hconnection-0x1efd2e3d-metaLookup-shared--pool4-t1]	Creating SASL GSSAPI client. Server's Kerberos principal name is hbase/sandbox.hortonworks.com@KRB.HDP
Found ticket for spark-Sandbox@KRB.HDP to go to krbtgt/KRB.HDP@KRB.HDP expiring on Sat Mar 19 08:39:55 GMT 2016
Found ticket for spark-Sandbox@KRB.HDP to go to krbtgt/KRB.HDP@KRB.HDP expiring on Sat Mar 19 08:39:55 GMT 2016
Entered Krb5Context.initSecContext with state=STATE_NEW
Entered Krb5Context.initSecContext with state=STATE_NEW
Found ticket for spark-Sandbox@KRB.HDP to go to krbtgt/KRB.HDP@KRB.HDP expiring on Sat Mar 19 08:39:55 GMT 2016

Re: I'm getting an error trying to write from Spark to Phoenix using Java

Super Collaborator

This is a bug in Phoenix-Spark plugin. More details can be found at PHOENIX-2817

As an a workaround you may try to change zookeeper znode parent to "/hbase" on your hbase cluster and replace zkUrl from "ZK_QUORUM:2181:/hbase-secure" to "ZK_QUORUM"

Re: I'm getting an error trying to write from Spark to Phoenix using Java

Thanks @ssoldatov, it worked perfectly

Don't have an account?
Coming from Hortonworks? Activate your account here