I am joining two datasets , first one coming
from stream and second one which is in HDFS.
After joining the two datasets , I need to apply
filter on the joined datasets, but here I am facing as issue. Please assist
to resolve.
I am using the code below,
val streamkv = streamrecs.map(_.split("~")).map(r => ( r(0), (r(5), r(6))))
val HDFSlines = sc.textFile("/user/Rest/sample.dat").map(_.split("~")).map(r
=> ( r(1), (r(0) r(3),r(4),)))
val streamwindow = streamkv.window(Minutes(1))
val join1 = streamwindow.transform(joinRDD => { joinRDD.join(HDFSlines)} )
I am getting the following error, when I use the filter
val tofilter = join1.filter {
| case (_, (_, _),(_,_,device)) =>
| device.contains("iPhone")
| }.count()
error: constructor cannot be instantiated to expected type;
found : (T1, T2, T3)
required: (String, ((String, String), (String, String, String)))
case (_, (_, _),(_,_,device)) =>
How can I solve this error?.