Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

spark stream based deduplication

avatar
Explorer

I want to do hash based comparison to find duplicate records. Record which i receive from stream will have hashid,recordid field in it.

1. I want to have all the historic records (hashid, recordid --> key,value) in memory RDD
2. When a new record is received in spark DStream RDD i want to compare against the historic records (hash, recordid)
3. also add the new records into existing historic records (hashid, recordid --> key,value) in memory RDD

My thoughts:


1. join the time based RDD and cache them in memory (historic lookup)
2. compare the new RDD comes, foreach record compare againt the historic lookup

What I have done:


1. I have created a stream line and able to consume the records.
2. But i am not sure how to store it in memory

I have the following questions:


1. How can i achieve this or workaround ?
2. Can i do this using MLib? or spark stream fits for my usecase ?

1 ACCEPTED SOLUTION

avatar
Expert Contributor

Sounds like you should be able to use updateStateByKey for your scenerio, in newer versions of spark there are other stateful operators you may use as well that may give you better performance (mapWithState).

 

When using the stateful operations like updateStateByKey, you will define a checkpoint that will save the state for you.  If a failure occurs, spark will recover the state from the checkpoint dir and rerun any records needed to catch up.

View solution in original post

8 REPLIES 8

avatar
Expert Contributor

What is your ultimate goal?  Are you trying to notify a process when a duplication is detected or are you trying to ensure the source system only has one record per id?

 

If you are trying to detect duplications, have a look at updateStateByKey in Spark Streaming, you can define your key as the hashid and Spark will handle keeping historic state in memory for you.  http://spark.apache.org/docs/latest/streaming-programming-guide.html#updatestatebykey-operation

 

If you are just trying to ensure only one copy exists in your source system, look into the source handling this for you.  HBase or RDBMs can already ensure this with primary key.  With large datasets, holding into memory may not be possible, so having the source handle this may be a better alternative.

avatar
Explorer
Thank you @hubbarja . I think for my scenario I have to go with state full stream. What I want to achieve is , say for an example

My historic rdd has
(Hash1, recordid1)
(Hash2,recordid2)

And in the new steam I have the following,
(Hash3, recordid3)
(Hash1,recordid5)

In this above scenario,
1) for recordid5,I should get recordid5 is duplicate of recordid1.
2) the new values (hash3,recordid3) should added in the historic rdd.

And I have one another question to ask,
If the problem crashes at any point, is it possible to recover that historic rdd ?

avatar
Expert Contributor

Sounds like you should be able to use updateStateByKey for your scenerio, in newer versions of spark there are other stateful operators you may use as well that may give you better performance (mapWithState).

 

When using the stateful operations like updateStateByKey, you will define a checkpoint that will save the state for you.  If a failure occurs, spark will recover the state from the checkpoint dir and rerun any records needed to catch up.

avatar
Explorer

Hi @hubbarja, Yes and thank you for the fast response. 

 

What i have done is (this is just for teh wordcount) , Still exploring 🙂 

 

def functionToCreateContext():
sc = SparkContext("local[*]", "WordCount")
ssc = StreamingContext(sc, 30)
lines = ssc.textFileStream("E:\\Work\\Python1\\work\\spark\\sampledata\\")
ssc.checkpoint("checkpoint") # checkpoint
initialStateRDD = sc.parallelize([(u'na', 0)])
files = lines.foreachRDD(fileName)
words = lines.map(lambda line: createdict(line.split("^")))
wordCounts = words.updateStateByKey(updateFunction,initialRDD=initialStateRDD)
wordCounts.pprint()

return ssc

context = StreamingContext.getOrCreate("checkpoint", functionToCreateContext)   

 

I am using Pyspark, for easy prototype.

 

I am using Spark 2.0 but when i tried "mapWithState" i got an error. "AttributeError: 'TransformedDStream' object has no attribute 'mapWithState'"

 

 

avatar
Expert Contributor

Unfortunately mapWithState is only available in Java and Scala right now, it hasn't been ported to python yet.  Please follow https://issues.apache.org/jira/browse/SPARK-16854 if you are interested in knowing when this will be worked on and +1 the jira to know you are interested in using this within python.

avatar
Explorer

Hi @hubbarja, I got into a problem. 

 

ssc.checkpoint("checkpoint")
Rdd = sc.parallelize([(u'na', '0')])
lines.foreachRDD(fileName)
newRdd = lines.map(lambda line: createdict(line.split("^")))
#print(Rdd.lookup('na'))
Rdd = newRdd.updateStateByKey(updateFunction,initialRDD=RDD)
Rdd.pprint()
return ssc

 In my program "Rdd" has the historic lookup.  I am able to see the historic values like below 

('h1','rid1')

('h2','rid2')

 

i am not sure on how to do lookup match? I tried useing Join and leftouterjoin. But i did not work out properly for me ?

 

Can you help please help me to figure out ?

 

 

 

avatar
Expert Contributor

Are you trying to join your stateful rdd to another rdd or just trying to look up a specific record?  Depending on how large the dataset is and how many lookups, you could call collect to bring the rdd to the driver and do quick lookups there or call the filter on the rdd to filter only the records you want before calling collect to the driver.  If you are joining to another dataset, you will first need to map over the rdd and create a key value pair on both the history rdd and the other rdd you are joining to.

avatar
Explorer

Hi @hubbarja,  I have achieved what i want partially. Following is a code sample,

 

def functionToCreateContext():
sc = SparkContext("local[*]", "dedup")
ssc = StreamingContext(sc, 60)
messagesample = ssc.textFileStream("input")
ssc.checkpoint("dedup_data_checkpoint")
message_id_hash = messagesamplelines.map(lambda line: line.split("^")).reduceByKey(lambda x,y:(x,y))
RDD = message_id_hash.updateStateByKey(updateFunction)
RDD.join(messagesamples).filter(isDuplicate).map(deuplicateRecords)\
.saveAsTextFiles('output.txt')
return ssc

 

It is working fine for me. Only problem is. It is creating files for each and every timestamp. I am trying to fix it.