- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Created on
07-14-2017
01:55 AM
- edited on
02-11-2020
09:29 PM
by
VidyaSargur
This Article will show how to read csv file which do not have header information as the first row. We will then specify the schema for both DataFrames and then join them together.
import org.apache.spark.sql.types._ val pathA = "hdfs:/tpc-ds/data/store_sales" val pathB = "hdfs:/tpc-ds/data/store/" // For Spark 2.x use -> val df = spark.read.option("header", true).csv(path) val A_df = sqlContext.read.format("com.databricks.spark.csv") .option("header","false") .option("inferSchema","false") .option("delimiter","|") .load(pathA) // Assign column names to the Store Sales dataframe val storeSalesDF = A_df.select( A_df("_c0").cast(IntegerType).as("SS_SOLD_DATE_SK"), A_df("_c1").cast(IntegerType).as("SS_SOLD_TIME_SK"), A_df("_c2").cast(IntegerType).as("SS_ITEM_SK"), A_df("_c7").cast(IntegerType).as("SS_STORE_SK") ) val B_df = sqlContext.read.format("com.databricks.spark.csv") .option("header","false") .option("inferSchema","false") .option("delimiter","|") .load(pathB) // Assign column names to the Region dataframe val storeDF = B_df.select( B_df("_c0").cast(IntegerType).as("S_STORE_SK"), B_df("_c1").cast(StringType).as("S_STORE_ID") B_df("_c5").cast(StringType).as("S_STORE_NAME") ) val joinedDF = storeSalesDF.join(storeDF, storeSalesDF("SS_STORE_SK") === storeDF("S_STORE_SK") ) joinedDF.take(5)
Created on 12-20-2018 01:36 PM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
hi Muji,
Great job 🙂
just missing a ',' after : B_df("_c1").cast(StringType).as("S_STORE_ID")
// Assign column names to the Region dataframe val storeDF = B_df.select(
B_df("_c0").cast(IntegerType).as("S_STORE_SK"),
B_df("_c1").cast(StringType).as("S_STORE_ID"),
B_df("_c5").cast(StringType).as("S_STORE_NAME")
)