Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar
Rising Star

This Article will show how to read csv file which do not have header information as the first row. We will then specify the schema for both DataFrames and then join them together.

import org.apache.spark.sql.types._


val pathA = "hdfs:/tpc-ds/data/store_sales"
val pathB = "hdfs:/tpc-ds/data/store/"

// For Spark 2.x use -> val df = spark.read.option("header", true).csv(path)

val A_df = sqlContext.read.format("com.databricks.spark.csv")
  .option("header","false")
  .option("inferSchema","false")
  .option("delimiter","|")
  .load(pathA)


// Assign column names to the Store Sales dataframe
val storeSalesDF = A_df.select(
    A_df("_c0").cast(IntegerType).as("SS_SOLD_DATE_SK"),
    A_df("_c1").cast(IntegerType).as("SS_SOLD_TIME_SK"),
    A_df("_c2").cast(IntegerType).as("SS_ITEM_SK"),
    A_df("_c7").cast(IntegerType).as("SS_STORE_SK")
    )


val B_df = sqlContext.read.format("com.databricks.spark.csv")
  .option("header","false")
  .option("inferSchema","false")
  .option("delimiter","|")
  .load(pathB)


// Assign column names to the Region dataframe
val storeDF = B_df.select(
    B_df("_c0").cast(IntegerType).as("S_STORE_SK"),
    B_df("_c1").cast(StringType).as("S_STORE_ID")
    B_df("_c5").cast(StringType).as("S_STORE_NAME")
    )

val joinedDF = storeSalesDF.join(storeDF,
    storeSalesDF("SS_STORE_SK") === storeDF("S_STORE_SK")
)

joinedDF.take(5)

 

23,826 Views
0 Kudos
Comments

hi Muji,

Great job 🙂

just missing a ',' after : B_df("_c1").cast(StringType).as("S_STORE_ID")

// Assign column names to the Region dataframe val storeDF = B_df.select(

B_df("_c0").cast(IntegerType).as("S_STORE_SK"),

B_df("_c1").cast(StringType).as("S_STORE_ID"),

B_df("_c5").cast(StringType).as("S_STORE_NAME")

)