Member since
09-24-2022
1
Post
0
Kudos Received
0
Solutions
11-21-2023
06:30 AM
To achieve your goal of loading data from all the latest files in each folder into a single DataFrame, you can collect the file paths from each folder in a list and then load the data into the DataFrame outside the loop. Here's a modified version of your code: import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val static_path = "/user/hdfs/test/partition_date="
val hours = 3
// Creating list of each folder.
val paths = (0 until hours)
.map(h => currentTs.minusHours(h))
.map(ts => s"${static_path}${ts.toLocalDate}/hour=${ts.getHour}")
.toList
// Collect the latest file paths from each folder in a list
val latestFilePaths = paths.flatMap { eachfolder =>
val fs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
val pathstatus = fs.listStatus(new Path(eachfolder))
val currpathfiles = pathstatus.map(x => (x.getPath.toString, x.getModificationTime))
val latestFilePath = currpathfiles
.filter(_._1.endsWith(".csv"))
.sortBy(_._2)
.reverse
.headOption
.map(_._1)
latestFilePath
}
// Load data from all the latest files into a single DataFrame
val df = spark.read.format("csv").load(latestFilePaths: _*)
// Show the combined DataFrame
df.show() In this modified code: latestFilePaths is a list that collects the latest file path from each folder. Outside the loop, spark.read.format("csv").load(latestFilePaths: _*) is used to load data from all the latest files into a single DataFrame. Now, df will contain data from all the latest files in each folder, and you can perform further operations or analysis on this combined DataFrame.
... View more