Support Questions

Find answers, ask questions, and share your expertise

Spark Java OOM error

avatar
New Contributor
import argparse
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
import logging
import json
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType,StructType, StructField, DateType
import pdb
from pyspark import StorageLevel
from group_dedupe.fuzzy_matching import register_udf
from group_dedupe.reason_code import household_level_deduplication_3,household_level_deduplication_4,household_level_deduplication_6,household_level_deduplication_7,household_level_deduplication_1,household_level_deduplication_9,household_level_deduplication_2
import time
import logging
from pyspark.sql import SparkSession


logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(levelname)s %(message)s',
                    handlers=[logging.FileHandler(r"C:\Users\C20061E\OneDrive - EXPERIAN SERVICES CORP\Desktop\All\Experian\tasks\f35\dev\group_dedupe\src\emr\f35\group_dedupe\logfile.log")]
)
logger = logging.getLogger(__name__)

logger.info("Starting GroupDedupe task")
PERSON_KEY = "cb_key_db_person" 
HOUSEHOLD_KEY = "cb_key_household"
CB_SOURCE_CD = "cb_source_cd"
DATE_OF_BIRTH = "date_of_birth"
CB_NAME_GENDER = "cb_name_gender"
CB_NAME_FORENAME = "cb_name_forename"
CB_NAME_TITLE = "cb_name_title"
CB_NAME_SURNAME = "cb_name_surname"
CB_NAME_SURNAME_SOUNDEX = "cb_name_surname_soundex"
CB_NAME_FIRST_INITIAL = "cb_name_first_initial"
CB_NAME_OTHER_NAMES = "cb_name_other_names"
SURNAME_CONTRIBUTORS= "surname_contributors"
FORENAME_CONTRIBUTORS = "forename_contributors"
P_PROSPECTABLE_FLAG = "p_prospectable_flag"
CB_DATA_DATE = "cb_data_date"
CB_ADDRESS_POSTCODE_SECTOR = "cb_address_postcode_sector"
POSTCODE_8BYTE_STANDARD = "postcode_8byte_standard"
CB_ADDRESS_DPS = "cb_address_dps"
CB_ADDRESS_STATUS_QAS = "cb_address_status_qas"
IS_SUPPRESSION = "is_suppression"
CB_KEY_FAMILY = "cb_key_family"
CB_ADDRESS_SUBBUILDINGNO = "cb_address_subbuildingno"
CB_ADDRESS_BUILDINGNAME = "cb_address_buildingname"
CB_ADDRESS_LINE_1 = "cb_address_line_1"

translation_table = str.maketrans('0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ', '9999999999AAAAAAAAAAAAAAAAAAAAAAAAAA')

def create_spark_session(app_name="P25_Group_Cleaning"):
    """
    Create a Spark session.

    Parameters:
    app_name (str): The name of the Spark application.

    Returns:
    SparkSession: A Spark session object.
    """
    return SparkSession.builder \
       .appName(app_name) \
       .config("spark.executor.memory", "32g") \
       .config("spark.driver.memory", "16g") \
       .config("spark.memory.offHeap.enabled", "true") \
       .config("spark.memory.offHeap.size", "2g") \
       .config("spark.sql.shuffle.partitions", "100") \
       .config("spark.default.parallelism", "100") \
       .config("spark.driver.maxResultSize", "2g") \
       .config("spark.memory.fraction", "0.6") \
       .config("spark.memory.storageFraction", "0.5") \
       .config("spark.executor.cores", "2") \
       .config("spark.sql.autoBroadcastJoinThreshold", "10m") \
       .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
       .getOrCreate()

def read_input_from_s3(spark, input_s3_bucket, input_s3_key):
    """
    Read input data from an S3 bucket.

    Parameters:
    spark (SparkSession): The Spark session object.
    input_s3_bucket (str): The name of the input S3 bucket.
    input_s3_key (str): The key of the input file in the S3 bucket.

    Returns:
    DataFrame: A Spark DataFrame containing the input data.
    """

    # Read input file from S3
    input_path = rf"s3://{input_s3_bucket}/{input_s3_key}"
    df = spark.read.csv(input_path, sep="|", header=True, inferSchema=False)
    return df

def export_data_to_s3(df, output_s3_bucket, output_s3_key):
    """
    Export data to an S3 bucket.

    Parameters:
    df (DataFrame): The Spark DataFrame to be exported.
    output_s3_bucket (str): The name of the output S3 bucket.
    output_s3_key (str): The key of the output file in the S3 bucket.
    """
    # Coalesce the DataFrame to a single partition
    output_path = rf"s3://{output_s3_bucket}/{output_s3_key}"
    df_coalesced = df.coalesce(1)
    # Save the coalesced DataFrame as a single CSV file
    df_coalesced.write.csv(output_path, header=True, sep="|", mode='overwrite', emptyValue = '')
    

def parse_arguments():
    """
    Parse command-line arguments.

    Returns:
    Namespace: The parsed command-line arguments.
    """
    parser = argparse.ArgumentParser(description= "Process input files and remove duplicates")
    parser.add_argument('--task_id', type=str, required=True, help= "Name of the task")
    parser.add_argument('--output_bucket', type=str, required=True, help= "Output file S3 Bucket")
    parser.add_argument('--p25_output_key', type=str, required=True, help= "Path to output p25 file")
    parser.add_argument('--dropRecsList_output_key', type=str, required=True, help= "Path to output dropRecsList file")
    parser.add_argument('--sector_output_key', type=str, required=True, help= "Path to output sector file")
    
    return parser.parse_args()

def fuzzy_matched(df,supression_call = False):
    
    if supression_call:
        find_duplicates_udf_forenames = register_udf(spark, forename=True, compare_with_surname=False, surname_hyphen=False, supression_call = True)

        df_grouped = df.groupBy(HOUSEHOLD_KEY).agg(F.collect_list(F.struct(PERSON_KEY, CB_NAME_FORENAME,DATE_OF_BIRTH,CB_NAME_SURNAME,IS_SUPPRESSION)).alias("forenames"))
    else :
        find_duplicates_udf_forenames = register_udf(spark, forename=True, compare_with_surname=False, surname_hyphen=False, supression_call = False )

        df_grouped = df.groupBy(HOUSEHOLD_KEY).agg(F.collect_list(F.struct(PERSON_KEY, CB_NAME_FORENAME,CB_NAME_SURNAME,DATE_OF_BIRTH,CB_NAME_GENDER)).alias("forenames"))
    
    df_result_forenames = df_grouped.withColumn("duplicates", F.explode(find_duplicates_udf_forenames(F.col("forenames"))))

    df_final_forenames = df_result_forenames.select(
        F.col(HOUSEHOLD_KEY),
        F.col("duplicates.person_id").alias("cb_key_db_person"),
        F.col("duplicates.group_id").alias("group_id")
    ).distinct() 

    
    if supression_call:
        find_duplicates_udf_surnames = register_udf(spark, forename=False, compare_with_surname=False, surname_hyphen=False,supression_call = True)

        df_grouped_surnames = df.groupBy(HOUSEHOLD_KEY).agg(F.collect_list(F.struct(PERSON_KEY,CB_NAME_SURNAME,CB_NAME_FORENAME,DATE_OF_BIRTH,IS_SUPPRESSION )).alias("surnames"))

    else:
        find_duplicates_udf_surnames = register_udf(spark, forename=False, compare_with_surname=False, surname_hyphen=False,supression_call = False)


        df_grouped_surnames = df.groupBy(HOUSEHOLD_KEY).agg(F.collect_list(F.struct(PERSON_KEY,CB_NAME_SURNAME,CB_NAME_FORENAME,DATE_OF_BIRTH,CB_NAME_GENDER )).alias("surnames"))

    df_result_surnames = df_grouped_surnames.withColumn("duplicates", F.explode(find_duplicates_udf_surnames(F.col("surnames"))))


    df_final_surnames = df_result_surnames.select(
        F.col(HOUSEHOLD_KEY),
        F.col("duplicates.person_id").alias("cb_key_db_person"),
        F.col("duplicates.group_id").alias("group_id")
    ).distinct()  # Ensure unique rows

    return df_final_forenames, df_final_surnames

