Created 03-25-2016 03:20 PM
I have a txt file with the following data
Michael, 29 Andy, 30 Justin, 19 |
These are the names of people, along with ages.
I want to change the age of justin from 19 to 21.
How to change the value of 19, in the spark-shell using spark-sql query?
What are all the methods to be incorporated like map, reduce to modify the value of 19?
Thanks
Sridhar
Created 03-28-2016 03:53 AM
This answer assumes that you have SparkContext and SQLSparkContext already created as part of the shell and that your file has one person per line and is located on HDFS at /user/spark/people.log.
import org.apache.spark.sql._ case class Person(name: String, age: Int) var personRDD = sc.textFile("/user/spark/people.txt") var personDF = personRDD.map(x=>x.split(",")).map(x=>Person(x(0),(x(1).trim.toInt))).toDF() personDF.registerTempTable("people") var personDF = sqlContext.sql("SELECT name, age FROM people WHERE age = 19") personDF.show() var agedPerson = personDF.map(x=>Person(x.getAs[String]("name"), x.getAs[Int]("age")+2)).toDF() agedPerson.registerTempTable("people") var agedPeopleDF = sqlContext.sql("SELECT * FROM people") agedPeopleDF.show
If you are running your driver as SparkClient or through Zeppelin, can bring up the Yarn Resource Manager UI and see the job being distributed across the cluster.
Created 03-28-2016 03:53 AM
This answer assumes that you have SparkContext and SQLSparkContext already created as part of the shell and that your file has one person per line and is located on HDFS at /user/spark/people.log.
import org.apache.spark.sql._ case class Person(name: String, age: Int) var personRDD = sc.textFile("/user/spark/people.txt") var personDF = personRDD.map(x=>x.split(",")).map(x=>Person(x(0),(x(1).trim.toInt))).toDF() personDF.registerTempTable("people") var personDF = sqlContext.sql("SELECT name, age FROM people WHERE age = 19") personDF.show() var agedPerson = personDF.map(x=>Person(x.getAs[String]("name"), x.getAs[Int]("age")+2)).toDF() agedPerson.registerTempTable("people") var agedPeopleDF = sqlContext.sql("SELECT * FROM people") agedPeopleDF.show
If you are running your driver as SparkClient or through Zeppelin, can bring up the Yarn Resource Manager UI and see the job being distributed across the cluster.
Created 03-30-2016 01:08 PM
@Vadim
working fine,but value is not changed in the people.txt file, it still remains 19.
One more small doubt, will we be able to see how the job is distributed across the cluster using the Ambari UI.
And Does SparkClient come as default, with the spark bundle? how to see the spark client in Web UI, like the localhost:port ?
Created 03-30-2016 01:14 PM
You can see the details of what Spark is doing by clicking on the application master in Resource Manager UI. When you click on the application master link for the Spark job in Resource Manager UI it will take you to the Spark UI and show you the job in detail. You may just have to make sure that the Spark History Server is running in Ambari or the page may come up blank.
If you actually need to change the value in the file then you will need to export the resulting Data Frame to file. The save function that is part of DF class creates a files for each partition. If you need a single file you convert back to an RDD and use coalesce(1) to get everything down to a single partition so you get one file. Make sure that you add the dependency in
Zeppelin
%dep
z.load("com.databricks:spark-csv_2.10:1.4.0")
or
spark-shell --packages com.databricks:spark-csv_2.10:1.4.0
import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SaveMode case class Person(name: String, age: Int) var personRDD = sc.textFile("/user/spark/people.txt") var personDF = personRDD.map(x=>x.split(",")).map(x=>Person(x(0),(x(1).trim.toInt))).toDF() personDF.registerTempTable("people") var personeDF = sqlContext.sql("SELECT * FROM people") var agedPerson = personDF.map(x=>if(x.getAs[String]("name")=="Justin"){Person(x.getAs[String]("name"), x.getAs[Int]("age")+2)}else{Person(x.getAs[String]("name"), x.getAs[Int]("age"))}).toDF() agedPerson.registerTempTable("people") var agedPeopleDF = sqlContext.sql("SELECT * FROM people") agedPeopleDF.show agedPeopleDF.select("name", "age").write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).save("agedPeople") var agedPeopleRDD = agedPeopleDF.rdd agedPeopleRDD.coalesce(1).saveAsTextFile("agedPeopleSingleFile")
Created 03-29-2016 04:38 PM
@Sridhar Babu M you cannot modify a column as such, you may operate on a column and return a new DataFrame reflecting that change. For that you'd first create a UserDefinedFunction
implementing the operation to apply and then selectively apply that function to the targeted column only. In Python:
from pyspark.sql.functions import UserDefinedFunction from pyspark.sql.types import StringType name = 'target_column' udf = UserDefinedFunction(lambda x: 'new_value', Stringtype()) new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])
Created 03-30-2016 01:09 PM
Thank you sir!
I hope the exact above program can be written and executed in Scala, will try in doing so.