Community Articles
Find and share helpful community-sourced technical articles
Labels (1)
Explorer

This Article will show how to read csv file which do not have header information as the first row. We will then specify the schema for both DataFrames and then join them together.

import org.apache.spark.sql.types._


val pathA = "hdfs:/tpc-ds/data/store_sales"
val pathB = "hdfs:/tpc-ds/data/store/"

// For Spark 2.x use -> val df = spark.read.option("header", true).csv(path)

val A_df = sqlContext.read.format("com.databricks.spark.csv")
  .option("header","false")
  .option("inferSchema","false")
  .option("delimiter","|")
  .load(pathA)


// Assign column names to the Store Sales dataframe
val storeSalesDF = A_df.select(
    A_df("_c0").cast(IntegerType).as("SS_SOLD_DATE_SK"),
    A_df("_c1").cast(IntegerType).as("SS_SOLD_TIME_SK"),
    A_df("_c2").cast(IntegerType).as("SS_ITEM_SK"),
    A_df("_c7").cast(IntegerType).as("SS_STORE_SK")
    )


val B_df = sqlContext.read.format("com.databricks.spark.csv")
  .option("header","false")
  .option("inferSchema","false")
  .option("delimiter","|")
  .load(pathB)


// Assign column names to the Region dataframe
val storeDF = B_df.select(
    B_df("_c0").cast(IntegerType).as("S_STORE_SK"),
    B_df("_c1").cast(StringType).as("S_STORE_ID")
    B_df("_c5").cast(StringType).as("S_STORE_NAME")
    )

val joinedDF = storeSalesDF.join(storeDF,
    storeSalesDF("SS_STORE_SK") === storeDF("S_STORE_SK")
)

joinedDF.take(5)

 

15,988 Views
0 Kudos
Comments
Contributor

hi Muji,

Great job :)

just missing a ',' after : B_df("_c1").cast(StringType).as("S_STORE_ID")

// Assign column names to the Region dataframe val storeDF = B_df.select(

B_df("_c0").cast(IntegerType).as("S_STORE_SK"),

B_df("_c1").cast(StringType).as("S_STORE_ID"),

B_df("_c5").cast(StringType).as("S_STORE_NAME")

)

Don't have an account?
Version history
Revision #:
3 of 3
Last update:
‎02-11-2020 09:29 PM
Updated by:
 
Contributors
Top Kudoed Authors