Hi
We couldn't get your requirements clear. Assuming you have dataframe with a list of HDFS paths with the ID assigned to them and each path containing multiple files, you would like to read each file merge the entire result of each path, and print result with source Hdfspath ID
------Try below----
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
def merge_files_from_hdfs(spark, input_paths):
# Create an empty dataframe
merged_df = spark.createDataFrame([], schema="id INT, name STRING, quantity INT, hdfs_path_id INT")
# Iterate over input paths
for path_id, hdfs_path in input_paths:
files_df = spark.read.option("header", "true").csv(hdfs_path)
files_df = files_df.withColumn("hdfs_path_id", lit(path_id))
# Union with the main dataframe
merged_df = merged_df.union(files_df)
return merged_df
# Read input paths from a CSV file
def read_input_paths(file_path):
df = spark.read.option("header", "true").csv(file_path)
input_paths = [(row["path_id"], row["hdfs_path"]) for row in df.collect()]
return input_paths
# Path to the CSV file containing input paths
input_paths_file = "/path/to/input_paths.csv"
spark = SparkSession.builder \
.appName("MergeFilesFromHDFS") \
.getOrCreate()
input_paths = read_input_paths(input_paths_file)
merged_df = merge_files_from_hdfs(spark, input_paths)
merged_df.show()
spark.stop()