Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Loading HDFS Files In a Spark Dataframe Using Loop

New Contributor

Can someone please help me modify the below code?

Based on the hours parameter, I am fetching folders from HDFS location, then from each folder, taking the latest csv file(based on getModificationTime) and loading it into the spark dataframe.

The below code is working, but there are some issues since it's running in a loop, so after every iteration the data in the dataframe is overwritten by new data, and at the end I am only getting the last file data. Is there any way to collect all the latest files, i.e. in a collection, then execute the dataframe load step, so it can load all the latest files from every folder in the dataframe?

Code:

val static_path="/user/hdfs/test/partition_date="
val hours=3

//creating list of each folder.

  val paths = (0 until hours)
.map(h => currentTs.minusHours(h))
  .map(ts => s"${static_path}${ts.toLocalDate}/hour=${ts.getHour}").toList

// iterating every folder from the list

  for (eachfolder <- paths) {
    var New_Folder_Path: String = eachfolder.toString

// fetching files and lastmodifiedtime of files

val fs =
  org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
var pathstatus = fs.listStatus(new Path(New_Folder_Path))
val currpathfiles =
  pathstatus.map(x => Row(x.getPath.toString, x.getModificationTime))

//filtering csv files

val dfFromArray_pathfiles = spark.sparkContext
  .parallelize(currpathfiles)
  .map(row => (row.getString(0), row.getLong(1)))
  .toDF("FilePath", "ModificationTime")
  .filter(
    col("FilePath")
      .like("%.csv%")
  )

// Sorting files

val dfModificationTime_pathfiles = dfFromArray_pathfiles
  .sort($"ModificationTime".desc)
  .select(col("FilePath"))

// convert into an in list.

val dfModificationTime_prevpathfiles_take =
      dfModificationTime_pathfiles.collectAsList()

// take first filepath from list

val b = dfModificationTime_prevpathfiles_take.head.mkString

import scala.collection.mutable.ListBuffer
var NewList = new ListBuffer[String]()
NewList += b
val NewList2 = NewList.toList

for (eachfolder2 <- NewList2) {
  var New_Folder_Path: String = eachfolder2.toString
  println("New New_Folder_Path List is : ")
  val df=spark.read.format("csv").load(New_Folder_Path)
  df.show()
}
}

As it's in a loop, every It's overwriting the DF and at the end I can only see the last file data(one file).

I want to see data from all the files. If I have 3 folders, then it will fetch the latest file from each folder, and in the end, the DF should show data from all the 3 files.

0 REPLIES 0
Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.