Member since
09-21-2016
5
Posts
0
Kudos Received
0
Solutions
09-28-2016
06:43 AM
Hi @hubbarja, I have achieved what i want partially. Following is a code sample, def functionToCreateContext(): sc = SparkContext("local[*]", "dedup") ssc = StreamingContext(sc, 60) messagesample = ssc.textFileStream("input") ssc.checkpoint("dedup_data_checkpoint") message_id_hash = messagesamplelines.map(lambda line: line.split("^")).reduceByKey(lambda x,y:(x,y)) RDD = message_id_hash.updateStateByKey(updateFunction) RDD.join(messagesamples).filter(isDuplicate).map(deuplicateRecords)\ .saveAsTextFiles('output.txt') return ssc It is working fine for me. Only problem is. It is creating files for each and every timestamp. I am trying to fix it.
... View more
09-27-2016
04:53 AM
Hi @hubbarja, I got into a problem. ssc.checkpoint("checkpoint") Rdd = sc.parallelize([(u'na', '0')]) lines.foreachRDD(fileName) newRdd = lines.map(lambda line: createdict(line.split("^"))) #print(Rdd.lookup('na')) Rdd = newRdd.updateStateByKey(updateFunction,initialRDD=RDD) Rdd.pprint() return ssc In my program "Rdd" has the historic lookup. I am able to see the historic values like below ('h1','rid1') ('h2','rid2') i am not sure on how to do lookup match? I tried useing Join and leftouterjoin. But i did not work out properly for me ? Can you help please help me to figure out ?
... View more
09-26-2016
07:13 AM
Hi @hubbarja, Yes and thank you for the fast response. What i have done is (this is just for teh wordcount) , Still exploring 🙂 def functionToCreateContext(): sc = SparkContext("local[*]", "WordCount") ssc = StreamingContext(sc, 30) lines = ssc.textFileStream("E:\\Work\\Python1\\work\\spark\\sampledata\\") ssc.checkpoint("checkpoint") # checkpoint initialStateRDD = sc.parallelize([(u'na', 0)]) files = lines.foreachRDD(fileName) words = lines.map(lambda line: createdict(line.split("^"))) wordCounts = words.updateStateByKey(updateFunction,initialRDD=initialStateRDD) wordCounts.pprint() return ssc context = StreamingContext.getOrCreate("checkpoint", functionToCreateContext) I am using Pyspark, for easy prototype. I am using Spark 2.0 but when i tried "mapWithState" i got an error. "AttributeError: 'TransformedDStream' object has no attribute 'mapWithState'"
... View more
09-24-2016
09:31 PM
Thank you @hubbarja . I think for my scenario I have to go with state full stream. What I want to achieve is , say for an example My historic rdd has (Hash1, recordid1) (Hash2,recordid2) And in the new steam I have the following, (Hash3, recordid3) (Hash1,recordid5) In this above scenario, 1) for recordid5,I should get recordid5 is duplicate of recordid1. 2) the new values (hash3,recordid3) should added in the historic rdd. And I have one another question to ask, If the problem crashes at any point, is it possible to recover that historic rdd ?
... View more
09-21-2016
09:00 AM
I want to do hash based comparison to find duplicate records. Record which i receive from stream will have hashid,recordid field in it. 1. I want to have all the historic records (hashid, recordid --> key,value) in memory RDD 2. When a new record is received in spark DStream RDD i want to compare against the historic records (hash, recordid) 3. also add the new records into existing historic records (hashid, recordid --> key,value) in memory RDD My thoughts: 1. join the time based RDD and cache them in memory (historic lookup) 2. compare the new RDD comes, foreach record compare againt the historic lookup What I have done: 1. I have created a stream line and able to consume the records. 2. But i am not sure how to store it in memory I have the following questions: 1. How can i achieve this or workaround ? 2. Can i do this using MLib? or spark stream fits for my usecase ?
... View more
Labels:
- Labels:
-
Apache Spark