<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Re: Loading HDFS Files In a Spark Dataframe Using Loop in Support Questions</title>
    <link>https://community.cloudera.com/t5/Support-Questions/Loading-HDFS-Files-In-a-Spark-Dataframe-Using-Loop/m-p/379311#M243829</link>
    <description>&lt;P&gt;To achieve your goal of loading data from all the latest files in each folder into a single DataFrame, you can collect the file paths from each folder in a list and then load the data into the DataFrame outside the loop. Here's a modified version of your code:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val static_path = "/user/hdfs/test/partition_date="
val hours = 3

// Creating list of each folder.
val paths = (0 until hours)
  .map(h =&amp;gt; currentTs.minusHours(h))
  .map(ts =&amp;gt; s"${static_path}${ts.toLocalDate}/hour=${ts.getHour}")
  .toList

// Collect the latest file paths from each folder in a list
val latestFilePaths = paths.flatMap { eachfolder =&amp;gt;
  val fs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
  val pathstatus = fs.listStatus(new Path(eachfolder))
  val currpathfiles = pathstatus.map(x =&amp;gt; (x.getPath.toString, x.getModificationTime))
  val latestFilePath = currpathfiles
    .filter(_._1.endsWith(".csv"))
    .sortBy(_._2)
    .reverse
    .headOption
    .map(_._1)
  latestFilePath
}

// Load data from all the latest files into a single DataFrame
val df = spark.read.format("csv").load(latestFilePaths: _*)

// Show the combined DataFrame
df.show()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;In this modified code:&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;latestFilePaths is a list that collects the latest file path from each folder.&lt;/LI&gt;&lt;LI&gt;Outside the loop, spark.read.format("csv").load(latestFilePaths: _*) is used to load data from all the latest files into a single DataFrame.&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;Now, df will contain data from all the latest files in each folder, and you can perform further operations or analysis on this combined DataFrame.&lt;/P&gt;</description>
    <pubDate>Tue, 21 Nov 2023 14:30:39 GMT</pubDate>
    <dc:creator>ggangadharan</dc:creator>
    <dc:date>2023-11-21T14:30:39Z</dc:date>
    <item>
      <title>Loading HDFS Files In a Spark Dataframe Using Loop</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Loading-HDFS-Files-In-a-Spark-Dataframe-Using-Loop/m-p/353245#M236665</link>
      <description>&lt;P&gt;Can someone please help me modify the below code?&lt;/P&gt;&lt;P&gt;Based on the hours parameter, I am fetching folders from HDFS location, then from each folder, taking the latest csv file(based on getModificationTime) and loading it into the spark dataframe.&lt;/P&gt;&lt;P&gt;The below code is working, but there are some issues since it's running in a loop, so after every iteration the data in the dataframe is overwritten by new data, and at the end I am only getting the last file data. Is there any way to collect all the latest files, i.e. in a collection, then execute the dataframe load step, so it can load all the latest files from every folder in the dataframe?&lt;/P&gt;&lt;P&gt;Code:&lt;/P&gt;&lt;PRE&gt;&lt;SPAN class="hljs-keyword"&gt;val&lt;/SPAN&gt; static_path=&lt;SPAN class="hljs-string"&gt;"/user/hdfs/test/partition_date="&lt;/SPAN&gt;
&lt;SPAN class="hljs-keyword"&gt;val&lt;/SPAN&gt; hours=&lt;SPAN class="hljs-number"&gt;3&lt;/SPAN&gt;
&lt;/PRE&gt;&lt;P&gt;//creating list of each folder.&lt;/P&gt;&lt;PRE&gt;  &lt;SPAN class="hljs-keyword"&gt;val&lt;/SPAN&gt; paths = (&lt;SPAN class="hljs-number"&gt;0&lt;/SPAN&gt; until hours)
.map(h =&amp;gt; currentTs.minusHours(h))
  .map(ts =&amp;gt; &lt;SPAN class="hljs-string"&gt;s"&lt;SPAN class="hljs-subst"&gt;${static_path}&lt;/SPAN&gt;&lt;SPAN class="hljs-subst"&gt;${ts.toLocalDate}&lt;/SPAN&gt;/hour=&lt;SPAN class="hljs-subst"&gt;${ts.getHour}&lt;/SPAN&gt;"&lt;/SPAN&gt;).toList&lt;/PRE&gt;&lt;P&gt;// iterating every folder from the list&lt;/P&gt;&lt;PRE&gt;  &lt;SPAN class="hljs-keyword"&gt;for&lt;/SPAN&gt; (eachfolder &amp;lt;- paths) {
    &lt;SPAN class="hljs-keyword"&gt;var&lt;/SPAN&gt; &lt;SPAN class="hljs-type"&gt;New_Folder_Path&lt;/SPAN&gt;: &lt;SPAN class="hljs-type"&gt;String&lt;/SPAN&gt; = eachfolder.toString&lt;/PRE&gt;&lt;P&gt;// fetching files and lastmodifiedtime of files&lt;/P&gt;&lt;PRE&gt;&lt;SPAN class="hljs-keyword"&gt;val&lt;/SPAN&gt; fs =
  org.apache.hadoop.fs.&lt;SPAN class="hljs-type"&gt;FileSystem&lt;/SPAN&gt;.get(spark.sparkContext.hadoopConfiguration)
