Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Spark Certification: When loading from HDFS to spark shell, Can we load to dataframe?

avatar

When writing certification and loading data from HDFS to spark, Can we load the data to data frame?

1 ACCEPTED SOLUTION

avatar

@archana v

You can read the data from HDFS directly to DF provided it's a data format with the embedded schema. For example.

Reading Avro in Spark - Refer this link

Reading Parquet in Spark

val parquetFileDF = spark.read.parquet("people.parquet")

Reading ORC in spark - Refer this link

Reading JSON in spark

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

But if your data do not belong to any of these "enriched" formats, you can always read those files as RDD and convert them to DF.

Follows some examples.

Import necessary classes

import org.apache.spark.sql.{Row,SparkSession}
import org.apache.spark.sql.types.{DoubleType,StringType,StructField,StructType}

Create SparkSession Object, Here it's spark

val spark:SparkSession=SparkSession.builder.master("local").getOrCreate
val sc = spark.sparkContext // Just used to create test RDDs

Let's an RDD to make it DataFrame

val rdd = sc.parallelize(Seq(("first",Array(2.0,1.0,2.1,5.4)),("test",Array(1.5,0.5,0.9,3.7)),("choose",Array(8.0,2.9,9.1,2.5))))

Method 1

Using SparkSession.createDataFrame(RDD obj).

val dfWithoutSchema = spark.createDataFrame(rdd)
dfWithoutSchema.show()

+------+--------------------+
|    _1|                  _2|
+------+--------------------+
| first|[2.0,1.0,2.1,5.4]   |
|  test|[1.5,0.5,0.9,3.7]   |
|choose|[8.0,2.9,9.1,2.5]   |
+------+--------------------+

Method 2

Using SparkSession.createDataFrame(RDD obj) and specifying column names.

val dfWithSchema = spark.createDataFrame(rdd).toDF("id","vals")
dfWithSchema.show()

+------+--------------------+
|    id|                vals|
+------+--------------------+
| first|[2.0,1.0,2.1,5.4]   |
|  test|[1.5,0.5,0.9,3.7]   |
|choose|[8.0,2.9,9.1,2.5]   |
+------+--------------------+

Method 3

This way requires the input rdd should be of type RDD[Row].

val rowsRdd: RDD[Row]= sc.parallelize(Seq(Row("first",2.0,7.0),Row("second",3.5,2.5),Row("third",7.0,5.9)))

create the schema

val schema =newStructType().add(StructField("id",StringType,true)).add(StructField("val1",DoubleType,true)).add(StructField("val2",DoubleType,true))

Now apply both rowsRdd and schema to createDataFrame()

val df = spark.createDataFrame(rowsRdd, schema)
df.show()

+------+----+----+
|    id|val1|val2|
+------+----+----+
| first|2.0|7.0  |
|second|3.5|2.5  |
| third|7.0|5.9  |
+------+----+----+

View solution in original post

1 REPLY 1

avatar

@archana v

You can read the data from HDFS directly to DF provided it's a data format with the embedded schema. For example.

Reading Avro in Spark - Refer this link

Reading Parquet in Spark

val parquetFileDF = spark.read.parquet("people.parquet")

Reading ORC in spark - Refer this link

Reading JSON in spark

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

But if your data do not belong to any of these "enriched" formats, you can always read those files as RDD and convert them to DF.

Follows some examples.

Import necessary classes

import org.apache.spark.sql.{Row,SparkSession}
import org.apache.spark.sql.types.{DoubleType,StringType,StructField,StructType}

Create SparkSession Object, Here it's spark

val spark:SparkSession=SparkSession.builder.master("local").getOrCreate
val sc = spark.sparkContext // Just used to create test RDDs

Let's an RDD to make it DataFrame

val rdd = sc.parallelize(Seq(("first",Array(2.0,1.0,2.1,5.4)),("test",Array(1.5,0.5,0.9,3.7)),("choose",Array(8.0,2.9,9.1,2.5))))

Method 1

Using SparkSession.createDataFrame(RDD obj).

val dfWithoutSchema = spark.createDataFrame(rdd)
dfWithoutSchema.show()

+------+--------------------+
|    _1|                  _2|
+------+--------------------+
| first|[2.0,1.0,2.1,5.4]   |
|  test|[1.5,0.5,0.9,3.7]   |
|choose|[8.0,2.9,9.1,2.5]   |
+------+--------------------+

Method 2

Using SparkSession.createDataFrame(RDD obj) and specifying column names.

val dfWithSchema = spark.createDataFrame(rdd).toDF("id","vals")
dfWithSchema.show()

+------+--------------------+
|    id|                vals|
+------+--------------------+
| first|[2.0,1.0,2.1,5.4]   |
|  test|[1.5,0.5,0.9,3.7]   |
|choose|[8.0,2.9,9.1,2.5]   |
+------+--------------------+

Method 3

This way requires the input rdd should be of type RDD[Row].

val rowsRdd: RDD[Row]= sc.parallelize(Seq(Row("first",2.0,7.0),Row("second",3.5,2.5),Row("third",7.0,5.9)))

create the schema

val schema =newStructType().add(StructField("id",StringType,true)).add(StructField("val1",DoubleType,true)).add(StructField("val2",DoubleType,true))

Now apply both rowsRdd and schema to createDataFrame()

val df = spark.createDataFrame(rowsRdd, schema)
df.show()

+------+----+----+
|    id|val1|val2|
+------+----+----+
| first|2.0|7.0  |
|second|3.5|2.5  |
| third|7.0|5.9  |
+------+----+----+