Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Why can't I partition a 1 gigabyte dataset into 300 partitions?

avatar

Hey guys, I have already asked this on multiple forums but never got a reply, so I thought that I might get one here.

 

I have an about 1 gig dataset, and it's got a "cityid" column of which there are 324 unique values, so after partitioning I should get 324 folders in hdfs. But whenever I partition, it fails, you can look at the exception messages here https://community.hortonworks.com/questions/238893/notenoughreplicasexception-when-writing-into-a-pa...

 

It's definitely an HDFS issue, because everything worked out on MapR. What could possible be the problem? 

Btw, I tried this on a fresh install of hortonworks and cloudera and with default settings, so nothing was compromised.

 

If you need any more details please ask.

 

Could this be a setup issue or something? Like maybe I need to increase memory somewhere in the HDFS or something?

1 ACCEPTED SOLUTION

avatar
Mentor
If you are dealing with unordered partitioning from a data source, you can
end up creating a lot of files in parallel as the partitioning is attempted.

In HDFS, when a file (or more specifically, its block) is open, the
DataNode performs a logical reservation of its target block size. So if
your configured block size is 128 MiB, then every concurrently open block
will deduct that value (logically) from the available remaining space the
DataNode publishes to the NameNode.

This reservation is done to help manage space and guarantees of a full
block write to a client, so that a client that's begun writing its file
never runs into an out of space exception mid-way.

Note: When the file is closed, only the actual length is persisted, and the
reservation calculation is adjusted to reflect the reality of used and
available space. However, while the file block remains open, its always
considered to be holding a full block size.

The NameNode further will only select a DataNode for a write if it can
guarantee full target block size. It will ignore any DataNodes it deems
(based on its reported values and metrics) unfit for the requested write's
parameters. Your error shows that the NameNode has stopped considering your
only live DataNode when trying to allocate a new block request.

As an example, 70 GiB of available space will prove insufficient if there
will be more than 560 concurrent, open files (70 GiB divided into 128 MiB
block sizes). So the DataNode will 'appear full' at the point of ~560 open
files, and will no longer serve as a valid target for further file requests.

It appears per your description of the insert that this is likely, as each
of the 300 chunks of the dataset may still carry varied IDs, resulting in a
lot of open files requested per parallel task, for insert into several
different partitions.

You could 'hack' your way around this by reducing the request block size
within the query (set dfs.blocksize to 8 MiB for ex.), influencing the
reservation calculation. However, this may not be a good idea for larger
datasets as you scale, since it will drive up the file:block count and
increase memory costs for the NameNode.

A better way to approach this would be to perform a pre-partitioned insert
(sort first by partition and then insert in a partitioned manner). Hive for
example provides this as an option: hive.optimize.sort.dynamic.partition
[1], and if you use plain Spark or MapReduce then their default strategy of
partitioning does exactly this.

[1] -
https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hi...

View solution in original post

2 REPLIES 2

avatar
Mentor
If you are dealing with unordered partitioning from a data source, you can
end up creating a lot of files in parallel as the partitioning is attempted.

In HDFS, when a file (or more specifically, its block) is open, the
DataNode performs a logical reservation of its target block size. So if
your configured block size is 128 MiB, then every concurrently open block
will deduct that value (logically) from the available remaining space the
DataNode publishes to the NameNode.

This reservation is done to help manage space and guarantees of a full
block write to a client, so that a client that's begun writing its file
never runs into an out of space exception mid-way.

Note: When the file is closed, only the actual length is persisted, and the
reservation calculation is adjusted to reflect the reality of used and
available space. However, while the file block remains open, its always
considered to be holding a full block size.

The NameNode further will only select a DataNode for a write if it can
guarantee full target block size. It will ignore any DataNodes it deems
(based on its reported values and metrics) unfit for the requested write's
parameters. Your error shows that the NameNode has stopped considering your
only live DataNode when trying to allocate a new block request.

As an example, 70 GiB of available space will prove insufficient if there
will be more than 560 concurrent, open files (70 GiB divided into 128 MiB
block sizes). So the DataNode will 'appear full' at the point of ~560 open
files, and will no longer serve as a valid target for further file requests.

It appears per your description of the insert that this is likely, as each
of the 300 chunks of the dataset may still carry varied IDs, resulting in a
lot of open files requested per parallel task, for insert into several
different partitions.

You could 'hack' your way around this by reducing the request block size
within the query (set dfs.blocksize to 8 MiB for ex.), influencing the
reservation calculation. However, this may not be a good idea for larger
datasets as you scale, since it will drive up the file:block count and
increase memory costs for the NameNode.

A better way to approach this would be to perform a pre-partitioned insert
(sort first by partition and then insert in a partitioned manner). Hive for
example provides this as an option: hive.optimize.sort.dynamic.partition
[1], and if you use plain Spark or MapReduce then their default strategy of
partitioning does exactly this.

[1] -
https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hi...

avatar

@Harsh J you are a genius! Thanks a lot!