Accepted Solution

Using filter in joined dataset in spark ?

[ Edited ]

I am joining two datasets , first one coming 
from stream and second one which is in HDFS. 

 After joining the two datasets , I need to apply 
filter on the joined datasets, but here I am facing as issue. Please assist 
to resolve. 

I am using the code below, 

val streamkv ="~")).map(r => ( r(0), (r(5), r(6)))) 
val HDFSlines = sc.textFile("/user/Rest/sample.dat").map(_.split("~")).map(r 
=> ( r(1), (r(0) r(3),r(4),))) 
val streamwindow = streamkv.window(Minutes(1)) 



val join1 = streamwindow.transform(joinRDD => { joinRDD.join(HDFSlines)} ) 


I am getting the following error, when I use the filter 

val tofilter = join1.filter { 
 | case (_, (_, _),(_,_,device)) => 
 | device.contains("iPhone") 
 | }.count() 


 error: constructor cannot be instantiated to expected type; 
 found   : (T1, T2, T3) 
 required: (String, ((String, String), (String, String, String))) 
       case (_, (_, _),(_,_,device)) => 



How can I solve this error?.



Re: Using filter in joined dataset in spark ?

Your signature is just a little bit off. The result of a join is not a triple, but a tuple whose second element is a tuple.


You have:


(_, (_, _),(_,_,device))


but I think you need:


(_, ((_, _),(_,_,device)))