Amazon Elastic MapReduce
Developer Guide (API Version 2009-03-31)
Print this pageEmail this pageGo to the ForumsView the PDFShare this page on TwitterShare this page on FacebookBookmark this page on DeliciousSubmit this page to RedditSubmit this page to DiggDid this page help you?  Yes  No   Tell us about it...

Distributed Copy Using S3DistCp

Apache DistCp is an open-source tool you can use to copy large amounts of data. DistCp uses MapReduce to copy in a distributed manner—sharing the copy, error handling, recovery, and reporting tasks across several servers. S3DistCp is an extension of DistCp that is optimized to work with Amazon Web Services, particularly Amazon Simple Storage Service (Amazon S3). Using S3DistCp, you can efficiently copy large amounts of data from Amazon S3 into HDFS where it can be processed by your Amazon Elastic MapReduce (Amazon EMR) job flow. You can also use S3DistCp to copy data between Amazon S3 buckets or from HDFS to Amazon S3. For more information about the Apache DistCp open source project, go to http://hadoop.apache.org/common/docs/current/distcp.html.

S3DistCp is stored as a JAR file on Amazon S3. It's stored at s3://region.elasticmapreduce/libs/s3distcp/1.0/s3distcp.jar, where region is one of the regions supported by Amazon EMR, such as us-west-1. For a complete list of the available regions, go to the Regions and Endpoints documentation.

If S3DistCp is unable to copy some or all of the specified files, it fails and return a non-zero error code. If this occurs, S3DistCp does not clean up any partially copied files.

S3DistCp Options

When you call S3DistCp, you can specify options that change how it copies and compresses data. These are described in the following table.

Option Description Required
--src

Location of the data to copy. This can be either an HDFS or Amazon S3 location.

Example: --src s3://myawsbucket/logs/j-3GY8JC4179IOJ/node/

Yes
--dest

Destination for the data. This can be either an HDFS or Amazon S3 location.

Example: --dest hdfs:///output

Yes
--srcPattern

A regular expression that filters the copy operation to a subset of the data at --src. If neither --srcPattern nor --groupBy is specified, all data at --src is copied to --dest.

Example: --srcPattern .*daemons.*-hadoop-.*

No
--groupBy

A regular expression that causes S3DistCp to concatenate files that match the expression. For example, you could use this option to combine all the log files written in one hour into a single file. The concatenated filename is the value matched by the regular expression for the grouping.

When --groupBy is specified, only files that match the specified pattern will be copied. You do not need to specify --groupBy and --srcPattern at the same time.

Example: --groupBy subnetid.*.([0-9]+-[0-9]+-[0-9]+-[0-9]+).*

No
--targetSize

The size, in mebibytes (MiB), of the files to create based on the --groupBy option. This value must be an integer. When --targetSize is set, S3DistCp will attempt to match this size; the actual size of the copied files may be larger or smaller than this value.

If the files concatenated by --groupBy are larger than the value of --targetSize, they will be broken up into part files, which will be named sequentially with a numeric value appended to the end. For example, a file concatenated into myfile.gz would be be broken into parts as: myfile0.gz, myfile1.gz, etc.

Example: --targetSize 2

No
--outputCodec

Specifies the compression codec to use for the copied files. This can take the values: gzip, lzo, snappy, or none. You can use this option, for example, to convert input files compressed with Gzip into output files with LZO compression, or to uncompress the files as part of the copy operation. If you do not specify a value for --outputCodec the files are copied over with no change in their compression.

Example: --outputCodec lzo

No
--deleteOnSuccess

If the copy operation is successful, this option causes S3DistCp to delete the copied files from the source location. This is useful if you are copying output files, such as log files, from one location to another as a scheduled task, and you don't want to copy the same files twice.

Example: --deleteOnSuccess

No
--disableMultipartUpload

Disables the use of multipart upload. For more information about multipart upload, see Multipart Upload.

Example: --disableMultipartUpload

No
--multipartUploadChunkSize

The size, in MiB, of the multipart upload part size. By default it will use multipart upload when writing to Amazon S3. The default chunk size is 16 MiB.

Example: --multipartUploadChunkSize 32

No

In addition to the options above, S3DistCp implements the Tool interface which means that it supports the generic options.

Adding S3DistCp as a Step in a Job Flow

You can call S3DistCp by adding it as a step in your job flow.

