Support Questions

Find answers, ask questions, and share your expertise

ERROR RDD transformations and actions can only be invoked by the drive on YARN

avatar
Explorer

Goal: I want to use Spark framework to generate machine learning feature vectors in parallel by analyzing and executing incoming data from the Kafka input stream.  To solve this I have created a Broadcast variable to store the RDDs to features that we want to be execute in parallel.  This features need to be applied on the incoming Spark stream that I am getting using Kafka.

 

Working use case in Local:  I have created a static Broadcast variable that stores the RDD of features that I want to run in parallel, and later I get the list of RDD and process these features just fine.

 

What is NOT working on YARN: Now I want to port this feature to YARN.  I found that we cannot have static Broadcast variable therefore I initialized the broadcast variable in main and got its value but now I am getting following error.  I know we cannot create a new RDD inside another RDD but here I am just using list of RDD inside RDD but still getting following error.

 

org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

                at org.apache.spark.rdd.RDD.sc(RDD.scala:87)

 

Thanks for all your help!

 

Rachana

 

4 REPLIES 4

avatar
Master Collaborator
You can't use RDDs inside functions on RDD executing remotely, which
may be what you're doing. Otherwise i'm not clear what you are
executing? I suspect you are doing something that does not work in
general in Spark, but may happen to when executing locally in 1 JVM.

avatar
Explorer

Thanks so much for your quick response.  I have a stream of data coming from Kafka, for each input data we have to create many features in parallel.   Since we cannot run RDD transformation inside another RDD.  Should we manage our own multithreaded feature execution for each RDD input data. 

avatar
Master Collaborator
It's possible to just use a static Executor in your code and use it to
run multi-threaded operations within each function call. This may not
be efficient though.

If your goal is simply full utilization of cores, then make sure you
have enough executors with enough cores running to use all of your
cluster. Then make sure your number of partitions is at least this
large. Then each operation can be single-threaded.

avatar
Explorer

Here is what i am tring to do.

 

1. Get list of features store into JavaPairRDD becasue we want featues to be executed in parallel.
   final Broadcast <JavaPairRDD<String, String>> broadCastfeatureKeyClassPair = getBroadcastRDD( jsc );
    final JavaPairRDD<String, String> featureKeyClassPair =  broadCastfeatureKeyClassPair.value();
 

2. Get stream of input data from Kafka stream
  ....
   JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, zkQuorum, group, topicMap);
 

3. For each input stream of data execute list of features that got from the broadcast variable
 ...
    JavaPairDStream<String, String> urlFeatureScore = lines.mapToPair(
      new PairFunction<String, String, String >() {
          public Tuple2<String, String> call(final String urlString) throws Exception {
            InputJson inputJson=  mapper.readValue(urlString, InputJson.class);
            String data = inputJson.getData();
            String  featureScore = getFeatureScore(inputJson, featureKeyClassPair);    
          return new Tuple2<String, String>(data,  featureScore);
        }
      });
    
 4. Method called in step 3 {This step is failing because RDD is called inside anotehr RDD}
private static String  getFeatureScore(InputJson inputJson, JavaPairRDD<String, String> featureKeyClassPair ){
                 JavaRDD<Double> featureScoreRDD = featureKeyClassPair.map(
                         new Function<Tuple2<String, String>, Double>() {
                             public Double call(Tuple2<String, String> keyValue) {    
                                 Double score= getFeatureScore(keyValue,  urlString,  urlContent);
                                 return score;
                             }
                         });                 
                      List<Double> list = featureScoreRDD.collect();
                  ...
    }
    return featureScore;
}

 

5. Using reflection to invoke each feature class.
private static Double getFeatureScore(Tuple2<String, String> keyValue, String urlString, String urlContent){
    String featureKey = (String) keyValue._1();
     String className = (String) keyValue._2();
     Double score = 0.0;
         Class<?> featureClass = Class.forName(className);     
         Method featureScoreMethod = featureClass.getMethod("getFeatureScore", String.class, String.class);
         score = (Double) featureScoreMethod.invoke(featureClass.newInstance(),urlString , urlContent);   
        return score;
}

 private static Broadcast <JavaPairRDD<String, String>>  getBroadcastRDD(JavaSparkContext jsc ){
    final JavaRDD<String> featurePropertiesRDD = jsc.textFile(FEATURE_PROPERTIES);      
    JavaPairRDD<String, String> featureKeyClassPair = featurePropertiesRDD.mapToPair(
            new PairFunction<String, String, String>() {
                public Tuple2<String, String> call(String property) {
                  return new Tuple2(property.split("=")[0], property.split("=")[1]);
                }
           });            
    Broadcast <JavaPairRDD<String, String>>  broadCastfeatureKeyClassPair = jsc.broadcast(featureKeyClassPair);
    return broadCastfeatureKeyClassPair;
}