Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Validating Two Tables Are Identical Using Spark

Validating Two Tables Are Identical Using Spark

New Contributor

We are migrating a data warehouse from a Teradata environment to Hive. I'm in the process of developing Spark code to perform a full outer join between the source table in Teradata and the target table in Hive once the movement is complete to prove the movement occurred properly:

val sourceDataFrame = <table from Teradata>

val targetDataFrame = <table from Hive>

val joinedDataFrame = sourceDataFrame

.join(targetDataFrame,sourceDataFrame.col("source_unique_id") === targetDataFrame.col("target_unique_id"),"fullouter")

Both source and target tables have identical number of columns: N. The joined table has 2N columns. Is there a slick way in scala to validate that the values of each of the source columns = the values of each of the target columns for the join key WITHOUT using the column labels?

Right now, the only way I know how to do source = target for all columns is

val filteredCount = joinedDataFrame.filter(($"source_field1" === $"target_field1") && ($"source_field2" === $"target_field2") && .... ).count()

if filteredCount = joinedDataFrame.count() then PASS

Obviously this code can't be re-used as is specific for the table it is written for. There must be a way in scala to soft code this validation using column indexes rather than column names, and to have it work on an arbitrary number of columns, but I have been struggling to find it. Can anyone help?

Thanks in advance!

1 REPLY 1

Re: Validating Two Tables Are Identical Using Spark

New Contributor

Try:

//sourceDF = source data set
//joinedDF = source data set joined to target data set on key

val validationColumns = for (i <- 0 to (sourceDF.columns.length-1)) yield (joinedDF.col(joinedDF.columns(i)) <=> joinedDF.col(joinedDF.columns(i+(sourceDF.columns.length))))

val matchedDF = joinedDF.filter(validationColumns.reduce((_&&_)))

// Then...

logger.info("Number of Matched records: "+matchedDF.count())
Don't have an account?
Coming from Hortonworks? Activate your account here