Support Questions

Find answers, ask questions, and share your expertise

PySpark Queries

avatar

In a pyspark code which reads a hdfs path in it's respective format(text, orc, parquet) and writes it in parquet format in ozone path.

Data is huge.

1) How to do resource calculations for the pyspark job

Like no. of cores, no. of executors, memory allocation

 

2) Is there a way we can dynamically read the data from hdfs by adjusting according to it's file type.

 

3) What should be the optimal solution and approach for the data movement and what input mappings the approach should use.

3 REPLIES 3

avatar
Contributor

Hello @Jack_sparrow

Glad to see you again in the forums. 

1. The resource allocation is something kind of complicated to tell, because it depends in a lot of factors. 
Take in mind how big is your data, how much memory you have on the cluster, do not forget the overhead and other things. 
There is very useful information here: https://docs.cloudera.com/cdp-private-cloud-base/7.3.1/tuning-spark/topics/spark-admin-tuning-resour... 
Under that parent section there is more tuning suggestions on each topic. 

2. From the second option I understand that you want to read the data separately using each type. 
That should be possible with something like this: 

if input_path.endswith(".parquet"):
df = spark.read.parquet(input_path)
elif input_path.endswith(".orc"):
df = spark.read.orc(input_path)
elif input_path.endswith(".txt") or input_path.endswith(".csv"):
df = spark.read.text(input_path) # o .csv con opciones
else:
raise Exception("Unsupported file format")

Then, you can handle each data in a separate way. 

3. The data movement should avoid going to the driver, to avoid issues and extra work, so collect() or .toPandas() are not the best options. 
If you want to move data without transformations, distcp should be a good option. 
To write you can use this: df.write.mode("overwrite").parquet("ofs://ozone/path/out")
And other suggestions can be tuning the partitions with "spark.sql.files.maxPartitionBytes" and change the compression to snappy using "spark.sql.parquet.compression.codec".

 


Regards,
Andrés Fallas
--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs-up button.

avatar

Can we parallelise it?

avatar
Contributor

Hello @Jack_sparrow

Spark should automatically do it, you can control that with these settings: 

  • Input splits are controlled by
    spark.sql.files.maxPartitionBytes (default 128MB). If smaller, more splits or parallel tasks will be executed.
    spark.sql.files.openCostInBytes (default 4MB) influences how Spark coalesces small files.
  • Shuffle parallelism
    spark.sql.shuffle.partitions (default 200). Configiure around  2–3 times per total executor cores.

Also, make sure df.write.parquet() doesn’t set everything into few files only. For that, you can use .repartition(n) to increase the parallelism before writing.


Regards,
Andrés Fallas
--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs-up button.