Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

how to read schema of csv file and according to column values and we need to split the data into multiple file using scala

avatar

i have csv file example with schema

test.csv

name,age,state

swathi,23,us

srivani,24,UK

ram,25,London

sravan,30,UK

we need to split into different files according to state US state data should be loaded into (with schema)

output

/user/data/US.txt

name,age,state

swathi,23,us

/user/data/UK

name,age,state

srivani,24,UK

sravan,30,UK

/user/data/London

name,age,state

ram,25,London

13 REPLIES 13

avatar

please help out using spark scala how to solve this problem this task assigned to me

thanks in advance

swathi.T

avatar
Master Guru
@swathi thukkaraju

By using Csv package we can do this use case easily

here is what i tried

i had a csv file in hdfs directory called test.csv

name,age,state
swathi,23,us
srivani,24,UK
ram,25,London
sravan,30,UK

initialize spark shell with csv package

spark-shell --master local --packages com.databricks:spark-csv_2.10:1.3.0

loading the hdfs file into spark dataframe using csv format as we are having header so i have included header while loading

val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/user/test/test.csv")

if your file is in local the use

val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("file:///<local-path>/test.csv")

once the loading completes to view schema

scala> df.printSchema()
root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- state: string (nullable = true)

Now we are having df dataframe with schema then we can apply all the filter operations on the schema

Filtering and storing state is us,UK,London:-

val df2=df.filter($"state"==="us") 

(or)

val df2=df.filter(col("state")==="us")
scala> df2.show()
+------+---+-----+
|  name|age|state|
+------+---+-----+
|swathi| 23|   us|
+------+---+-----+

as we can see above we are having only state is us in df2 dataframe.

In the same way we need to filter and create new dataframes for state is UK and London

val df3=df.filter(col("state")==="UK")
val df4=df.filter(col("state")==="London")

once the filtering and creating new data frames is done now we need to write df2,df3,df4 dataframes into hdfs with headers included.

As we cannot create specific files while writing the data back to hdfs, with below command i'm creating us directory in hdfs then loading the df2 data frame data into us directory

df2.write.format("com.databricks.spark.csv").save("/user/test/us")

same way

we need to store df3,df4 into different directories in hdfs

df3.write.format("com.databricks.spark.csv").save("/user/test/UK")
df4.write.format("com.databricks.spark.csv").save("/user/test/London")

now when you run

hadoop fs -ls /user/test/

you are going to have 3 directories(us,UK,London) and the corresponding part-00000 files in those directories.

In addition

we can create register temp table once the data loaded into df dataframe, then we can run sql queries on top of temp table using sqlContext.

