Support Questions

Find answers, ask questions, and share your expertise

Validating Two Tables Are Identical Using Spark

Explorer

We are migrating a data warehouse from a Teradata environment to Hive. I'm in the process of developing Spark code to perform a full outer join between the source table in Teradata and the target table in Hive once the movement is complete to prove the movement occurred properly:

val sourceDataFrame = <table from Teradata>

val targetDataFrame = <table from Hive>

val joinedDataFrame = sourceDataFrame

.join(targetDataFrame,sourceDataFrame.col("source_unique_id") === targetDataFrame.col("target_unique_id"),"fullouter")

Both source and target tables have identical number of columns: N. The joined table has 2N columns. Is there a slick way in scala to validate that the values of each of the source columns = the values of each of the target columns for the join key WITHOUT using the column labels?

Right now, the only way I know how to do source = target for all columns is

val filteredCount = joinedDataFrame.filter(($"source_field1" === $"target_field1") && ($"source_field2" === $"target_field2") && .... ).count()

if filteredCount = joinedDataFrame.count() then PASS

Obviously this code can't be re-used as is specific for the table it is written for. There must be a way in scala to soft code this validation using column indexes rather than column names, and to have it work on an arbitrary number of columns, but I have been struggling to find it. Can anyone help?

Thanks in advance!

1 REPLY 1

Explorer

Try:

//sourceDF = source data set
//joinedDF = source data set joined to target data set on key

val validationColumns = for (i <- 0 to (sourceDF.columns.length-1)) yield (joinedDF.col(joinedDF.columns(i)) <=> joinedDF.col(joinedDF.columns(i+(sourceDF.columns.length))))

val matchedDF = joinedDF.filter(validationColumns.reduce((_&&_)))

// Then...

logger.info("Number of Matched records: "+matchedDF.count())