Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

How to launch the computation of 2 RDDs in parallel, in Spark?

Highlighted

How to launch the computation of 2 RDDs in parallel, in Spark?

Explorer

I create several RDDs (RDD1, RDD2, ...) based on an input RDD that containg data from HDFS.

The RDDs are then stored in HDFS. I would like to know if it is possible to launch the creation of the RDDs in parallel, in separte threads.

 

 

incomingDataRDD = sc.textFile(...)
RDD1 = incomingDataRDD .map(...).saveAsTextFile(...)
RDD2 = incomingDataRDD.map(...) .saveAsTextFile(...)

 

I tried the this idea but I get the following error:

t1 = Thread(target=rddComputation, args=(,))
t1.start()
t2 = Thread(target=rddComputation, args=(,))
t2.start()

Where rddComputation is a method where I do 

RDD = incomingDataRDD .map(...).saveAsTextFile(...)

 

Py4JJavaError: An error occurred while calling o23.saveAsTextFile.
: java.lang.NullPointerException
at org.apache.spark.rdd.HadoopRDD$.containsCachedMetadata(HadoopRDD.scala:240)
at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:134)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:168)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:53)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1087)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:840)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:724)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:643)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1068)
at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:436)
at org.apache.spark.api.java.JavaRDD.saveAsTextFile(JavaRDD.scala:29)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)

 

1 REPLY 1
Highlighted

Re: How to launch the computation of 2 RDDs in parallel, in Spark?

Master Collaborator

Yes, that's perfectly possible. I use .par collections in Scala for example to run N transformations in parallel. I think your error is something else specific to your code or program. There is no fundamental reason you can't run as many jobs as you like in parallel

Don't have an account?
Coming from Hortonworks? Activate your account here