Support Questions
Find answers, ask questions, and share your expertise

How to handle dataframe whose source table might change

How to handle dataframe whose source table might change

New Contributor

Hi community,

 

We are using Spark 2.3.2 (Scala) on Hadoop 3.1.1 with Oozie to have a workflow run every 15 minutes.

We mostly have external tables stored as ORC. One of the jobs loads an external table to a dataframe and then uses it in different occasions for different analyses, thus we .persist it once we load it.

On a previous Spark and Hadoop version, when we weren't using external tables, Hive managed the locks and it happened that when the abovementioned table would be updated (by a separate process) the flow would break/fail. This was happening once or twice a day, so no big deal at the time. Now it happens that if the table gets updated it can result as empty if we hit the sweet spot of reading it while it is being rewritten. We implemented a count check on the dataframe, so that if its records are < arbitraryBusinessLogicNumber, we wait and then load the table again (still checking the count).

However, it appears that sometimes the empty table sneaks in and messes things up, even after persisting the loaded non-empty dataframe.

I run some tests on the behaviour of .persist and noticed that indeed if the source table changes, then also the dataframe (even if cached and triggered with a .count), gets recomputed. And this is in line with all the Stackoverflow's posts and other articles I have read around.

I have looked into checkpointing a dataframe, but I have found only convoluted ways of reading it back [1] and I am not sure it would be a good solution.

Do you have any suggestion on the approach to take in such a situation?

Thanks a lot! Cheers

 

[1] https://stackoverflow.com/questions/62206092/how-to-read-a-checkpoint-dataframe-in-spark-scala