Member since
02-05-2021
41
Posts
0
Kudos Received
0
Solutions
03-14-2024
02:35 PM
@Vivekaushik Vcores can be controlled for each job via spark parameters --executor-cores & --driver-cores by hardcoding in your custom code Ex-1 (or) as shown in the below example (or) in Spark service safety valves of spark-defaults.conf Ex-1: spark.conf.set("spark.executor.cores","4") or .config("spark.executor.cores","4") Ex-2 : spark-submit --executor-cores 4 --driver-cores 2 --num-executors 5 --queue xyz The above spark job creates Yarn app_id and occupies a total of 5*4 + 2 = 22 cores Note: 1. Parameters detailed in code will be taken 1st preference and overwrites that are passed in spark-submit & default taken from Spark service safety valves 2. Parameters passed via spark-submit taken as 2nd preference and overwrites spark safety valves 3. If not defined anywhere then Spark safety valves will be the default ones 4. Check whether the dynamic allocation is enabled for this job that uses all vcores in a queue than memory and blocks resources. These parameters depend on "--executor-cores" and dynamically allocate executors as required that use Vcores as passed and claim a total of 22 vcores on max usage "--conf spark.dynamicAllocation.enabled=true" "--conf spark.dynamicAllocation.maxExecutors=5" Hope this clarifies. your query. If you found this response assisted with your query, please take a moment to log in and click on KUDOS & ”Accept as Solution" below this post.
... View more
03-14-2024
02:07 PM
Hi @Hae Please share the CDP (Public/Private) / CML versions to understand this better. Ideally, your data connections will be chosen automatically if you have any data setup. If not try the option to create a new connection, add your datalake, and warehouse connections. Sync with workspace. Refer to https://blog.cloudera.com/one-line-away-from-your-data/ to understand the setup.
... View more
03-14-2024
01:31 PM
Hi We couldn't get your requirements clear. Assuming you have dataframe with a list of HDFS paths with the ID assigned to them and each path containing multiple files, you would like to read each file merge the entire result of each path, and print result with source Hdfspath ID ------Try below---- from pyspark.sql import SparkSession from pyspark.sql.functions import lit def merge_files_from_hdfs(spark, input_paths): # Create an empty dataframe merged_df = spark.createDataFrame([], schema="id INT, name STRING, quantity INT, hdfs_path_id INT") # Iterate over input paths for path_id, hdfs_path in input_paths: files_df = spark.read.option("header", "true").csv(hdfs_path) files_df = files_df.withColumn("hdfs_path_id", lit(path_id)) # Union with the main dataframe merged_df = merged_df.union(files_df) return merged_df # Read input paths from a CSV file def read_input_paths(file_path): df = spark.read.option("header", "true").csv(file_path) input_paths = [(row["path_id"], row["hdfs_path"]) for row in df.collect()] return input_paths # Path to the CSV file containing input paths input_paths_file = "/path/to/input_paths.csv" spark = SparkSession.builder \ .appName("MergeFilesFromHDFS") \ .getOrCreate() input_paths = read_input_paths(input_paths_file) merged_df = merge_files_from_hdfs(spark, input_paths) merged_df.show() spark.stop()
... View more