Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Scala: read path from dataframe column and merge result

avatar
Explorer

I have a dataframe with an id field and a path, looks like this:

Снимок экрана 2022-11-11 в 06.17.58.png

I need to read the files (csv) and match them with the id. I expect it should work out:

Снимок экрана 2022-11-11 в 06.30.18.png

 

while I can't even figure out how to read files from the path specified in the dataframe, all my knowledge leads to getting a list of paths from the dataframe and reading each one

1 REPLY 1

avatar
Cloudera Employee

Hi
We couldn't get your requirements clear. Assuming you have dataframe with a list of HDFS paths with the ID assigned to them and each path containing multiple files, you would like to read each file merge the entire result of each path, and print result with source Hdfspath ID
------Try below----

from pyspark.sql import SparkSession

from pyspark.sql.functions import lit

 

def merge_files_from_hdfs(spark, input_paths):

    # Create an empty dataframe

    merged_df = spark.createDataFrame([], schema="id INT, name STRING, quantity INT, hdfs_path_id INT")

    

    # Iterate over input paths

    for path_id, hdfs_path in input_paths:

       

        files_df = spark.read.option("header", "true").csv(hdfs_path)

 

        files_df = files_df.withColumn("hdfs_path_id", lit(path_id))

        

        # Union with the main dataframe

        merged_df = merged_df.union(files_df)

 

    return merged_df

 

# Read input paths from a CSV file

def read_input_paths(file_path):

    df = spark.read.option("header", "true").csv(file_path)

    input_paths = [(row["path_id"], row["hdfs_path"]) for row in df.collect()]

    return input_paths

 

# Path to the CSV file containing input paths

input_paths_file = "/path/to/input_paths.csv"

 

spark = SparkSession.builder \

    .appName("MergeFilesFromHDFS") \

    .getOrCreate()

 

input_paths = read_input_paths(input_paths_file)

 

merged_df = merge_files_from_hdfs(spark, input_paths)

 

merged_df.show()

spark.stop()