Reply
Explorer
Posts: 12
Registered: ‎03-18-2016

kafka-spark mutliple worker nodes

Hello- we have kafka message events sent to topic with partitions. On spark side we have receiver which gets dstreamRdds which gets events using "for each". We have seen events do not get distributed to multiple worker nodes of cluster - in spite of topic partitions. Reason can be due to "for each" to get dstream events, "for each" is not a transformation and hence works only on driver node. Any workaround with "map" transformation to get dstream events? Is it possible to access sparkcontext from worker nodes? Thanks
Cloudera Employee
Posts: 366
Registered: ‎07-29-2013

Re: kafka-spark mutliple worker nodes

foreach is an action, not a transformation. It is not true that
transformation execute on the driver; they are distributed. foreachRDD
executes a function on the driver. I'm not clear therefore what you
are doing vs experiencing.
Explorer
Posts: 12
Registered: ‎03-18-2016

Re: kafka-spark mutliple worker nodes

Thanks

Events are received from kafka topic as dstreamRDD by spark. These events need to be processed in parallel across spark cluster worker nodes. We are running some MLLIB model for each event.  Due to "for each" action - this runs only on master node and not on worker node. If we change this to map transformation we get "task not serializable exception".

 


javaPairReceiverInputDStream.foreachRDD(new Function<JavaPairRDD<byte[], byte[]>, Void>() {
@Override
public Void call(JavaPairRDD<byte[], byte[]> pairRDDData) throws Exception {
List<Tuple2<byte[], byte[]>> data = pairRDDData.collect();
for (Tuple2<byte[], byte[]> tuple2 : data) {

NaiveByesImpl model = new NaiveByesImpl();
JavaPairRDD<Double, Double> modelRdd = NaiveByesImpl.processML(new String(tuple2._1) + " "+ new String(tuple2._2));

List<Tuple2<Double, Double>> predictionRdd = modelRdd.collect();
for (Tuple2<Double, Double> tuple3 : predictionRdd)
{
System.out.println(" prediction is ********** "+tuple3._1 + " "+tuple3._2);
}
}
return null;
}
});

 

 

Cloudera Employee
Posts: 366
Registered: ‎07-29-2013

Re: kafka-spark mutliple worker nodes

As I say, you are not calling foreach. You are calling foreachRDD.
That executes on the driver and is supposed to. It defines what
functions are invoked on each RDD. One of those functions can be
foreach, but that's not what you're calling. The function you invoke
for distributed operations must be serializable.
Explorer
Posts: 12
Registered: ‎03-18-2016

Re: kafka-spark mutliple worker nodes

Thanks. what changes needs to be done to make this work on spark cluster (multiple worker nodes)

Cloudera Employee
Posts: 366
Registered: ‎07-29-2013

Re: kafka-spark mutliple worker nodes

It depends on what your code does. Right now this code just shows a
process that collects some data to the driver, calls your naive
Bayes-related function on the driver, and collects some other data to
the driver. This code needs to call distributed operations if you want
it to be distributed. What that is, I couldn't say, since I'm not sure
what you're trying to invoke.
Explorer
Posts: 12
Registered: ‎03-18-2016

Re: kafka-spark mutliple worker nodes

Thanks - Actually naive byes MLLIB should be executed on worker nodes - what changes can be done for this - I tried wrapping a map function - but gives "task not serializable error" For now using simple string- but gives serialization error

 

javaPairReceiverInputDStream.foreachRDD(new Function<JavaPairRDD<byte[], byte[]>, Void>() {
@Override
public Void call(JavaPairRDD<byte[], byte[]> pairRDDData)
throws Exception {
List<Tuple2<byte[], byte[]>> data = pairRDDData.collect();
JavaRDD<String> pairRdd = (JavaRDD<String>) pairRDDData.map(new Function<Tuple2<byte[], byte[]>,String>(){
@Override
public String call(Tuple2<byte[], byte[]> tuple2)
throws Exception {
String val = new String(tuple2._1) + " "+new String(tuple2._2);
return val;
}

}).collect();
return null;
}
});

 

Cloudera Employee
Posts: 366
Registered: ‎07-29-2013

Re: kafka-spark mutliple worker nodes

The map() will execute on the cluster. There are some problems with
this code, but the most immediate one is that you are using inner
classes, which are non-static in Java, and you are creating them in an
instance method. They have a reference to your containing class which
presumably is unintentional, and which contains non-serializable
elements. Put this in a static method.