Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
Explorer

This Article will show how to read csv file which do not have header information as the first row. We will then specify the schema for both DataFrames and then join them together.

import org.apache.spark.sql.types._


val pathA = "hdfs:/tpc-ds/data/store_sales"
val pathB = "hdfs:/tpc-ds/data/store/"

// For Spark 2.x use -> val df = spark.read.option("header", true).csv(path)

val A_df = sqlContext.read.format("com.databricks.spark.csv")
  .option("header","false")
  .option("inferSchema","false")
  .option("delimiter","|")
  .load(pathA)


// Assign column names to the Store Sales dataframe
val storeSalesDF = A_df.select(
    A_df("_c0").cast(IntegerType).as("SS_SOLD_DATE_SK"),
    A_df("_c1").cast(IntegerType).as("SS_SOLD_TIME_SK"),
    A_df("_c2").cast(IntegerType).as("SS_ITEM_SK"),
    A_df("_c7").cast(IntegerType).as("SS_STORE_SK")
    )


val B_df = sqlContext.read.format("com.databricks.spark.csv")
  .option("header","false")
  .option("inferSchema","false")
  .option("delimiter","|")
  .load(pathB)


// Assign column names to the Region dataframe
val storeDF = B_df.select(
    B_df("_c0").cast(IntegerType).as("S_STORE_SK"),
    B_df("_c1").cast(StringType).as("S_STORE_ID")
    B_df("_c5").cast(StringType).as("S_STORE_NAME")
    )

val joinedDF = storeSalesDF.join(storeDF,
    storeSalesDF("SS_STORE_SK") === storeDF("S_STORE_SK")
)

joinedDF.take(5)

 

22,026 Views
0 Kudos
Comments

hi Muji,

Great job 🙂

just missing a ',' after : B_df("_c1").cast(StringType).as("S_STORE_ID")

// Assign column names to the Region dataframe val storeDF = B_df.select(

B_df("_c0").cast(IntegerType).as("S_STORE_SK"),

B_df("_c1").cast(StringType).as("S_STORE_ID"),

B_df("_c5").cast(StringType).as("S_STORE_NAME")

)

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.
Version history
Last update:
‎02-11-2020 09:29 PM
Updated by:
Contributors
Top Kudoed Authors