- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Spark Java OOM error
- Labels:
-
Apache Spark
Created ‎04-02-2025 06:40 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
import argparse
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
import logging
import json
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType,StructType, StructField, DateType
import pdb
from pyspark import StorageLevel
from group_dedupe.fuzzy_matching import register_udf
from group_dedupe.reason_code import household_level_deduplication_3,household_level_deduplication_4,household_level_deduplication_6,household_level_deduplication_7,household_level_deduplication_1,household_level_deduplication_9,household_level_deduplication_2
import time
import logging
from pyspark.sql import SparkSession
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s',
handlers=[logging.FileHandler(r"C:\Users\C20061E\OneDrive - EXPERIAN SERVICES CORP\Desktop\All\Experian\tasks\f35\dev\group_dedupe\src\emr\f35\group_dedupe\logfile.log")]
)
logger = logging.getLogger(__name__)
logger.info("Starting GroupDedupe task")
PERSON_KEY = "cb_key_db_person"
HOUSEHOLD_KEY = "cb_key_household"
CB_SOURCE_CD = "cb_source_cd"
DATE_OF_BIRTH = "date_of_birth"
CB_NAME_GENDER = "cb_name_gender"
CB_NAME_FORENAME = "cb_name_forename"
CB_NAME_TITLE = "cb_name_title"
CB_NAME_SURNAME = "cb_name_surname"
CB_NAME_SURNAME_SOUNDEX = "cb_name_surname_soundex"
CB_NAME_FIRST_INITIAL = "cb_name_first_initial"
CB_NAME_OTHER_NAMES = "cb_name_other_names"
SURNAME_CONTRIBUTORS= "surname_contributors"
FORENAME_CONTRIBUTORS = "forename_contributors"
P_PROSPECTABLE_FLAG = "p_prospectable_flag"
CB_DATA_DATE = "cb_data_date"
CB_ADDRESS_POSTCODE_SECTOR = "cb_address_postcode_sector"
POSTCODE_8BYTE_STANDARD = "postcode_8byte_standard"
CB_ADDRESS_DPS = "cb_address_dps"
CB_ADDRESS_STATUS_QAS = "cb_address_status_qas"
IS_SUPPRESSION = "is_suppression"
CB_KEY_FAMILY = "cb_key_family"
CB_ADDRESS_SUBBUILDINGNO = "cb_address_subbuildingno"
CB_ADDRESS_BUILDINGNAME = "cb_address_buildingname"
CB_ADDRESS_LINE_1 = "cb_address_line_1"
translation_table = str.maketrans('0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ', '9999999999AAAAAAAAAAAAAAAAAAAAAAAAAA')
def create_spark_session(app_name="P25_Group_Cleaning"):
"""
Create a Spark session.
Parameters:
app_name (str): The name of the Spark application.
Returns:
SparkSession: A Spark session object.
"""
return SparkSession.builder \
.appName(app_name) \
.config("spark.executor.memory", "32g") \
.config("spark.driver.memory", "16g") \
.config("spark.memory.offHeap.enabled", "true") \
.config("spark.memory.offHeap.size", "2g") \
.config("spark.sql.shuffle.partitions", "100") \
.config("spark.default.parallelism", "100") \
.config("spark.driver.maxResultSize", "2g") \
.config("spark.memory.fraction", "0.6") \
.config("spark.memory.storageFraction", "0.5") \
.config("spark.executor.cores", "2") \
.config("spark.sql.autoBroadcastJoinThreshold", "10m") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.getOrCreate()
def read_input_from_s3(spark, input_s3_bucket, input_s3_key):
"""
Read input data from an S3 bucket.
Parameters:
spark (SparkSession): The Spark session object.
input_s3_bucket (str): The name of the input S3 bucket.
input_s3_key (str): The key of the input file in the S3 bucket.
Returns:
DataFrame: A Spark DataFrame containing the input data.
"""
# Read input file from S3
input_path = rf"s3://{input_s3_bucket}/{input_s3_key}"
df = spark.read.csv(input_path, sep="|", header=True, inferSchema=False)
return df
def export_data_to_s3(df, output_s3_bucket, output_s3_key):
"""
Export data to an S3 bucket.
Parameters:
df (DataFrame): The Spark DataFrame to be exported.
output_s3_bucket (str): The name of the output S3 bucket.
output_s3_key (str): The key of the output file in the S3 bucket.
"""
# Coalesce the DataFrame to a single partition
output_path = rf"s3://{output_s3_bucket}/{output_s3_key}"
df_coalesced = df.coalesce(1)
# Save the coalesced DataFrame as a single CSV file
df_coalesced.write.csv(output_path, header=True, sep="|", mode='overwrite', emptyValue = '')
def parse_arguments():
"""
Parse command-line arguments.
Returns:
Namespace: The parsed command-line arguments.
"""
parser = argparse.ArgumentParser(description= "Process input files and remove duplicates")
parser.add_argument('--task_id', type=str, required=True, help= "Name of the task")
parser.add_argument('--output_bucket', type=str, required=True, help= "Output file S3 Bucket")
parser.add_argument('--p25_output_key', type=str, required=True, help= "Path to output p25 file")
parser.add_argument('--dropRecsList_output_key', type=str, required=True, help= "Path to output dropRecsList file")
parser.add_argument('--sector_output_key', type=str, required=True, help= "Path to output sector file")
return parser.parse_args()
def fuzzy_matched(df,supression_call = False):
if supression_call:
find_duplicates_udf_forenames = register_udf(spark, forename=True, compare_with_surname=False, surname_hyphen=False, supression_call = True)
df_grouped = df.groupBy(HOUSEHOLD_KEY).agg(F.collect_list(F.struct(PERSON_KEY, CB_NAME_FORENAME,DATE_OF_BIRTH,CB_NAME_SURNAME,IS_SUPPRESSION)).alias("forenames"))
else :
find_duplicates_udf_forenames = register_udf(spark, forename=True, compare_with_surname=False, surname_hyphen=False, supression_call = False )
df_grouped = df.groupBy(HOUSEHOLD_KEY).agg(F.collect_list(F.struct(PERSON_KEY, CB_NAME_FORENAME,CB_NAME_SURNAME,DATE_OF_BIRTH,CB_NAME_GENDER)).alias("forenames"))
df_result_forenames = df_grouped.withColumn("duplicates", F.explode(find_duplicates_udf_forenames(F.col("forenames"))))
df_final_forenames = df_result_forenames.select(
F.col(HOUSEHOLD_KEY),
F.col("duplicates.person_id").alias("cb_key_db_person"),
F.col("duplicates.group_id").alias("group_id")
).distinct()
if supression_call:
find_duplicates_udf_surnames = register_udf(spark, forename=False, compare_with_surname=False, surname_hyphen=False,supression_call = True)
df_grouped_surnames = df.groupBy(HOUSEHOLD_KEY).agg(F.collect_list(F.struct(PERSON_KEY,CB_NAME_SURNAME,CB_NAME_FORENAME,DATE_OF_BIRTH,IS_SUPPRESSION )).alias("surnames"))
else:
find_duplicates_udf_surnames = register_udf(spark, forename=False, compare_with_surname=False, surname_hyphen=False,supression_call = False)
df_grouped_surnames = df.groupBy(HOUSEHOLD_KEY).agg(F.collect_list(F.struct(PERSON_KEY,CB_NAME_SURNAME,CB_NAME_FORENAME,DATE_OF_BIRTH,CB_NAME_GENDER )).alias("surnames"))
df_result_surnames = df_grouped_surnames.withColumn("duplicates", F.explode(find_duplicates_udf_surnames(F.col("surnames"))))
df_final_surnames = df_result_surnames.select(
F.col(HOUSEHOLD_KEY),
F.col("duplicates.person_id").alias("cb_key_db_person"),
F.col("duplicates.group_id").alias("group_id")
).distinct() # Ensure unique rows
return df_final_forenames, df_final_surnames
def maiden_married_test(df_all, match_level='Household'):
"""
Identifies and handles cases where a woman's surname might have changed due to marriage.
Args:
df_all (DataFrame): Input DataFrame containing name and household information.
Returns:
DataFrame: DataFrame with false matches marked for dropping and reason for maiden-married name change.
"""
columns_to_concat = df_all.columns
df_all = df_all.withColumn(CB_NAME_TITLE, F.initcap(F.col(CB_NAME_TITLE))) \
.withColumn(CB_NAME_FORENAME, F.initcap(F.col(CB_NAME_FORENAME))) \
.withColumn(CB_NAME_OTHER_NAMES, F.initcap(F.col(CB_NAME_OTHER_NAMES))) \
.withColumn(CB_NAME_SURNAME, F.initcap(F.col(CB_NAME_SURNAME))) \
.withColumn(CB_NAME_GENDER, F.lower(F.col(CB_NAME_GENDER)))
df_male = df_all.filter(F.col(CB_NAME_GENDER) == "m").groupby(HOUSEHOLD_KEY).agg(F.collect_set(CB_NAME_SURNAME).alias("male_surnames"))
df_female = df_all.filter(F.col(CB_NAME_GENDER) != "m").join(df_male, HOUSEHOLD_KEY, "left")
df_female = df_female.withColumn("surname_match", F.array_contains(F.col("male_surnames"), F.col(CB_NAME_SURNAME)))
true_matches = df_female.filter(F.col("surname_match") == True)
df_female_false = df_female.filter(F.col("surname_match") == False)
true_matches = true_matches.select(
HOUSEHOLD_KEY, CB_NAME_FORENAME, DATE_OF_BIRTH,
F.col(PERSON_KEY).alias("cb_key_db_person_true"),
*[F.col(col).alias(f"{col}_true") for col in true_matches.columns if col not in [HOUSEHOLD_KEY, CB_NAME_FORENAME, DATE_OF_BIRTH, PERSON_KEY]]
)
df_female_false_matching = df_female_false.join(
true_matches,
(df_female_false[HOUSEHOLD_KEY] == true_matches[HOUSEHOLD_KEY]) &
(df_female_false[CB_NAME_FORENAME] == true_matches[CB_NAME_FORENAME]) &
(df_female_false[DATE_OF_BIRTH] == true_matches[DATE_OF_BIRTH]),
"inner"
)
df_female_false_matching = df_female_false_matching.select(
df_female_false[P_PROSPECTABLE_FLAG].alias("p_code"),
df_female_false[PERSON_KEY].alias("drop_key"),
true_matches["cb_key_db_person_true"].alias("keep_key"),
F.lit("105").alias("drop_code"),
F.concat_ws("~", *[df_female_false[col] for col in columns_to_concat]).alias("drop_rec"),
F.concat_ws("~", *[true_matches[f"{col}_true"] for col in columns_to_concat if col not in [HOUSEHOLD_KEY, CB_NAME_FORENAME, DATE_OF_BIRTH]]).alias("keep_rec"),
F.lit("maiden married").alias("description"),
df_female_false[CB_SOURCE_CD].alias("cb_source_cd"),
true_matches["cb_source_cd_true"].alias("match_in"),
F.lit("duplicate of kept").alias("match_type"),
F.lit(match_level).alias("match_level"),
F.lit("").alias("match_indirect")
)
return df_female_false_matching
def is_an_NC_winner(df: DataFrame, group_column: str, group_id: str) :
df.repartition(HOUSEHOLD_KEY)
print(f"Time before agg_df: {time.time() - start_time} seconds")
agg_df = df.groupBy(group_column, group_id).agg(
F.sum(F.when(F.col("CB_SOURCE_CD") == 'A31', 1).otherwise(0)).alias("a31_count"),
F.sum(F.when(F.col("P_PROSPECTABLE_FLAG") == 'Y', 1).otherwise(0)).alias("prospectable_count")
)
print(f"Time after agg_df: {time.time() - start_time} seconds")
print(f"Time before df: {time.time() - start_time} seconds")
df = df.join(F.broadcast(agg_df), on=[group_column, group_id], how="left")
print(f"Time after df: {time.time() - start_time} seconds")
print(f"Time before df withcolumn: {time.time() - start_time} seconds")
# Add subreason column based on the counts
df = df.withColumn(
"subreason",
F.when((F.col("a31_count") > 0) , 'A31 record kept')
.when((F.col("a31_count") == 0) & (F.col("prospectable_count") > 0) , 'prospectable record kept')
.when((F.col("a31_count") == 0) & (F.col("prospectable_count") == 0), 'no A31 or prospectable record, all kept')
)
print(f"Time after df withcolumn: {time.time() - start_time} seconds")
print(f"Time before kept_df withcolumn: {time.time() - start_time} seconds")
# Filter records based on the counts and subreason
kept_df = df.filter(
(F.col("a31_count") > 0) & (F.col(CB_SOURCE_CD) == 'A31') |
(F.col("a31_count") == 0) & (F.col("prospectable_count") > 0) & (F.col(P_PROSPECTABLE_FLAG) == 'Y') |
(F.col("a31_count") == 0) & (F.col("prospectable_count") == 0)
)
print(f"Time after kept_df withcolumn: {time.time() - start_time} seconds")
print(f"Time before dropped_df withcolumn: {time.time() - start_time} seconds")
# Identify dropped records
dropped_df = df.join(F.broadcast(kept_df), on=[HOUSEHOLD_KEY, group_id], how="left_anti")
# Identify dropped records
# dropped_df = df.subtract(kept_df)
print(f"Time after dropped_df withcolumn: {time.time() - start_time} seconds")
print(f"Time before first_person_key_df withcolumn: {time.time() - start_time} seconds")
# Add keep_key column to dropped_df
first_person_key_df = kept_df.groupBy(group_column, group_id).agg(F.first(PERSON_KEY).alias("first_person_key"))
print(f"Time after first_person_key_df withcolumn: {time.time() - start_time} seconds")
print(f"Time before dropped_df : {time.time() - start_time} seconds")
dropped_df = dropped_df.join(
F.broadcast(first_person_key_df).select(group_column, group_id, "first_person_key"),
on=[group_column, group_id],
how="left"
).withColumnRenamed("first_person_key", "keep_key")
dropped_df.repartition(HOUSEHOLD_KEY)
print(f"Time after dropped_df : {time.time() - start_time} seconds")
partition_sizes = dropped_df.rdd.getNumPartitions()
total_size = partition_sizes
print(f"Total DataFrame size: {total_size} bytes")
print(f"Time befire kept_dfa31 : {time.time() - start_time} seconds")
# Drop the temporary count columns
kept_df = kept_df.drop("a31_count", "prospectable_count")
dropped_df = dropped_df.drop("a31_count", "prospectable_count")
print(f"Time after kept_dfa31 : {time.time() - start_time} seconds")
return kept_df, dropped_df
def is_an_NC_winner_7(df: DataFrame, group_column: str, group_id: str) :
"""
Identify and filter records based on specific conditions and assign subreasons.
This function partitions the DataFrame by the specified group column and group ID, counts the number of records
with specific source codes and prospectable flags, assigns subreasons based on these counts, and filters the records
accordingly.
Args:
df (DataFrame): The input DataFrame.
group_column (str): The column name to group by.
group_id (str): The group ID column name.
Returns:
(DataFrame, DataFrame): A tuple containing the kept DataFrame and the dropped DataFrame.
"""
agg_df = df.groupBy(group_column, group_id).agg(
F.sum(F.when(F.col("CB_SOURCE_CD") == 'A31', 1).otherwise(0)).alias("a31_count"),
F.sum(F.when(F.col("P_PROSPECTABLE_FLAG") == 'Y', 1).otherwise(0)).alias("prospectable_count")
)
# Join the aggregated counts back to the original DataFrame
df = df.join(agg_df, on=[group_column, group_id], how="left")
df = df.withColumn(
"subreason",
F.when((F.col("a31_count") > 0) , 'A31 record kept')
.when((F.col("a31_count") == 0) & (F.col("prospectable_count") > 0) , 'prospectable record kept')
.when((F.col("a31_count") == 0) & (F.col("prospectable_count") == 0), 'no A31 or prospectable record, all kept')
)
# Filter records based on the counts and subreason
kept_df = df.filter(
(F.col("a31_count") > 0) & (F.col(CB_SOURCE_CD) == 'A31') |
(F.col("a31_count") == 0) & (F.col("prospectable_count") > 0) & (F.col(P_PROSPECTABLE_FLAG) == 'Y') |
(F.col("a31_count") == 0) & (F.col("prospectable_count") == 0)
)
# Identify dropped records
dropped_df = df.join(kept_df, on=[HOUSEHOLD_KEY, group_id], how="left_anti")
first_person_key_df = kept_df.groupBy(group_column, group_id).agg(F.first(PERSON_KEY).alias("first_person_key"))
dropped_df = dropped_df.join(
first_person_key_df.select(group_column, group_id, "first_person_key").distinct(),
on=[group_column, group_id],
how="left"
).withColumnRenamed("first_person_key", "keep_key")
# Drop the temporary count columns
kept_df = kept_df.drop("a31_count", "prospectable_count")
dropped_df = dropped_df.drop("a31_count", "prospectable_count")
return kept_df, dropped_df
def score_records(df: DataFrame,group_column: str, group_id: str) -> DataFrame:
"""
Score records based on specific conditions.
This function assigns a score to each record based on various conditions such as prospectable flag,
address status, name title, other names, gender, and date of birth.
Args:
df (DataFrame): The input DataFrame.
Returns:
DataFrame: The DataFrame with an additional 'score' column.
"""
df = df.withColumn(
"score",
F.when(F.col(P_PROSPECTABLE_FLAG) == 'Y', 15).otherwise(0) +
F.when(F.col(CB_ADDRESS_STATUS_QAS).startswith('R9'), 5).otherwise(0) +
F.when(F.lower(F.col(CB_NAME_TITLE)).isin(['mr', 'mrs', 'ms', 'miss']), 1)
.when(F.lower(F.col(CB_NAME_TITLE)) != '', 2).otherwise(0) +
F.when(F.col(CB_NAME_OTHER_NAMES) != '', 1).otherwise(0) +
F.when((F.col(CB_NAME_GENDER) != '') & (F.col(CB_NAME_GENDER) != 'U'), 1).otherwise(0) +
F.when(F.col(DATE_OF_BIRTH) != '', 1).otherwise(0)
)
agg_df = df.groupBy(group_column, group_id).agg(F.max("score").alias("max_score"))
# Join the aggregated max score back to the original DataFrame
df = df.join(agg_df, on=[group_column, group_id], how="left")
# Filter to keep only the highest score record in each group
kept_df = df.filter(F.col("score") == F.col("max_score")).drop("max_score")
# Identify the final dropped records
dropped_df = df.filter(F.col("score") != F.col("max_score")).drop("max_score")
# # Add row number based on the score within each group
# df = df.withColumn("row_num", F.row_number().over(window_spec_score))
# Add keep_key and score column to dropped_df
first_person_key_df = kept_df.groupBy(group_column, group_id).agg(F.first("PERSON_KEY").alias("first_person_key"))
dropped_df = dropped_df.join(
first_person_key_df,
on=[group_column, group_id],
how="left"
).withColumnRenamed("first_person_key", "keep_key")
first_person_key_df = kept_df.groupBy(group_column, group_id).agg(F.first(PERSON_KEY).alias("first_person_key"))
dropped_df = dropped_df.join(
first_person_key_df,
on=[group_column, group_id],
how="left"
).withColumnRenamed("first_person_key", "keep_key")
# Add subreason column to dropped_df
dropped_df = dropped_df.withColumn(
"subreason",
F.concat(
F.lit("records were scored - scores were : "),
F.col(PERSON_KEY), F.lit(" "), F.col("score"), F.lit(" "),
F.col("keep_key")
)
)
return kept_df, dropped_df
def get_sector_data(df):
"""
Aggregates data by postcode sector and counts prospectable and topup records.
This function groups the input DataFrame by the first six characters of the
'postcode_8byte_standard' column, which represents the postcode sector. It then
counts the number of records where 'p_prospectable_flag' is 'Y' (prospectable)
and 'N' (topup), and returns a DataFrame with these counts.
Parameters:
df (DataFrame): The input DataFrame containing the data to be aggregated.
Returns:
DataFrame: A DataFrame with columns 'sector', 'nc_count' (number of prospectable
records), and 'topup_count' (number of topup records).
"""
# Group by sector and aggregate counts
df = df.groupBy(F.expr(f"SUBSTRING({POSTCODE_8BYTE_STANDARD}, 1, 6)").alias("sector")).agg(
F.sum(F.when(F.col(P_PROSPECTABLE_FLAG) == "Y", 1).otherwise(0)).alias("nc_count"),
F.sum(F.when(F.col(P_PROSPECTABLE_FLAG) == "N", 1).otherwise(0)).alias("topup_count")
)
sectors_df = df.select("sector", "nc_count", "topup_count")
return sectors_df
def filter_a30_records_by_dps(p20_df, dps50_df):
"""
Filters A30 records from the main DataFrame based on DPS criteria and generates a report.
This function creates a key column in the main DataFrame by concatenating the
'postcode_8byte_standard' and 'cb_address_dps' columns. It then joins and filters
the main DataFrame with the `dps50_df` DataFrame to identify A30 records. The
function performs a left anti join to filter out these A30 records from the main
DataFrame and generates a report for the records to be removed.
Parameters:
p20_df (DataFrame): The main DataFrame containing the data to be filtered.
dps50_df (DataFrame): The DataFrame containing DPS criteria for filtering.
Returns:
tuple: A tuple containing two DataFrames:
- filtered_df: The DataFrame after filtering out A30 records.
- dropped50A30recs_report_df: The DataFrame containing the report of dropped A30 records.
"""
columns_to_concat = p20_df.columns
# Create a key column in the main DataFrame
p20_df = p20_df.withColumn("key", F.concat_ws("", F.regexp_replace(F.col(POSTCODE_8BYTE_STANDARD), " ", ""), F.col(CB_ADDRESS_DPS)))
# Join and filter the main DataFrame with dps50_df
a30_recs_df = p20_df.join(dps50_df, on="key", how="inner").filter(F.col(CB_SOURCE_CD) == "A30")
# Create the DataFrame by performing a left anti join
filtered_df = p20_df.join(a30_recs_df, on=["key", CB_SOURCE_CD], how="left_anti")
filtered_df = filtered_df.drop("key")
# Generate the report for the records to be removed
dropped50A30recs_report_df = a30_recs_df.withColumn("description", F.lit("dropped A30s when >50 in DPS")) \
.withColumn("match_type", F.lit(">50 in DPS")) \
.withColumn("drop_key", F.col(PERSON_KEY))\
.withColumn("keep_key", F.lit("")) \
.withColumn("drop_code", F.lit(600)) \
.withColumn("drop_rec", F.concat_ws("~", *[F.col(col) for col in columns_to_concat])) \
.withColumn("keep_rec", F.lit("")) \
.withColumn("match_in", F.lit("")) \
.withColumn("match_level", F.lit("")) \
.withColumn("match_indirect",F.lit("")) \
.select(F.col(P_PROSPECTABLE_FLAG).alias("p_code"), "drop_key", "keep_key","drop_code","drop_rec","keep_rec","description"
,F.col(CB_SOURCE_CD).alias("cb_source_cd"),"match_in","match_type","match_level","match_indirect")
return filtered_df, dropped50A30recs_report_df
def count_surnames_and_filter(df: DataFrame,group_id: str) :
"""
Count surnames and filter records based on the highest total contributors.
This function counts the number of occurrences of each surname within each group, adds the count to the surname contributors,
and filters the records to keep only the highest total contributors record in each group.
Args:
df (DataFrame): The input DataFrame.
Returns:
(DataFrame, DataFrame): A tuple containing the kept DataFrame and the dropped DataFrame.
"""
# Window specification to partition by household key, group ID, and surname
# window_spec = Window.partitionBy(HOUSEHOLD_KEY, group_id, CB_NAME_SURNAME)
# df = df.withColumn("surname_count", F.count(CB_NAME_SURNAME).over(window_spec))
# df.persist(StorageLevel.MEMORY_AND_DISK)
#Aggregate surname counts
surname_counts = df.groupBy(HOUSEHOLD_KEY, group_id, CB_NAME_SURNAME).agg(F.count(CB_NAME_SURNAME).alias("surname_count"))
# Join the aggregated counts back to the original DataFrame
df = df.join(F.broadcast(surname_counts), on=[HOUSEHOLD_KEY, group_id, CB_NAME_SURNAME], how="left")
# Add the surname count to the surname contributors
# df = df.withColumn("total_contributors", F.col("surname_count") + F.col(SURNAME_CONTRIBUTORS))
df = df.select(
"*", # Select all existing columns
(F.col("surname_count") + F.col(SURNAME_CONTRIBUTORS)).alias("total_contributors") # Add the new column
)
# # Window specification to partition by household key and group ID and order by total contributors descending
# window_spec_surname = Window.partitionBy(HOUSEHOLD_KEY, group_id).orderBy(F.col("total_contributors").desc())
# Aggregate to get the highest total contributors in each group
max_contributors = df.groupBy(HOUSEHOLD_KEY, group_id).agg(F.max("total_contributors").alias("max_contributors"))
df = df.join(F.broadcast(max_contributors), on=[HOUSEHOLD_KEY, group_id], how="left")
final_kept_df = df.filter(F.col("total_contributors") == F.col("max_contributors")).drop("max_contributors")
# # Add keep_key column to dropped_df
# first_person_key_window = Window.partitionBy(HOUSEHOLD_KEY, group_id).orderBy(F.col(PERSON_KEY))
# first_person_key_df = final_kept_df.withColumn("first_person_key", F.first(PERSON_KEY).over(first_person_key_window))
# Identify the final dropped records
final_dropped_df = df.filter(F.col("total_contributors") != F.col("max_contributors")).drop("max_contributors")
# Add keep_key column to dropped_df
first_person_key_df = final_kept_df.groupBy(HOUSEHOLD_KEY, group_id).agg(F.first(PERSON_KEY).alias("first_person_key"))
final_dropped_df = final_dropped_df.join(
F.broadcast(first_person_key_df),
on=[HOUSEHOLD_KEY, group_id],
how="left"
).withColumnRenamed("first_person_key", "keep_key")
# Add subreason column to final_dropped_df
final_dropped_df = final_dropped_df.withColumn(
"subreason",
F.concat(F.lit("- surname "), F.col(CB_NAME_SURNAME), F.lit(" dropped"))
)
return final_kept_df, final_dropped_df
def identify_postcode_duplicates(df: DataFrame) -> DataFrame:
"""
Identify duplicate records at the postcode level based on name keys.
This function generates name keys, assigns group IDs using dense rank, adds a reason column,
counts the number of records in each group, and filters groups with more than one record.
Args:
df (DataFrame): The input DataFrame.
Returns:
DataFrame: The DataFrame containing duplicate records.
"""
# Generate keys
df = df.withColumn("namekey", F.concat_ws(":", F.lower(F.col(CB_NAME_FORENAME)), F.lower(F.col(CB_NAME_SURNAME)), F.when(F.col(CB_NAME_GENDER) == "", "u").otherwise(F.lower(F.col(CB_NAME_GENDER)))))
df = df.withColumn("namekey_alt", F.concat_ws(":", F.lower(F.col(CB_NAME_FORENAME)), F.lower(F.col(CB_NAME_SURNAME)), F.lit("u")))
# Define window specification for dense rank
windowSpecDenseRank = Window.partitionBy(POSTCODE_8BYTE_STANDARD).orderBy("namekey_alt")
# Assign group IDs using dense_rank
df = df.withColumn("group_id", F.dense_rank().over(windowSpecDenseRank))
# df.repartition(POSTCODE_8BYTE_STANDARD)
# Add reason column
df = df.withColumn("reason", F.expr("CASE WHEN cb_name_gender = '' THEN 'gender lowered to unknown to make match' ELSE 'gender also matched (or u matched to u)' END"))
# Count the number of records in each group
group_counts = df.groupBy(POSTCODE_8BYTE_STANDARD, "group_id").agg(F.count("*").alias("group_count"))
# Join the original DataFrame with the group counts
df_with_counts = df.join(group_counts, on=[POSTCODE_8BYTE_STANDARD, "group_id"])
# Filter groups with more than one record
duplicates = df_with_counts.filter(F.col("group_count") > 1)
# Drop the namekey, namekey_alt, and group_count columns
duplicates = duplicates.drop("namekey", "namekey_alt", "group_count")
return duplicates
def identify_dropping_records_postcode(df1: DataFrame, df2: DataFrame) -> DataFrame:
"""
Identify records to be dropped based on matching keys.
This function generates name keys for both DataFrames, adds a reason column, and joins the DataFrames
on postcode and namekey or namekey_alt to identify records to be dropped.
Args:
df1 (DataFrame): The first input DataFrame.
df2 (DataFrame): The second input DataFrame.
Returns:
DataFrame: The DataFrame containing records to be dropped.
"""
# Generate keys for df1
df1 = df1.withColumn("namekey", F.concat_ws(":", F.lower(F.col(CB_NAME_FORENAME)), F.lower(F.col(CB_NAME_SURNAME)), F.when(F.col(CB_NAME_GENDER) == "", "u").otherwise(F.lower(F.col(CB_NAME_GENDER)))))
df1 = df1.withColumn("namekey_alt", F.concat_ws(":", F.lower(F.col(CB_NAME_FORENAME)), F.lower(F.col(CB_NAME_SURNAME)), F.lit("u")))
# Add reason column
df1 = df1.withColumn("reason", F.expr("CASE WHEN cb_name_gender = '' THEN 'gender lowered to unknown to make match' ELSE 'gender also matched (or u matched to u)' END"))
# Generate keys for df2
df2 = df2.withColumn("namekey", F.concat_ws(":", F.lower(F.col(CB_NAME_FORENAME)), F.lower(F.col(CB_NAME_SURNAME)), F.when(F.col(CB_NAME_GENDER) == "", "u").otherwise(F.lower(F.col(CB_NAME_GENDER)))))
df2 = df2.withColumn("namekey_alt", F.concat_ws(":", F.lower(F.col(CB_NAME_FORENAME)), F.lower(F.col(CB_NAME_SURNAME)), F.lit("u")))
# Join df1 with df2 on postcode and namekey or namekey_alt
drop = df1.join(df2, (df1[POSTCODE_8BYTE_STANDARD] == df2[POSTCODE_8BYTE_STANDARD]) & (df1["namekey_alt"] == df2["namekey_alt"]), "inner") \
.select(df1["*"])
# Join df1 with df2 on postcode and namekey or namekey_alt
keep = df1.join(drop, (df1[POSTCODE_8BYTE_STANDARD] == drop[POSTCODE_8BYTE_STANDARD]) & (df1["namekey_alt"] == drop["namekey_alt"]), "leftanti")
return keep,drop
def identify_dropping_key_postcode(df1: DataFrame, df2: DataFrame) -> DataFrame:
"""
Identify records to be dropped based on matching keys.
This function generates name keys for both DataFrames, adds a reason column, and joins the DataFrames
on postcode and namekey or namekey_alt to identify records to be dropped.
Args:
df1 (DataFrame): The first input DataFrame.
df2 (DataFrame): The second input DataFrame.
Returns:
DataFrame: The DataFrame containing records to be dropped.
"""
# Join df1 with df2 on postcode and personkey
dropped_df = df1.join(df2, (df1[POSTCODE_8BYTE_STANDARD] == df2[POSTCODE_8BYTE_STANDARD]) & (df1[PERSON_KEY] == df2[PERSON_KEY]), "inner")\
.select(df1["*"])
# Join df1 with df2 on postcode and personkey
keep_df = df1.join(dropped_df, (df1[POSTCODE_8BYTE_STANDARD] == dropped_df[POSTCODE_8BYTE_STANDARD]) & (df1[PERSON_KEY] == dropped_df[PERSON_KEY]), "left_anti")
return keep_df,dropped_df
def filter_and_group_supression(reason_code_supression, union_df,group_id):
union_df_alias = union_df.select(PERSON_KEY, F.col(IS_SUPPRESSION).alias("union_is_suppression"))
# Filter the reason_code_4_supression DataFrame to show only entries where supression_call is 1
reason_code_supression_filtered = reason_code_supression.join(
union_df_alias,
on=PERSON_KEY,
how="inner"
).filter(F.col("union_is_suppression") == 1)
window_spec = Window.partitionBy(HOUSEHOLD_KEY, group_id).orderBy(PERSON_KEY)
# Use window function to get the first PERSON_KEY as keep_key
reason_code_supression_filtered = reason_code_supression_filtered.withColumn(
"keep_key", F.first(PERSON_KEY).over(window_spec)
).dropDuplicates([HOUSEHOLD_KEY, group_id])
# Join back to reason_code_supression
reason_code_to_be_drop = reason_code_supression.join(
(reason_code_supression_filtered.select(HOUSEHOLD_KEY, group_id, "keep_key")),
on=[HOUSEHOLD_KEY, group_id],
how="inner"
)
reason_code_to_be_drop = reason_code_to_be_drop.join(
reason_code_supression_filtered.select(PERSON_KEY),
on=PERSON_KEY,
how="leftanti"
)
return reason_code_to_be_drop
def add_mrdps_column(df):
"""
Adds an 'mrdps' column to the given DataFrame based on the logic of grouping by household,
filtering households with more than 9 family keys, and assigning sequential numbers to family keys.
Parameters:
df (DataFrame): The input DataFrame containing household and family key information.
Returns:
DataFrame: The original DataFrame with an additional 'mrdps' column.
"""
# Group by household and aggregate family keys
grouped_df_mrdps = df.groupBy(HOUSEHOLD_KEY).agg(
F.countDistinct(CB_KEY_FAMILY).alias("family_count"),
F.collect_list(CB_KEY_FAMILY).alias("family_keys")
)
# Filter households with more than 9 family keys
filtered_df_mrdps = grouped_df_mrdps.filter(F.col("family_count") > 9)
# Explode family keys and assign sequential numbers
exploded_df = filtered_df_mrdps.select(
HOUSEHOLD_KEY,
F.explode("family_keys").alias(CB_KEY_FAMILY)
)
window_spec = Window.partitionBy(HOUSEHOLD_KEY).orderBy(CB_KEY_FAMILY)
mrdps_df = exploded_df.withColumn("mrdps", F.row_number().over(window_spec))
# Join with original DataFrame to add the mrdps column
result_df_mrdps = df.join(mrdps_df, on=[HOUSEHOLD_KEY, CB_KEY_FAMILY], how="left")
# Fill null values in mrdps column with '0'
result_df_mrdps = result_df_mrdps.fillna({'mrdps': '0'})
return result_df_mrdps
def check_for_flat(buildingname, addr1):
if buildingname == '':
return ''
upperaddr1 = addr1.strip().upper()
posname = upperaddr1.find(buildingname.upper())
if posname > 0:
lostbit = upperaddr1[:posname].strip()
if lostbit == '':
return ''
if ' ' in lostbit:
return ''
mask = str.translate(lostbit, translation_table)
if mask[0] != '9':
return ''
if mask not in ['9999', '9999A']:
return ''
return upperaddr1[:posname].strip()
else:
return ''
def drop_report(df1: DataFrame, df2: DataFrame, match_type :str, match_level :str) -> DataFrame:
"""
Join two DataFrames on the specified column and return the joined DataFrame.
Parameters:
df1 (DataFrame): The first DataFrame.
df2 (DataFrame): The second DataFrame.
join_column (str): The column name to join on.
Returns:
DataFrame: The joined DataFrame.
"""
columns_to_concat = df1.columns
# Perform the join operation for dropkey
joined_df = df1.join(df2, df1[PERSON_KEY] == df2[PERSON_KEY], "inner")
joined_df_keep = df1.join(df2.select("keep_key"), df1[PERSON_KEY] == df2["keep_key"], "inner")
# Concatenate specified columns from df1 into a new column
concatenated_df = joined_df.withColumn("drop_rec", F.concat_ws("~", *[df1[col] for col in columns_to_concat]))
concatenated_df_keep = joined_df_keep.withColumn("keep_rec", F.concat_ws("~", *[df1[col] for col in columns_to_concat]))
# Select the key column, the new concatenated column, and specific columns from df2
result_df = concatenated_df.select(
df1[PERSON_KEY].alias ("drop_key"),
df1[P_PROSPECTABLE_FLAG],
df1[CB_SOURCE_CD],
"drop_rec",
df2["reasoncode"],
df2["reason"],
df2["subreason"],
df2["keep_key"]
).join( concatenated_df_keep.select(
df1[PERSON_KEY],
df1[CB_SOURCE_CD].alias("match_in"),
"keep_rec"
),
df2["keep_key"] == concatenated_df_keep[PERSON_KEY],
"left"
)
# Modify the result_df while returning the data
final_df = result_df.select(
F.col(P_PROSPECTABLE_FLAG).alias("p_code"),
"drop_key",
"keep_key",
F.col("reasoncode").alias("drop_code"),
"drop_rec",
"keep_rec",
F.concat_ws("|", F.col("reason"), F.col("subreason")).alias("description"),
F.col(CB_SOURCE_CD).alias("cb_source_cd"),
"match_in",
F.lit(match_type).alias("match_type"),
F.lit(match_level).alias("match_level"),
F.lit("").alias("match_indirect")
)
return final_df
start_time = time.time()
def run(p20_df, dps50_df, all_supps_df):
sectors_df = get_sector_data(p20_df)
#filtered df will have data
print(f"Time after before dropped50A30recs_report_df: {time.time() - start_time} seconds")
filtered_df, dropped50A30recs_report_df = filter_a30_records_by_dps(p20_df, dps50_df)
print(f"Time after before dropped50A30recs_report_df: {time.time() - start_time} seconds")
drop_record_report= dropped50A30recs_report_df
# filtered_df.persist(StorageLevel.MEMORY_AND_DISK)
# #maiden married test or reason code 5
print(f"Time after before df_female_false_matching: {time.time() - start_time} seconds")
df_female_false_matching = maiden_married_test(filtered_df)
print(f"Time after before df_female_false_matching: {time.time() - start_time} seconds")
# #filtering duplicate records from p20_df
print(f"Time after before df_female_false_matching_filter: {time.time() - start_time} seconds")
# # Filter p20_df where PERSON_KEY does not match with df_female_false_matching
filtered_df = filtered_df.join(df_female_false_matching, filtered_df[PERSON_KEY] == df_female_false_matching['drop_key'], how='left_anti')
print(f"Time after before df_female_false_matching_filter: {time.time() - start_time} seconds")
print(f"Time after before fuzzy: {time.time() - start_time} seconds")
duplicate_forename_df, duplicate_surname_df = fuzzy_matched(filtered_df)
print(f"Time after after fuzzy: {time.time() - start_time} seconds")
print(f"Time after before duplicate_forename_df join: {time.time() - start_time} seconds")
duplicate_forename_df = duplicate_forename_df.join(filtered_df.select(PERSON_KEY, DATE_OF_BIRTH,CB_NAME_SURNAME,CB_SOURCE_CD,P_PROSPECTABLE_FLAG,SURNAME_CONTRIBUTORS,CB_NAME_TITLE,CB_NAME_OTHER_NAMES,CB_NAME_GENDER,CB_ADDRESS_STATUS_QAS), on= PERSON_KEY, how="left")
print(f"Time after after duplicate_forename_df join: {time.time() - start_time} seconds")
print(f"Time after before duplicate_surname_df join: {time.time() - start_time} seconds")
duplicate_surname_df = duplicate_surname_df.join(filtered_df.select(PERSON_KEY, DATE_OF_BIRTH), on=PERSON_KEY, how="left")
print(f"Time after after duplicate_surname_df join: {time.time() - start_time} seconds")
cleanse_columns = [
CB_SOURCE_CD,
PERSON_KEY,
HOUSEHOLD_KEY,
POSTCODE_8BYTE_STANDARD,
CB_ADDRESS_DPS,
CB_NAME_TITLE,
CB_NAME_FORENAME,
CB_NAME_OTHER_NAMES,
CB_NAME_SURNAME,
CB_NAME_SURNAME_SOUNDEX,
CB_NAME_GENDER,
DATE_OF_BIRTH,
CB_NAME_FIRST_INITIAL
]
#Household level deduplication
#Start reason_code
#reason code 4
# Call the function
print(f"Time before duplicate_surname_df cache: {time.time() - start_time} seconds")
duplicate_forename_df.cache()##cache or materialize
duplicate_forename_df.count()
duplicate_surname_df.cache()
duplicate_surname_df.count()
print(f"Time after duplicate_surname_df cache: {time.time() - start_time} seconds")
# duplicate_forename_df.repartition(10)
# duplicate_surname_df.repartition(10)
# print(f"Time after before reasoson code 4 join: {time.time() - start_time} seconds")
reason_code_4 = household_level_deduplication_4(duplicate_forename_df, duplicate_surname_df)
# print(f"Time after after reasoson code 4 join: {time.time() - start_time} seconds")
# reason_code_4.repartition(HOUSEHOLD_KEY)
print(f"Time after before is_an_NC_winner: {time.time() - start_time} seconds")
# # # Apply the function to get kept and dropped records
kept_df_4_is_an_winner, dropped_df_4_is_an_winner = is_an_NC_winner(reason_code_4,HOUSEHOLD_KEY,"group_id_for")
print(f"Time after after is_an_NC_winner: {time.time() - start_time} seconds")
# # drop_report_4_is_an_winner = drop_report(filtered_df,dropped_df_4_is_an_winner,"duplicate of kept","Household")
# # drop_record_report = dropped50A30recs_report_df.union(drop_report_4_is_an_winner)
# # # # # Filter for is_an_winner ranking
print(f"Time Before dropped_df_4_is_an_winner: {time.time() - start_time} seconds")
filtered_df = filtered_df.join(F.broadcast(dropped_df_4_is_an_winner), filtered_df[PERSON_KEY] == dropped_df_4_is_an_winner[PERSON_KEY], how='left_anti')
print(f"Time after dropped_df_4_is_an_winner: {time.time() - start_time} seconds")
# # # partition_sizes = filtered_df.rdd.glom().map(len).collect()
# # # total_size = sum(partition_sizes)
# # # print(f"Total DataFrame size: {total_size} bytes")
# # # # # #running for surname winner
# print(f"Time Before dropped_df_4_count_surname: {time.time() - start_time} seconds")
# kept_df_4_count_surname, dropped_df_4_count_surname =count_surnames_and_filter(kept_df_4_is_an_winner,"group_id_for")
# print(f"Time After dropped_df_4_count_surname: {time.time() - start_time} seconds")
# # # # # # Filter for surname winner ranking
# print(f"Time Before filter: {time.time() - start_time} seconds")
# filtered_df = filtered_df.join(F.broadcast(dropped_df_4_count_surname), filtered_df[PERSON_KEY] == dropped_df_4_count_surname[PERSON_KEY], how='left_anti')
# print(f"Time Before after: {time.time() - start_time} seconds")
# # #END reason_code 4
# # #dropping records from duplcation groups
# print(f"Time Before duplicate_forename_df second: {time.time() - start_time} seconds")
duplicate_forename_df = duplicate_forename_df.join(F.broadcast(dropped_df_4_is_an_winner), duplicate_forename_df[PERSON_KEY] == dropped_df_4_is_an_winner[PERSON_KEY], how='left_anti')
# duplicate_forename_df = duplicate_forename_df.join(F.broadcast(dropped_df_4_count_surname), duplicate_forename_df[PERSON_KEY] == dropped_df_4_count_surname[PERSON_KEY], how='left_anti')
# duplicate_surname_df = duplicate_surname_df.join(F.broadcast(dropped_df_4_is_an_winner), duplicate_surname_df[PERSON_KEY] == dropped_df_4_is_an_winner[PERSON_KEY], how='left_anti')
# duplicate_surname_df = duplicate_surname_df.join(F.broadcast(dropped_df_4_count_surname), duplicate_surname_df[PERSON_KEY] == dropped_df_4_count_surname[PERSON_KEY], how='left_anti')
# print(f"Time after duplicate_forename_df second: {time.time() - start_time} seconds")
# # Filter for is_an_winner ranking
#START reason_code 6
# Check the number of partitions
# num_partitions = filtered_df.rdd.getNumPartitions()
# print(f"Number of partitions: {num_partitions}")
print(f"Time Before reasoncode6 second: {time.time() - start_time} seconds")
# duplicate_forename_df.repartition(HOUSEHOLD_KEY)
reason_code_6 = household_level_deduplication_6(duplicate_forename_df)
print(f"Time after reasoncode second: {time.time() - start_time} seconds")
print(f"Time before caching: {time.time() - start_time} seconds")
reason_code_6.cache()
print(f"Time after caching: {time.time() - start_time} seconds")
reason_code_6.count()
print(f"Time Before is_an_winner second: {time.time() - start_time} seconds")
# Apply the function to get kept and dropped records
kept_df_6_is_an_winner, dropped_df_6_is_an_winner = is_an_NC_winner(reason_code_6,HOUSEHOLD_KEY,"group_id")
print(f"Time after is_an_winner second: {time.time() - start_time} seconds")
print(f"Time Before filter 1: {time.time() - start_time} seconds")
# # Filter for is_an_winner ranking
filtered_df = filtered_df.join(F.broadcast(dropped_df_6_is_an_winner), filtered_df[PERSON_KEY] == dropped_df_6_is_an_winner[PERSON_KEY], how='left_anti')
print(f"Time Before filter 1: {time.time() - start_time} seconds")
#running for surname winner
kept_df_6_count_surname, dropped_df_6_count_surname =count_surnames_and_filter(kept_df_6_is_an_winner,"group_id")
# Filter for surname winner ranking
filtered_df = filtered_df.join(F.broadcast((dropped_df_6_count_surname)), filtered_df[PERSON_KEY] == dropped_df_6_count_surname[PERSON_KEY], how='left_anti')
# #dropping records from duplcation groups
duplicate_forename_df = duplicate_forename_df.join(F.broadcast(dropped_df_6_is_an_winner), duplicate_forename_df[PERSON_KEY] == dropped_df_6_is_an_winner[PERSON_KEY], how='left_anti')
duplicate_forename_df = duplicate_forename_df.join(F.broadcast(dropped_df_6_count_surname), duplicate_forename_df[PERSON_KEY] == dropped_df_6_count_surname[PERSON_KEY], how='left_anti')
duplicate_surname_df = duplicate_surname_df.join(F.broadcast(dropped_df_6_is_an_winner), duplicate_surname_df[PERSON_KEY] == dropped_df_6_is_an_winner[PERSON_KEY], how='left_anti')
duplicate_surname_df = duplicate_surname_df.join(F.broadcast(dropped_df_6_count_surname), duplicate_surname_df[PERSON_KEY] == dropped_df_6_count_surname[PERSON_KEY], how='left_anti')
#END reason_code 6
return filtered_df
def main(args: argparse.Namespace,spark):
# Load parameters
# input_bucket = args.input_bucket
output_bucket = args.output_bucket
p25_output_sorted_output_key = args.p25_output_key
sectors_output_key = args.sector_output_key
dropRecsList_output_key = args.dropRecsList_output_key
cleanse_columns = [
CB_SOURCE_CD,
PERSON_KEY,
HOUSEHOLD_KEY,
POSTCODE_8BYTE_STANDARD,
CB_ADDRESS_DPS,
CB_NAME_TITLE,
CB_NAME_FORENAME,
CB_NAME_OTHER_NAMES,
CB_NAME_SURNAME,
CB_NAME_SURNAME_SOUNDEX,
CB_NAME_GENDER,
DATE_OF_BIRTH,
CB_NAME_FIRST_INITIAL
]
# Define schema
schema = StructType([
StructField("cb_key_db_person", StringType(), True),
StructField("cb_key_household", StringType(), True),
StructField("cb_source_cd", StringType(), True),
StructField("date_of_birth", StringType(), True),
StructField("cb_name_gender", StringType(), True),
StructField("cb_name_forename", StringType(), True),
StructField("cb_name_title", StringType(), True),
StructField("cb_name_surname", StringType(), True),
StructField("cb_name_surname_soundex", StringType(), True),
StructField("cb_name_first_initial", StringType(), True),
StructField("cb_name_other_names", StringType(), True),
StructField("surname_contributors", StringType(), True),
StructField("forename_contributors", StringType(), True),
StructField("p_prospectable_flag", StringType(), True),
StructField("cb_data_date", StringType(), True),
StructField("cb_address_postcode_sector", StringType(), True),
StructField("postcode_8byte_standard", StringType(), True),
StructField("cb_address_dps", StringType(), True),
StructField("cb_address_status_qas", StringType(), True),
StructField("cb_key_family", StringType(), True),
StructField("cb_address_buildingname", StringType(), True),
StructField("cb_address_line_1", StringType(), True),
])
data = [
("123", "456", "A", "19900101", "F", "Karrena", "Mr", "Doe-Anee", "D000", "J", "Johnny", 2, "John", "Y", "20250316", "NG1", "NG2 1AA", "01","R9","100","Building A","1234A Building A Street"),
("100", "456", "B", "19900101", "F", "Karrena", "Ms", "", "S000", "J", "Janey", 2, "Jane", "N", "20250316", "NG2", "NG2 2BB", "02","R9","100","",""),
("01", "456", "B", "19900101", "M", "Doe", "Mr", "Doe", "S000", "J", "Janey", 3, "Jane", "N", "20250316", "NG2", "NG2 2BB", "02","R9","100","",""),
("02", "456", "B", "19900101", "M", "Doe", "Mr", "kkaree", "S000", "J", "Janey", 4, "Jane", "N", "20250316", "NG2", "NG2 2BB", "02","R9","100","",""),
("03", "456", "B", "19900102", "M", "Anee", "Mr", "Karrena", "S000", "J", "Janey", 5, "Jane", "Y", "20250316", "NG2", "NG2 2BB", "02","R8","100","",""),
("04", "456", "B", "19900101", "M", "Anee", "Mr", "kare", "S000", "J", "Janey", 10, "Jane", "Y", "20250316", "NG2", "NG2 2BB", "02","R9","100","",""),
("06", "456", "B", "19900102", "M", "Anee", "Mr", "kare", "S000", "J", "Janey", 5, "Jane", "Y", "20250316", "NG2", "NG2 2BB", "02","R8","100","",""),
("05", "457", "B", "19900101", "M", "Anees", "Mr", "karee", "S000", "J", "Janey", 5, "Jane", "Y", "20250316", "NG2", "NG2 2BB", "02","R9","100","",""),
("91", "457", "A30", "19900101", "M", "Doe", "Mr", "Doe", "S000", "J", "Janey", 6, "Jane", "N", "20250316", "NG2", "NG11 AA", "AA","R9","100","","")
]
# Create DataFrame
p20_df = spark.createDataFrame(data, schema)
# Create sample data
data = [
("321", "654", "C", "19920202", "M", "Michael", "Dr", "Brown", "B000", "M", "Mike", "Johnson", "Michael", "Y", "20250316", "NG3", "NG3 3CC", "03","R6","100","",""),
("987", "210", "D", "19780825", "F", "Emily", "Mrs", "Davis", "D000", "E", "Em", "Brown", "Emily", "N", "20250316", "NG4", "NG4 4DD", "05","R6","100","","")
]
# Create DataFrame
a01_cleanse_df = spark.createDataFrame(data, schema)
# Create sample data
data = [
("456", "456", "E", "19900101", "M", "Anee", "Mr", "kare", "W000", "D", "Dave", "Taylor", "David", "Y", "20250316", "NG5", "NG5 5EE", "05","R02","100","",""),
("654", "456", "F", "19801212", "F", "Sophia", "Ms", "Martinez", "M000", "S", "Sophie", "Wilson", "Sophia", "N", "20250316", "NG6", "NG6 6FF", "06","R06","100","","")
]
# Create DataFrame
a08_cleanse_df = spark.createDataFrame(data, schema)
# Create sample data
data = [
("789", "123", "G", "19980404", "M", "James", "Mr", "Anderson", "A000", "J", "Jimmy", "Clark", "James", "Y", "20250316", "NG7", "NG7 7GG", "07","R06","100","",""),
("321", "654", "H", "19831111", "F", "Olivia", "Mrs", "Garcia", "G000", "O", "Liv", "Martinez", "Olivia", "N", "20250316", "NG8", "NG8 8HH", "08","R07","100","","")
]
# Create DataFrame
a20_cleanse_df = spark.createDataFrame(data, schema)
a01_cleanse_df = a01_cleanse_df.select(*cleanse_columns)
a08_cleanse_df = a08_cleanse_df.select(*cleanse_columns)
a20_cleanse_df = a20_cleanse_df.select(*cleanse_columns)
#a44_cleanse_df = a44_cleanse_df.select(*cleanse_columns)
all_supps_df = a01_cleanse_df.union(a08_cleanse_df).union(a20_cleanse_df)#.union(a44_cleanse_df)
# Define schema
schema1 = StructType([
StructField("key", StringType(), True)
])
# Create sample data
data1 = [
("NG11AAAA",)
]
# Create DataFrame
dps50_df = spark.createDataFrame(data1, schema1)
# sectors_df, drop_report_df,p25_output = run(p20_df, dps50_df, all_supps_df)
p25_output = run(p20_df, dps50_df, all_supps_df)
# drop_report_df.show()
# sectors_df.show()
p25_output.show()
spark.stop()
if __name__ == "__main__":
args = parse_arguments()
spark = create_spark_session()
main( args,spark)
File "C:\Users\C20061E\AppData\Local\spark\python\lib\pyspark.zip\pyspark\sql\dataframe.py", line 947, in show
File "C:\Users\C20061E\AppData\Local\spark\python\lib\pyspark.zip\pyspark\sql\dataframe.py", line 965, in _show_string
File "C:\Users\C20061E\AppData\Local\spark\python\lib\py4j-0.10.9.7-src.zip\py4j\java_gateway.py", line 1322, in __call__
File "C:\Users\C20061E\AppData\Local\spark\python\lib\pyspark.zip\pyspark\errors\exceptions\captured.py", line 179, in deco
File "C:\Users\C20061E\AppData\Local\spark\python\lib\py4j-0.10.9.7-src.zip\py4j\protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o950.showString.
: java.lang.OutOfMemoryError: Java heap space
Created on ‎04-02-2025 08:46 AM - edited ‎04-02-2025 09:20 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Welcome to the community @KameshMaruvada. While you wait for a more knowledgeable member to respond, I thought I would drop a documentation link here in case it helps get you any closer to a resolution.
Spark jobs failing with memory issues
Cy Jervis, Manager, Community Program
Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