val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/user/test/test.csv")
df.registerTempTable("temp")
val df2=sqlContext.sql("select * from temp where state ='us')
val df3=sqlContext.sql("select * from temp where state ='UK')
val df4=sqlContext.sql("select * from temp where state ='London')
df2.write.format("com.databricks.spark.csv").save("/user/test/us")
df3.write.format("com.databricks.spark.csv").save("/user/test/UK")
df4.write.format("com.databricks.spark.csv").save("/user/test/London")

in both ways(using filter and using register temp table) results will be the same.

avatar
Master Guru

@swathi thukkaraju

I think you are having header as first line in your file so we need to skip that header and then apply your case class to the file and also use escape for the special split character because if you specify split then spark takes as regex character for | regex matching character would be \\|.

my input file:-

name,age,state
swathi|23|us
srivani|24|UK
ram|25|London
 case class schema(name:String,age:Int,brand_code:String)
val rdd = sc.textFile("file://<local-file-path>/test.csv") (or) val rdd = sc.textFile("/test.csv") //for hadoop file
 val header = rdd.first() 
 val data = rdd.filter(row => row != header)
 val df1 = data.map(_.split("\\|")).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()<br>

(or)

case class schema(name:String,age:Int,brand_code:String) <br>val rdd = sc.textFile("file://<local-file>/test.csv") //for local file (or)  val rdd = sc.textFile("/test.csv") //for hadoop file
 val rdd1= rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
 val df1 = rdd1.map(_.split("\\|")).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()

in both ways we are skipping header line then applying our case class schema to the file once we apply case class and to df then we are going to have dataframe.

If you don't have header then just load the file then apply split and case class and convert as dataframe

case class schema(name:String,age:Int,brand_code:String)
val rdd = sc.textFile("file://<local-file-path>/test.csv") (or) val rdd = sc.textFile("/test.csv") //for hadoop file
 val df1 = rdd.map(_.split("\\|")).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()

(or)

case class schema(name:String,age:Int,brand_code:String) <br>val rdd = sc.textFile("file://<local-file>/test.csv") //for local file (or)  val rdd = sc.textFile("/test.csv") //for hadoop file
 val df1 = rdd.map(_.split("\\|")).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()

Output from both ways:-

scala> df1.show()
+-------+---+----------+
|   name|age|brand_code|
+-------+---+----------+
| swathi| 23|        us|
|srivani| 24|        UK|
|    ram| 25|    London|
+-------+---+----------+

How ever i have used csv package in spark 1.6.2 version it works fine, using this package is simple method than assigning case class . But you can choose either of those methods as per your requirements..!!

avatar
Master Guru

@swathi thukkaraju

i didn't get the question, but if you are having a file with caret(^) as delimiter then we need to escape that caret with two back slashes as caret(^) is an special character in regex(line start position)

Input file:-

name^age^state
swathi^23^us
srivani^24^UK
ram^25^London
scala> case class schema(name:String,age:Int,brand_code:String)
scala> val rdd = sc.textFile("file://<file-path>/test1.csv")
scala> val rdd1= rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
scala> val df1 = rdd1.map(_.split("\\^")).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()
(or)
scala> val df1 = rdd1.map(_.split('^')).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()

Output:-

scala> df1.show()
+-------+---+----------+ | name|age|brand_code| +-------+---+----------+ | swathi| 23| us| |srivani| 24| UK| | ram| 25| London| +-------+---+----------+

if you are still facing issues then please share your input data, script that you have prepared and the expected output. so that its easily understand the root cause of the issue..!!

avatar

val rdd2 = rddwithheader.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }

val rowrdd3 = rdd2.map(_.split("\\|")).map(p=>schema(p(0),p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10), p(11), p(12), p(13), p(14), p(15), p(16), p(17), p(18), p(19), p(20), p(21), p(22), p(23), p(24), p(25), p(26), p(27), p(28), p(29), p(30), p(31), p(32), p(33), p(34), p(35), p(36), p(37), p(38), p(39), p(40), p(41), p(42), p(43), p(44), p(45), p(46), p(47), p(48), p(49), p(50), p(51), p(52), p(53), p(54), p(55), p(56), p(57), p(58), p(59), p(60), p(61), p(62), p(63), p(64), p(65), p(66), p(67), p(68), p(69), p(70), p(71), p(72), p(73), p(74), p(75), p(76), p(77), p(78), p(79), p(80), p(81), p(82), p(83), p(84), p(85), p(86), p(87), p(88), p(89), p(90), p(91), p(92), p(93), p(94), p(95), p(96), p(97), p(98), p(99), p(100), p(101), p(102), p(103), p(104), p(105), p(106), p(107), p(108), p(109), p(110), p(111), p(112), p(113), p(114), p(115), p(116), p(117), p(118), p(119), p(120), p(121), p(122), p(123), p(124), p(125), p(126), p(127), p(128), p(129), p(130), p(131), p(132), p(133), p(134), p(135), p(136), p(137), p(138), p(139), p(140), p(141), p(142)))

error: overloaded method value apply with alternatives: (fieldIndex: Int)org.apache.spark.sql.types.StructField <and> (names: Set[String])org.apache.spark.sql.types.StructType <and> (name: String)org.apache.spark.sql.types.StructField

other code shu wat u gave overloaded method value apply with alternatives: exception how to handle this

kindly help on this

thanks in advance

avatar

input.data

ef47cd52f7ed4044148ab7b1cc897f55|TEST_F1|TEST_L1|7109 Romford Way||North Richland Hills|TX|76182-5027|5027|test1498@yahoo.com|||||MNY|USA|1989||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||N|N|N|N|N|N||||||||||||||||||||||||||||||||||||||||||||||||||||||| 556510f9cea2e32260eb913e976b7ef0|TEST_F2|TEST_L2|11 South Rd||Chester|NJ|07930-2739|2739|test@embarqmail.com|||||OAP|USA|1964|||||Female||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 91daac14d56047c45cb27227b46b8074|TEST_F3|TEST_L3|1724 Holly Ln||Pampa|TX|79065-4536|4536|test@sbcglobal.net|||||OAP|USA|1941|||||Female|||||SKINTONE_LIGHT|||||||||||||||||||||EYECOLOR_BLUE|||||HAIRCOLOR_AUBURN|||||||||||||||||||||||||||EN|||N|Y|N|N|N||||INT_HAIR_GREY_COVERAGE,INT_HAIR_TRENDS||||||||||||||||||||||||||||||||||||||||||||||||||||4536|4536|test@sbcglobal.net|||||OAP|USA|1941|||||Female|||||SKINTONE_LIGHT|||||||||||||||||||||EYECOLOR_BLUE|||||HAIRCOLOR_AUBURN|||||||||||||||||||||||||||EN|||N|Y|N|N|N||||INT_HAIR_GREY_COVERAGE,INT_HAIR_TRENDS||||||||||||||||||||||||||||||||||||||||||||||||||||

avatar

input data

input.txt

avatar
Super Collaborator

@swathi thukkaraju

The pipe is a special character for splits, please use single quotes to split pipe-delimited strings:

val df1 = sc.textFile("testfile.txt").Map(_.split('|')).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()

Alternatively, you can use commas or another separator.

See the following StackOverflow post for more detail:

https://stackoverflow.com/questions/11284771/scala-string-split-does-not-work

avatar

val rdd2 = rdd1.map(_.split("^"))

rdd2.collect

res16: Array[Array[String]] = Array(Array(OAP^US^xxahggv), Array(MNY^US^sfskdgsjkg), Array(ESS^US^fxjshgg))

it is not split well is the issue i not getting

can show me the syntax am not able to find

thanks in advance