To add a S3DistCp step to a job flow using the CLI

  • Add a step to the job flow that calls S3DistCp, passing in the parameters that specify how S3DistCp should perform the copy operation. For more information about adding steps to a job flow, see Add Steps to a Job Flow.

    The following example copys daemon logs from Amazon S3 to hdfs:///output.

    In this CLI command:

    • --jobflow specifies the job flow to add the copy step to.

    • --jar is the location of the S3DistCp JAR file.

    • --args is a comma-separated list of the option name-value pairs to pass in to S3DistCp. For a complete list of the available options, see S3DistCp Options.

    elastic-mapreduce --jobflow jobflow-identifier --jar \
    s3://region.elasticmapreduce/libs/s3distcp/1.0/s3distcp.jar \
    --args 'S3DistCp-options'
    				
    [Note]Note

    The --args parameter is a convenient way to pass in several --arg parameters at one time. It splits the string passed in on comma (,) characters to parse them into arguments. If the value of a S3DistCp option contains a comma, you cannot use --args, and must instead individual --arg parameters. In the following example, --srcPattern is set to '.*[a-zA-Z,]+' using individual --arg parameters.

    elastic-mapreduce --jobflow jobflow-identifier --jar \
    s3://region.elasticmapreduce/libs/s3distcp/1.0/s3distcp.jar \
    --arg --src --arg s3://myawsbucket/logs/j-3GY8JC4179IOJ/node/ \
    --arg --dest --arg hdfs:///output \
    --arg --srcPattern --arg '.*[a-zA-Z,]+'
    				

    The following example illustrates how to copy log files stored in an Amazon S3 bucket into HDFS. In this example the --srcPattern option is used to limit the data copied to the daemon logs.

    elastic-mapreduce --jobflow j-3GY8JC4179IOJ --jar \
    s3://us-east-1.elasticmapreduce/libs/s3distcp/1.0/s3distcp.jar \
    --args '--src,s3://myawsbucket/logs/j-3GY8JC4179IOJ/node/,--dest,hdfs:///output,\
    --srcPattern,.*daemons.*-hadoop-.*'
    		

    The next example loads Amazon CloudFront logs into HDFS. In the process it changes the compression format from Gzip (the CloudFront default) to LZO. This is useful because data compressed using LZO can be split into multiple maps as it is decompressed, so you don't have to wait until the compression is complete as you do with Gzip. This provides better performance when analyzing the data using Amazon EMR. This example also improves performance by using the regular expression specified in the --groupBy option to combine all the logs for a given hour into a single file. Amazon EMR job flows are more efficient when processing a few, large, LZO-compressed files than when processing many, small, Gzip-compressed files.

    elastic-mapreduce --jobflow j-3GY8JC4179IOK --jar \
    s3://us-east-1.elasticmapreduce/libs/s3distcp/1.0/s3distcp.jar \
    --args '--src,s3://myawsbucket/cf,--dest,hdfs:///local,\
    --groupBy,XABCD12345678.([0-9]+-[0-9]+-[0-9]+-[0-9]+).*,--targetSize,128,\
    --outputCodec,lzo,--deleteOnSuccess'
    		

    Consider the case in which the preceding example is run over the following CloudFront log files.

    s3://myawsbucket/cf/XABCD12345678.2012-02-23-01.HLUS3JKx.gz
    s3://myawsbucket/cf/XABCD12345678.2012-02-23-01.I9CNAZrg.gz
    s3://myawsbucket/cf/XABCD12345678.2012-02-23-02.YRRwERSA.gz
    s3://myawsbucket/cf/XABCD12345678.2012-02-23-02.dshVLXFE.gz
    s3://myawsbucket/cf/XABCD12345678.2012-02-23-02.LpLfuShd.gz
    		

    S3DistCp copies, concatenates, and compresses the files into the following, where the file name is determined by the match made by the regular expression.

    hdfs:///local/2012-02-23-01.lzo
    hdfs:///local/2012-02-23-02.lzo
    		

To add a S3DistCp step to a job flow using the API

  • Send a request similar to the following example, where the arguments specified by Steps.member.1.HadoopJarStep.Args.member alternate the argument name and value, where the value is URL encoded.

    https://elasticmapreduce.amazonaws.com?
    JobFlowId=jobflow-identifier&
    Steps.member.1.Name="S3DistCp Step"&
    Steps.member.1.ActionOnFailure=CONTINUE&
    Steps.member.1.HadoopJarStep.Jar=s3://us-east-1.elasticmapreduce/libs/s3distcp/1.0/s3distcp.jar&
    Steps.member.1.HadoopJarStep.Args.member.1=--src&
    Steps.member.1.HadoopJarStep.Args.member.2=s3%3A%2F%2Fbucket%2Fcf&	
    Steps.member.1.HadoopJarStep.Args.member.3=--dest&
    Steps.member.1.HadoopJarStep.Args.member.4=hdfs%3A%2F%2F%2Flocal&
    Steps.member.1.HadoopJarStep.Args.member.5=--srcPattern&
    Steps.member.1.HadoopJarStep.Args.member.6=.%2A%5Ba-zA-Z%5D%2B&
    Steps.member.1.HadoopJarStep.Args.member.7=--groupBy&
    Steps.member.1.HadoopJarStep.Args.member.8=.%2A%5Ba-zA-Z%5D%2B&					
    Operation=AddJobFlowSteps&
    AWSAccessKeyId=access-key-identifier&
    SignatureVersion=2&
    SignatureMethod=HmacSHA256&
    Timestamp=2011-12-28T21%3A51%3A51.000Z&
    Signature=calculated-value