Created 12-10-2015 01:37 PM
I'm trying to convert each distinct value in each column of my RDD, but the code below is very slow. Is there any alternative?
Data is both numeric and categorical (string).
categories = {} for i in idxCategories: ##idxCategories contains indexes of rows that contains categorical data distinctValues = rawTrainData.map(lambda x : x[i]).distinct().collect() valuesMap = {key: value for (key,value) in zip(distinctValues, range(len(valores)))} categories[i] = valuesMap
Created 12-10-2015 02:28 PM
Data Frames are supposed to be faster than Python RDD operations, check slide 20 of this presentation:
Could you try code below and check if it's faster?
from pyspark.sql import SQLContext, Row input_file = "hdfs:///tmp/your_text_file" raw_rdd = sc.textFile(input_file) csv_rdd = raw_rdd.map(lambda x: x.split(",")) row_data = csv_rdd.map(lambda p: Row( field1=p[0], field2=p[1], field3=p[2] ) ) df = sqlContext.createDataFrame(row_data) categories = {} idxCategories = [0,1,2] for i in idxCategories: ##idxCategories contains indexes of rows that contains categorical data distinctValues = df.map(lambda x : x[i]).distinct().collect() categories[i] = distinctValues print categories[0] print categories[1] print categories[2]
Created 12-10-2015 02:23 PM
A few clarifying questions about rawTrainData:
- How is this RDD generated?
- Is it cached?
- how many partitions does it have?
Also, what is the variable "valores"?
Created 12-10-2015 03:22 PM
- RDD is read from CSV and split into list
- rawTrainData is cached
- It have 2 partitions at same node. The file is not large. 220 MB.
- I edited original code to translate to English. Valores = distincValues
Created 12-10-2015 02:28 PM
Data Frames are supposed to be faster than Python RDD operations, check slide 20 of this presentation:
Could you try code below and check if it's faster?
from pyspark.sql import SQLContext, Row input_file = "hdfs:///tmp/your_text_file" raw_rdd = sc.textFile(input_file) csv_rdd = raw_rdd.map(lambda x: x.split(",")) row_data = csv_rdd.map(lambda p: Row( field1=p[0], field2=p[1], field3=p[2] ) ) df = sqlContext.createDataFrame(row_data) categories = {} idxCategories = [0,1,2] for i in idxCategories: ##idxCategories contains indexes of rows that contains categorical data distinctValues = df.map(lambda x : x[i]).distinct().collect() categories[i] = distinctValues print categories[0] print categories[1] print categories[2]
Created on 12-10-2015 03:18 PM - edited 08-19-2019 05:39 AM
4x slower 😞 I used .toDF() instead of your code. Is there any difference?
Created 12-10-2015 05:16 PM
You could load your csv directly, but I tested here and indeed distinct is take much longer with data frames.
Can you describe your environment?
- hortonworks version
- spark version
- hardware configuration
- spark mode (localmode or spark on yarn)
Lastly, if you have enough cores/processor and as your file is small, spark might be choosing a low level of parallelism. you can try it increasing parallelism, like this:
distinctValues = rawTrainData.map(lambda x : x[i]).distinct(numPartitions = 15).collect()
me fala se ficou mais rápido 🙂
Created 12-10-2015 08:28 PM
before calling this routine, I introduced the code bellow and exec time reduced to 1m8s. 3x improvement.
rawTrainData = rawTrainData.repartition(8) rawTrainData.cache()
But introducing numPartitions=15 inside distinct method does not affect the result.
I'm running Spark 1.3.1 into standalone mode (spark://host:7077) with 12 cores and 20 GB per node allocated to Spark. The hardware is virtual, but I know it`s a top hardware. The cluster has 4 nodes (3 spark workers)
Created 12-11-2015 12:48 AM
Awesome! You can check current number of partitions with command below:
print csv_rdd.getNumPartitions()
Created 02-02-2016 01:52 AM
@Vitor Batista can you accept the best answer to close this thread or post your own solution?