Support Questions

Find answers, ask questions, and share your expertise

reading data from oracle in parallel

avatar
Explorer

Hi All,

I feel this community is teaching me lots of things which In was not aware before.

i am working on a project where i need to import the data from oracle and store that data into hdfs.

I am using pyspark to load the data into DF and then storing the data into HDFS, however in  Oracle the table size is big and it has 420000000 record.

Now i want to read those tables in parallel but but there are multiple tables and i am not able to make partition on the tables.

 

is there any way to read the data in parallel when you don't know the partition column ?

 

 

 

1 ACCEPTED SOLUTION

avatar
Master Collaborator

Spark JDBC reader is capable of reading data in parallel by splitting it into several partitions.

There are four options. 

  • partitionColumn is the name of the column used for partitioning. An important condition is that the column must be numeric (integer or decimal), date or timestamp type. If the partitionColumn parameter is not specified, Spark will use a single executor and create one non-empty partition. Reading data will not be distributed or parallelized.
  • numPartitions is the maximum number of partitions that can be used for simultaneous table reading and writing.
  • The lowerBound and upperBound boundaries used to define the partition width. These boundaries determines how many rows from a given range of partition column values can be within a single partition.


For Example - 

 

df = spark.read \
	.format("jdbc") \
	.option("url", "jdbc:postgresql:postgres") \
	.option("dbtable", "db.table") \
	.option("user", "user")\
	.option("password", "pass") \
	.option("numPartitions", "10") \
	.option("lowerBound", "100") \
	.option("upperBound", "1100") \
	.load()

 



This method will use the upper and lower bounds, and number of partitions to create where clauses. For example, if lower bound is set to 100, upper bound is 1,000, and number of partitions is 10, then the number of rows read by each task, called stride in the reference documentation, will be:

(upper bound - lower bound) / number of partitions
(1100 - 100) / 10 = 100

And the series of filters applied to each task will be:

where partitionColumn < 100
where partitionColumn >= 100 and partitionColumn < 200
where partitionColumn >= 200 and partitionColumn < 300
...
where partitionColumn >= 1100

 

The lowerBound and upperBound define partitioning boundaries, but they DO NOT participate in filtering rows of the table. Therefore, Spark partitions and returns ALL the rows of the table. It is important to note that all data will be read whether partitioning is used or not.

For example suppose we have partitionColumn data range in [0, 10000] and we set numPartitions=10, lowerBound=4000 and upperBound=5000. As shown in the illustration above, the first and last partitions will contain all the data outside of the corresponding upper or lower boundary.

Another example, suppose we have partitionColumn data range in [2000, 4000] and we set numPartitions=10, lowerBound=0 and upperBound=10000. In this case, then only 2 of the 10 queries (one for each partition) will do all the work, not ideal. In this scenario, the best configuration would be numPartitions=10, lowerBound=2000, upperBound=4000


 

View solution in original post

3 REPLIES 3

avatar
Master Collaborator

Spark JDBC reader is capable of reading data in parallel by splitting it into several partitions.

There are four options. 

  • partitionColumn is the name of the column used for partitioning. An important condition is that the column must be numeric (integer or decimal), date or timestamp type. If the partitionColumn parameter is not specified, Spark will use a single executor and create one non-empty partition. Reading data will not be distributed or parallelized.
  • numPartitions is the maximum number of partitions that can be used for simultaneous table reading and writing.
  • The lowerBound and upperBound boundaries used to define the partition width. These boundaries determines how many rows from a given range of partition column values can be within a single partition.


For Example - 

 

df = spark.read \
	.format("jdbc") \
	.option("url", "jdbc:postgresql:postgres") \
	.option("dbtable", "db.table") \
	.option("user", "user")\
	.option("password", "pass") \
	.option("numPartitions", "10") \
	.option("lowerBound", "100") \
	.option("upperBound", "1100") \
	.load()

 



This method will use the upper and lower bounds, and number of partitions to create where clauses. For example, if lower bound is set to 100, upper bound is 1,000, and number of partitions is 10, then the number of rows read by each task, called stride in the reference documentation, will be:

(upper bound - lower bound) / number of partitions
(1100 - 100) / 10 = 100

And the series of filters applied to each task will be:

where partitionColumn < 100
where partitionColumn >= 100 and partitionColumn < 200
where partitionColumn >= 200 and partitionColumn < 300
...
where partitionColumn >= 1100

 

The lowerBound and upperBound define partitioning boundaries, but they DO NOT participate in filtering rows of the table. Therefore, Spark partitions and returns ALL the rows of the table. It is important to note that all data will be read whether partitioning is used or not.

For example suppose we have partitionColumn data range in [0, 10000] and we set numPartitions=10, lowerBound=4000 and upperBound=5000. As shown in the illustration above, the first and last partitions will contain all the data outside of the corresponding upper or lower boundary.

Another example, suppose we have partitionColumn data range in [2000, 4000] and we set numPartitions=10, lowerBound=0 and upperBound=10000. In this case, then only 2 of the 10 queries (one for each partition) will do all the work, not ideal. In this scenario, the best configuration would be numPartitions=10, lowerBound=2000, upperBound=4000


 

avatar
Explorer

Cool, thanks. Just what I was looking for.

avatar
New Contributor

For partitionColumn, can you use Date column and date range for lower and upper bound for spark 2.4.8 ?