Created 07-01-2016 08:26 AM
Hi
I am trying to run an application from my eclipse so I can put break points as well as monitor changing values of my variables. I create a JavaSparkContext which uses "SparkConf" object. This object should have access to my yarn-site.xml and core-site.xml so it knows how to connect to the cluster. I have these files under /etc/hadoop/conf and two environment variables set "HADOOP_CONF_DIR" and "YARN_CONF_DIR" on my mac using ~/Library/LaunchAgents/environment.plist where I have eclipse. I have verified these variables are available when I boot up mac and I can view these variables in my my app in eclipse using "System.getenv("HADOOP_CONF_DIR") and they point to the right location. I have also tried adding environment variables in my build configuration in eclipse.
After doing all this, my code consistently fails because it's unable to read yarn-site.xml or core-site.xml because I run into following issue
INFO RMProxy: Connecting to ResourceManager at /0.0.0.0:803216/07/01 00:57:16 INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
As you can see, it's not trying to connect to the correct location of resource manager. Here is how the code looks in create(). Please let me know what you think as this is blocking me.
public static JavaSparkContext create() { System.setProperty("spark.serializer","org.apache.spark.serializer.KryoSerializer"); System.setProperty("spark.kryo.registrator","fire.util.spark.Registrator"); System.setProperty("spark.akka.timeout","900"); System.setProperty("spark.worker.timeout","900"); System.setProperty("spark.storage.blockManagerSlaveTimeoutMs","3200000"); // create spark context SparkConf sparkConf = new SparkConf().setAppName("MyApp"); // if (clusterMode == false) { sparkConf.setMaster("yarn-client"); sparkConf.set("spark.broadcast.compress", "false"); sparkConf.set("spark.shuffle.compress", "false"); } JavaSparkContext ctx = new JavaSparkContext(sparkConf); <- Fails Here return ctx; }
Created 07-06-2016 03:28 PM
I figured this out. I changed master to local and then simply loading remote HDFS data. It was still giving an exception because it's a kerberized cluster. While I was using UserGroupInformation and then creating a proxy user with valid keytab to access my cluster, the reason it was failing was because I was creating JavaSparkContext outside of "doAs" method. Once I created JavaSparkContext using the right proxy user, everything worked.
Created 07-03-2016 01:09 AM
@mqureshi Can you try setting the environment variable in your HADOOP_CONF_DIR and YARN_CONF_DIR in your code? Then set your log levels to debug to verify it is pointing to correct location.
Created 07-06-2016 03:12 PM
Another approach may be to setup remote debugging. There is an article here explaining how to set it up.
Created 07-06-2016 03:28 PM
I figured this out. I changed master to local and then simply loading remote HDFS data. It was still giving an exception because it's a kerberized cluster. While I was using UserGroupInformation and then creating a proxy user with valid keytab to access my cluster, the reason it was failing was because I was creating JavaSparkContext outside of "doAs" method. Once I created JavaSparkContext using the right proxy user, everything worked.
Created 07-06-2016 03:57 PM
@mqureshi do you mind sharing details of the change? Would be useful to others.w
Created 07-06-2016 05:12 PM
@Sunile Manjee Yes. Here is what I did. Let me know if you have any questions.
try{ UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(kerberos_principal, kerberos_keytab); objectOfMyType = ugi.doAs(new PrivilegedExceptionAction<MyType>(){ @Override public MyType run() throws Exception{ System.setProperty("spark.serializer","org.apache.spark.serializer.KryoSerializer"); System.setProperty("spark.kryo.registrator","fire.util.spark.Registrator"); System.setProperty("spark.akka.timeout","900"); System.setProperty("spark.worker.timeout","900"); System.setProperty("spark.storage.blockManagerSlaveTimeoutMs","3200000"); // create spark context SparkConf sparkConf = new SparkConf().setAppName("MyApp"); sparkConf.setMaster("local"); sparkConf.set("spark.broadcast.compress", "false"); sparkConf.set("spark.shuffle.compress", "false"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); DataFrame tdf = ctx.sqlctx().read().format("com.databricks.spark.csv") .option("header", String.valueOf(header)) // Use first line of all files as header .option("inferSchema", "true") // Automatically infer data types .option("delimiter", delimiter) .load(path); //some more application specific code here return objectOfMyType; } }); } catch (Exception exception){ exception.printStackTrace(); }