Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

write is slow in hdfs using pyspark

avatar
Explorer

Hi All,

I am trying to f=import the data from oracle database and writing the data to hdfs using pyspark.

Oracle has 480 tables i am creating a loop over list of tables but while writing the data into hdfs spark taking too much time.

when i check in logs only 1 executor is running while i was passing --num-executor 4.

here is my code  

# oracle-example.py
from pyspark.sql import SparkSession
from pyspark.sql import HiveContext

appName = "PySpark Example - Oracle Example"
master = "yarn"


spark = SparkSession.builder.master(master).appName(appName).enableHiveSupport().getOrCreate()
spark.sparkContext.getConf().getAll()
#to get the list of tables present in schema
sql = "SELECT table_name FROM all_tables WHERE owner = '**'"
user = "**"
password = "**"
jdbc_url = "jdbc:oracle:thin:@****/**"
# Change this to your Oracle's details accordingly
server = "**"
port = **
service_name = '**'
jdbcDriver = "oracle.jdbc.OracleDriver"

# Create a data frame by reading data from Oracle via JDBC to get the list of tables prersent in schema
tablelist = spark.read.format("jdbc") \
.option("url", jdbc_url) \
.option("query", sql) \
.option("user", user) \
.option("password", password) \
.option("driver", jdbcDriver) \
.load().select("table_name")


connection_details = { "user": "**", "password": "**", "driver": "oracle.jdbc.OracleDriver", }
tablelist = [row.table_name for row in tablelist.collect()]
for i in range(len(tablelist)):
df = spark.read.jdbc(url=jdbc_url, table='sgms.'+tablelist[i], properties=connection_details)
df.write.save('hdfs:/rajsampark/sgms/'+tablelist[i], format='csv', mode='overwrite')
print("Write sucessfully for table "+tablelist[i])

 

 

And I am submitting the code using spark- submit

 

please help

1 ACCEPTED SOLUTION

avatar
Expert Contributor

Hi @rdhau 

Can you share spark-submit command used to trigger this application?

Also as a quick test can you make this property as true and share its outcome?

spark.dynamicAllocation.enabled

 

View solution in original post

4 REPLIES 4

avatar
New Contributor

Increase the number of partitions: By default, the number of partitions is set to the number of cores available in your cluster. If your data is small, you can try to increase the number of partitions to improve the performance. You can use the repartition method to increase the number of partitions. For example, you can try something like this: 
df = spark.read.jdbc(url=jdbc_url, table='sgms.'+tablelist[i], properties=connection_details).repartition(4)

 

This will create 4 partitions for the data and distribute it across the cluster.

 

Increase the executor memory: By default, each executor is allocated 1GB of memory. If your data is large, you can try to increase the memory allocation to improve the performance. You can use the --executor-memory flag to set the memory allocation. For example, you can try something like this:

spark-submit --executor-memory 4g oracle-example.com

This will allocate 4GB of memory to each executor.

 

Use foreachPartition instead of write: The write method writes data sequentially, which can be slow for large datasets. You can try using the foreachPartition method to write data in parallel. For example, you can try something like this:

df.foreachPartition(lambda x: write_to_hdfs(x)) 

Here, write_to_hdfs is a function that writes the data to HDFS.

 

Increase the number of executors: By default, only one executor is allocated for each task. You can try to increase the number of executors to improve the performance. You can use the --num-executors flag to set the number of executors. For example, you can try something like this: 

spark-submit --num-executors 4 oracle-example.com

This will allocate 4 executors for each task.

 

avatar
Explorer

Hi, i have applied the repartition  but still only 1 executor is running at a time.

could you please help me with this and also while writing can you share the syntax where i can give the path to save the data in hdfs .

avatar
Expert Contributor

Hi @rdhau 

Can you share spark-submit command used to trigger this application?

Also as a quick test can you make this property as true and share its outcome?

spark.dynamicAllocation.enabled

 

avatar
Explorer

thanks i have disabled the dynamic allocation and it was working now.