Member since
11-02-2021
8
Posts
0
Kudos Received
0
Solutions
03-14-2024
01:31 PM
Hi We couldn't get your requirements clear. Assuming you have dataframe with a list of HDFS paths with the ID assigned to them and each path containing multiple files, you would like to read each file merge the entire result of each path, and print result with source Hdfspath ID ------Try below---- from pyspark.sql import SparkSession from pyspark.sql.functions import lit def merge_files_from_hdfs(spark, input_paths): # Create an empty dataframe merged_df = spark.createDataFrame([], schema="id INT, name STRING, quantity INT, hdfs_path_id INT") # Iterate over input paths for path_id, hdfs_path in input_paths: files_df = spark.read.option("header", "true").csv(hdfs_path) files_df = files_df.withColumn("hdfs_path_id", lit(path_id)) # Union with the main dataframe merged_df = merged_df.union(files_df) return merged_df # Read input paths from a CSV file def read_input_paths(file_path): df = spark.read.option("header", "true").csv(file_path) input_paths = [(row["path_id"], row["hdfs_path"]) for row in df.collect()] return input_paths # Path to the CSV file containing input paths input_paths_file = "/path/to/input_paths.csv" spark = SparkSession.builder \ .appName("MergeFilesFromHDFS") \ .getOrCreate() input_paths = read_input_paths(input_paths_file) merged_df = merge_files_from_hdfs(spark, input_paths) merged_df.show() spark.stop()
... View more
01-17-2022
09:15 AM
Can you also tell me how to ask data from a fixed date (period), not everything that there are in the bucket, thanks)
... View more
11-18-2021
10:20 PM
Hi! I'm trying to set up a data transfer with Ni-Fi. I need to transfer data from postgresql to Clickhouse. I'm asking for help in configuring processors. There is also a problem with connecting Nifi to Clickhouse.
... View more
Labels:
- Labels:
-
Apache NiFi