def maiden_married_test(df_all, match_level='Household'):
    """
    Identifies and handles cases where a woman's surname might have changed due to marriage.

    Args:
        df_all (DataFrame): Input DataFrame containing name and household information.

    Returns:
        DataFrame: DataFrame with false matches marked for dropping and reason for maiden-married name change.
    """
    columns_to_concat = df_all.columns

    df_all = df_all.withColumn(CB_NAME_TITLE, F.initcap(F.col(CB_NAME_TITLE))) \
        .withColumn(CB_NAME_FORENAME, F.initcap(F.col(CB_NAME_FORENAME))) \
        .withColumn(CB_NAME_OTHER_NAMES, F.initcap(F.col(CB_NAME_OTHER_NAMES))) \
        .withColumn(CB_NAME_SURNAME, F.initcap(F.col(CB_NAME_SURNAME))) \
        .withColumn(CB_NAME_GENDER, F.lower(F.col(CB_NAME_GENDER))) 
        
    df_male = df_all.filter(F.col(CB_NAME_GENDER) == "m").groupby(HOUSEHOLD_KEY).agg(F.collect_set(CB_NAME_SURNAME).alias("male_surnames"))

    
    df_female = df_all.filter(F.col(CB_NAME_GENDER) != "m").join(df_male, HOUSEHOLD_KEY, "left")

    
    df_female = df_female.withColumn("surname_match", F.array_contains(F.col("male_surnames"), F.col(CB_NAME_SURNAME)))

   
   

    true_matches = df_female.filter(F.col("surname_match") == True)

    

    df_female_false = df_female.filter(F.col("surname_match") == False)
    
    
    true_matches = true_matches.select(
        HOUSEHOLD_KEY, CB_NAME_FORENAME, DATE_OF_BIRTH,
        F.col(PERSON_KEY).alias("cb_key_db_person_true"),
        *[F.col(col).alias(f"{col}_true") for col in true_matches.columns if col not in [HOUSEHOLD_KEY, CB_NAME_FORENAME, DATE_OF_BIRTH, PERSON_KEY]]
    )
    df_female_false_matching = df_female_false.join(
        true_matches,
        (df_female_false[HOUSEHOLD_KEY] == true_matches[HOUSEHOLD_KEY]) &
        (df_female_false[CB_NAME_FORENAME] == true_matches[CB_NAME_FORENAME]) &
        (df_female_false[DATE_OF_BIRTH] == true_matches[DATE_OF_BIRTH]),
        "inner"
    )
    df_female_false_matching = df_female_false_matching.select(
        
        df_female_false[P_PROSPECTABLE_FLAG].alias("p_code"),
        df_female_false[PERSON_KEY].alias("drop_key"),
        true_matches["cb_key_db_person_true"].alias("keep_key"),
        F.lit("105").alias("drop_code"),
        F.concat_ws("~", *[df_female_false[col] for col in columns_to_concat]).alias("drop_rec"),
        F.concat_ws("~", *[true_matches[f"{col}_true"] for col in columns_to_concat if col not in [HOUSEHOLD_KEY, CB_NAME_FORENAME, DATE_OF_BIRTH]]).alias("keep_rec"),
        F.lit("maiden married").alias("description"),
        df_female_false[CB_SOURCE_CD].alias("cb_source_cd"),
        true_matches["cb_source_cd_true"].alias("match_in"),
        F.lit("duplicate of kept").alias("match_type"),
        F.lit(match_level).alias("match_level"),
        F.lit("").alias("match_indirect")
    )
    return df_female_false_matching
def is_an_NC_winner(df: DataFrame, group_column: str, group_id: str) :
    
    df.repartition(HOUSEHOLD_KEY)
    print(f"Time  before  agg_df: {time.time() - start_time} seconds")
    agg_df = df.groupBy(group_column, group_id).agg(
        F.sum(F.when(F.col("CB_SOURCE_CD") == 'A31', 1).otherwise(0)).alias("a31_count"),
        F.sum(F.when(F.col("P_PROSPECTABLE_FLAG") == 'Y', 1).otherwise(0)).alias("prospectable_count")
    )
    print(f"Time after   agg_df: {time.time() - start_time} seconds")
    print(f"Time  before  df: {time.time() - start_time} seconds")

    df = df.join(F.broadcast(agg_df), on=[group_column, group_id], how="left")
    print(f"Time  after  df: {time.time() - start_time} seconds")
    
    print(f"Time  before  df withcolumn: {time.time() - start_time} seconds")
    # Add subreason column based on the counts
    df = df.withColumn(
        "subreason",
        F.when((F.col("a31_count") > 0) , 'A31 record kept')
        .when((F.col("a31_count") == 0) & (F.col("prospectable_count") > 0) , 'prospectable record kept')
        .when((F.col("a31_count") == 0) & (F.col("prospectable_count") == 0), 'no A31 or prospectable record, all kept')
    )
    print(f"Time  after  df withcolumn: {time.time() - start_time} seconds")
    
    print(f"Time  before  kept_df withcolumn: {time.time() - start_time} seconds")
    # Filter records based on the counts and subreason
    kept_df = df.filter(
        (F.col("a31_count") > 0) & (F.col(CB_SOURCE_CD) == 'A31') |
        (F.col("a31_count") == 0) & (F.col("prospectable_count") > 0) & (F.col(P_PROSPECTABLE_FLAG) == 'Y') |
        (F.col("a31_count") == 0) & (F.col("prospectable_count") == 0)
    )
    print(f"Time  after  kept_df withcolumn: {time.time() - start_time} seconds")
    print(f"Time  before  dropped_df withcolumn: {time.time() - start_time} seconds")
     # Identify dropped records
    dropped_df = df.join(F.broadcast(kept_df), on=[HOUSEHOLD_KEY, group_id], how="left_anti")
    # Identify dropped records
    # dropped_df = df.subtract(kept_df)

    print(f"Time  after  dropped_df withcolumn: {time.time() - start_time} seconds")
    print(f"Time  before  first_person_key_df withcolumn: {time.time() - start_time} seconds")
    # Add keep_key column to dropped_df
    first_person_key_df = kept_df.groupBy(group_column, group_id).agg(F.first(PERSON_KEY).alias("first_person_key"))
    print(f"Time  after  first_person_key_df withcolumn: {time.time() - start_time} seconds")
    print(f"Time  before  dropped_df : {time.time() - start_time} seconds")
    dropped_df = dropped_df.join(
        F.broadcast(first_person_key_df).select(group_column, group_id, "first_person_key"),
        on=[group_column, group_id],
        how="left"
    ).withColumnRenamed("first_person_key", "keep_key")
    dropped_df.repartition(HOUSEHOLD_KEY)
    print(f"Time  after  dropped_df : {time.time() - start_time} seconds")
    partition_sizes = dropped_df.rdd.getNumPartitions()
    total_size = partition_sizes
    print(f"Total DataFrame size: {total_size} bytes")

    print(f"Time  befire  kept_dfa31 : {time.time() - start_time} seconds")
    # Drop the temporary count columns
    kept_df = kept_df.drop("a31_count", "prospectable_count")
    dropped_df = dropped_df.drop("a31_count", "prospectable_count")
    print(f"Time  after  kept_dfa31 : {time.time() - start_time} seconds")
    return kept_df, dropped_df


