Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Hive Mapjoin in Spark Hivecontext

Highlighted

Hive Mapjoin in Spark Hivecontext

New Contributor

I have using hivecontext with pyspark and trying to executing an HQL with 6 joins out of which 4 tables are really huge. Out of the 6 join, two joins are on really small reference tables (e.g. Country & State) and when I look at the DAG, it seems the data is being hash partitioned on the reference table keys too as those are the joining keys. Due to this re-partitioning the intermediate data set is skewed and the job is failing with an "Too large frame" as it going over 2GB. Tried passing the hints too to make the join with these two reference table as mapjoins but did not work. When i remove these two joins from the query, the job completes successfully. Is there anything I am missing here?

4 REPLIES 4

Re: Hive Mapjoin in Spark Hivecontext

@Gagandeep Singh

Spark Shuffle block can’t be greater to 2 GB (ByteBuffers max size), it appears that you are hitting this threshold.

By default the partitions will be set to 200. To increase the parallelism please set spark.sql.shuffle.partitions=1000 and re-run the job. If you still face the issue, please share the explain plan for the query.

Re: Hive Mapjoin in Spark Hivecontext

New Contributor

@Sindhu

Thank for your response. We tried running with 1000 as well as 2000 partitions but still getting the same issue. Below is the explain plan of the query and the hash partitioning that is in bold below is the root cause of this issue. We were trying broadcast and even to make the joins with these tables as mapjoin using hints but nothing really worked. If we remove these two joins, the application completes successfully.

Re: Hive Mapjoin in Spark Hivecontext

New Contributor

== Physical Plan ==

