Created on 09-21-2016 09:00 AM - edited 09-16-2022 03:40 AM
I want to do hash based comparison to find duplicate records. Record which i receive from stream will have hashid,recordid field in it.
1. I want to have all the historic records (hashid, recordid --> key,value) in memory RDD
2. When a new record is received in spark DStream RDD i want to compare against the historic records (hash, recordid)
3. also add the new records into existing historic records (hashid, recordid --> key,value) in memory RDD
My thoughts:
1. join the time based RDD and cache them in memory (historic lookup)
2. compare the new RDD comes, foreach record compare againt the historic lookup
What I have done:
1. I have created a stream line and able to consume the records.
2. But i am not sure how to store it in memory
I have the following questions:
1. How can i achieve this or workaround ?
2. Can i do this using MLib? or spark stream fits for my usecase ?
Created 09-26-2016 07:06 AM
Sounds like you should be able to use updateStateByKey for your scenerio, in newer versions of spark there are other stateful operators you may use as well that may give you better performance (mapWithState).
When using the stateful operations like updateStateByKey, you will define a checkpoint that will save the state for you. If a failure occurs, spark will recover the state from the checkpoint dir and rerun any records needed to catch up.
Created 09-23-2016 08:04 AM
What is your ultimate goal? Are you trying to notify a process when a duplication is detected or are you trying to ensure the source system only has one record per id?
If you are trying to detect duplications, have a look at updateStateByKey in Spark Streaming, you can define your key as the hashid and Spark will handle keeping historic state in memory for you. http://spark.apache.org/docs/latest/streaming-programming-guide.html#updatestatebykey-operation
If you are just trying to ensure only one copy exists in your source system, look into the source handling this for you. HBase or RDBMs can already ensure this with primary key. With large datasets, holding into memory may not be possible, so having the source handle this may be a better alternative.
Created 09-24-2016 09:31 PM
Created 09-26-2016 07:06 AM
Sounds like you should be able to use updateStateByKey for your scenerio, in newer versions of spark there are other stateful operators you may use as well that may give you better performance (mapWithState).
When using the stateful operations like updateStateByKey, you will define a checkpoint that will save the state for you. If a failure occurs, spark will recover the state from the checkpoint dir and rerun any records needed to catch up.
Created 09-26-2016 07:13 AM
Hi @hubbarja, Yes and thank you for the fast response.
What i have done is (this is just for teh wordcount) , Still exploring 🙂
def functionToCreateContext():
sc = SparkContext("local[*]", "WordCount")
ssc = StreamingContext(sc, 30)
lines = ssc.textFileStream("E:\\Work\\Python1\\work\\spark\\sampledata\\")
ssc.checkpoint("checkpoint") # checkpoint
initialStateRDD = sc.parallelize([(u'na', 0)])
files = lines.foreachRDD(fileName)
words = lines.map(lambda line: createdict(line.split("^")))
wordCounts = words.updateStateByKey(updateFunction,initialRDD=initialStateRDD)
wordCounts.pprint()
return ssc
context = StreamingContext.getOrCreate("checkpoint", functionToCreateContext)
I am using Pyspark, for easy prototype.
I am using Spark 2.0 but when i tried "mapWithState" i got an error. "AttributeError: 'TransformedDStream' object has no attribute 'mapWithState'"
Created 09-26-2016 09:08 AM
Unfortunately mapWithState is only available in Java and Scala right now, it hasn't been ported to python yet. Please follow https://issues.apache.org/jira/browse/SPARK-16854 if you are interested in knowing when this will be worked on and +1 the jira to know you are interested in using this within python.
Created 09-27-2016 04:53 AM
Hi @hubbarja, I got into a problem.
ssc.checkpoint("checkpoint")
Rdd = sc.parallelize([(u'na', '0')])
lines.foreachRDD(fileName)
newRdd = lines.map(lambda line: createdict(line.split("^")))
#print(Rdd.lookup('na'))
Rdd = newRdd.updateStateByKey(updateFunction,initialRDD=RDD)
Rdd.pprint()
return ssc
In my program "Rdd" has the historic lookup. I am able to see the historic values like below
('h1','rid1')
('h2','rid2')
i am not sure on how to do lookup match? I tried useing Join and leftouterjoin. But i did not work out properly for me ?
Can you help please help me to figure out ?
Created 09-28-2016 06:35 AM
Are you trying to join your stateful rdd to another rdd or just trying to look up a specific record? Depending on how large the dataset is and how many lookups, you could call collect to bring the rdd to the driver and do quick lookups there or call the filter on the rdd to filter only the records you want before calling collect to the driver. If you are joining to another dataset, you will first need to map over the rdd and create a key value pair on both the history rdd and the other rdd you are joining to.
Created 09-28-2016 06:43 AM
Hi @hubbarja, I have achieved what i want partially. Following is a code sample,
def functionToCreateContext():
sc = SparkContext("local[*]", "dedup")
ssc = StreamingContext(sc, 60)
messagesample = ssc.textFileStream("input")
ssc.checkpoint("dedup_data_checkpoint")
message_id_hash = messagesamplelines.map(lambda line: line.split("^")).reduceByKey(lambda x,y:(x,y))
RDD = message_id_hash.updateStateByKey(updateFunction)
RDD.join(messagesamples).filter(isDuplicate).map(deuplicateRecords)\
.saveAsTextFiles('output.txt')
return ssc
It is working fine for me. Only problem is. It is creating files for each and every timestamp. I am trying to fix it.