def is_an_NC_winner_7(df: DataFrame, group_column: str, group_id: str) :
    """
    Identify and filter records based on specific conditions and assign subreasons.

    This function partitions the DataFrame by the specified group column and group ID, counts the number of records
    with specific source codes and prospectable flags, assigns subreasons based on these counts, and filters the records
    accordingly.

    Args:
        df (DataFrame): The input DataFrame.
        group_column (str): The column name to group by.
        group_id (str): The group ID column name.

    Returns:
        (DataFrame, DataFrame): A tuple containing the kept DataFrame and the dropped DataFrame.
    """
    agg_df = df.groupBy(group_column, group_id).agg(
        F.sum(F.when(F.col("CB_SOURCE_CD") == 'A31', 1).otherwise(0)).alias("a31_count"),
        F.sum(F.when(F.col("P_PROSPECTABLE_FLAG") == 'Y', 1).otherwise(0)).alias("prospectable_count")
    )

    # Join the aggregated counts back to the original DataFrame
    df = df.join(agg_df, on=[group_column, group_id], how="left")
    
    df = df.withColumn(
        "subreason",
        F.when((F.col("a31_count") > 0) , 'A31 record kept')
        .when((F.col("a31_count") == 0) & (F.col("prospectable_count") > 0) , 'prospectable record kept')
        .when((F.col("a31_count") == 0) & (F.col("prospectable_count") == 0), 'no A31 or prospectable record, all kept')
    )
    
    
    # Filter records based on the counts and subreason
    kept_df = df.filter(
        (F.col("a31_count") > 0) & (F.col(CB_SOURCE_CD) == 'A31') |
        (F.col("a31_count") == 0) & (F.col("prospectable_count") > 0) & (F.col(P_PROSPECTABLE_FLAG) == 'Y') |
        (F.col("a31_count") == 0) & (F.col("prospectable_count") == 0)
    )

     # Identify dropped records
    dropped_df = df.join(kept_df, on=[HOUSEHOLD_KEY, group_id], how="left_anti")

    first_person_key_df = kept_df.groupBy(group_column, group_id).agg(F.first(PERSON_KEY).alias("first_person_key"))
    
    dropped_df = dropped_df.join(
        first_person_key_df.select(group_column, group_id, "first_person_key").distinct(),
        on=[group_column, group_id],
        how="left"
    ).withColumnRenamed("first_person_key", "keep_key")

    # Drop the temporary count columns
    kept_df = kept_df.drop("a31_count", "prospectable_count")
    dropped_df = dropped_df.drop("a31_count", "prospectable_count")
    
    return kept_df, dropped_df



def score_records(df: DataFrame,group_column: str, group_id: str) -> DataFrame:
    """
    Score records based on specific conditions.

    This function assigns a score to each record based on various conditions such as prospectable flag,
    address status, name title, other names, gender, and date of birth.

    Args:
        df (DataFrame): The input DataFrame.

    Returns:
        DataFrame: The DataFrame with an additional 'score' column.
    """
    df = df.withColumn(
        "score",
        F.when(F.col(P_PROSPECTABLE_FLAG) == 'Y', 15).otherwise(0) +
        F.when(F.col(CB_ADDRESS_STATUS_QAS).startswith('R9'), 5).otherwise(0) +
        F.when(F.lower(F.col(CB_NAME_TITLE)).isin(['mr', 'mrs', 'ms', 'miss']), 1)
        .when(F.lower(F.col(CB_NAME_TITLE)) != '', 2).otherwise(0) +
        F.when(F.col(CB_NAME_OTHER_NAMES) != '', 1).otherwise(0) +
        F.when((F.col(CB_NAME_GENDER) != '') & (F.col(CB_NAME_GENDER) != 'U'), 1).otherwise(0) +
        F.when(F.col(DATE_OF_BIRTH) != '', 1).otherwise(0)
    )

    agg_df = df.groupBy(group_column, group_id).agg(F.max("score").alias("max_score"))

    # Join the aggregated max score back to the original DataFrame
    df = df.join(agg_df, on=[group_column, group_id], how="left")
    
     # Filter to keep only the highest score record in each group
    kept_df = df.filter(F.col("score") == F.col("max_score")).drop("max_score")

     # Identify the final dropped records
    dropped_df = df.filter(F.col("score") != F.col("max_score")).drop("max_score")

    # # Add row number based on the score within each group
    # df = df.withColumn("row_num", F.row_number().over(window_spec_score))

    # Add keep_key and score column to dropped_df
    first_person_key_df = kept_df.groupBy(group_column, group_id).agg(F.first("PERSON_KEY").alias("first_person_key"))

    dropped_df = dropped_df.join(
        first_person_key_df,
        on=[group_column, group_id],
        how="left"
    ).withColumnRenamed("first_person_key", "keep_key")

    first_person_key_df = kept_df.groupBy(group_column, group_id).agg(F.first(PERSON_KEY).alias("first_person_key"))
    
    dropped_df = dropped_df.join(
        first_person_key_df,
        on=[group_column, group_id],
        how="left"
    ).withColumnRenamed("first_person_key", "keep_key")
    
    # Add subreason column to dropped_df
    dropped_df = dropped_df.withColumn(
        "subreason",
        F.concat(
            F.lit("records were scored - scores were : "),
            F.col(PERSON_KEY), F.lit(" "), F.col("score"), F.lit(" "),
            F.col("keep_key")
        )
    )

    return kept_df, dropped_df


def get_sector_data(df):
    """
    Aggregates data by postcode sector and counts prospectable and topup records.

    This function groups the input DataFrame by the first six characters of the 
    'postcode_8byte_standard' column, which represents the postcode sector. It then 
    counts the number of records where 'p_prospectable_flag' is 'Y' (prospectable) 
    and 'N' (topup), and returns a DataFrame with these counts.

    Parameters:
    df (DataFrame): The input DataFrame containing the data to be aggregated.

    Returns:
    DataFrame: A DataFrame with columns 'sector', 'nc_count' (number of prospectable 
    records), and 'topup_count' (number of topup records).
    """
    
    # Group by sector and aggregate counts
    df = df.groupBy(F.expr(f"SUBSTRING({POSTCODE_8BYTE_STANDARD}, 1, 6)").alias("sector")).agg(
        F.sum(F.when(F.col(P_PROSPECTABLE_FLAG) == "Y", 1).otherwise(0)).alias("nc_count"),
        F.sum(F.when(F.col(P_PROSPECTABLE_FLAG) == "N", 1).otherwise(0)).alias("topup_count")
    )
    
    sectors_df = df.select("sector", "nc_count", "topup_count")
    
    return sectors_df

