Support Questions

Find answers, ask questions, and share your expertise

Removing header from CSV file through pyspark

avatar

I have csv file in this format.

tagId,tag

1,007

2,007 (series)

3,18th century

4,1920s

5,1930s

First line is header. Im using below code to remove the header in pyspark but its throwing an error. Could someone help me on that?

Error Message:

Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/lib/spark/python/pyspark/rdd.py", line 1975, in subtract rdd = other.map(lambda x: (x, True)) AttributeError: 'unicode' object has no attribute 'map'

Code:

import csv 
tags = sc.textFile("hdfs:///data/spark/genome-tags.csv") 
tagsheader = tags.first() 
tagsdata = tags.subtract(tagsheader)


6 REPLIES 6

avatar
Super Guru
@Bala Vignesh N V

Why not use filter like the following?

val header = data.first 
val rows = data.filter(line => line != header)

avatar

Thanks @mqureshi . Thats how I have implemented it. But just wanted to understand why the above code doesnt work.

avatar
Super Guru

@Bala Vignesh N V

It may be your first line and not the subtract function. try removing one extra slash from your hdfs path. Badically use the following:

sc.textFile("hdfs://data/spark/genome-tags.csv") or if you haven't provided hadoop config before then use the following:

sc.textFile("hdfs://<namenode uri>:8020/data/spark/genome-tags.csv")

avatar
@mqureshi

I dont think thats the issue here. Im able to perform actions like count(), collect() and take() over tags

avatar

@Bala Vignesh N V

the issue is first() method returns a string not a Rdd. Subtract will works within two rdd's. So u should convert tagsheader to rdd by using parallelize.

tags = sc.textFile("hdfs:///data/spark/genome-tags.csv")
tagsheader = tags.first()
header = sc.parallelize([tagsheader])
tagsdata = tags.subtract(header)

avatar
New Contributor

I have a tab separated file like this

 

Copyright details 2021

ID \t NUMBER \t ADDRESS \t ZIPCODE

10 \t 9877777777 \t India \t 400322

13 \t 2983359852 \t AUS \t 84534

26 \t 7832724424

34 \t 8238444294 \t RSA \t 74363


Here the first row is a comment and the row with ID 26 doesn't have ending columns values. Even it doesn't have \t at the end .
So I need to read file skipping first line and handle missing delimiters at end.

I tried this

import org.apache.spark.sql.DataFrame
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val data = sc.textFile("sample_badtsv.txt")

val comments = data.first()
val fdata = data.filter(x => x != comments)


val header = fdata.filter(x => x.split("\t")(1) == "NUMBER").collect().mkString
val df = fdata.filter(x => x.split("\t")(1) != "NUMBER")
.map(x => x.split("\t"))
.map(x => (x(0),x(1),x(2),x(3)))
.toDF(header.split("\t"): _*)

 

 

Since I have missing \t at the end of lines if empty, I am getting ArrayIndexoutofBoundsException.

because when converting the rdd to dataframe we have less records for some rows.

 

Please provide me a better solution so that I can skip first line and read the file correctly (even there are no \t the code needs to consider it as NULL values at the end like below)

 

ID NUMBER ADDRESS ZIPCODE

10 9877777777 India 400322

13 2983359852 AUS 84534

26 7832724424 NULL NULL

34 8238444294 RSA 74363