Created on 07-14-201701:55 AM - edited on 02-11-202009:29 PM by VidyaSargur
This Article will show how to read csv file which do not have header information as the first row. We will then specify the schema for both DataFrames and then join them together.
import org.apache.spark.sql.types._
val pathA = "hdfs:/tpc-ds/data/store_sales"
val pathB = "hdfs:/tpc-ds/data/store/"
// For Spark 2.x use -> val df = spark.read.option("header", true).csv(path)
val A_df = sqlContext.read.format("com.databricks.spark.csv")
.option("header","false")
.option("inferSchema","false")
.option("delimiter","|")
.load(pathA)
// Assign column names to the Store Sales dataframe
val storeSalesDF = A_df.select(
A_df("_c0").cast(IntegerType).as("SS_SOLD_DATE_SK"),
A_df("_c1").cast(IntegerType).as("SS_SOLD_TIME_SK"),
A_df("_c2").cast(IntegerType).as("SS_ITEM_SK"),
A_df("_c7").cast(IntegerType).as("SS_STORE_SK")
)
val B_df = sqlContext.read.format("com.databricks.spark.csv")
.option("header","false")
.option("inferSchema","false")
.option("delimiter","|")
.load(pathB)
// Assign column names to the Region dataframe
val storeDF = B_df.select(
B_df("_c0").cast(IntegerType).as("S_STORE_SK"),
B_df("_c1").cast(StringType).as("S_STORE_ID")
B_df("_c5").cast(StringType).as("S_STORE_NAME")
)
val joinedDF = storeSalesDF.join(storeDF,
storeSalesDF("SS_STORE_SK") === storeDF("S_STORE_SK")
)
joinedDF.take(5)