def filter_a30_records_by_dps(p20_df, dps50_df):
    """
    Filters A30 records from the main DataFrame based on DPS criteria and generates a report.

    This function creates a key column in the main DataFrame by concatenating the 
    'postcode_8byte_standard' and 'cb_address_dps' columns. It then joins and filters 
    the main DataFrame with the `dps50_df` DataFrame to identify A30 records. The 
    function performs a left anti join to filter out these A30 records from the main 
    DataFrame and generates a report for the records to be removed.

    Parameters:
    p20_df (DataFrame): The main DataFrame containing the data to be filtered.
    dps50_df (DataFrame): The DataFrame containing DPS criteria for filtering.

    Returns:
    tuple: A tuple containing two DataFrames:
        - filtered_df: The DataFrame after filtering out A30 records.
        - dropped50A30recs_report_df: The DataFrame containing the report of dropped A30 records.
    """
    
    columns_to_concat = p20_df.columns
    # Create a key column in the main DataFrame
    p20_df = p20_df.withColumn("key", F.concat_ws("", F.regexp_replace(F.col(POSTCODE_8BYTE_STANDARD), " ", ""), F.col(CB_ADDRESS_DPS)))
    
    # Join and filter the main DataFrame with dps50_df
    a30_recs_df = p20_df.join(dps50_df, on="key", how="inner").filter(F.col(CB_SOURCE_CD) == "A30")
    
    # Create the DataFrame by performing a left anti join
    filtered_df = p20_df.join(a30_recs_df, on=["key", CB_SOURCE_CD], how="left_anti")
    
    filtered_df = filtered_df.drop("key")

    # Generate the report for the records to be removed
    dropped50A30recs_report_df = a30_recs_df.withColumn("description", F.lit("dropped A30s when >50 in DPS")) \
                            .withColumn("match_type", F.lit(">50 in DPS")) \
                            .withColumn("drop_key", F.col(PERSON_KEY))\
                            .withColumn("keep_key", F.lit("")) \
                            .withColumn("drop_code", F.lit(600)) \
                            .withColumn("drop_rec", F.concat_ws("~", *[F.col(col) for col in columns_to_concat])) \
                            .withColumn("keep_rec", F.lit("")) \
                            .withColumn("match_in", F.lit("")) \
                            .withColumn("match_level", F.lit("")) \
                            .withColumn("match_indirect",F.lit("")) \
                            .select(F.col(P_PROSPECTABLE_FLAG).alias("p_code"), "drop_key", "keep_key","drop_code","drop_rec","keep_rec","description"
                                    ,F.col(CB_SOURCE_CD).alias("cb_source_cd"),"match_in","match_type","match_level","match_indirect")
    

    
    return filtered_df, dropped50A30recs_report_df

def count_surnames_and_filter(df: DataFrame,group_id: str) :
    """
    Count surnames and filter records based on the highest total contributors.

    This function counts the number of occurrences of each surname within each group, adds the count to the surname contributors,
    and filters the records to keep only the highest total contributors record in each group.

    Args:
        df (DataFrame): The input DataFrame.

    Returns:
        (DataFrame, DataFrame): A tuple containing the kept DataFrame and the dropped DataFrame.
    """
    # Window specification to partition by household key, group ID, and surname
    # window_spec = Window.partitionBy(HOUSEHOLD_KEY, group_id, CB_NAME_SURNAME)
    # df = df.withColumn("surname_count", F.count(CB_NAME_SURNAME).over(window_spec))

    # df.persist(StorageLevel.MEMORY_AND_DISK)
    #Aggregate surname counts
    surname_counts = df.groupBy(HOUSEHOLD_KEY, group_id, CB_NAME_SURNAME).agg(F.count(CB_NAME_SURNAME).alias("surname_count"))

     # Join the aggregated counts back to the original DataFrame
    df = df.join(F.broadcast(surname_counts), on=[HOUSEHOLD_KEY, group_id, CB_NAME_SURNAME], how="left")
    
    # Add the surname count to the surname contributors
    # df = df.withColumn("total_contributors", F.col("surname_count") + F.col(SURNAME_CONTRIBUTORS))
    df = df.select(
    "*",  # Select all existing columns
    (F.col("surname_count") + F.col(SURNAME_CONTRIBUTORS)).alias("total_contributors")  # Add the new column
    )
    
    # # Window specification to partition by household key and group ID and order by total contributors descending
    # window_spec_surname = Window.partitionBy(HOUSEHOLD_KEY, group_id).orderBy(F.col("total_contributors").desc())

      
    # Aggregate to get the highest total contributors in each group
    max_contributors = df.groupBy(HOUSEHOLD_KEY, group_id).agg(F.max("total_contributors").alias("max_contributors"))

    df = df.join(F.broadcast(max_contributors), on=[HOUSEHOLD_KEY, group_id], how="left")
    
    final_kept_df = df.filter(F.col("total_contributors") == F.col("max_contributors")).drop("max_contributors")

    # # Add keep_key column to dropped_df
    # first_person_key_window = Window.partitionBy(HOUSEHOLD_KEY, group_id).orderBy(F.col(PERSON_KEY))
    # first_person_key_df = final_kept_df.withColumn("first_person_key", F.first(PERSON_KEY).over(first_person_key_window))

    # Identify the final dropped records
    final_dropped_df = df.filter(F.col("total_contributors") != F.col("max_contributors")).drop("max_contributors")
     # Add keep_key column to dropped_df
    first_person_key_df = final_kept_df.groupBy(HOUSEHOLD_KEY, group_id).agg(F.first(PERSON_KEY).alias("first_person_key"))
    
    final_dropped_df = final_dropped_df.join(
        F.broadcast(first_person_key_df),
        on=[HOUSEHOLD_KEY, group_id],
        how="left"
    ).withColumnRenamed("first_person_key", "keep_key")

    # Add subreason column to final_dropped_df
    final_dropped_df = final_dropped_df.withColumn(
        "subreason",
        F.concat(F.lit("- surname "), F.col(CB_NAME_SURNAME), F.lit(" dropped"))
    )
    
    return final_kept_df, final_dropped_df

