Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Loading HDFS Files In a Spark Dataframe Using Loop

avatar
New Contributor

Can someone please help me modify the below code?

Based on the hours parameter, I am fetching folders from HDFS location, then from each folder, taking the latest csv file(based on getModificationTime) and loading it into the spark dataframe.

The below code is working, but there are some issues since it's running in a loop, so after every iteration the data in the dataframe is overwritten by new data, and at the end I am only getting the last file data. Is there any way to collect all the latest files, i.e. in a collection, then execute the dataframe load step, so it can load all the latest files from every folder in the dataframe?

Code:

val static_path="/user/hdfs/test/partition_date="
val hours=3

//creating list of each folder.

  val paths = (0 until hours)
.map(h => currentTs.minusHours(h))
  .map(ts => s"${static_path}${ts.toLocalDate}/hour=${ts.getHour}").toList

// iterating every folder from the list

  for (eachfolder <- paths) {
    var New_Folder_Path: String = eachfolder.toString

// fetching files and lastmodifiedtime of files

val fs =
  org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
var pathstatus = fs.listStatus(new Path(New_Folder_Path))
val currpathfiles =
  pathstatus.map(x => Row(x.getPath.toString, x.getModificationTime))

//filtering csv files

val dfFromArray_pathfiles = spark.sparkContext
  .parallelize(currpathfiles)
  .map(row => (row.getString(0), row.getLong(1)))
  .toDF("FilePath", "ModificationTime")
  .filter(
    col("FilePath")
      .like("%.csv%")
  )

// Sorting files

val dfModificationTime_pathfiles = dfFromArray_pathfiles
  .sort($"ModificationTime".desc)
  .select(col("FilePath"))

// convert into an in list.

val dfModificationTime_prevpathfiles_take =
      dfModificationTime_pathfiles.collectAsList()

// take first filepath from list

val b = dfModificationTime_prevpathfiles_take.head.mkString

import scala.collection.mutable.ListBuffer
var NewList = new ListBuffer[String]()
NewList += b
val NewList2 = NewList.toList

for (eachfolder2 <- NewList2) {
  var New_Folder_Path: String = eachfolder2.toString
  println("New New_Folder_Path List is : ")
  val df=spark.read.format("csv").load(New_Folder_Path)
  df.show()
}
}

As it's in a loop, every It's overwriting the DF and at the end I can only see the last file data(one file).

I want to see data from all the files. If I have 3 folders, then it will fetch the latest file from each folder, and in the end, the DF should show data from all the 3 files.

1 REPLY 1

avatar
Super Collaborator

To achieve your goal of loading data from all the latest files in each folder into a single DataFrame, you can collect the file paths from each folder in a list and then load the data into the DataFrame outside the loop. Here's a modified version of your code:

 

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val static_path = "/user/hdfs/test/partition_date="
val hours = 3

// Creating list of each folder.
val paths = (0 until hours)
  .map(h => currentTs.minusHours(h))
  .map(ts => s"${static_path}${ts.toLocalDate}/hour=${ts.getHour}")
  .toList

// Collect the latest file paths from each folder in a list
val latestFilePaths = paths.flatMap { eachfolder =>
  val fs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
  val pathstatus = fs.listStatus(new Path(eachfolder))
  val currpathfiles = pathstatus.map(x => (x.getPath.toString, x.getModificationTime))
  val latestFilePath = currpathfiles
    .filter(_._1.endsWith(".csv"))
    .sortBy(_._2)
    .reverse
    .headOption
    .map(_._1)
  latestFilePath
}

// Load data from all the latest files into a single DataFrame
val df = spark.read.format("csv").load(latestFilePaths: _*)

// Show the combined DataFrame
df.show()

 

In this modified code:

  1. latestFilePaths is a list that collects the latest file path from each folder.
  2. Outside the loop, spark.read.format("csv").load(latestFilePaths: _*) is used to load data from all the latest files into a single DataFrame.

Now, df will contain data from all the latest files in each folder, and you can perform further operations or analysis on this combined DataFrame.