Support Questions

Find answers, ask questions, and share your expertise

Tips for optimizing export to S3(n) ?

avatar
Super Collaborator

I've been experimenting with the options for copying data from our (bare metal) cluster to S3.

I found that something like this works:

hive> create table aws.my_table
> (
> `column1` string,
> `column2` string,
  ....
> `columnX` string)
> row format delimited fields terminated by ','
> lines terminated by '\n'
> stored as textfile
> location 's3n://my_bucket/my_folder_path/';
hive> insert into table aws.my_table select * from source_db.source_table;

But only if the source data set is pretty small.

For a larger data set (10's of GB), it fails with errors like

	Diagnostic Messages for this Task:
Error: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row
        at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:172)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
        at 
	 ... 8 more 
	Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.IllegalArgumentException: n must be positive
        at 
	...

I understand that pushing gigabytes (and eventually terabytes) of data to a remote server is going to be somewhat painful.

So, I'm wondering what kind of customizations are available.

Is there a way to specify compression or upload throttling, etc?

Can anyone give me instruction on getting around the errors?

1 ACCEPTED SOLUTION

avatar

@Zack Riesland, have you considered trying DistCp to copy the raw files from a source hdfs: URI to a destination s3n: or s3a: URI? It's possible this would be able to move the data more quickly than the Hive insert into/select from. If it's still important to have Hive metadata referencing the table at the s3n: or s3a: location, then you could handle that by creating an external table after completion of the DistCp.

View solution in original post

11 REPLIES 11

avatar
Super Guru

Can you please run insert command in debug mode and share the output?.

hive --hiveconf hive.root.logger=DEBUG,console

avatar
Super Collaborator

Thanks @Jitendra Yadav

The output is too large to paste it all here.

I'm trying out the "upload file" feature you guys have here.

hive-to-s3-output.txt

(Note: I replaced the names of our servers and s3 bucket, but it should still be pretty clear. The folder I tried to use is /HDFS_ToS3_Testing/Hive2/

avatar

@Zack Riesland, have you considered trying DistCp to copy the raw files from a source hdfs: URI to a destination s3n: or s3a: URI? It's possible this would be able to move the data more quickly than the Hive insert into/select from. If it's still important to have Hive metadata referencing the table at the s3n: or s3a: location, then you could handle that by creating an external table after completion of the DistCp.

avatar
Super Collaborator

Thanks @Chris Nauroth. I'll play with DistCp.

One clarification (I'm brand new when it comes to S3, so this might be dumb):

Suppose I have Hive table X that is stored as compressed ORC files.

To use DistCp, I suppose I would point at the raw data: /apps/hive/warehouse/db_name.db/table_name

But this will copy the compressed, ORC formatted data, correct?

Suppose someone wanted to use that data as a Hive table in EMR.

Could they also use DistCp and pull it to a cluster, and then create a table over top, specifying the same metadata and then just use the data?

Is there a straightforward way to copy, say, CSV data from table X to S3?

avatar

@Zack Riesland, your understanding of DistCp is correct. It performs a raw byte-by-byte copy from the source to the destination. If that data is compressed ORC at the source, then that's what it will be at the destination too.

According to AWS blog posts, Elastic MapReduce does support use of ORC. This is not a scenario I have tested myself though. I'd recommend a quick prototyping end-to-end test to make sure it meets your requirements: DistCp a small ORC data set to S3, and then see if you can query it successfully from EMR.

avatar
Super Collaborator

Thanks @Chris Nauroth

Does DistCp support any kind of configuration, for example - to limit the amount of bandwidth used?

avatar

@Zack Riesland, yes, there is a -bandwidth option. For full documentation of the available command line options, refer to the Apache documentation on DistCp.

avatar
Super Collaborator

Thanks @Chris Nauroth

After some experimentation, DistCp seems interesting.

But I'm noticing a huge failure rate on my mappers.

Everything eventually succeeds, but usually only after several failed attempts - even for relatively small batches of data.

The error stack is below.

"No space available in any of the local directories." This is confusing because the edge node (where I'm running the distcp command) and all the data nodes have plenty of disk space. I'm guessing that it's perhaps a permissions-related issue trying to access some temporary storage?

Any ideas?

Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: No space available in any of the local directories.
	at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:366)
	at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
	at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
	at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsOutputStream.newBackupFile(NativeS3FileSystem.java:263)
	at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsOutputStream.<init>(NativeS3FileSystem.java:245)
	at org.apache.hadoop.fs.s3native.NativeS3FileSystem.create(NativeS3FileSystem.java:412)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:986)
	at org.apache.hadoop.tools.mapred.RetriableFileCopyCommand.copyToFile(RetriableFileCopyCommand.java:174)
	at org.apache.hadoop.tools.mapred.RetriableFileCopyCommand.doCopy(RetriableFileCopyCommand.java:123)
	at org.apache.hadoop.tools.mapred.RetriableFileCopyCommand.doExecute(RetriableFileCopyCommand.java:99)
	at org.apache.hadoop.tools.util.RetriableCommand.execute(RetriableCommand.java:87)
	... 11 more
2016-06-16 14:48:29,841 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.io.IOException: File copy failed: hdfs://surus/apps/hive/warehouse/fma2_v12_featuredata.db/eea_iperl/000000_0 --> s3n://sensus-device-analytics/HDFS_To_S3_Testing/distcp1/eea_iperl/000000_0
	at org.apache.hadoop.tools.mapred.CopyMapper.copyFileWithRetry(CopyMapper.java:285)
	at org.apache.hadoop.tools.mapred.CopyMapper.map(CopyMapper.java:253)
	at org.apache.hadoop.tools.mapred.CopyMapper.map(CopyMapper.java:50)
	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:415)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1709)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
Caused by: java.io.IOException: Couldn't run retriable-command: Copying hdfs://surus/apps/hive/warehouse/fma2_v12_featuredata.db/eea_iperl/000000_0 to s3n://sensus-device-analytics/HDFS_To_S3_Testing/distcp1/eea_iperl/000000_0
	at org.apache.hadoop.tools.util.RetriableCommand.execute(RetriableCommand.java:101)
	at org.apache.hadoop.tools.mapred.CopyMapper.copyFileWithRetry(CopyMapper.java:281)
	... 10 more

avatar
@Zack Riesland, the S3N file system will buffer to a local disk area first before flushing data to the S3 bucket. I suspect that depending on the amount of concurrent copy activity happening on the node (number of DistCp mapper tasks actively copying to S3N concurrently), you might hit the limit of available disk space for that buffering.

The directory used by S3N for this buffering is configurable via property fs.s3.buffer.dir in core-site.xml. See below for the full specification of that property and its default value. I recommend reviewing this in your cluster to make sure that it's configured to point to a large enough volume to support the workload. You can specify a comma-separated list of multiple paths too if you want to use multiple disks.

<property>
  <name>fs.s3.buffer.dir</name>
  <value>${hadoop.tmp.dir}/s3</value>
  <description>Determines where on the local filesystem the s3:/s3n: filesystem
  should store files before sending them to S3
  (or after retrieving them from S3).
  </description>
</property>