Created on 09-30-2015 09:39 AM - edited 09-16-2022 02:42 AM
Goal: I want to use Spark framework to generate machine learning feature vectors in parallel by analyzing and executing incoming data from the Kafka input stream. To solve this I have created a Broadcast variable to store the RDDs to features that we want to be execute in parallel. This features need to be applied on the incoming Spark stream that I am getting using Kafka.
Working use case in Local: I have created a static Broadcast variable that stores the RDD of features that I want to run in parallel, and later I get the list of RDD and process these features just fine.
What is NOT working on YARN: Now I want to port this feature to YARN. I found that we cannot have static Broadcast variable therefore I initialized the broadcast variable in main and got its value but now I am getting following error. I know we cannot create a new RDD inside another RDD but here I am just using list of RDD inside RDD but still getting following error.
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
at org.apache.spark.rdd.RDD.sc(RDD.scala:87)
Thanks for all your help!
Rachana
Created 09-30-2015 09:42 AM
Created 09-30-2015 10:26 AM
Thanks so much for your quick response. I have a stream of data coming from Kafka, for each input data we have to create many features in parallel. Since we cannot run RDD transformation inside another RDD. Should we manage our own multithreaded feature execution for each RDD input data.
Created 09-30-2015 11:46 AM
Created 09-30-2015 10:36 AM
Here is what i am tring to do.
1. Get list of features store into JavaPairRDD becasue we want featues to be executed in parallel.
final Broadcast <JavaPairRDD<String, String>> broadCastfeatureKeyClassPair = getBroadcastRDD( jsc );
final JavaPairRDD<String, String> featureKeyClassPair = broadCastfeatureKeyClassPair.value();
2. Get stream of input data from Kafka stream
....
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, zkQuorum, group, topicMap);
3. For each input stream of data execute list of features that got from the broadcast variable
...
JavaPairDStream<String, String> urlFeatureScore = lines.mapToPair(
new PairFunction<String, String, String >() {
public Tuple2<String, String> call(final String urlString) throws Exception {
InputJson inputJson= mapper.readValue(urlString, InputJson.class);
String data = inputJson.getData();
String featureScore = getFeatureScore(inputJson, featureKeyClassPair);
return new Tuple2<String, String>(data, featureScore);
}
});
4. Method called in step 3 {This step is failing because RDD is called inside anotehr RDD}
private static String getFeatureScore(InputJson inputJson, JavaPairRDD<String, String> featureKeyClassPair ){
JavaRDD<Double> featureScoreRDD = featureKeyClassPair.map(
new Function<Tuple2<String, String>, Double>() {
public Double call(Tuple2<String, String> keyValue) {
Double score= getFeatureScore(keyValue, urlString, urlContent);
return score;
}
});
List<Double> list = featureScoreRDD.collect();
...
}
return featureScore;
}
5. Using reflection to invoke each feature class.
private static Double getFeatureScore(Tuple2<String, String> keyValue, String urlString, String urlContent){
String featureKey = (String) keyValue._1();
String className = (String) keyValue._2();
Double score = 0.0;
Class<?> featureClass = Class.forName(className);
Method featureScoreMethod = featureClass.getMethod("getFeatureScore", String.class, String.class);
score = (Double) featureScoreMethod.invoke(featureClass.newInstance(),urlString , urlContent);
return score;
}
private static Broadcast <JavaPairRDD<String, String>> getBroadcastRDD(JavaSparkContext jsc ){
final JavaRDD<String> featurePropertiesRDD = jsc.textFile(FEATURE_PROPERTIES);
JavaPairRDD<String, String> featureKeyClassPair = featurePropertiesRDD.mapToPair(
new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String property) {
return new Tuple2(property.split("=")[0], property.split("=")[1]);
}
});
Broadcast <JavaPairRDD<String, String>> broadCastfeatureKeyClassPair = jsc.broadcast(featureKeyClassPair);
return broadCastfeatureKeyClassPair;
}