Created 03-27-2018 08:11 AM
i have csv file example with schema
test.csv
name,age,state
swathi,23,us
srivani,24,UK
ram,25,London
sravan,30,UK
we need to split into different files according to state US state data should be loaded into (with schema)
output
/user/data/US.txt
name,age,state
swathi,23,us
/user/data/UK
name,age,state
srivani,24,UK
sravan,30,UK
/user/data/London
name,age,state
ram,25,London
Created 03-27-2018 08:14 AM
please help out using spark scala how to solve this problem this task assigned to me
thanks in advance
swathi.T
Created 03-27-2018 11:45 AM
By using Csv package we can do this use case easily
here is what i tried
i had a csv file in hdfs directory called test.csv
name,age,state swathi,23,us srivani,24,UK ram,25,London sravan,30,UK
initialize spark shell with csv package
spark-shell --master local --packages com.databricks:spark-csv_2.10:1.3.0
loading the hdfs file into spark dataframe using csv format as we are having header so i have included header while loading
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/user/test/test.csv")
if your file is in local the use
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("file:///<local-path>/test.csv")
once the loading completes to view schema
scala> df.printSchema() root |-- name: string (nullable = true) |-- age: string (nullable = true) |-- state: string (nullable = true)
Now we are having df dataframe with schema then we can apply all the filter operations on the schema
Filtering and storing state is us,UK,London:-
val df2=df.filter($"state"==="us")
(or)
val df2=df.filter(col("state")==="us")
scala> df2.show() +------+---+-----+ | name|age|state| +------+---+-----+ |swathi| 23| us| +------+---+-----+
as we can see above we are having only state is us in df2 dataframe.
In the same way we need to filter and create new dataframes for state is UK and London
val df3=df.filter(col("state")==="UK")
val df4=df.filter(col("state")==="London")
once the filtering and creating new data frames is done now we need to write df2,df3,df4 dataframes into hdfs with headers included.
As we cannot create specific files while writing the data back to hdfs, with below command i'm creating us directory in hdfs then loading the df2 data frame data into us directory
df2.write.format("com.databricks.spark.csv").save("/user/test/us")
same way
we need to store df3,df4 into different directories in hdfs
df3.write.format("com.databricks.spark.csv").save("/user/test/UK")
df4.write.format("com.databricks.spark.csv").save("/user/test/London")
now when you run
hadoop fs -ls /user/test/
you are going to have 3 directories(us,UK,London) and the corresponding part-00000 files in those directories.
In addition
we can create register temp table once the data loaded into df dataframe, then we can run sql queries on top of temp table using sqlContext.
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/user/test/test.csv") df.registerTempTable("temp") val df2=sqlContext.sql("select * from temp where state ='us') val df3=sqlContext.sql("select * from temp where state ='UK') val df4=sqlContext.sql("select * from temp where state ='London') df2.write.format("com.databricks.spark.csv").save("/user/test/us") df3.write.format("com.databricks.spark.csv").save("/user/test/UK") df4.write.format("com.databricks.spark.csv").save("/user/test/London")
in both ways(using filter and using register temp table) results will be the same.
Created 03-27-2018 02:40 PM
I think you are having header as first line in your file so we need to skip that header and then apply your case class to the file and also use escape for the special split character because if you specify split then spark takes as regex character for | regex matching character would be \\|.
my input file:-
name,age,state swathi|23|us srivani|24|UK ram|25|London
case class schema(name:String,age:Int,brand_code:String) val rdd = sc.textFile("file://<local-file-path>/test.csv") (or) val rdd = sc.textFile("/test.csv") //for hadoop file val header = rdd.first() val data = rdd.filter(row => row != header) val df1 = data.map(_.split("\\|")).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()<br>
(or)
case class schema(name:String,age:Int,brand_code:String) <br>val rdd = sc.textFile("file://<local-file>/test.csv") //for local file (or) val rdd = sc.textFile("/test.csv") //for hadoop file val rdd1= rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter } val df1 = rdd1.map(_.split("\\|")).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()
in both ways we are skipping header line then applying our case class schema to the file once we apply case class and to df then we are going to have dataframe.
If you don't have header then just load the file then apply split and case class and convert as dataframe
case class schema(name:String,age:Int,brand_code:String) val rdd = sc.textFile("file://<local-file-path>/test.csv") (or) val rdd = sc.textFile("/test.csv") //for hadoop file val df1 = rdd.map(_.split("\\|")).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()
(or)
case class schema(name:String,age:Int,brand_code:String) <br>val rdd = sc.textFile("file://<local-file>/test.csv") //for local file (or) val rdd = sc.textFile("/test.csv") //for hadoop file val df1 = rdd.map(_.split("\\|")).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()
Output from both ways:-
scala> df1.show() +-------+---+----------+ | name|age|brand_code| +-------+---+----------+ | swathi| 23| us| |srivani| 24| UK| | ram| 25| London| +-------+---+----------+
How ever i have used csv package in spark 1.6.2 version it works fine, using this package is simple method than assigning case class . But you can choose either of those methods as per your requirements..!!
Created 03-28-2018 01:22 PM
i didn't get the question, but if you are having a file with caret(^) as delimiter then we need to escape that caret with two back slashes as caret(^) is an special character in regex(line start position)
Input file:-
name^age^state swathi^23^us srivani^24^UK ram^25^London
scala> case class schema(name:String,age:Int,brand_code:String) scala> val rdd = sc.textFile("file://<file-path>/test1.csv") scala> val rdd1= rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter } scala> val df1 = rdd1.map(_.split("\\^")).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()
(or)
scala> val df1 = rdd1.map(_.split('^')).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()
Output:-
scala> df1.show()
+-------+---+----------+ | name|age|brand_code| +-------+---+----------+ | swathi| 23| us| |srivani| 24| UK| | ram| 25| London| +-------+---+----------+
if you are still facing issues then please share your input data, script that you have prepared and the expected output. so that its easily understand the root cause of the issue..!!
Created 03-29-2018 04:34 AM
val rdd2 = rddwithheader.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
val rowrdd3 = rdd2.map(_.split("\\|")).map(p=>schema(p(0),p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10), p(11), p(12), p(13), p(14), p(15), p(16), p(17), p(18), p(19), p(20), p(21), p(22), p(23), p(24), p(25), p(26), p(27), p(28), p(29), p(30), p(31), p(32), p(33), p(34), p(35), p(36), p(37), p(38), p(39), p(40), p(41), p(42), p(43), p(44), p(45), p(46), p(47), p(48), p(49), p(50), p(51), p(52), p(53), p(54), p(55), p(56), p(57), p(58), p(59), p(60), p(61), p(62), p(63), p(64), p(65), p(66), p(67), p(68), p(69), p(70), p(71), p(72), p(73), p(74), p(75), p(76), p(77), p(78), p(79), p(80), p(81), p(82), p(83), p(84), p(85), p(86), p(87), p(88), p(89), p(90), p(91), p(92), p(93), p(94), p(95), p(96), p(97), p(98), p(99), p(100), p(101), p(102), p(103), p(104), p(105), p(106), p(107), p(108), p(109), p(110), p(111), p(112), p(113), p(114), p(115), p(116), p(117), p(118), p(119), p(120), p(121), p(122), p(123), p(124), p(125), p(126), p(127), p(128), p(129), p(130), p(131), p(132), p(133), p(134), p(135), p(136), p(137), p(138), p(139), p(140), p(141), p(142)))
error: overloaded method value apply with alternatives: (fieldIndex: Int)org.apache.spark.sql.types.StructField <and> (names: Set[String])org.apache.spark.sql.types.StructType <and> (name: String)org.apache.spark.sql.types.StructField
other code shu wat u gave overloaded method value apply with alternatives: exception how to handle this
kindly help on this
thanks in advance
Created 03-29-2018 04:36 AM
input.data
ef47cd52f7ed4044148ab7b1cc897f55|TEST_F1|TEST_L1|7109 Romford Way||North Richland Hills|TX|76182-5027|5027|test1498@yahoo.com|||||MNY|USA|1989||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||N|N|N|N|N|N||||||||||||||||||||||||||||||||||||||||||||||||||||||| 556510f9cea2e32260eb913e976b7ef0|TEST_F2|TEST_L2|11 South Rd||Chester|NJ|07930-2739|2739|test@embarqmail.com|||||OAP|USA|1964|||||Female||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 91daac14d56047c45cb27227b46b8074|TEST_F3|TEST_L3|1724 Holly Ln||Pampa|TX|79065-4536|4536|test@sbcglobal.net|||||OAP|USA|1941|||||Female|||||SKINTONE_LIGHT|||||||||||||||||||||EYECOLOR_BLUE|||||HAIRCOLOR_AUBURN|||||||||||||||||||||||||||EN|||N|Y|N|N|N||||INT_HAIR_GREY_COVERAGE,INT_HAIR_TRENDS||||||||||||||||||||||||||||||||||||||||||||||||||||4536|4536|test@sbcglobal.net|||||OAP|USA|1941|||||Female|||||SKINTONE_LIGHT|||||||||||||||||||||EYECOLOR_BLUE|||||HAIRCOLOR_AUBURN|||||||||||||||||||||||||||EN|||N|Y|N|N|N||||INT_HAIR_GREY_COVERAGE,INT_HAIR_TRENDS||||||||||||||||||||||||||||||||||||||||||||||||||||
Created 03-29-2018 04:44 AM
input data
Created 03-27-2018 02:34 PM
The pipe is a special character for splits, please use single quotes to split pipe-delimited strings:
val df1 = sc.textFile("testfile.txt").Map(_.split('|')).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()
Alternatively, you can use commas or another separator.
See the following StackOverflow post for more detail:
https://stackoverflow.com/questions/11284771/scala-string-split-does-not-work
Created 03-28-2018 09:47 AM
val rdd2 = rdd1.map(_.split("^"))
rdd2.collect
res16: Array[Array[String]] = Array(Array(OAP^US^xxahggv), Array(MNY^US^sfskdgsjkg), Array(ESS^US^fxjshgg))
it is not split well is the issue i not getting
can show me the syntax am not able to find
thanks in advance