&lt;SPAN class="hljs-keyword"&gt;var&lt;/SPAN&gt; pathstatus = fs.listStatus(&lt;SPAN class="hljs-keyword"&gt;new&lt;/SPAN&gt; &lt;SPAN class="hljs-type"&gt;Path&lt;/SPAN&gt;(&lt;SPAN class="hljs-type"&gt;New_Folder_Path&lt;/SPAN&gt;))
&lt;SPAN class="hljs-keyword"&gt;val&lt;/SPAN&gt; currpathfiles =
  pathstatus.map(x =&amp;gt; &lt;SPAN class="hljs-type"&gt;Row&lt;/SPAN&gt;(x.getPath.toString, x.getModificationTime))&lt;/PRE&gt;&lt;P&gt;//filtering csv files&lt;/P&gt;&lt;PRE&gt;&lt;SPAN class="hljs-keyword"&gt;val&lt;/SPAN&gt; dfFromArray_pathfiles = spark.sparkContext
  .parallelize(currpathfiles)
  .map(row =&amp;gt; (row.getString(&lt;SPAN class="hljs-number"&gt;0&lt;/SPAN&gt;), row.getLong(&lt;SPAN class="hljs-number"&gt;1&lt;/SPAN&gt;)))
  .toDF(&lt;SPAN class="hljs-string"&gt;"FilePath"&lt;/SPAN&gt;, &lt;SPAN class="hljs-string"&gt;"ModificationTime"&lt;/SPAN&gt;)
  .filter(
    col(&lt;SPAN class="hljs-string"&gt;"FilePath"&lt;/SPAN&gt;)
      .like(&lt;SPAN class="hljs-string"&gt;"%.csv%"&lt;/SPAN&gt;)
  )&lt;/PRE&gt;&lt;P&gt;// Sorting files&lt;/P&gt;&lt;PRE&gt;&lt;SPAN class="hljs-keyword"&gt;val&lt;/SPAN&gt; dfModificationTime_pathfiles = dfFromArray_pathfiles
  .sort($&lt;SPAN class="hljs-string"&gt;"ModificationTime"&lt;/SPAN&gt;.desc)
  .select(col(&lt;SPAN class="hljs-string"&gt;"FilePath"&lt;/SPAN&gt;))&lt;/PRE&gt;&lt;P&gt;// convert into an in list.&lt;/P&gt;&lt;PRE&gt;&lt;SPAN class="hljs-keyword"&gt;val&lt;/SPAN&gt; dfModificationTime_prevpathfiles_take =
      dfModificationTime_pathfiles.collectAsList()&lt;/PRE&gt;&lt;P&gt;// take first filepath from list&lt;/P&gt;&lt;PRE&gt;&lt;SPAN class="hljs-keyword"&gt;val&lt;/SPAN&gt; b = dfModificationTime_prevpathfiles_take.head.mkString

&lt;SPAN class="hljs-keyword"&gt;import&lt;/SPAN&gt; scala.collection.mutable.&lt;SPAN class="hljs-type"&gt;ListBuffer&lt;/SPAN&gt;
&lt;SPAN class="hljs-keyword"&gt;var&lt;/SPAN&gt; &lt;SPAN class="hljs-type"&gt;NewList&lt;/SPAN&gt; = &lt;SPAN class="hljs-keyword"&gt;new&lt;/SPAN&gt; &lt;SPAN class="hljs-type"&gt;ListBuffer&lt;/SPAN&gt;[&lt;SPAN class="hljs-type"&gt;String&lt;/SPAN&gt;]()
&lt;SPAN class="hljs-type"&gt;NewList&lt;/SPAN&gt; += b
&lt;SPAN class="hljs-keyword"&gt;val&lt;/SPAN&gt; &lt;SPAN class="hljs-type"&gt;NewList2&lt;/SPAN&gt; = &lt;SPAN class="hljs-type"&gt;NewList&lt;/SPAN&gt;.toList