def identify_postcode_duplicates(df: DataFrame) -> DataFrame:
    """
    Identify duplicate records at the postcode level based on name keys.

    This function generates name keys, assigns group IDs using dense rank, adds a reason column,
    counts the number of records in each group, and filters groups with more than one record.

    Args:
        df (DataFrame): The input DataFrame.

    Returns:
        DataFrame: The DataFrame containing duplicate records.
    """
    # Generate keys
    df = df.withColumn("namekey", F.concat_ws(":", F.lower(F.col(CB_NAME_FORENAME)), F.lower(F.col(CB_NAME_SURNAME)), F.when(F.col(CB_NAME_GENDER) == "", "u").otherwise(F.lower(F.col(CB_NAME_GENDER)))))
    df = df.withColumn("namekey_alt", F.concat_ws(":", F.lower(F.col(CB_NAME_FORENAME)), F.lower(F.col(CB_NAME_SURNAME)), F.lit("u")))

    # Define window specification for dense rank
    windowSpecDenseRank = Window.partitionBy(POSTCODE_8BYTE_STANDARD).orderBy("namekey_alt")

    # Assign group IDs using dense_rank
    df = df.withColumn("group_id", F.dense_rank().over(windowSpecDenseRank))
    # df.repartition(POSTCODE_8BYTE_STANDARD)
    # Add reason column
    df = df.withColumn("reason", F.expr("CASE WHEN cb_name_gender = '' THEN 'gender lowered to unknown to make match' ELSE 'gender also matched (or u matched to u)' END"))

    # Count the number of records in each group
    group_counts = df.groupBy(POSTCODE_8BYTE_STANDARD, "group_id").agg(F.count("*").alias("group_count"))

    # Join the original DataFrame with the group counts
    df_with_counts = df.join(group_counts, on=[POSTCODE_8BYTE_STANDARD, "group_id"])

    # Filter groups with more than one record
    duplicates = df_with_counts.filter(F.col("group_count") > 1)

    # Drop the namekey, namekey_alt, and group_count columns
    duplicates = duplicates.drop("namekey", "namekey_alt", "group_count")


    return duplicates


def identify_dropping_records_postcode(df1: DataFrame, df2: DataFrame) -> DataFrame:
    """
    Identify records to be dropped based on matching keys.

    This function generates name keys for both DataFrames, adds a reason column, and joins the DataFrames
    on postcode and namekey or namekey_alt to identify records to be dropped.

    Args:
        df1 (DataFrame): The first input DataFrame.
        df2 (DataFrame): The second input DataFrame.

    Returns:
        DataFrame: The DataFrame containing records to be dropped.
    """
    # Generate keys for df1
    df1 = df1.withColumn("namekey", F.concat_ws(":", F.lower(F.col(CB_NAME_FORENAME)), F.lower(F.col(CB_NAME_SURNAME)), F.when(F.col(CB_NAME_GENDER) == "", "u").otherwise(F.lower(F.col(CB_NAME_GENDER)))))
    df1 = df1.withColumn("namekey_alt", F.concat_ws(":", F.lower(F.col(CB_NAME_FORENAME)), F.lower(F.col(CB_NAME_SURNAME)), F.lit("u")))
    # Add reason column
    df1 = df1.withColumn("reason", F.expr("CASE WHEN cb_name_gender = '' THEN 'gender lowered to unknown to make match' ELSE 'gender also matched (or u matched to u)' END"))

    # Generate keys for df2
    df2 = df2.withColumn("namekey", F.concat_ws(":", F.lower(F.col(CB_NAME_FORENAME)), F.lower(F.col(CB_NAME_SURNAME)), F.when(F.col(CB_NAME_GENDER) == "", "u").otherwise(F.lower(F.col(CB_NAME_GENDER)))))
    df2 = df2.withColumn("namekey_alt", F.concat_ws(":", F.lower(F.col(CB_NAME_FORENAME)), F.lower(F.col(CB_NAME_SURNAME)), F.lit("u")))

    # Join df1 with df2 on postcode and namekey or namekey_alt
    drop = df1.join(df2, (df1[POSTCODE_8BYTE_STANDARD] == df2[POSTCODE_8BYTE_STANDARD]) & (df1["namekey_alt"] == df2["namekey_alt"]), "inner") \
            .select(df1["*"])

    # Join df1 with df2 on postcode and namekey or namekey_alt
    keep = df1.join(drop, (df1[POSTCODE_8BYTE_STANDARD] == drop[POSTCODE_8BYTE_STANDARD]) & (df1["namekey_alt"] == drop["namekey_alt"]), "leftanti")


    return keep,drop
def identify_dropping_key_postcode(df1: DataFrame, df2: DataFrame) -> DataFrame:
    """
    Identify records to be dropped based on matching keys.

    This function generates name keys for both DataFrames, adds a reason column, and joins the DataFrames
    on postcode and namekey or namekey_alt to identify records to be dropped.

    Args:
        df1 (DataFrame): The first input DataFrame.
        df2 (DataFrame): The second input DataFrame.

    Returns:
        DataFrame: The DataFrame containing records to be dropped.
    """
    
    # Join df1 with df2 on postcode and personkey
    dropped_df = df1.join(df2, (df1[POSTCODE_8BYTE_STANDARD] == df2[POSTCODE_8BYTE_STANDARD]) & (df1[PERSON_KEY] == df2[PERSON_KEY]), "inner")\
     .select(df1["*"])

    # Join df1 with df2 on postcode and personkey
    keep_df = df1.join(dropped_df, (df1[POSTCODE_8BYTE_STANDARD] == dropped_df[POSTCODE_8BYTE_STANDARD]) & (df1[PERSON_KEY] == dropped_df[PERSON_KEY]), "left_anti")


    return keep_df,dropped_df

def filter_and_group_supression(reason_code_supression, union_df,group_id):

    union_df_alias = union_df.select(PERSON_KEY, F.col(IS_SUPPRESSION).alias("union_is_suppression"))
    
    # Filter the reason_code_4_supression DataFrame to show only entries where supression_call is 1
    reason_code_supression_filtered = reason_code_supression.join(
        union_df_alias,
        on=PERSON_KEY,
        how="inner"
    ).filter(F.col("union_is_suppression") == 1)

    window_spec = Window.partitionBy(HOUSEHOLD_KEY, group_id).orderBy(PERSON_KEY)

     # Use window function to get the first PERSON_KEY as keep_key
    reason_code_supression_filtered = reason_code_supression_filtered.withColumn(
        "keep_key", F.first(PERSON_KEY).over(window_spec)
    ).dropDuplicates([HOUSEHOLD_KEY, group_id])

    # Join back to reason_code_supression
    reason_code_to_be_drop = reason_code_supression.join(
        (reason_code_supression_filtered.select(HOUSEHOLD_KEY, group_id, "keep_key")),
        on=[HOUSEHOLD_KEY, group_id],
        how="inner"
    )
    
    reason_code_to_be_drop = reason_code_to_be_drop.join(
        reason_code_supression_filtered.select(PERSON_KEY),
        on=PERSON_KEY,
        how="leftanti"
    )


    
    return reason_code_to_be_drop

def add_mrdps_column(df):
    """
    Adds an 'mrdps' column to the given DataFrame based on the logic of grouping by household,
    filtering households with more than 9 family keys, and assigning sequential numbers to family keys.
    
    Parameters:
    df (DataFrame): The input DataFrame containing household and family key information.
    
    Returns:
    DataFrame: The original DataFrame with an additional 'mrdps' column.
    """
    # Group by household and aggregate family keys
    grouped_df_mrdps = df.groupBy(HOUSEHOLD_KEY).agg(
        F.countDistinct(CB_KEY_FAMILY).alias("family_count"),
        F.collect_list(CB_KEY_FAMILY).alias("family_keys")
    )

    # Filter households with more than 9 family keys
    filtered_df_mrdps = grouped_df_mrdps.filter(F.col("family_count") > 9)

    # Explode family keys and assign sequential numbers
    exploded_df = filtered_df_mrdps.select(
        HOUSEHOLD_KEY,
        F.explode("family_keys").alias(CB_KEY_FAMILY)
    )

    window_spec = Window.partitionBy(HOUSEHOLD_KEY).orderBy(CB_KEY_FAMILY)
    mrdps_df = exploded_df.withColumn("mrdps", F.row_number().over(window_spec))

    # Join with original DataFrame to add the mrdps column
    result_df_mrdps = df.join(mrdps_df, on=[HOUSEHOLD_KEY, CB_KEY_FAMILY], how="left")

    # Fill null values in mrdps column with '0'
    result_df_mrdps = result_df_mrdps.fillna({'mrdps': '0'})


    return result_df_mrdps


