Created 03-22-2018 06:06 AM
When writing certification and loading data from HDFS to spark, Can we load the data to data frame?
Created 03-22-2018 07:12 PM
You can read the data from HDFS directly to DF provided it's a data format with the embedded schema. For example.
Reading Avro in Spark - Refer this link
Reading Parquet in Spark
val parquetFileDF = spark.read.parquet("people.parquet")
Reading ORC in spark - Refer this link
Reading JSON in spark
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
But if your data do not belong to any of these "enriched" formats, you can always read those files as RDD and convert them to DF.
Follows some examples.
Import necessary classes
import org.apache.spark.sql.{Row,SparkSession} import org.apache.spark.sql.types.{DoubleType,StringType,StructField,StructType}
Create SparkSession
Object, Here it's spark
val spark:SparkSession=SparkSession.builder.master("local").getOrCreate val sc = spark.sparkContext // Just used to create test RDDs
Let's an RDD
to make it DataFrame
val rdd = sc.parallelize(Seq(("first",Array(2.0,1.0,2.1,5.4)),("test",Array(1.5,0.5,0.9,3.7)),("choose",Array(8.0,2.9,9.1,2.5))))
Using SparkSession.createDataFrame(RDD obj)
.
val dfWithoutSchema = spark.createDataFrame(rdd) dfWithoutSchema.show() +------+--------------------+ | _1| _2| +------+--------------------+ | first|[2.0,1.0,2.1,5.4] | | test|[1.5,0.5,0.9,3.7] | |choose|[8.0,2.9,9.1,2.5] | +------+--------------------+
Using SparkSession.createDataFrame(RDD obj)
and specifying column names.
val dfWithSchema = spark.createDataFrame(rdd).toDF("id","vals") dfWithSchema.show() +------+--------------------+ | id| vals| +------+--------------------+ | first|[2.0,1.0,2.1,5.4] | | test|[1.5,0.5,0.9,3.7] | |choose|[8.0,2.9,9.1,2.5] | +------+--------------------+
This way requires the input rdd
should be of type RDD[Row]
.
val rowsRdd: RDD[Row]= sc.parallelize(Seq(Row("first",2.0,7.0),Row("second",3.5,2.5),Row("third",7.0,5.9)))
create the schema
val schema =newStructType().add(StructField("id",StringType,true)).add(StructField("val1",DoubleType,true)).add(StructField("val2",DoubleType,true))
Now apply both rowsRdd
and schema
to createDataFrame()
val df = spark.createDataFrame(rowsRdd, schema) df.show() +------+----+----+ | id|val1|val2| +------+----+----+ | first|2.0|7.0 | |second|3.5|2.5 | | third|7.0|5.9 | +------+----+----+
Created 03-22-2018 07:12 PM
You can read the data from HDFS directly to DF provided it's a data format with the embedded schema. For example.
Reading Avro in Spark - Refer this link
Reading Parquet in Spark
val parquetFileDF = spark.read.parquet("people.parquet")
Reading ORC in spark - Refer this link
Reading JSON in spark
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
But if your data do not belong to any of these "enriched" formats, you can always read those files as RDD and convert them to DF.
Follows some examples.
Import necessary classes
import org.apache.spark.sql.{Row,SparkSession} import org.apache.spark.sql.types.{DoubleType,StringType,StructField,StructType}
Create SparkSession
Object, Here it's spark
val spark:SparkSession=SparkSession.builder.master("local").getOrCreate val sc = spark.sparkContext // Just used to create test RDDs
Let's an RDD
to make it DataFrame
val rdd = sc.parallelize(Seq(("first",Array(2.0,1.0,2.1,5.4)),("test",Array(1.5,0.5,0.9,3.7)),("choose",Array(8.0,2.9,9.1,2.5))))
Using SparkSession.createDataFrame(RDD obj)
.
val dfWithoutSchema = spark.createDataFrame(rdd) dfWithoutSchema.show() +------+--------------------+ | _1| _2| +------+--------------------+ | first|[2.0,1.0,2.1,5.4] | | test|[1.5,0.5,0.9,3.7] | |choose|[8.0,2.9,9.1,2.5] | +------+--------------------+
Using SparkSession.createDataFrame(RDD obj)
and specifying column names.
val dfWithSchema = spark.createDataFrame(rdd).toDF("id","vals") dfWithSchema.show() +------+--------------------+ | id| vals| +------+--------------------+ | first|[2.0,1.0,2.1,5.4] | | test|[1.5,0.5,0.9,3.7] | |choose|[8.0,2.9,9.1,2.5] | +------+--------------------+
This way requires the input rdd
should be of type RDD[Row]
.
val rowsRdd: RDD[Row]= sc.parallelize(Seq(Row("first",2.0,7.0),Row("second",3.5,2.5),Row("third",7.0,5.9)))
create the schema
val schema =newStructType().add(StructField("id",StringType,true)).add(StructField("val1",DoubleType,true)).add(StructField("val2",DoubleType,true))
Now apply both rowsRdd
and schema
to createDataFrame()
val df = spark.createDataFrame(rowsRdd, schema) df.show() +------+----+----+ | id|val1|val2| +------+----+----+ | first|2.0|7.0 | |second|3.5|2.5 | | third|7.0|5.9 | +------+----+----+