&lt;SPAN class="hljs-keyword"&gt;for&lt;/SPAN&gt; (eachfolder2 &amp;lt;- &lt;SPAN class="hljs-type"&gt;NewList2&lt;/SPAN&gt;) {
  &lt;SPAN class="hljs-keyword"&gt;var&lt;/SPAN&gt; &lt;SPAN class="hljs-type"&gt;New_Folder_Path&lt;/SPAN&gt;: &lt;SPAN class="hljs-type"&gt;String&lt;/SPAN&gt; = eachfolder2.toString
  println(&lt;SPAN class="hljs-string"&gt;"New New_Folder_Path List is : "&lt;/SPAN&gt;)
  &lt;SPAN class="hljs-keyword"&gt;val&lt;/SPAN&gt; df=spark.read.format(&lt;SPAN class="hljs-string"&gt;"csv"&lt;/SPAN&gt;).load(&lt;SPAN class="hljs-type"&gt;New_Folder_Path&lt;/SPAN&gt;)
  df.show()
}
}&lt;/PRE&gt;&lt;P&gt;As it's in a loop, every It's overwriting the DF and at the end I can only see the last file data(one file).&lt;/P&gt;&lt;P&gt;I want to see data from all the files. If I have 3 folders, then it will fetch the latest file from each folder, and in the end, the DF should show data from all the 3 files.&lt;/P&gt;</description>
      <pubDate>Sat, 24 Sep 2022 15:46:54 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Loading-HDFS-Files-In-a-Spark-Dataframe-Using-Loop/m-p/353245#M236665</guid>
      <dc:creator>HappyCoding</dc:creator>
      <dc:date>2022-09-24T15:46:54Z</dc:date>
    </item>
    <item>
      <title>Re: Loading HDFS Files In a Spark Dataframe Using Loop</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Loading-HDFS-Files-In-a-Spark-Dataframe-Using-Loop/m-p/379311#M243829</link>
      <description>&lt;P&gt;To achieve your goal of loading data from all the latest files in each folder into a single DataFrame, you can collect the file paths from each folder in a list and then load the data into the DataFrame outside the loop. Here's a modified version of your code:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val static_path = "/user/hdfs/test/partition_date="
val hours = 3

// Creating list of each folder.
val paths = (0 until hours)
  .map(h =&amp;gt; currentTs.minusHours(h))
  .map(ts =&amp;gt; s"${static_path}${ts.toLocalDate}/hour=${ts.getHour}")
  .toList

// Collect the latest file paths from each folder in a list
val latestFilePaths = paths.flatMap { eachfolder =&amp;gt;
  val fs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
  val pathstatus = fs.listStatus(new Path(eachfolder))
  val currpathfiles = pathstatus.map(x =&amp;gt; (x.getPath.toString, x.getModificationTime))
  val latestFilePath = currpathfiles
    .filter(_._1.endsWith(".csv"))
    .sortBy(_._2)
    .reverse
    .headOption
    .map(_._1)
  latestFilePath
}

// Load data from all the latest files into a single DataFrame
val df = spark.read.format("csv").load(latestFilePaths: _*)

// Show the combined DataFrame
df.show()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;In this modified code:&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;latestFilePaths is a list that collects the latest file path from each folder.&lt;/LI&gt;&lt;LI&gt;Outside the loop, spark.read.format("csv").load(latestFilePaths: _*) is used to load data from all the latest files into a single DataFrame.&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;Now, df will contain data from all the latest files in each folder, and you can perform further operations or analysis on this combined DataFrame.&lt;/P&gt;</description>
      <pubDate>Tue, 21 Nov 2023 14:30:39 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Loading-HDFS-Files-In-a-Spark-Dataframe-Using-Loop/m-p/379311#M243829</guid>
      <dc:creator>ggangadharan</dc:creator>
      <dc:date>2023-11-21T14:30:39Z</dc:date>
    </item>
  </channel>
</rss>