def check_for_flat(buildingname, addr1):
    if buildingname == '':
        return ''
    
    upperaddr1 = addr1.strip().upper()
    posname = upperaddr1.find(buildingname.upper())
    
    if posname > 0:
        lostbit = upperaddr1[:posname].strip()
        if lostbit == '':
            return ''
        if ' ' in lostbit:
            return ''
        
        mask = str.translate(lostbit, translation_table)
        if mask[0] != '9':
            return ''
        if mask not in ['9999', '9999A']:
            return ''
        
        return upperaddr1[:posname].strip()
    else:
        return ''

def drop_report(df1: DataFrame, df2: DataFrame, match_type :str, match_level :str) -> DataFrame:
        """
        Join two DataFrames on the specified column and return the joined DataFrame.

        Parameters:
        df1 (DataFrame): The first DataFrame.
        df2 (DataFrame): The second DataFrame.
        join_column (str): The column name to join on.

        Returns:
        DataFrame: The joined DataFrame.
        """
        columns_to_concat = df1.columns

     
        
        # Perform the join operation for dropkey
        joined_df = df1.join(df2, df1[PERSON_KEY] == df2[PERSON_KEY], "inner")

        joined_df_keep = df1.join(df2.select("keep_key"), df1[PERSON_KEY] == df2["keep_key"], "inner")

        
        
        # Concatenate specified columns from df1 into a new column
        concatenated_df = joined_df.withColumn("drop_rec", F.concat_ws("~", *[df1[col] for col in columns_to_concat]))

        concatenated_df_keep = joined_df_keep.withColumn("keep_rec", F.concat_ws("~", *[df1[col] for col in columns_to_concat]))

      
            # Select the key column, the new concatenated column, and specific columns from df2
        result_df = concatenated_df.select(
            df1[PERSON_KEY].alias ("drop_key"),
            df1[P_PROSPECTABLE_FLAG],
            df1[CB_SOURCE_CD],
            "drop_rec",
            df2["reasoncode"],
            df2["reason"],
            df2["subreason"],
            df2["keep_key"]
        ).join( concatenated_df_keep.select(
                df1[PERSON_KEY],
                df1[CB_SOURCE_CD].alias("match_in"),
                "keep_rec"
            ),
            df2["keep_key"] == concatenated_df_keep[PERSON_KEY],
            "left"
        )

            # Modify the result_df while returning the data
        final_df = result_df.select(
            F.col(P_PROSPECTABLE_FLAG).alias("p_code"),
            "drop_key",
            "keep_key",
            F.col("reasoncode").alias("drop_code"),
            "drop_rec",
            "keep_rec",
            F.concat_ws("|", F.col("reason"), F.col("subreason")).alias("description"),
            F.col(CB_SOURCE_CD).alias("cb_source_cd"),
            "match_in",
            F.lit(match_type).alias("match_type"),
            F.lit(match_level).alias("match_level"),
            F.lit("").alias("match_indirect")
        )
       
        return final_df
