Support Questions

Find answers, ask questions, and share your expertise

Best way to select distinct values from multiple columns using Spark RDD?

avatar
Contributor

I'm trying to convert each distinct value in each column of my RDD, but the code below is very slow. Is there any alternative?

Data is both numeric and categorical (string).

categories = {}
for i in idxCategories: ##idxCategories contains indexes of rows that contains categorical data
    distinctValues = rawTrainData.map(lambda x : x[i]).distinct().collect()
    valuesMap = {key: value for (key,value) in zip(distinctValues, range(len(valores)))}
    categories[i] = valuesMap
1 ACCEPTED SOLUTION

avatar

@Vitor Batista

Data Frames are supposed to be faster than Python RDD operations, check slide 20 of this presentation:

http://www.slideshare.net/databricks/spark-summit-eu-2015-spark-dataframes-simple-and-fast-analysis-...

Could you try code below and check if it's faster?

from pyspark.sql import SQLContext, Row


input_file = "hdfs:///tmp/your_text_file"
raw_rdd = sc.textFile(input_file)
csv_rdd = raw_rdd.map(lambda x: x.split(","))


row_data = csv_rdd.map(lambda p: Row(
    field1=p[0], 
    field2=p[1],
    field3=p[2]
    )
)


df = sqlContext.createDataFrame(row_data)


categories = {}
idxCategories = [0,1,2]
for i in idxCategories: ##idxCategories contains indexes of rows that contains categorical data
    distinctValues = df.map(lambda x : x[i]).distinct().collect()
    categories[i] = distinctValues



    
print categories[0]
print categories[1]
print categories[2]


View solution in original post

8 REPLIES 8

avatar
Explorer

A few clarifying questions about rawTrainData:

- How is this RDD generated?

- Is it cached?

- how many partitions does it have?

Also, what is the variable "valores"?

avatar
Contributor

- RDD is read from CSV and split into list

- rawTrainData is cached

- It have 2 partitions at same node. The file is not large. 220 MB.

- I edited original code to translate to English. Valores = distincValues

avatar

@Vitor Batista

Data Frames are supposed to be faster than Python RDD operations, check slide 20 of this presentation:

http://www.slideshare.net/databricks/spark-summit-eu-2015-spark-dataframes-simple-and-fast-analysis-...

Could you try code below and check if it's faster?

from pyspark.sql import SQLContext, Row


input_file = "hdfs:///tmp/your_text_file"
raw_rdd = sc.textFile(input_file)
csv_rdd = raw_rdd.map(lambda x: x.split(","))


row_data = csv_rdd.map(lambda p: Row(
    field1=p[0], 
    field2=p[1],
    field3=p[2]
    )
)


df = sqlContext.createDataFrame(row_data)


categories = {}
idxCategories = [0,1,2]
for i in idxCategories: ##idxCategories contains indexes of rows that contains categorical data
    distinctValues = df.map(lambda x : x[i]).distinct().collect()
    categories[i] = distinctValues



    
print categories[0]
print categories[1]
print categories[2]


avatar
Contributor

4x slower 😞 I used .toDF() instead of your code. Is there any difference?

762-dataframe.png

avatar

You could load your csv directly, but I tested here and indeed distinct is take much longer with data frames.

Can you describe your environment?

- hortonworks version

- spark version

- hardware configuration

- spark mode (localmode or spark on yarn)

Lastly, if you have enough cores/processor and as your file is small, spark might be choosing a low level of parallelism. you can try it increasing parallelism, like this:

    distinctValues = rawTrainData.map(lambda x : x[i]).distinct(numPartitions = 15).collect()

me fala se ficou mais rápido 🙂

avatar
Contributor

before calling this routine, I introduced the code bellow and exec time reduced to 1m8s. 3x improvement.

rawTrainData = rawTrainData.repartition(8)
rawTrainData.cache()

But introducing numPartitions=15 inside distinct method does not affect the result.

I'm running Spark 1.3.1 into standalone mode (spark://host:7077) with 12 cores and 20 GB per node allocated to Spark. The hardware is virtual, but I know it`s a top hardware. The cluster has 4 nodes (3 spark workers)

avatar

Awesome! You can check current number of partitions with command below:

print csv_rdd.getNumPartitions()

avatar
Master Mentor

@Vitor Batista can you accept the best answer to close this thread or post your own solution?