<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question spark stream based deduplication in Archives of Support Questions (Read Only)</title>
    <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/spark-stream-based-deduplication/m-p/45378#M41313</link>
    <description>&lt;P&gt;&lt;SPAN&gt;I want to do hash based comparison to find duplicate records. Record which i receive from stream will have hashid,recordid field in it. &lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;1. I want to have all the historic records (hashid, recordid --&amp;gt; key,value) in memory RDD &lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;2. When a new record is received in spark DStream RDD i want to compare against the historic records (hash, recordid) &lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;3. also add the new records into existing historic records (hashid, recordid --&amp;gt; key,value) in memory RDD &lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;My thoughts:&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;SPAN&gt;1. join the time based RDD and cache them in memory (historic lookup) &lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;2. compare the new RDD comes, foreach record compare againt the historic lookup &lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;What I have done:&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;SPAN&gt;1. I have created a stream line and able to consume the records. &lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;2. But i am not sure how to store it in memory &lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;I have the following questions:&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;SPAN&gt;1. How can i achieve this or workaround ? &lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;2. Can i do this using MLib? or spark stream fits for my usecase ?&lt;/SPAN&gt;&lt;/P&gt;</description>
    <pubDate>Fri, 16 Sep 2022 10:40:23 GMT</pubDate>
    <dc:creator>backtrack5</dc:creator>
    <dc:date>2022-09-16T10:40:23Z</dc:date>
    <item>
      <title>spark stream based deduplication</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/spark-stream-based-deduplication/m-p/45378#M41313</link>
      <description>&lt;P&gt;&lt;SPAN&gt;I want to do hash based comparison to find duplicate records. Record which i receive from stream will have hashid,recordid field in it. &lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;1. I want to have all the historic records (hashid, recordid --&amp;gt; key,value) in memory RDD &lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;2. When a new record is received in spark DStream RDD i want to compare against the historic records (hash, recordid) &lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;3. also add the new records into existing historic records (hashid, recordid --&amp;gt; key,value) in memory RDD &lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;My thoughts:&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;SPAN&gt;1. join the time based RDD and cache them in memory (historic lookup) &lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;2. compare the new RDD comes, foreach record compare againt the historic lookup &lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;What I have done:&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;SPAN&gt;1. I have created a stream line and able to consume the records. &lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;2. But i am not sure how to store it in memory &lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;I have the following questions:&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;SPAN&gt;1. How can i achieve this or workaround ? &lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;2. Can i do this using MLib? or spark stream fits for my usecase ?&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 16 Sep 2022 10:40:23 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/spark-stream-based-deduplication/m-p/45378#M41313</guid>
      <dc:creator>backtrack5</dc:creator>
      <dc:date>2022-09-16T10:40:23Z</dc:date>
    </item>
    <item>
      <title>Re: spark stream based deduplication</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/spark-stream-based-deduplication/m-p/45480#M41314</link>
      <description>&lt;P&gt;What is your ultimate goal? &amp;nbsp;Are you trying to notify a process when a duplication is detected or are you trying to ensure the source system only has one record per id?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;If you are trying to detect duplications, have a look at updateStateByKey in Spark Streaming, you can define your key as the hashid and Spark will handle keeping historic state in memory for you. &amp;nbsp;&lt;A href="http://spark.apache.org/docs/latest/streaming-programming-guide.html#updatestatebykey-operation" target="_blank"&gt;http://spark.apache.org/docs/latest/streaming-programming-guide.html#updatestatebykey-operation&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;If you are just trying to ensure only one copy exists in your source system, look into the source handling this for you. &amp;nbsp;HBase or RDBMs can already ensure this with primary key. &amp;nbsp;With large datasets, holding into memory may not be possible, so having the source handle this may be a better alternative.&lt;/P&gt;</description>
      <pubDate>Fri, 23 Sep 2016 15:04:38 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/spark-stream-based-deduplication/m-p/45480#M41314</guid>
      <dc:creator>hubbarja</dc:creator>
      <dc:date>2016-09-23T15:04:38Z</dc:date>
    </item>
    <item>
      <title>Re: spark stream based deduplication</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/spark-stream-based-deduplication/m-p/45540#M41315</link>
      <description>Thank you &lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/16433"&gt;@hubbarja&lt;/a&gt; . I think for my scenario I have to go with state full stream. What I want to achieve is , say for an example&lt;BR /&gt;&lt;BR /&gt;My historic rdd has&lt;BR /&gt;(Hash1, recordid1)&lt;BR /&gt;(Hash2,recordid2)&lt;BR /&gt;&lt;BR /&gt;And in the new steam I have the following,&lt;BR /&gt;(Hash3, recordid3)&lt;BR /&gt;(Hash1,recordid5)&lt;BR /&gt;&lt;BR /&gt;In this above scenario,&lt;BR /&gt;1) for recordid5,I should get recordid5 is duplicate of recordid1.&lt;BR /&gt;2) the new values (hash3,recordid3) should added in the historic rdd.&lt;BR /&gt;&lt;BR /&gt;And I have one another question to ask,&lt;BR /&gt;If the problem crashes at any point, is it possible to recover that historic rdd ?</description>
      <pubDate>Sun, 25 Sep 2016 04:31:12 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/spark-stream-based-deduplication/m-p/45540#M41315</guid>
      <dc:creator>backtrack5</dc:creator>
      <dc:date>2016-09-25T04:31:12Z</dc:date>
    </item>
    <item>
      <title>Re: spark stream based deduplication</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/spark-stream-based-deduplication/m-p/45583#M41316</link>
      <description>&lt;P&gt;Sounds like you should be able to use updateStateByKey for your scenerio, in newer versions of spark there are other stateful operators you may use as well that may give you better performance (mapWithState).&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;When using the stateful operations like updateStateByKey, you will define a checkpoint that will save the state for you. &amp;nbsp;If a failure occurs, spark will recover the state from the checkpoint dir and rerun any records needed to catch up.&lt;/P&gt;</description>
      <pubDate>Mon, 26 Sep 2016 14:06:56 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/spark-stream-based-deduplication/m-p/45583#M41316</guid>
      <dc:creator>hubbarja</dc:creator>
      <dc:date>2016-09-26T14:06:56Z</dc:date>
    </item>
    <item>
      <title>Re: spark stream based deduplication</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/spark-stream-based-deduplication/m-p/45585#M41317</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/16433"&gt;@hubbarja﻿&lt;/a&gt;, Yes and thank you for the fast response.&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;What i have done is (this is just for teh wordcount) , Still exploring &lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;&lt;SPAN&gt;def &lt;/SPAN&gt;functionToCreateContext():&lt;BR /&gt;    sc = SparkContext(&lt;SPAN&gt;"local[*]"&lt;/SPAN&gt;, &lt;SPAN&gt;"WordCount"&lt;/SPAN&gt;)&lt;BR /&gt;    ssc = StreamingContext(sc, &lt;SPAN&gt;30&lt;/SPAN&gt;)&lt;BR /&gt;    lines = ssc.textFileStream(&lt;SPAN&gt;"E:&lt;/SPAN&gt;&lt;SPAN&gt;\\&lt;/SPAN&gt;&lt;SPAN&gt;Work&lt;/SPAN&gt;&lt;SPAN&gt;\\&lt;/SPAN&gt;&lt;SPAN&gt;Python1&lt;/SPAN&gt;&lt;SPAN&gt;\\&lt;/SPAN&gt;&lt;SPAN&gt;work&lt;/SPAN&gt;&lt;SPAN&gt;\\&lt;/SPAN&gt;&lt;SPAN&gt;spark&lt;/SPAN&gt;&lt;SPAN&gt;\\&lt;/SPAN&gt;&lt;SPAN&gt;sampledata&lt;/SPAN&gt;&lt;SPAN&gt;\\&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;)&lt;BR /&gt;    ssc.checkpoint(&lt;SPAN&gt;"checkpoint"&lt;/SPAN&gt;)  &lt;SPAN&gt;# checkpoint &lt;BR /&gt;&lt;/SPAN&gt;    initialStateRDD = sc.parallelize([(&lt;SPAN&gt;u'na'&lt;/SPAN&gt;, &lt;SPAN&gt;0&lt;/SPAN&gt;)])&lt;BR /&gt;    &lt;SPAN&gt;files &lt;/SPAN&gt;= lines.foreachRDD(fileName)&lt;BR /&gt;    words = lines.map(&lt;SPAN&gt;lambda &lt;/SPAN&gt;line: createdict(line.split(&lt;SPAN&gt;"^"&lt;/SPAN&gt;)))&lt;SPAN&gt;&lt;BR /&gt;&lt;/SPAN&gt;    wordCounts = words.updateStateByKey(updateFunction,&lt;SPAN&gt;initialRDD&lt;/SPAN&gt;=initialStateRDD)&lt;SPAN&gt;&lt;BR /&gt;&lt;/SPAN&gt;    wordCounts.pprint()&lt;BR /&gt;&lt;BR /&gt;    &lt;SPAN&gt;return &lt;/SPAN&gt;ssc&lt;BR /&gt;&lt;BR /&gt;context = StreamingContext.getOrCreate(&lt;SPAN&gt;"checkpoint"&lt;/SPAN&gt;, functionToCreateContext)&amp;nbsp;&amp;nbsp;&amp;nbsp;&lt;/PRE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I am using Pyspark, for easy prototype.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I am using Spark 2.0 but when i tried "&lt;SPAN&gt;mapWithState&lt;/SPAN&gt;" i got an error.&amp;nbsp;&lt;SPAN&gt;"AttributeError: 'TransformedDStream' object has no attribute 'mapWithState'"&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 26 Sep 2016 14:13:13 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/spark-stream-based-deduplication/m-p/45585#M41317</guid>
      <dc:creator>backtrack5</dc:creator>
      <dc:date>2016-09-26T14:13:13Z</dc:date>
    </item>
    <item>
      <title>Re: spark stream based deduplication</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/spark-stream-based-deduplication/m-p/45599#M41318</link>
      <description>&lt;P&gt;Unfortunately mapWithState is only available in Java and Scala right now, it hasn't been ported to python yet. &amp;nbsp;Please follow&amp;nbsp;&lt;A href="https://issues.apache.org/jira/browse/SPARK-16854" target="_blank"&gt;https://issues.apache.org/jira/browse/SPARK-16854&lt;/A&gt; if you are interested in knowing when this will be worked on and +1 the jira to know you are interested in using this within python.&lt;/P&gt;</description>
      <pubDate>Mon, 26 Sep 2016 16:08:49 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/spark-stream-based-deduplication/m-p/45599#M41318</guid>
      <dc:creator>hubbarja</dc:creator>
      <dc:date>2016-09-26T16:08:49Z</dc:date>
    </item>
    <item>
      <title>Re: spark stream based deduplication</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/spark-stream-based-deduplication/m-p/45636#M41319</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/16433"&gt;@hubbarja﻿&lt;/a&gt;, I got into a problem.&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;ssc.checkpoint(&lt;SPAN&gt;"checkpoint"&lt;/SPAN&gt;)&lt;BR /&gt;Rdd = sc.parallelize([(&lt;SPAN&gt;u'na'&lt;/SPAN&gt;, &lt;SPAN&gt;'0'&lt;/SPAN&gt;)])&lt;BR /&gt;lines.foreachRDD(fileName)&lt;BR /&gt;newRdd = lines.map(&lt;SPAN&gt;lambda &lt;/SPAN&gt;line: createdict(line.split(&lt;SPAN&gt;"^"&lt;/SPAN&gt;)))&lt;BR /&gt;&lt;SPAN&gt;#print(Rdd.lookup('na'))&lt;/SPAN&gt;&lt;SPAN&gt;&lt;BR /&gt;&lt;/SPAN&gt;Rdd = newRdd.updateStateByKey(updateFunction,&lt;SPAN&gt;initialRDD&lt;/SPAN&gt;=RDD)&lt;BR /&gt;Rdd.pprint()&lt;BR /&gt;&lt;SPAN&gt;return &lt;/SPAN&gt;ssc&lt;/PRE&gt;&lt;P&gt;&amp;nbsp;In my program "Rdd" has the historic lookup. &amp;nbsp;I am able to see the historic values like below&amp;nbsp;&lt;/P&gt;&lt;P&gt;('h1','rid1')&lt;/P&gt;&lt;P&gt;('h2','rid2')&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;i am not sure on how to do lookup match? I tried useing Join and leftouterjoin. But i did not work out properly for me ?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Can you help please help me to figure out ?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 27 Sep 2016 11:53:09 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/spark-stream-based-deduplication/m-p/45636#M41319</guid>
      <dc:creator>backtrack5</dc:creator>
      <dc:date>2016-09-27T11:53:09Z</dc:date>
    </item>
    <item>
      <title>Re: spark stream based deduplication</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/spark-stream-based-deduplication/m-p/45686#M41320</link>
      <description>&lt;P&gt;Are you trying to join your stateful rdd to another rdd or just trying to look up a specific record? &amp;nbsp;Depending on how large the dataset is and how many lookups, you could call collect to bring the rdd to the driver and do quick lookups there or call the filter on the rdd to filter only the records you want before calling collect to the driver. &amp;nbsp;If you are joining to another dataset, you will first need to map over the rdd and create a key value pair on both the history rdd and the other rdd you are joining to.&lt;/P&gt;</description>
      <pubDate>Wed, 28 Sep 2016 13:35:44 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/spark-stream-based-deduplication/m-p/45686#M41320</guid>
      <dc:creator>hubbarja</dc:creator>
      <dc:date>2016-09-28T13:35:44Z</dc:date>
    </item>
    <item>
      <title>Re: spark stream based deduplication</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/spark-stream-based-deduplication/m-p/45687#M41321</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/16433"&gt;@hubbarja﻿&lt;/a&gt;, &amp;nbsp;I have achieved what i want partially. Following is a code sample,&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;&lt;SPAN&gt;def &lt;/SPAN&gt;functionToCreateContext():&lt;BR /&gt;    sc = SparkContext(&lt;SPAN&gt;"local[*]"&lt;/SPAN&gt;, &lt;SPAN&gt;"dedup"&lt;/SPAN&gt;)&lt;BR /&gt;    ssc = StreamingContext(sc, &lt;SPAN&gt;60&lt;/SPAN&gt;)&lt;BR /&gt;    messagesample = ssc.textFileStream(&lt;SPAN&gt;"input&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;)&lt;BR /&gt;    ssc.checkpoint(&lt;SPAN&gt;"dedup_data_checkpoint"&lt;/SPAN&gt;)&lt;BR /&gt;    message_id_hash = messagesamplelines.map(&lt;SPAN&gt;lambda &lt;/SPAN&gt;line: line.split(&lt;SPAN&gt;"^"&lt;/SPAN&gt;)).reduceByKey(&lt;SPAN&gt;lambda &lt;/SPAN&gt;x,y:(x,y))&lt;BR /&gt;    RDD = message_id_hash.updateStateByKey(updateFunction)&lt;BR /&gt;    RDD.join(messagesamples).filter(isDuplicate).map(deuplicateRecords)\&lt;BR /&gt;        .saveAsTextFiles(&lt;SPAN&gt;'&lt;/SPAN&gt;&lt;SPAN&gt;output.txt'&lt;/SPAN&gt;)&lt;BR /&gt;    &lt;SPAN&gt;return &lt;/SPAN&gt;ssc&lt;/PRE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;It is working fine for me. Only problem is. It is creating files for each and every timestamp. I am trying to fix it.&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 28 Sep 2016 13:43:14 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/spark-stream-based-deduplication/m-p/45687#M41321</guid>
      <dc:creator>backtrack5</dc:creator>
      <dc:date>2016-09-28T13:43:14Z</dc:date>
    </item>
  </channel>
</rss>

