Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

SPARK SQL query to modify values

avatar
Expert Contributor

I have a txt file with the following data

Michael, 29 Andy, 30 Justin, 19

These are the names of people, along with ages.

I want to change the age of justin from 19 to 21.

How to change the value of 19, in the spark-shell using spark-sql query?

What are all the methods to be incorporated like map, reduce to modify the value of 19?

Thanks

Sridhar

1 ACCEPTED SOLUTION

avatar
Guru

This answer assumes that you have SparkContext and SQLSparkContext already created as part of the shell and that your file has one person per line and is located on HDFS at /user/spark/people.log.

import org.apache.spark.sql._

case class Person(name: String, age: Int)

var personRDD = sc.textFile("/user/spark/people.txt")
var personDF = personRDD.map(x=>x.split(",")).map(x=>Person(x(0),(x(1).trim.toInt))).toDF()
personDF.registerTempTable("people")
var personDF = sqlContext.sql("SELECT name, age FROM people WHERE age = 19")
personDF.show() 


var agedPerson = personDF.map(x=>Person(x.getAs[String]("name"), x.getAs[Int]("age")+2)).toDF()
agedPerson.registerTempTable("people")
var agedPeopleDF = sqlContext.sql("SELECT * FROM people")
agedPeopleDF.show

If you are running your driver as SparkClient or through Zeppelin, can bring up the Yarn Resource Manager UI and see the job being distributed across the cluster.

View solution in original post

5 REPLIES 5

avatar
Guru

This answer assumes that you have SparkContext and SQLSparkContext already created as part of the shell and that your file has one person per line and is located on HDFS at /user/spark/people.log.

import org.apache.spark.sql._

case class Person(name: String, age: Int)

var personRDD = sc.textFile("/user/spark/people.txt")
var personDF = personRDD.map(x=>x.split(",")).map(x=>Person(x(0),(x(1).trim.toInt))).toDF()
personDF.registerTempTable("people")
var personDF = sqlContext.sql("SELECT name, age FROM people WHERE age = 19")
personDF.show() 


var agedPerson = personDF.map(x=>Person(x.getAs[String]("name"), x.getAs[Int]("age")+2)).toDF()
agedPerson.registerTempTable("people")
var agedPeopleDF = sqlContext.sql("SELECT * FROM people")
agedPeopleDF.show

If you are running your driver as SparkClient or through Zeppelin, can bring up the Yarn Resource Manager UI and see the job being distributed across the cluster.

avatar
Expert Contributor

@Vadim

working fine,but value is not changed in the people.txt file, it still remains 19.

One more small doubt, will we be able to see how the job is distributed across the cluster using the Ambari UI.

And Does SparkClient come as default, with the spark bundle? how to see the spark client in Web UI, like the localhost:port ?

avatar
Guru

@Sridhar Babu M

You can see the details of what Spark is doing by clicking on the application master in Resource Manager UI. When you click on the application master link for the Spark job in Resource Manager UI it will take you to the Spark UI and show you the job in detail. You may just have to make sure that the Spark History Server is running in Ambari or the page may come up blank.

If you actually need to change the value in the file then you will need to export the resulting Data Frame to file. The save function that is part of DF class creates a files for each partition. If you need a single file you convert back to an RDD and use coalesce(1) to get everything down to a single partition so you get one file. Make sure that you add the dependency in

Zeppelin

%dep

z.load("com.databricks:spark-csv_2.10:1.4.0")

or

spark-shell --packages com.databricks:spark-csv_2.10:1.4.0

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SaveMode
case class Person(name: String, age: Int)
var personRDD = sc.textFile("/user/spark/people.txt")
var personDF = personRDD.map(x=>x.split(",")).map(x=>Person(x(0),(x(1).trim.toInt))).toDF()
personDF.registerTempTable("people")
var personeDF = sqlContext.sql("SELECT * FROM people")
var agedPerson = personDF.map(x=>if(x.getAs[String]("name")=="Justin"){Person(x.getAs[String]("name"), x.getAs[Int]("age")+2)}else{Person(x.getAs[String]("name"), x.getAs[Int]("age"))}).toDF()
agedPerson.registerTempTable("people")
var agedPeopleDF = sqlContext.sql("SELECT * FROM people")
agedPeopleDF.show
 
agedPeopleDF.select("name", "age").write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).save("agedPeople")
var agedPeopleRDD = agedPeopleDF.rdd
agedPeopleRDD.coalesce(1).saveAsTextFile("agedPeopleSingleFile")

avatar

@Sridhar Babu M you cannot modify a column as such, you may operate on a column and return a new DataFrame reflecting that change. For that you'd first create a UserDefinedFunction implementing the operation to apply and then selectively apply that function to the targeted column only. In Python:

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', Stringtype())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])

avatar
Expert Contributor

@azeltov

Thank you sir!

I hope the exact above program can be written and executed in Scala, will try in doing so.