Created 03-21-2016 04:16 AM
I am trying to update the value of a record using spark sql in spark shell
I get executed the command Update tablename set age=20 where name=justin, and I am getting the following errors
scala> val teenagers = sqlContext.sql("UPDATE people SET age=20 WHERE name=Justin") java.lang.RuntimeException: [1.1] failure: ``with'' expected but identifier UPDATE found UPDATE people SET age=20 WHERE name=Justin ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36) ...... |
Thanks
Sridhar
Created 03-21-2016 05:21 PM
@Sridhar Babu M it depends on the datasource you are updating, not all sources can be updated. What is the backend ur using for the Dataframe People?
For example for hive: it’s possible to update data in Hive using ORC format https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-hive-orc-example.html
Created 03-21-2016 05:21 PM
@Sridhar Babu M it depends on the datasource you are updating, not all sources can be updated. What is the backend ur using for the Dataframe People?
For example for hive: it’s possible to update data in Hive using ORC format https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-hive-orc-example.html
Created 03-25-2016 03:15 PM
Backend as default is Spark Sql, in the spark-shell I will be executing the Spark SQL queries.
I have a people.txt file, which has data of names along with ages.
I want to change the age of a particular name to some value..........
Is it possible to change the value in a txt file, using Spark-SQL query?
Is it possible to modify the value during map and reduce commands in Spark?
Note: I am not having HIVE installed.....
Created 03-28-2016 01:13 PM
@Sridhar Babu M you cannot update text files using sparksql. I would suggest saving/appending results using an RDD or DataFrame instead.
Created 03-28-2016 04:03 AM
I don't think SparkSQL supports DML on text file datasource just yet. You need to create a DataFrame from the source file, register a table using the DataFrame, select with predicate to get the person whose age you want to update, apply a function to increment the age field, and then overwrite the old table with the new DataFrame. Here is the code:
import org.apache.spark.sql._ case class Person(name: String, age: Int) var personRDD = sc.textFile("/user/spark/people.txt") var personDF = personRDD.map(x=>x.split(",")).map(x=>Person(x(0),(x(1).trim.toInt))).toDF() personDF.registerTempTable("people") var personDF = sqlContext.sql("SELECT name, age FROM people WHERE age = 19") personDF.show() var agedPerson = personDF.map(x=>Person(x.getAs[String]("name"), x.getAs[Int]("age")+2)).toDF() agedPerson.registerTempTable("people") var agedPeopleDF = sqlContext.sql("SELECT * FROM people") agedPeopleDF.show
This assumes that you have the SparkContext and SparkSQLContext, one person per line, file on HDFS at /user/spark/people.txt, and running shell as Spark-Client or Zeppelin.
Created 03-30-2016 01:29 PM
Yup, this code is working fine, but after the execution of the above lines,
the contents of /user/spark/people.txt still has age of justin as 19
justin, 19
Value is not modified.
Created 03-30-2016 04:04 PM
If you actually need to change the value in the file then you will need to export the resulting Data Frame to file. The save function that is part of DF class creates a files for each partition. If you need a single file you convert back to an RDD and use coalesce(1) to get everything down to a single partition so you get one file. Make sure that you add the dependency in
Zeppelin
%dep
z.load("com.databricks:spark-csv_2.10:1.4.0")
or
spark-shell --packages com.databricks:spark-csv_2.10:1.4.0
import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SaveMode case class Person(name: String, age: Int) var personRDD = sc.textFile("/user/spark/people.txt") var personDF = personRDD.map(x=>x.split(",")).map(x=>Person(x(0),(x(1).trim.toInt))).toDF() personDF.registerTempTable("people") var personeDF = sqlContext.sql("SELECT * FROM people") var agedPerson = personDF.map(x=>if(x.getAs[String]("name")=="Justin"){Person(x.getAs[String]("name"), x.getAs[Int]("age")+2)}else{Person(x.getAs[String]("name"), x.getAs[Int]("age"))}).toDF() agedPerson.registerTempTable("people") var agedPeopleDF = sqlContext.sql("SELECT * FROM people") agedPeopleDF.show agedPeopleDF.select("name", "age").write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).save("agedPeople") var agedPeopleRDD = agedPeopleDF.rdd agedPeopleRDD.coalesce(1).saveAsTextFile("agedPeopleSingleFile")
Created 03-31-2016 12:38 AM
I am getting the following errors after executing the statement
agedPeopleDF.select("name", "age").write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).save("agedPeople")
Output
java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$less$colon$less; at com.databricks.spark.csv.util.CompressionCodecs$.<init>(CompressionCodecs.scala:29) at com.databricks.spark.csv.util.CompressionCodecs$.<clinit>(CompressionCodecs.scala) at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:189) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139) .............. |
Created 03-31-2016 01:21 AM
<code>groupId: com.databricks artifactId: spark-csv_2.10 version: 1.4.0
Make sure that you add the dependency in
Zeppelin
%dep
z.load("com.databricks:spark-csv_2.10:1.4.0")
or
spark-shell --packages com.databricks:spark-csv_2.10:1.4.0
Created 03-31-2016 08:27 AM
It is working fine if I execute the below command
spark-shell --packages com.databricks:spark-csv_2.10:1.3.0
by keeping the version as 1.3.0 and not 1.4.0