Member since
09-24-2022
1
Post
0
Kudos Received
0
Solutions
09-24-2022
08:46 AM
Can someone please help me modify the below code? Based on the hours parameter, I am fetching folders from HDFS location, then from each folder, taking the latest csv file(based on getModificationTime) and loading it into the spark dataframe. The below code is working, but there are some issues since it's running in a loop, so after every iteration the data in the dataframe is overwritten by new data, and at the end I am only getting the last file data. Is there any way to collect all the latest files, i.e. in a collection, then execute the dataframe load step, so it can load all the latest files from every folder in the dataframe? Code: val static_path="/user/hdfs/test/partition_date="
val hours=3
//creating list of each folder. val paths = (0 until hours)
.map(h => currentTs.minusHours(h))
.map(ts => s"${static_path}${ts.toLocalDate}/hour=${ts.getHour}").toList // iterating every folder from the list for (eachfolder <- paths) {
var New_Folder_Path: String = eachfolder.toString // fetching files and lastmodifiedtime of files val fs =
org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
var pathstatus = fs.listStatus(new Path(New_Folder_Path))
val currpathfiles =
pathstatus.map(x => Row(x.getPath.toString, x.getModificationTime)) //filtering csv files val dfFromArray_pathfiles = spark.sparkContext
.parallelize(currpathfiles)
.map(row => (row.getString(0), row.getLong(1)))
.toDF("FilePath", "ModificationTime")
.filter(
col("FilePath")
.like("%.csv%")
) // Sorting files val dfModificationTime_pathfiles = dfFromArray_pathfiles
.sort($"ModificationTime".desc)
.select(col("FilePath")) // convert into an in list. val dfModificationTime_prevpathfiles_take =
dfModificationTime_pathfiles.collectAsList() // take first filepath from list val b = dfModificationTime_prevpathfiles_take.head.mkString
import scala.collection.mutable.ListBuffer
var NewList = new ListBuffer[String]()
NewList += b
val NewList2 = NewList.toList
for (eachfolder2 <- NewList2) {
var New_Folder_Path: String = eachfolder2.toString
println("New New_Folder_Path List is : ")
val df=spark.read.format("csv").load(New_Folder_Path)
df.show()
}
} As it's in a loop, every It's overwriting the DF and at the end I can only see the last file data(one file). I want to see data from all the files. If I have 3 folders, then it will fetch the latest file from each folder, and in the end, the DF should show data from all the 3 files.
... View more
Labels:
- Labels:
-
Apache Spark