Support Questions

Find answers, ask questions, and share your expertise

Spark SQL - Update Command

avatar
Expert Contributor

I am trying to update the value of a record using spark sql in spark shell

I get executed the command Update tablename set age=20 where name=justin, and I am getting the following errors

scala> val teenagers = sqlContext.sql("UPDATE people SET age=20 WHERE name=Justin")

java.lang.RuntimeException: [1.1] failure: ``with'' expected but identifier UPDATE found UPDATE people SET age=20 WHERE name=Justin ^

at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36)

......

Thanks

Sridhar

1 ACCEPTED SOLUTION

avatar

@Sridhar Babu M it depends on the datasource you are updating, not all sources can be updated. What is the backend ur using for the Dataframe People?

For example for hive: it’s possible to update data in Hive using ORC format https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-hive-orc-example.html

View solution in original post

10 REPLIES 10

avatar

@Sridhar Babu M it depends on the datasource you are updating, not all sources can be updated. What is the backend ur using for the Dataframe People?

For example for hive: it’s possible to update data in Hive using ORC format https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-hive-orc-example.html

avatar
Expert Contributor

Backend as default is Spark Sql, in the spark-shell I will be executing the Spark SQL queries.

I have a people.txt file, which has data of names along with ages.

I want to change the age of a particular name to some value..........

Is it possible to change the value in a txt file, using Spark-SQL query?

Is it possible to modify the value during map and reduce commands in Spark?

Note: I am not having HIVE installed.....

avatar

@Sridhar Babu M you cannot update text files using sparksql. I would suggest saving/appending results using an RDD or DataFrame instead.

avatar
Guru

I don't think SparkSQL supports DML on text file datasource just yet. You need to create a DataFrame from the source file, register a table using the DataFrame, select with predicate to get the person whose age you want to update, apply a function to increment the age field, and then overwrite the old table with the new DataFrame. Here is the code:

import org.apache.spark.sql._

case class Person(name: String, age: Int)
var personRDD = sc.textFile("/user/spark/people.txt")
var personDF = personRDD.map(x=>x.split(",")).map(x=>Person(x(0),(x(1).trim.toInt))).toDF()
personDF.registerTempTable("people")
var personDF = sqlContext.sql("SELECT name, age FROM people WHERE age = 19")
personDF.show() 

var agedPerson = personDF.map(x=>Person(x.getAs[String]("name"), x.getAs[Int]("age")+2)).toDF()
agedPerson.registerTempTable("people")
var agedPeopleDF = sqlContext.sql("SELECT * FROM people")
agedPeopleDF.show

This assumes that you have the SparkContext and SparkSQLContext, one person per line, file on HDFS at /user/spark/people.txt, and running shell as Spark-Client or Zeppelin.

avatar
Expert Contributor

Yup, this code is working fine, but after the execution of the above lines,

the contents of /user/spark/people.txt still has age of justin as 19

justin, 19

Value is not modified.

avatar
Guru

@Sridhar Babu M

If you actually need to change the value in the file then you will need to export the resulting Data Frame to file. The save function that is part of DF class creates a files for each partition. If you need a single file you convert back to an RDD and use coalesce(1) to get everything down to a single partition so you get one file. Make sure that you add the dependency in

Zeppelin

%dep

z.load("com.databricks:spark-csv_2.10:1.4.0")

or

spark-shell --packages com.databricks:spark-csv_2.10:1.4.0

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SaveMode


case class Person(name: String, age: Int)
var personRDD = sc.textFile("/user/spark/people.txt")
var personDF = personRDD.map(x=>x.split(",")).map(x=>Person(x(0),(x(1).trim.toInt))).toDF()
personDF.registerTempTable("people")
var personeDF = sqlContext.sql("SELECT * FROM people")


var agedPerson = personDF.map(x=>if(x.getAs[String]("name")=="Justin"){Person(x.getAs[String]("name"), x.getAs[Int]("age")+2)}else{Person(x.getAs[String]("name"), x.getAs[Int]("age"))}).toDF()
agedPerson.registerTempTable("people")
var agedPeopleDF = sqlContext.sql("SELECT * FROM people")
agedPeopleDF.show
 
agedPeopleDF.select("name", "age").write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).save("agedPeople")
var agedPeopleRDD = agedPeopleDF.rdd
agedPeopleRDD.coalesce(1).saveAsTextFile("agedPeopleSingleFile")

avatar
Expert Contributor

@Vadim

I am getting the following errors after executing the statement

agedPeopleDF.select("name", "age").write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).save("agedPeople")

Output

java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$less$colon$less; at com.databricks.spark.csv.util.CompressionCodecs$.<init>(CompressionCodecs.scala:29) at com.databricks.spark.csv.util.CompressionCodecs$.<clinit>(CompressionCodecs.scala) at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:189) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139) ..............

avatar
Guru
<code>groupId: com.databricks
artifactId: spark-csv_2.10
version: 1.4.0

@Sridhar Babu M

Make sure that you add the dependency in

Zeppelin

%dep

z.load("com.databricks:spark-csv_2.10:1.4.0")

or

spark-shell --packages com.databricks:spark-csv_2.10:1.4.0

avatar
Expert Contributor

@Vadim

It is working fine if I execute the below command

spark-shell --packages com.databricks:spark-csv_2.10:1.3.0

by keeping the version as 1.3.0 and not 1.4.0