start_time = time.time()
def run(p20_df, dps50_df, all_supps_df):

    
    sectors_df = get_sector_data(p20_df)

    #filtered df will have data 
    print(f"Time after before  dropped50A30recs_report_df: {time.time() - start_time} seconds")
    filtered_df, dropped50A30recs_report_df = filter_a30_records_by_dps(p20_df, dps50_df)
    print(f"Time after before  dropped50A30recs_report_df: {time.time() - start_time} seconds")
    
    drop_record_report= dropped50A30recs_report_df
    
    # filtered_df.persist(StorageLevel.MEMORY_AND_DISK)
    # #maiden married test or reason code 5
    print(f"Time after before  df_female_false_matching: {time.time() - start_time} seconds")
    df_female_false_matching = maiden_married_test(filtered_df)
    print(f"Time after before  df_female_false_matching: {time.time() - start_time} seconds")
    # #filtering duplicate records from p20_df
    
    print(f"Time after before  df_female_false_matching_filter: {time.time() - start_time} seconds")
    # # Filter p20_df where PERSON_KEY does not match with df_female_false_matching
    filtered_df = filtered_df.join(df_female_false_matching, filtered_df[PERSON_KEY] == df_female_false_matching['drop_key'], how='left_anti')
    print(f"Time after before  df_female_false_matching_filter: {time.time() - start_time} seconds")

    print(f"Time after before  fuzzy: {time.time() - start_time} seconds")
    duplicate_forename_df, duplicate_surname_df = fuzzy_matched(filtered_df)
    print(f"Time after after  fuzzy: {time.time() - start_time} seconds")
    
    print(f"Time after before  duplicate_forename_df join: {time.time() - start_time} seconds")
    duplicate_forename_df = duplicate_forename_df.join(filtered_df.select(PERSON_KEY, DATE_OF_BIRTH,CB_NAME_SURNAME,CB_SOURCE_CD,P_PROSPECTABLE_FLAG,SURNAME_CONTRIBUTORS,CB_NAME_TITLE,CB_NAME_OTHER_NAMES,CB_NAME_GENDER,CB_ADDRESS_STATUS_QAS), on= PERSON_KEY, how="left")
    print(f"Time after after  duplicate_forename_df join: {time.time() - start_time} seconds")
    print(f"Time after before  duplicate_surname_df join: {time.time() - start_time} seconds")
    duplicate_surname_df = duplicate_surname_df.join(filtered_df.select(PERSON_KEY, DATE_OF_BIRTH), on=PERSON_KEY, how="left")
    print(f"Time after after  duplicate_surname_df join: {time.time() - start_time} seconds")
    cleanse_columns = [
    CB_SOURCE_CD,
    PERSON_KEY,
    HOUSEHOLD_KEY,
    POSTCODE_8BYTE_STANDARD,
    CB_ADDRESS_DPS,
    CB_NAME_TITLE, 
    CB_NAME_FORENAME, 
    CB_NAME_OTHER_NAMES, 
    CB_NAME_SURNAME, 
    CB_NAME_SURNAME_SOUNDEX, 
    CB_NAME_GENDER, 
    DATE_OF_BIRTH,
    CB_NAME_FIRST_INITIAL
    ]
    #Household level deduplication
    #Start reason_code 
    #reason code 4
    # Call the function
    print(f"Time before duplicate_surname_df cache: {time.time() - start_time} seconds")
    duplicate_forename_df.cache()##cache or materialize
    duplicate_forename_df.count()
    duplicate_surname_df.cache()
    duplicate_surname_df.count()
    print(f"Time after duplicate_surname_df cache: {time.time() - start_time} seconds")
    # duplicate_forename_df.repartition(10)
    # duplicate_surname_df.repartition(10)
    
    # print(f"Time after before  reasoson code 4 join: {time.time() - start_time} seconds")
    reason_code_4 = household_level_deduplication_4(duplicate_forename_df, duplicate_surname_df)
    # print(f"Time after after  reasoson code 4 join: {time.time() - start_time} seconds")
    # reason_code_4.repartition(HOUSEHOLD_KEY)
    print(f"Time after before  is_an_NC_winner: {time.time() - start_time} seconds")
    # # # Apply the function to get kept and dropped records
    kept_df_4_is_an_winner, dropped_df_4_is_an_winner = is_an_NC_winner(reason_code_4,HOUSEHOLD_KEY,"group_id_for")
    print(f"Time after after  is_an_NC_winner: {time.time() - start_time} seconds")

    # # drop_report_4_is_an_winner = drop_report(filtered_df,dropped_df_4_is_an_winner,"duplicate of kept","Household")
    
    # # drop_record_report = dropped50A30recs_report_df.union(drop_report_4_is_an_winner) 

    # # # # # Filter for is_an_winner ranking
    print(f"Time Before   dropped_df_4_is_an_winner: {time.time() - start_time} seconds")
    filtered_df = filtered_df.join(F.broadcast(dropped_df_4_is_an_winner), filtered_df[PERSON_KEY] == dropped_df_4_is_an_winner[PERSON_KEY], how='left_anti')
    print(f"Time after   dropped_df_4_is_an_winner: {time.time() - start_time} seconds")
    # # # partition_sizes = filtered_df.rdd.glom().map(len).collect()
    # # # total_size = sum(partition_sizes)
    # # # print(f"Total DataFrame size: {total_size} bytes")
    # # # # # #running for surname winner
    # print(f"Time Before   dropped_df_4_count_surname: {time.time() - start_time} seconds")
    # kept_df_4_count_surname, dropped_df_4_count_surname =count_surnames_and_filter(kept_df_4_is_an_winner,"group_id_for")
    # print(f"Time After   dropped_df_4_count_surname: {time.time() - start_time} seconds")
    # # # # # # Filter for surname winner ranking
    # print(f"Time Before   filter: {time.time() - start_time} seconds")
    # filtered_df = filtered_df.join(F.broadcast(dropped_df_4_count_surname), filtered_df[PERSON_KEY] == dropped_df_4_count_surname[PERSON_KEY], how='left_anti')
    # print(f"Time Before   after: {time.time() - start_time} seconds")
    # # #END reason_code 4

    
    # # #dropping records from duplcation groups
    # print(f"Time Before   duplicate_forename_df second: {time.time() - start_time} seconds")
    duplicate_forename_df = duplicate_forename_df.join(F.broadcast(dropped_df_4_is_an_winner), duplicate_forename_df[PERSON_KEY] == dropped_df_4_is_an_winner[PERSON_KEY], how='left_anti')
    # duplicate_forename_df = duplicate_forename_df.join(F.broadcast(dropped_df_4_count_surname), duplicate_forename_df[PERSON_KEY] == dropped_df_4_count_surname[PERSON_KEY], how='left_anti')

    # duplicate_surname_df = duplicate_surname_df.join(F.broadcast(dropped_df_4_is_an_winner), duplicate_surname_df[PERSON_KEY] == dropped_df_4_is_an_winner[PERSON_KEY], how='left_anti')
    # duplicate_surname_df = duplicate_surname_df.join(F.broadcast(dropped_df_4_count_surname), duplicate_surname_df[PERSON_KEY] == dropped_df_4_count_surname[PERSON_KEY], how='left_anti')
    # print(f"Time after   duplicate_forename_df second: {time.time() - start_time} seconds")
    # # Filter for is_an_winner ranking
    #START reason_code 6

    # Check the number of partitions
    # num_partitions = filtered_df.rdd.getNumPartitions()
    # print(f"Number of partitions: {num_partitions}")
    print(f"Time Before   reasoncode6 second: {time.time() - start_time} seconds")

    # duplicate_forename_df.repartition(HOUSEHOLD_KEY)
    reason_code_6 = household_level_deduplication_6(duplicate_forename_df)
    print(f"Time after   reasoncode second: {time.time() - start_time} seconds")
    
    print(f"Time before   caching: {time.time() - start_time} seconds")
    reason_code_6.cache()
    print(f"Time after   caching: {time.time() - start_time} seconds")
    reason_code_6.count()
    print(f"Time Before   is_an_winner second: {time.time() - start_time} seconds")
    # Apply the function to get kept and dropped records
    kept_df_6_is_an_winner, dropped_df_6_is_an_winner = is_an_NC_winner(reason_code_6,HOUSEHOLD_KEY,"group_id")
    print(f"Time after   is_an_winner second: {time.time() - start_time} seconds")

    print(f"Time Before   filter 1: {time.time() - start_time} seconds")
    # # Filter for is_an_winner ranking
    filtered_df = filtered_df.join(F.broadcast(dropped_df_6_is_an_winner), filtered_df[PERSON_KEY] == dropped_df_6_is_an_winner[PERSON_KEY], how='left_anti')
    print(f"Time Before   filter 1: {time.time() - start_time} seconds")
    #running for surname winner
    kept_df_6_count_surname, dropped_df_6_count_surname =count_surnames_and_filter(kept_df_6_is_an_winner,"group_id")

    # Filter for surname winner ranking
    filtered_df = filtered_df.join(F.broadcast((dropped_df_6_count_surname)), filtered_df[PERSON_KEY] == dropped_df_6_count_surname[PERSON_KEY], how='left_anti')
    
    # #dropping records from duplcation groups
    
    duplicate_forename_df = duplicate_forename_df.join(F.broadcast(dropped_df_6_is_an_winner), duplicate_forename_df[PERSON_KEY] == dropped_df_6_is_an_winner[PERSON_KEY], how='left_anti')
    duplicate_forename_df = duplicate_forename_df.join(F.broadcast(dropped_df_6_count_surname), duplicate_forename_df[PERSON_KEY] == dropped_df_6_count_surname[PERSON_KEY], how='left_anti')

    duplicate_surname_df = duplicate_surname_df.join(F.broadcast(dropped_df_6_is_an_winner), duplicate_surname_df[PERSON_KEY] == dropped_df_6_is_an_winner[PERSON_KEY], how='left_anti')
    duplicate_surname_df = duplicate_surname_df.join(F.broadcast(dropped_df_6_count_surname), duplicate_surname_df[PERSON_KEY] == dropped_df_6_count_surname[PERSON_KEY], how='left_anti')

   
    #END reason_code 6
    

    return filtered_df

