Reply
Explorer
Posts: 62
Registered: ‎01-22-2014
Accepted Solution

Using filter in joined dataset in spark ?

[ Edited ]

I am joining two datasets , first one coming 
from stream and second one which is in HDFS. 

 After joining the two datasets , I need to apply 
filter on the joined datasets, but here I am facing as issue. Please assist 
to resolve. 

I am using the code below, 

val streamkv = streamrecs.map(_.split("~")).map(r => ( r(0), (r(5), r(6)))) 
val HDFSlines = sc.textFile("/user/Rest/sample.dat").map(_.split("~")).map(r 
=> ( r(1), (r(0) r(3),r(4),))) 
val streamwindow = streamkv.window(Minutes(1)) 

 

 

val join1 = streamwindow.transform(joinRDD => { joinRDD.join(HDFSlines)} ) 

 



I am getting the following error, when I use the filter 

val tofilter = join1.filter { 
 | case (_, (_, _),(_,_,device)) => 
 | device.contains("iPhone") 
 | }.count() 

 



 error: constructor cannot be instantiated to expected type; 
 found   : (T1, T2, T3) 
 required: (String, ((String, String), (String, String, String))) 
       case (_, (_, _),(_,_,device)) => 

 

 

How can I solve this error?.

 

 

Highlighted
Cloudera Employee
Posts: 366
Registered: ‎07-29-2013

Re: Using filter in joined dataset in spark ?

Your signature is just a little bit off. The result of a join is not a triple, but a tuple whose second element is a tuple.

 

You have:

 

(_, (_, _),(_,_,device))

 

but I think you need:

 

(_, ((_, _),(_,_,device)))
Announcements