- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Scala: read path from dataframe column and merge result
- Labels:
-
Apache Spark
Created 11-11-2022 01:39 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I have a dataframe with an id field and a path, looks like this:
I need to read the files (csv) and match them with the id. I expect it should work out:
while I can't even figure out how to read files from the path specified in the dataframe, all my knowledge leads to getting a list of paths from the dataframe and reading each one
Created 03-14-2024 01:31 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi
We couldn't get your requirements clear. Assuming you have dataframe with a list of HDFS paths with the ID assigned to them and each path containing multiple files, you would like to read each file merge the entire result of each path, and print result with source Hdfspath ID
------Try below----
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
def merge_files_from_hdfs(spark, input_paths):
# Create an empty dataframe
merged_df = spark.createDataFrame([], schema="id INT, name STRING, quantity INT, hdfs_path_id INT")
# Iterate over input paths
for path_id, hdfs_path in input_paths:
files_df = spark.read.option("header", "true").csv(hdfs_path)
files_df = files_df.withColumn("hdfs_path_id", lit(path_id))
# Union with the main dataframe
merged_df = merged_df.union(files_df)
return merged_df
# Read input paths from a CSV file
def read_input_paths(file_path):
df = spark.read.option("header", "true").csv(file_path)
input_paths = [(row["path_id"], row["hdfs_path"]) for row in df.collect()]
return input_paths
# Path to the CSV file containing input paths
input_paths_file = "/path/to/input_paths.csv"
spark = SparkSession.builder \
.appName("MergeFilesFromHDFS") \
.getOrCreate()
input_paths = read_input_paths(input_paths_file)
merged_df = merge_files_from_hdfs(spark, input_paths)
merged_df.show()
spark.stop()
