Created 07-19-2017 05:04 PM
I have csv file in this format.
tagId,tag
1,007
2,007 (series)
3,18th century
4,1920s
5,1930s
First line is header. Im using below code to remove the header in pyspark but its throwing an error. Could someone help me on that?
Error Message:
Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/lib/spark/python/pyspark/rdd.py", line 1975, in subtract rdd = other.map(lambda x: (x, True)) AttributeError: 'unicode' object has no attribute 'map'
Code:
import csv tags = sc.textFile("hdfs:///data/spark/genome-tags.csv") tagsheader = tags.first() tagsdata = tags.subtract(tagsheader)
Created 07-19-2017 05:21 PM
Why not use filter like the following?
val header = data.first
val rows = data.filter(line => line != header)
Created 07-19-2017 05:33 PM
Thanks @mqureshi . Thats how I have implemented it. But just wanted to understand why the above code doesnt work.
Created 07-19-2017 05:43 PM
It may be your first line and not the subtract function. try removing one extra slash from your hdfs path. Badically use the following:
sc.textFile("hdfs://data/spark/genome-tags.csv") or if you haven't provided hadoop config before then use the following:
sc.textFile("hdfs://<namenode uri>:8020/data/spark/genome-tags.csv")
Created 07-19-2017 06:25 PM
I dont think thats the issue here. Im able to perform actions like count(), collect() and take() over tags
Created 07-20-2017 12:29 AM
the issue is first() method returns a string not a Rdd. Subtract will works within two rdd's. So u should convert tagsheader to rdd by using parallelize.
tags = sc.textFile("hdfs:///data/spark/genome-tags.csv") tagsheader = tags.first() header = sc.parallelize([tagsheader]) tagsdata = tags.subtract(header)
Created on 03-03-2021 09:51 PM - edited 03-03-2021 11:29 PM
I have a tab separated file like this
Copyright details 2021
ID \t NUMBER \t ADDRESS \t ZIPCODE
10 \t 9877777777 \t India \t 400322
13 \t 2983359852 \t AUS \t 84534
26 \t 7832724424
34 \t 8238444294 \t RSA \t 74363
Here the first row is a comment and the row with ID 26 doesn't have ending columns values. Even it doesn't have \t at the end .
So I need to read file skipping first line and handle missing delimiters at end.
I tried this
import org.apache.spark.sql.DataFrame
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val data = sc.textFile("sample_badtsv.txt")
val comments = data.first()
val fdata = data.filter(x => x != comments)
val header = fdata.filter(x => x.split("\t")(1) == "NUMBER").collect().mkString
val df = fdata.filter(x => x.split("\t")(1) != "NUMBER")
.map(x => x.split("\t"))
.map(x => (x(0),x(1),x(2),x(3)))
.toDF(header.split("\t"): _*)
Since I have missing \t at the end of lines if empty, I am getting ArrayIndexoutofBoundsException.
because when converting the rdd to dataframe we have less records for some rows.
Please provide me a better solution so that I can skip first line and read the file correctly (even there are no \t the code needs to consider it as NULL values at the end like below)
ID NUMBER ADDRESS ZIPCODE
10 9877777777 India 400322
13 2983359852 AUS 84534
26 7832724424 NULL NULL
34 8238444294 RSA 74363