Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark - Scala - Join RDDS (csv) files

Spark - Scala - Join RDDS (csv) files

New Contributor

Hello Guys!!!

 

I'm coming in and learning scala, as I am in the initial steps, i need to know how to join in two fields like a relational database.

Example: 

 

Table 1 ( csv )

Columns zip type primary_city acceptable_cities unacceptable_cities

 

Example: 

 

Table 2 ( csv )

Columns GEO.id GEO.id2 GEO.display-label VD01

 

Question:

I want to join Column1 (zip type)Table1 with Column2(GEO.id2)Table2.

 

I am currently facing two problems:

 

import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._


######### Importar variaveis e criar o RDD

 

val sparkConf = new SparkConf().setAppName("JoinInScala")

 

scala> val sc = new SparkContext(sparkConf)
<console>:24: error: not found: type SparkContext

OBS: I already tried to import the classes but does not solve

 

val sc = new SparkContext(sparkConf)

 

val sqlContext = new SQLContext(sc)

 

sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true")

 

val testdata = sc.textFile("/user/training/zip_code_database.csv").map(_.split(","))

 

.map(p => Row(p(0), p(1), p(2)))

 

val fields = new Array[StructField](2)

 

fields(0) = StructField("zip", IntegerType, false);

 

fields(1) = StructField("type", StringType, false);

 

val schema = StructType(fields);

 

val data = sqlContext.applySchema(testdata, schema)

<console>:32: error: type mismatch;
found : org.apache.spark.rdd.RDD[Array[String]]
required: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
val data = sqlContext.applySchema(testdata, schema)

OBS: I tried to import specified classes but not solve

 

 

Can someone give me some help ??

 

Thanks!!!

2 REPLIES 2

Re: Spark - Scala - Join RDDS (csv) files

Master Collaborator
I think the compile errors do describe the problem directly. In the
first case, you did not "import org.apache.spark.SparkContext". In the
second, you need to pass an RDD of Row to the method, not an Array.
Have a look at the API docs.

Highlighted

Re: Spark - Scala - Join RDDS (csv) files

New Contributor

Hello srowen!!!

 

Thank you very much for your help!!!

 

That first error was a line spacing causing RDD problem.

 

I'm almost there, except that now im facing the "null pointer exception error"

 

import org.apache.spark.SparkContext
import au.com.bytecode.opencsv.CSVParser
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.SparkConf

 

######### Instanciar sparkConf e SQL Context para trabalhar com dados relacionais e tabulares

 

val sparkConf = new SparkConf().setAppName("JoinInScala")

val sc = new SparkContext(sparkConf)

val sqlContext = new SQLContext(sc)

sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true")


######## Importar o RDD da primeira tabela(csv) passando chave valor #################

 

val zcd = sc.textFile("/user/zeppelin/zip_code_database.csv").map(_.split(",")) .map(p => Row(p(0), p(1)))

 

######## Criação do array e transformação para dados tabulares

val fields = new Array[StructField](2)

fields(0) = StructField("zip", IntegerType, false);

fields(1) = StructField("type", StringType, false);

val schema = StructType(fields);

 

####### RDD por linhas e variável do array

 

val data = sqlContext.applySchema(zcd, schema)

data.registerTempTable("datatable")

sqlContext.cacheTable("datatable")


######## Importar o RDD da segunda tabela(csv) passando chave valor #################


val mincome = sc.textFile("/user/zeppelin/medianincome.csv").map(_.split(",")) .map(p => Row(p(0), p(1)))


######## Criação do array e transformação para dados tabulares


val reffields = new Array[StructField](2)

fields(0) = StructField("id1", IntegerType, false);

fields(1) = StructField("id2", StringType, false);

val minschema = StructType(reffields);


####### RDD por linhas e variável do array

 

val minschemardd = sqlContext.applySchema(mincome, minschema)

OBS: 

 

java.lang.NullPointerException
at org.apache.spark.sql.catalyst.types.StructType$$anonfun$toAttributes$1.apply(dataTypes.scala:549)

 

minschemardd.registerTempTable("min")

sqlContext.cacheTable("min")

 

########### QUERY ###################

val results = sqlContext.sql("SELECT * FROM datatable as d join ref on d.id=ref.id")

results.foreach(T => Unit);