Support Questions

Find answers, ask questions, and share your expertise

Connecting to remote spark 1.6 as yarn-client from Eclipse on Mac

avatar
Super Guru

Hi

I am trying to run an application from my eclipse so I can put break points as well as monitor changing values of my variables. I create a JavaSparkContext which uses "SparkConf" object. This object should have access to my yarn-site.xml and core-site.xml so it knows how to connect to the cluster. I have these files under /etc/hadoop/conf and two environment variables set "HADOOP_CONF_DIR" and "YARN_CONF_DIR" on my mac using ~/Library/LaunchAgents/environment.plist where I have eclipse. I have verified these variables are available when I boot up mac and I can view these variables in my my app in eclipse using "System.getenv("HADOOP_CONF_DIR") and they point to the right location. I have also tried adding environment variables in my build configuration in eclipse.

After doing all this, my code consistently fails because it's unable to read yarn-site.xml or core-site.xml because I run into following issue

 INFO RMProxy: Connecting to ResourceManager at /0.0.0.0:803216/07/01 00:57:16 INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)

As you can see, it's not trying to connect to the correct location of resource manager. Here is how the code looks in create(). Please let me know what you think as this is blocking me.

public static JavaSparkContext create() {        

System.setProperty("spark.serializer","org.apache.spark.serializer.KryoSerializer");
System.setProperty("spark.kryo.registrator","fire.util.spark.Registrator");        
System.setProperty("spark.akka.timeout","900");        
System.setProperty("spark.worker.timeout","900");        
System.setProperty("spark.storage.blockManagerSlaveTimeoutMs","3200000");               

// create spark context        
SparkConf sparkConf = new SparkConf().setAppName("MyApp");       

// if (clusterMode == false)   

{           
sparkConf.setMaster("yarn-client");           
sparkConf.set("spark.broadcast.compress", "false");          
sparkConf.set("spark.shuffle.compress", "false");
}       

JavaSparkContext ctx = new JavaSparkContext(sparkConf);  <- Fails Here       

return ctx;   
}
1 ACCEPTED SOLUTION

avatar
Super Guru

I figured this out. I changed master to local and then simply loading remote HDFS data. It was still giving an exception because it's a kerberized cluster. While I was using UserGroupInformation and then creating a proxy user with valid keytab to access my cluster, the reason it was failing was because I was creating JavaSparkContext outside of "doAs" method. Once I created JavaSparkContext using the right proxy user, everything worked.

View solution in original post

5 REPLIES 5

avatar
Master Guru

@mqureshi Can you try setting the environment variable in your HADOOP_CONF_DIR and YARN_CONF_DIR in your code? Then set your log levels to debug to verify it is pointing to correct location.

avatar
Contributor

Another approach may be to setup remote debugging. There is an article here explaining how to set it up.

avatar
Super Guru

I figured this out. I changed master to local and then simply loading remote HDFS data. It was still giving an exception because it's a kerberized cluster. While I was using UserGroupInformation and then creating a proxy user with valid keytab to access my cluster, the reason it was failing was because I was creating JavaSparkContext outside of "doAs" method. Once I created JavaSparkContext using the right proxy user, everything worked.

avatar
Master Guru

@mqureshi do you mind sharing details of the change? Would be useful to others.w

avatar
Super Guru

@Sunile Manjee Yes. Here is what I did. Let me know if you have any questions.

try{

        UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(kerberos_principal, kerberos_keytab);      	

       	objectOfMyType = ugi.doAs(new PrivilegedExceptionAction<MyType>(){

        		@Override

        		public MyType run() throws Exception{       	        
        		
	System.setProperty("spark.serializer","org.apache.spark.serializer.KryoSerializer");
        System.setProperty("spark.kryo.registrator","fire.util.spark.Registrator");
        System.setProperty("spark.akka.timeout","900");
        System.setProperty("spark.worker.timeout","900");
        System.setProperty("spark.storage.blockManagerSlaveTimeoutMs","3200000");
        // create spark context

        SparkConf sparkConf = new SparkConf().setAppName("MyApp");
	sparkConf.setMaster("local");
        sparkConf.set("spark.broadcast.compress", "false");
        sparkConf.set("spark.shuffle.compress", "false");

	JavaSparkContext ctx = new JavaSparkContext(sparkConf);

	DataFrame tdf = ctx.sqlctx().read().format("com.databricks.spark.csv")

        	                .option("header", String.valueOf(header)) // Use first line of all files as header

        	                .option("inferSchema", "true") // Automatically infer data types

        	                .option("delimiter", delimiter)

        	                .load(path);
			//some more application specific code here

        	        return objectOfMyType;
        		}
        	});

        }
        catch (Exception exception){
        	exception.printStackTrace();
        }