Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

How to launch the computation of 2 RDDs in parallel, in Spark?

How to launch the computation of 2 RDDs in parallel, in Spark?

Explorer

I create several RDDs (RDD1, RDD2, ...) based on an input RDD that containg data from HDFS.

The RDDs are then stored in HDFS. I would like to know if it is possible to launch the creation of the RDDs in parallel, in separte threads.

 

 

incomingDataRDD = sc.textFile(...)
RDD1 = incomingDataRDD .map(...).saveAsTextFile(...)
RDD2 = incomingDataRDD.map(...) .saveAsTextFile(...)

 

I tried the this idea but I get the following error:

t1 = Thread(target=rddComputation, args=(,))
t1.start()
t2 = Thread(target=rddComputation, args=(,))
t2.start()

Where rddComputation is a method where I do 

RDD = incomingDataRDD .map(...).saveAsTextFile(...)

 

Py4JJavaError: An error occurred while calling o23.saveAsTextFile.
: java.lang.NullPointerException
at org.apache.spark.rdd.HadoopRDD$.containsCachedMetadata(HadoopRDD.scala:240)
at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:134)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:168)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:53)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1087)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:840)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:724)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:643)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1068)
at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:436)
at org.apache.spark.api.java.JavaRDD.saveAsTextFile(JavaRDD.scala:29)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)

 

1 REPLY 1
Highlighted

Re: How to launch the computation of 2 RDDs in parallel, in Spark?

Master Collaborator

Yes, that's perfectly possible. I use .par collections in Scala for example to run N transformations in parallel. I think your error is something else specific to your code or program. There is no fundamental reason you can't run as many jobs as you like in parallel