def main(args: argparse.Namespace,spark):
    
    # Load parameters
    # input_bucket = args.input_bucket
    output_bucket = args.output_bucket
    p25_output_sorted_output_key = args.p25_output_key
    sectors_output_key = args.sector_output_key
    dropRecsList_output_key = args.dropRecsList_output_key
    
    cleanse_columns = [
    CB_SOURCE_CD,
    PERSON_KEY,
    HOUSEHOLD_KEY,
    POSTCODE_8BYTE_STANDARD,
    CB_ADDRESS_DPS,
    CB_NAME_TITLE, 
    CB_NAME_FORENAME, 
    CB_NAME_OTHER_NAMES, 
    CB_NAME_SURNAME, 
    CB_NAME_SURNAME_SOUNDEX, 
    CB_NAME_GENDER, 
    DATE_OF_BIRTH,
    CB_NAME_FIRST_INITIAL
    ]

    # Define schema
    schema = StructType([
        StructField("cb_key_db_person", StringType(), True),
        StructField("cb_key_household", StringType(), True),
        StructField("cb_source_cd", StringType(), True),
        StructField("date_of_birth", StringType(), True),
        StructField("cb_name_gender", StringType(), True),
        StructField("cb_name_forename", StringType(), True),
        StructField("cb_name_title", StringType(), True),
        StructField("cb_name_surname", StringType(), True),
        StructField("cb_name_surname_soundex", StringType(), True),
        StructField("cb_name_first_initial", StringType(), True),
        StructField("cb_name_other_names", StringType(), True),
        StructField("surname_contributors", StringType(), True),
        StructField("forename_contributors", StringType(), True),
        StructField("p_prospectable_flag", StringType(), True),
        StructField("cb_data_date", StringType(), True),
        StructField("cb_address_postcode_sector", StringType(), True),
        StructField("postcode_8byte_standard", StringType(), True),
        StructField("cb_address_dps", StringType(), True),
        StructField("cb_address_status_qas", StringType(), True),
        StructField("cb_key_family", StringType(), True),
        StructField("cb_address_buildingname", StringType(), True),
        StructField("cb_address_line_1", StringType(), True),
        
        ])

    data = [
        ("123", "456", "A", "19900101", "F", "Karrena", "Mr", "Doe-Anee", "D000", "J", "Johnny", 2, "John", "Y", "20250316", "NG1", "NG2 1AA", "01","R9","100","Building A","1234A Building A Street"),
        ("100", "456", "B", "19900101", "F", "Karrena", "Ms", "", "S000", "J", "Janey", 2, "Jane", "N", "20250316", "NG2", "NG2 2BB", "02","R9","100","",""),
        ("01", "456", "B", "19900101", "M", "Doe", "Mr", "Doe", "S000", "J", "Janey", 3, "Jane", "N", "20250316", "NG2", "NG2 2BB", "02","R9","100","",""),
        ("02", "456", "B", "19900101", "M", "Doe", "Mr", "kkaree", "S000", "J", "Janey", 4, "Jane", "N", "20250316", "NG2", "NG2 2BB", "02","R9","100","",""),
        ("03", "456", "B", "19900102", "M", "Anee", "Mr", "Karrena", "S000", "J", "Janey", 5, "Jane", "Y", "20250316", "NG2", "NG2 2BB", "02","R8","100","",""),
        ("04", "456", "B", "19900101", "M", "Anee", "Mr", "kare", "S000", "J", "Janey", 10, "Jane", "Y", "20250316", "NG2", "NG2 2BB", "02","R9","100","",""),
        ("06", "456", "B", "19900102", "M", "Anee", "Mr", "kare", "S000", "J", "Janey", 5, "Jane", "Y", "20250316", "NG2", "NG2 2BB", "02","R8","100","",""),
        
        ("05", "457", "B", "19900101", "M", "Anees", "Mr", "karee", "S000", "J", "Janey", 5, "Jane", "Y", "20250316", "NG2", "NG2 2BB", "02","R9","100","",""),
        ("91", "457", "A30", "19900101", "M", "Doe", "Mr", "Doe", "S000", "J", "Janey", 6, "Jane", "N", "20250316", "NG2", "NG11 AA", "AA","R9","100","","")
    ]
    # Create DataFrame
    p20_df = spark.createDataFrame(data, schema)
        # Create sample data
    data = [
        ("321", "654", "C", "19920202", "M", "Michael", "Dr", "Brown", "B000", "M", "Mike", "Johnson", "Michael", "Y", "20250316", "NG3", "NG3 3CC", "03","R6","100","",""),
        ("987", "210", "D", "19780825", "F", "Emily", "Mrs", "Davis", "D000", "E", "Em", "Brown", "Emily", "N", "20250316", "NG4", "NG4 4DD", "05","R6","100","","")
    ]

    # Create DataFrame
    a01_cleanse_df = spark.createDataFrame(data, schema)

        # Create sample data
    data = [
        ("456", "456", "E", "19900101", "M", "Anee", "Mr", "kare", "W000", "D", "Dave", "Taylor", "David", "Y", "20250316", "NG5", "NG5 5EE", "05","R02","100","",""),
        ("654", "456", "F", "19801212", "F", "Sophia", "Ms", "Martinez", "M000", "S", "Sophie", "Wilson", "Sophia", "N", "20250316", "NG6", "NG6 6FF", "06","R06","100","","")
    ]

    # Create DataFrame
    a08_cleanse_df = spark.createDataFrame(data, schema)

    # Create sample data
    data = [
        ("789", "123", "G", "19980404", "M", "James", "Mr", "Anderson", "A000", "J", "Jimmy", "Clark", "James", "Y", "20250316", "NG7", "NG7 7GG", "07","R06","100","",""),
        ("321", "654", "H", "19831111", "F", "Olivia", "Mrs", "Garcia", "G000", "O", "Liv", "Martinez", "Olivia", "N", "20250316", "NG8", "NG8 8HH", "08","R07","100","","")
    ]

    # Create DataFrame
    a20_cleanse_df = spark.createDataFrame(data, schema)

    a01_cleanse_df = a01_cleanse_df.select(*cleanse_columns)
    a08_cleanse_df = a08_cleanse_df.select(*cleanse_columns)
    a20_cleanse_df = a20_cleanse_df.select(*cleanse_columns)
    #a44_cleanse_df = a44_cleanse_df.select(*cleanse_columns)
    
    all_supps_df = a01_cleanse_df.union(a08_cleanse_df).union(a20_cleanse_df)#.union(a44_cleanse_df)

    
    
    # Define schema
    schema1 = StructType([
        StructField("key", StringType(), True)
    ])

    # Create sample data
    data1 = [
        ("NG11AAAA",)
    ]

    # Create DataFrame
    dps50_df = spark.createDataFrame(data1, schema1)


    
    # sectors_df, drop_report_df,p25_output = run(p20_df, dps50_df, all_supps_df)
    p25_output = run(p20_df, dps50_df, all_supps_df)
    # drop_report_df.show()
    # sectors_df.show()
    p25_output.show()

    spark.stop()


if __name__ == "__main__":
    args = parse_arguments()
    spark = create_spark_session()
    main( args,spark)

 

File "C:\Users\C20061E\AppData\Local\spark\python\lib\pyspark.zip\pyspark\sql\dataframe.py", line 947, in show
  File "C:\Users\C20061E\AppData\Local\spark\python\lib\pyspark.zip\pyspark\sql\dataframe.py", line 965, in _show_string
  File "C:\Users\C20061E\AppData\Local\spark\python\lib\py4j-0.10.9.7-src.zip\py4j\java_gateway.py", line 1322, in __call__
  File "C:\Users\C20061E\AppData\Local\spark\python\lib\pyspark.zip\pyspark\errors\exceptions\captured.py", line 179, in deco
  File "C:\Users\C20061E\AppData\Local\spark\python\lib\py4j-0.10.9.7-src.zip\py4j\protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o950.showString.
: java.lang.OutOfMemoryError: Java heap space

1 REPLY 1

avatar
Community Manager

Welcome to the community @KameshMaruvada. While you wait for a more knowledgeable member to respond, I thought I would drop a documentation link here in case it helps get you any closer to a resolution. 

Spark jobs failing with memory issues

 


Cy Jervis, Manager, Community Program
Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.