Project [xxxx_id#62 AS xxxx_id#2,tin#1 AS ssss#3,yyyy_id#0 AS yyyy_id#4,mmmm_zzzz_one#53 AS mmmm_zzzz_one#5,mmmm_zzzz_two#54 AS mmmm_zzzz_two#6,mmmm_zzzz_three#55 AS mmmm_zzzz_three#7,mmmm_zzzz_four#56 AS mmmm_zzzz_four#8,last_zzzz#57 AS last_zzzz#9,aaaa_zzzz_id#49 AS aaaa_zzzz_id#10,suffix_desc#59 AS suffix_desc#11,zzzz_usage_tp_cd#52 AS zzzz_usage_tp_cd#12,pppp_dt#31 AS pppp_dt#13,lst_xxxx#64 AS lst_xxxx#14,zzzzhip_type#99 AS zzzzhip_tp_cd#15,end_dt#61 AS aaaa_zzzz_end_dt#16,xxxx_state_tp_cd#107 AS xxxx_state_tp_cd#17,zzzz#108 AS state_of_ssss_type#18,xxxx_tp_cd#114 AS xxxx_tp_cd#19,zzzz#115 AS xxxx_of_llll_type#20,end_dt#101 AS xzzzzhip_end_dt#21,id_tp_cd#79 AS id_tp_cd#22]

+- SortMergeOuterJoin [xxxx_of_llll_type#42], [xxxx_tp_cd#114], LeftOuter, None

:- Sort [xxxx_of_llll_type#42 ASC], false, 0

: +- TungstenExchange hashpartitioning(xxxx_of_llll_type#42,200), None

: +- Project [end_dt#101,zzzz#108,mmmm_zzzz_one#53,tin#1,xxxx_state_tp_cd#107,mmmm_zzzz_two#54,xxxx_id#62,suffix_desc#59,xxxx_of_llll_type#42,zzzz_usage_tp_cd#52,yyyy_id#0,aaaa_zzzz_id#49,mmmm_zzzz_four#56,end_dt#61,lst_xxxx#64,pppp_dt#31,last_zzzz#57,mmmm_zzzz_three#55,id_tp_cd#79,zzzzhip_type#99]

: +- SortMergeOuterJoin [state_of_ssss_type#43], [xxxx_state_tp_cd#107], LeftOuter, None

: :- Sort [state_of_ssss_type#43 ASC], false, 0

: : +- TungstenExchange hashpartitioning(state_of_ssss_type#43,200), None

: : +- Project [end_dt#101,mmmm_zzzz_one#53,tin#1,mmmm_zzzz_two#54,xxxx_id#62,suffix_desc#59,xxxx_of_llll_type#42,zzzz_usage_tp_cd#52,yyyy_id#0,aaaa_zzzz_id#49,mmmm_zzzz_four#56,state_of_ssss_type#43,end_dt#61,lst_xxxx#64,pppp_dt#31,last_zzzz#57,mmmm_zzzz_three#55,id_tp_cd#79,zzzzhip_type#99]

: : +- SortMergeOuterJoin [xxxx_id#62], [xxxx_id#98], LeftOuter, None

: : :- Sort [xxxx_id#62 ASC], false, 0

: : : +- Project [mmmm_zzzz_one#53,tin#1,mmmm_zzzz_two#54,xxxx_id#62,suffix_desc#59,xxxx_of_llll_type#42,zzzz_usage_tp_cd#52,yyyy_id#0,aaaa_zzzz_id#49,mmmm_zzzz_four#56,state_of_ssss_type#43,end_dt#61,lst_xxxx#64,pppp_dt#31,last_zzzz#57,mmmm_zzzz_three#55,id_tp_cd#79]

: : : +- SortMergeJoin [xxxx_id#24], [xxxx_id#78]

: : : :- Project [mmmm_zzzz_one#53,mmmm_zzzz_two#54,xxxx_id#62,suffix_desc#59,xxxx_id#24,xxxx_of_llll_type#42,zzzz_usage_tp_cd#52,aaaa_zzzz_id#49,mmmm_zzzz_four#56,state_of_ssss_type#43,end_dt#61,lst_xxxx#64,pppp_dt#31,last_zzzz#57,mmmm_zzzz_three#55]

: : : : +- SortMergeJoin [xxxx_id#24], [xxxx_id#62]

: : : : :- Project [state_of_ssss_type#43,xxxx_of_llll_type#42,pppp_dt#31,xxxx_id#24]

: : : : : +- SortMergeOuterJoin [xxxx_id#24], [xxxx_id#41], LeftOuter, None

: : : : : :- Sort [xxxx_id#24 ASC], false, 0

: : : : : : +- TungstenExchange hashpartitioning(xxxx_id#24,200), None

: : : : : : +- ConvertToUnsafe

: : : : : : +- HiveTableScan [xxxx_id#24,pppp_dt#31], MetastoreRelation db, vw_aaaa, None

: : : : : +- Sort [xxxx_id#41 ASC], false, 0

: : : : : +- TungstenExchange hashpartitioning(xxxx_id#41,200), None

: : : : : +- ConvertToUnsafe

: : : : : +- HiveTableScan [xxxx_id#41,state_of_ssss_type#43,xxxx_of_llll_type#42], MetastoreRelation db, vw_xaaaa, None

: : : : +- Sort [xxxx_id#62 ASC], false, 0

: : : : +- TungstenExchange hashpartitioning(xxxx_id#62,200), None

: : : : +- ConvertToUnsafe

: : : : +- HiveTableScan [xxxx_id#62,mmmm_zzzz_one#53,mmmm_zzzz_two#54,mmmm_zzzz_three#55,mmmm_zzzz_four#56,last_zzzz#57,aaaa_zzzz_id#49,suffix_desc#59,zzzz_usage_tp_cd#52,lst_xxxx#64,end_dt#61], MetastoreRelation db, vw_aaaazzzz, None

: : : +- Sort [xxxx_id#78 ASC], false, 0

: : : +- TungstenExchange hashpartitioning(xxxx_id#78,200), None

: : : +- Project [CASE WHEN (cast(id_tp_cd#79 as double) = 2.0) THEN ref_num#80 ELSE null AS tin#1,id_tp_cd#79,xxxx_id#78,xx_id#0]

: : : +- Filter (id_tp_cd#79 = 100000)

: : : +- HiveTableScan [id_tp_cd#79,ref_num#80,xxxx_id#78], MetastoreRelation db, vw_identifier, None

: : +- Sort [xxxx_id#98 ASC], false, 0

: : +- TungstenExchange hashpartitioning(xxxx_id#98,200), None

: : +- ConvertToUnsafe

: : +- Filter (length(end_dt#101) = 0)

: : +- HiveTableScan [xxxx_id#98,zzzzhip_type#99,end_dt#101], MetastoreRelation db, vw_xzzzzhip, None

: +- Sort [xxxx_state_tp_cd#107 ASC], false, 0

: +- TungstenExchange hashpartitioning(xxxx_state_tp_cd#107,200), None

: +- ConvertToUnsafe

: +- HiveTableScan [xxxx_state_tp_cd#107,zzzz#108], MetastoreRelation db, vw_cdprovstatetp, None

+- Sort [xxxx_tp_cd#114 ASC], false, 0

+- TungstenExchange hashpartitioning(xxxx_tp_cd#114,200), None

+- ConvertToUnsafe

+- HiveTableScan [xxxx_tp_cd#114,zzzz#115], MetastoreRelation db, vw_cdxxxxtp, None

Re: Hive Mapjoin in Spark Hivecontext

Here you can find out one of the best text file which will help you to know about eCommerce Product Data Entry Services and its scope is increasing day by day so if you need to access so you may have a help of this portal and seize all information regarding it.

Don't have an account?
Coming from Hortonworks? Activate your account here