Member since
09-21-2016
5
Posts
0
Kudos Received
0
Solutions
09-28-2016
06:43 AM
Hi @hubbarja, I have achieved what i want partially. Following is a code sample, def functionToCreateContext(): sc = SparkContext("local[*]", "dedup") ssc = StreamingContext(sc, 60) messagesample = ssc.textFileStream("input") ssc.checkpoint("dedup_data_checkpoint") message_id_hash = messagesamplelines.map(lambda line: line.split("^")).reduceByKey(lambda x,y:(x,y)) RDD = message_id_hash.updateStateByKey(updateFunction) RDD.join(messagesamples).filter(isDuplicate).map(deuplicateRecords)\ .saveAsTextFiles('output.txt') return ssc It is working fine for me. Only problem is. It is creating files for each and every timestamp. I am trying to fix it.
... View more