Created 12-21-2016 01:21 PM
1)val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
2)val df=sqlContext.sql("select * from v_main_test ")
3)df.show()
|vbeln| line_number|parent_line_number|flag| dt|
| 123| ||0|20161219|
| 124| ||1|20161219|
df.map(row => { val row1 = row.getAs[String]("vbeln")
val make = if (row1.toLowerCase == "125") "S" else "its 123"
Row(row(0),make,row(0)) }).collect().foreach(println)
row doest work??
I have to use map function because i have to use each line and update each line by line number and parent line number
out put shoud be :line shoud be 001 and dont need to add new line but for flag1 i have to add same line twise as
| 123| 001||0|20161219|
| 124| 002||1|20161219|
| 124| 003|002|1|20161219|
how to use spark dataframe and spark core functions like map in scala ?
Created 12-21-2016 04:46 PM
Unfortunately the error itself is not included in the question. but most probably you don't need to map to Row type. Map the Row to a case class or a tuple. Tested with Spark 2.0.2:
val spark = session.sparkContext import session.sqlContext.implicits._ val df: DataFrame = session.sqlContext.read.format("csv").load("file:///tmp/ls.txt") df.printSchema() df.show() val df2 = df.map(r => (r.getString(0),r.getString(0).length)) df2.printSchema()
From the output:
root |-- _c0: string (nullable = true) |-- _c1: string (nullable = true) +---+---+ |_c0|_c1| +---+---+ | a| 1| | b| 2| | c| 3| +---+---+ root |-- _1: string (nullable = true) |-- _2: integer (nullable = true)
Created 12-22-2016 07:26 AM
| 123| 001||0|20161219|
| 124| 002||1|20161219|
| 124| 003|002|1|20161219|
how to use spark dataframe and spark core functions like map in scala ?
how to put variable value in each row of DF ? is it possible (becasue df is immutable )?
if we convert df into rdd then how to change each lines 3 rd column with varible value +1 and increment for each line ?
Created 12-22-2016 11:19 AM
Use
df.map(row => { val row1 = row.getAs[String]("vbeln") val make = if (row1.toLowerCase == "125") "S" else "its 123" (row(0),make,row(0)) }).collect().foreach(println)
Instead of:
df.map(row => { val row1 = row.getAs[String]("vbeln") val make = if (row1.toLowerCase == "125") "S" else "its 123" Row(row(0),make,row(0)) }).collect().foreach(println)
You can map each line to a new one, but you don't need to return with a new Row. You should map each record to a new tuple (in case of simple types) or a new case class instance.