Created 03-18-2016 06:35 AM
Created 03-18-2016 06:40 AM
Created 03-18-2016 07:10 PM
Thanks
Events are received from kafka topic as dstreamRDD by spark. These events need to be processed in parallel across spark cluster worker nodes. We are running some MLLIB model for each event. Due to "for each" action - this runs only on master node and not on worker node. If we change this to map transformation we get "task not serializable exception".
javaPairReceiverInputDStream.foreachRDD(new Function<JavaPairRDD<byte[], byte[]>, Void>() {
@Override
public Void call(JavaPairRDD<byte[], byte[]> pairRDDData) throws Exception {
List<Tuple2<byte[], byte[]>> data = pairRDDData.collect();
for (Tuple2<byte[], byte[]> tuple2 : data) {
NaiveByesImpl model = new NaiveByesImpl();
JavaPairRDD<Double, Double> modelRdd = NaiveByesImpl.processML(new String(tuple2._1) + " "+ new String(tuple2._2));
List<Tuple2<Double, Double>> predictionRdd = modelRdd.collect();
for (Tuple2<Double, Double> tuple3 : predictionRdd)
{
System.out.println(" prediction is ********** "+tuple3._1 + " "+tuple3._2);
}
}
return null;
}
});
Created 03-19-2016 12:52 AM
Created 03-19-2016 10:03 AM
Thanks. what changes needs to be done to make this work on spark cluster (multiple worker nodes)
Created 03-19-2016 10:08 AM
Created 03-20-2016 06:59 AM
Thanks - Actually naive byes MLLIB should be executed on worker nodes - what changes can be done for this - I tried wrapping a map function - but gives "task not serializable error" For now using simple string- but gives serialization error
javaPairReceiverInputDStream.foreachRDD(new Function<JavaPairRDD<byte[], byte[]>, Void>() {
@Override
public Void call(JavaPairRDD<byte[], byte[]> pairRDDData)
throws Exception {
List<Tuple2<byte[], byte[]>> data = pairRDDData.collect();
JavaRDD<String> pairRdd = (JavaRDD<String>) pairRDDData.map(new Function<Tuple2<byte[], byte[]>,String>(){
@Override
public String call(Tuple2<byte[], byte[]> tuple2)
throws Exception {
String val = new String(tuple2._1) + " "+new String(tuple2._2);
return val;
}
}).collect();
return null;
}
});
Created 03-20-2016 07:14 AM