Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Joining large data in spark streaming

Joining large data in spark streaming

New Contributor

We have a big customer table with 7 million records and we are trying to process some transaction data (500K messages per batch) coming from the kafka stream.

During the processing, we need to join the transaction data with customer data. This is currently taking us around 10s and the requirement is to bring it down to 5s. Since the customer table is too large, we cannot use broadcast join. Is there any other optimization that we can make?

== Parsed Logical Plan ==
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#119L]
+- Join Inner, Some((custId#110 = rowkey#0))
   :- Subquery custProfile
   :  +- Project [rowkey#0,no_mkt_opto_flag#5,thrd_party_ads_opto_flag#4]
   :     +- Filter ((no_mkt_opto_flag#5 = N) && (thrd_party_ads_opto_flag#4 = N))
   :        +- Subquery jz_view_sub_cust_profile
   :           +- Project [rowkey#0,thrd_party_ads_opto_flag#4,no_mkt_opto_flag#5]
   :              +- MetastoreRelation db_localhost, ext_sub_cust_profile, None
   +- LogicalRDD [key#109,custId#110,mktOptOutFlag#117,thirdPartyOptOutFlag#118], MapPartitionsRDD[190] at rddToDataFrameHolder at custStream.scala:166

== Analyzed Logical Plan ==
count: bigint
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#119L]
+- Join Inner, Some((custId#110 = rowkey#0))
   :- Subquery custProfile
   :  +- Project [rowkey#0,no_mkt_opto_flag#5,thrd_party_ads_opto_flag#4]
   :     +- Filter ((no_mkt_opto_flag#5 = N) && (thrd_party_ads_opto_flag#4 = N))
   :        +- Subquery jz_view_sub_cust_profile
   :           +- Project [rowkey#0,thrd_party_ads_opto_flag#4,no_mkt_opto_flag#5]
   :              +- MetastoreRelation db_localhost, ext_sub_cust_profile, None
   +- LogicalRDD [key#109,custId#110,mktOptOutFlag#117,thirdPartyOptOutFlag#118], MapPartitionsRDD[190] at rddToDataFrameHolder at custStream.scala:166

== Optimized Logical Plan ==
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#119L]
+- Project
   +- Join Inner, Some((custId#110 = rowkey#0))
      :- Project [rowkey#0]
      :  +- Filter ((no_mkt_opto_flag#5 = N) && (thrd_party_ads_opto_flag#4 = N))
      :     +- MetastoreRelation db_localhost, ext_sub_cust_profile, None
      +- Project [custId#110]
         +- LogicalRDD [key#109,custId#110,mktOptOutFlag#117,thirdPartyOptOutFlag#118], MapPartitionsRDD[190] at rddToDataFrameHolder at custStream.scala:166

== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#119L])
+- TungstenExchange SinglePartition, None
   +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#122L])
      +- Project
         +- SortMergeJoin [rowkey#0], [custId#110]
            :- Sort [rowkey#0 ASC], false, 0
            :  +- TungstenExchange hashpartitioning(rowkey#0,200), None
            :     +- Project [rowkey#0]
            :        +- Filter ((no_mkt_opto_flag#5 = N) && (thrd_party_ads_opto_flag#4 = N))
            :           +- HiveTableScan [rowkey#0,no_mkt_opto_flag#5,thrd_party_ads_opto_flag#4], MetastoreRelation db_localhost, ext_sub_cust_profile, None
            +- Sort [custId#110 ASC], false, 0
               +- TungstenExchange hashpartitioning(custId#110,200), None
                  +- Project [custId#110]
                     +- Scan ExistingRDD[key#109,custId#110,mktOptOutFlag#117,thirdPartyOptOutFlag#118]
Don't have an account?
Coming from Hortonworks? Activate your account here