Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

kafka-spark mutliple worker nodes

kafka-spark mutliple worker nodes

Explorer
Hello- we have kafka message events sent to topic with partitions. On spark side we have receiver which gets dstreamRdds which gets events using "for each". We have seen events do not get distributed to multiple worker nodes of cluster - in spite of topic partitions. Reason can be due to "for each" to get dstream events, "for each" is not a transformation and hence works only on driver node. Any workaround with "map" transformation to get dstream events? Is it possible to access sparkcontext from worker nodes? Thanks
7 REPLIES 7

Re: kafka-spark mutliple worker nodes

Master Collaborator
foreach is an action, not a transformation. It is not true that
transformation execute on the driver; they are distributed. foreachRDD
executes a function on the driver. I'm not clear therefore what you
are doing vs experiencing.

Re: kafka-spark mutliple worker nodes

Explorer

Thanks

Events are received from kafka topic as dstreamRDD by spark. These events need to be processed in parallel across spark cluster worker nodes. We are running some MLLIB model for each event.  Due to "for each" action - this runs only on master node and not on worker node. If we change this to map transformation we get "task not serializable exception".

 


javaPairReceiverInputDStream.foreachRDD(new Function<JavaPairRDD<byte[], byte[]>, Void>() {
@Override
public Void call(JavaPairRDD<byte[], byte[]> pairRDDData) throws Exception {
List<Tuple2<byte[], byte[]>> data = pairRDDData.collect();
for (Tuple2<byte[], byte[]> tuple2 : data) {

NaiveByesImpl model = new NaiveByesImpl();
JavaPairRDD<Double, Double> modelRdd = NaiveByesImpl.processML(new String(tuple2._1) + " "+ new String(tuple2._2));

List<Tuple2<Double, Double>> predictionRdd = modelRdd.collect();
for (Tuple2<Double, Double> tuple3 : predictionRdd)
{
System.out.println(" prediction is ********** "+tuple3._1 + " "+tuple3._2);
}
}
return null;
}
});

 

 

Highlighted

Re: kafka-spark mutliple worker nodes

Master Collaborator
As I say, you are not calling foreach. You are calling foreachRDD.
That executes on the driver and is supposed to. It defines what
functions are invoked on each RDD. One of those functions can be
foreach, but that's not what you're calling. The function you invoke
for distributed operations must be serializable.

Re: kafka-spark mutliple worker nodes

Explorer

Thanks. what changes needs to be done to make this work on spark cluster (multiple worker nodes)

Re: kafka-spark mutliple worker nodes

Master Collaborator
It depends on what your code does. Right now this code just shows a
process that collects some data to the driver, calls your naive
Bayes-related function on the driver, and collects some other data to
the driver. This code needs to call distributed operations if you want
it to be distributed. What that is, I couldn't say, since I'm not sure
what you're trying to invoke.

Re: kafka-spark mutliple worker nodes

Explorer

Thanks - Actually naive byes MLLIB should be executed on worker nodes - what changes can be done for this - I tried wrapping a map function - but gives "task not serializable error" For now using simple string- but gives serialization error

 

javaPairReceiverInputDStream.foreachRDD(new Function<JavaPairRDD<byte[], byte[]>, Void>() {
@Override
public Void call(JavaPairRDD<byte[], byte[]> pairRDDData)
throws Exception {
List<Tuple2<byte[], byte[]>> data = pairRDDData.collect();
JavaRDD<String> pairRdd = (JavaRDD<String>) pairRDDData.map(new Function<Tuple2<byte[], byte[]>,String>(){
@Override
public String call(Tuple2<byte[], byte[]> tuple2)
throws Exception {
String val = new String(tuple2._1) + " "+new String(tuple2._2);
return val;
}

}).collect();
return null;
}
});

 

Re: kafka-spark mutliple worker nodes

Master Collaborator
The map() will execute on the cluster. There are some problems with
this code, but the most immediate one is that you are using inner
classes, which are non-static in Java, and you are creating them in an
instance method. They have a reference to your containing class which
presumably is unintentional, and which contains non-serializable
elements. Put this in a static method.
Don't have an account?
Coming from Hortonworks? Activate your account here