Amazon Elastic MapReduce
Developer Guide (API Version 2009-11-30)
Print this pageEmail this pageGo to the ForumsView the PDFShare this page on TwitterShare this page on FacebookBookmark this page on DeliciousSubmit this page to RedditSubmit this page to DiggDid this page help you?  Yes  No   Tell us about it...

Distributed Copy Using S3DistCp

Apache DistCp is an open-source tool you can use to copy large amounts of data. DistCp uses MapReduce to copy in a distributed manner—sharing the copy, error handling, recovery, and reporting tasks across several servers. For more information about the Apache DistCp open source project, go to http://hadoop.apache.org/common/docs/current/distcp.html.

S3DistCp is an extension of DistCp that is optimized to work with Amazon Web Services (AWS), particularly Amazon Simple Storage Service (Amazon S3). You use S3DistCp by adding it as a step in a job flow. Using S3DistCp, you can efficiently copy large amounts of data from Amazon S3 into HDFS where it can be processed by subsequent steps in your Amazon Elastic MapReduce (Amazon EMR) job flow. You can also use S3DistCp to copy data between Amazon S3 buckets or from HDFS to Amazon S3.

S3DistCp is stored as a JAR file on Amazon S3. It's stored at s3://region.elasticmapreduce/libs/s3distcp/1.0.1/s3distcp.jar, where region is one of the regions supported by Amazon EMR, such as us-west-1. For a complete list of the available regions, go to the Regions and Endpoints documentation.

If S3DistCp is unable to copy some or all of the specified files, the job flow step fails and returns a non-zero error code. If this occurs, S3DistCp does not clean up partially copied files.

S3DistCp Options

When you call S3DistCp, you can specify options that change how it copies and compresses data. These are described in the following table. The options are added to the step using either the --arg or --args syntax, examples of which are shown following the table.

Option Description Required
--src

Location of the data to copy. This can be either an HDFS or Amazon S3 location.

Example: --src,s3://myawsbucket/logs/j-3GY8JC4179IOJ/node

Yes
--dest

Destination for the data. This can be either an HDFS or Amazon S3 location.

Example: --dest,hdfs:///output

Yes
--srcPattern

A regular expression that filters the copy operation to a subset of the data at --src. If neither --srcPattern nor --groupBy is specified, all data at --src is copied to --dest.

If the regular expression argument contains special characters, such as an asterisk (*), either the regular expression or the entire --args string must be enclosed in single quotes (').

Example: --srcPattern,.*daemons.*-hadoop-.*

No
--groupBy

A regular expression that causes S3DistCp to concatenate files that match the expression. For example, you could use this option to combine all of the log files written in one hour into a single file. The concatenated filename is the value matched by the regular expression for the grouping.

Parentheses indicate how files should be grouped, with all of the items that match the parenthetical statement being combined into a single output file. If the regular expression does not include a parenthetical statement, the job flow will fail on the S3DistCp step and return an error.

If the regular expression argument contains special characters, such as an asterisk (*), either the regular expression or the entire --args string must be enclosed in single quotes (').

When --groupBy is specified, only files that match the specified pattern will be copied. You do not need to specify --groupBy and --srcPattern at the same time.

Example: --groupBy,.*subnetid.*([0-9]+-[0-9]+-[0-9]+-[0-9]+).*

No
--targetSize

The size, in mebibytes (MiB), of the files to create based on the --groupBy option. This value must be an integer. When --targetSize is set, S3DistCp will attempt to match this size; the actual size of the copied files may be larger or smaller than this value.

If the files concatenated by --groupBy are larger than the value of --targetSize, they will be broken up into part files, which will be named sequentially with a numeric value appended to the end. For example, a file concatenated into myfile.gz would be be broken into parts as: myfile0.gz, myfile1.gz, etc.

Example: --targetSize,2

No
--outputCodec

Specifies the compression codec to use for the copied files. This can take the values: gzip, lzo, snappy, or none. You can use this option, for example, to convert input files compressed with Gzip into output files with LZO compression, or to uncompress the files as part of the copy operation. If you do not specify a value for --outputCodec the files are copied over with no change in their compression.

Example: --outputCodec,lzo

No
--deleteOnSuccess

If the copy operation is successful, this option causes S3DistCp to delete the copied files from the source location. This is useful if you are copying output files, such as log files, from one location to another as a scheduled task, and you don't want to copy the same files twice.

Example: --deleteOnSuccess

No
--disableMultipartUpload

Disables the use of multipart upload. For more information about multipart upload, see Multipart Upload.

Example: --disableMultipartUpload

No
--multipartUploadChunkSize

The size, in MiB, of the multipart upload part size. By default it will use multipart upload when writing to Amazon S3. The default chunk size is 16 MiB.

Example: --multipartUploadChunkSize,32

No

In addition to the options above, S3DistCp implements the Tool interface which means that it supports the generic options.

Adding S3DistCp as a Step in a Job Flow

You can call S3DistCp by adding it as a step in your job flow.

To add a S3DistCp step to a job flow using the CLI

  • Add a step to the job flow that calls S3DistCp, passing in the parameters that specify how S3DistCp should perform the copy operation. For more information about adding steps to a job flow, see Add Steps to a Job Flow.

    The following example copies daemon logs from Amazon S3 to hdfs:///output.

    In this CLI command:

    • --jobflow specifies the job flow to add the copy step to.

    • --jar is the location of the S3DistCp JAR file.

    • --args is a comma-separated list of the option name-value pairs to pass in to S3DistCp. For a complete list of the available options, see S3DistCp Options. You can also specify the options singly, using multiple --arg parameters. Both forms are shown in examples below.

    You can use either the --args or --arg syntax to pass options into the job flow step. The --args parameter is a convenient way to pass in several --arg parameters at one time. It splits the string passed in on comma (,) characters to parse them into arguments. This syntax is shown in the following example. Note that the value passed in by --args is enclosed in single quotes ('). This prevents asterisks (*) and any other special characters in any regular expressions from being expanded by the Linux shell.

    elastic-mapreduce --jobflow jobflow-identifier --jar \
    s3://region.elasticmapreduce/libs/s3distcp/1.0.1/s3distcp.jar \
    --args 'S3DistCp-OptionName1,S3DistCp-OptionValue1, \
    S3DistCp-OptionName2,S3DistCp-OptionValue2,\
    S3DistCp-OptionName3,S3DistCp-OptionValue3'
    				

    If the value of a S3DistCp option contains a comma, you cannot use --args, and must use instead individual --arg parameters to pass in the S3DistCp option names and values. Only the --src and --dest arguments are required. Note that the option values are enclosed in single quotes ('). This prevents asterisks (*) and any other special characters in any regular expressions from being expanded by the Linux shell.

    elastic-mapreduce --jobflow jobflow-identifier --jar \
    s3://region.elasticmapreduce/libs/s3distcp/1.0.1/s3distcp.jar \
    --arg S3DistCp-OptionName1 --arg 'S3DistCp-OptionValue1' \
    --arg S3DistCp-OptionName2 --arg 'S3DistCp-OptionValue2' \
    --arg S3DistCp-OptionName3 --arg 'S3DistCp-OptionValue3' 
    				

Example Specify an option value that contains a comma

In this example, --srcPattern is set to '.*[a-zA-Z,]+'. The inclusion of a comma in the --srcPattern regular expression requires the use of individual --arg parameters.

elastic-mapreduce --jobflow j-3GY8JC4179IOJ --jar \
s3://us-east-1.elasticmapreduce/libs/s3distcp/1.0.1/s3distcp.jar \
--arg --src --arg 's3://myawsbucket/logs/j-3GY8JC4179IOJ/node/' \
--arg --dest --arg 'hdfs:///output' \
--arg --srcPattern --arg '.*[a-zA-Z,]+'
				

Example Copy log files from Amazon S3 to HDFS

This example illustrates how to copy log files stored in an Amazon S3 bucket into HDFS. In this example the --srcPattern option is used to limit the data copied to the daemon logs.

elastic-mapreduce --jobflow j-3GY8JC4179IOJ --jar \
s3://us-east-1.elasticmapreduce/libs/s3distcp/1.0.1/s3distcp.jar \
--args '--src,s3://myawsbucket/logs/j-3GY8JC4179IOJ/node/,\
--dest,hdfs:///output,\
--srcPattern,.*daemons.*-hadoop-.*'
		

Example Load Amazon CloudFront logs into HDFS

This example loads Amazon CloudFront logs into HDFS. In the process it changes the compression format from Gzip (the CloudFront default) to LZO. This is useful because data compressed using LZO can be split into multiple maps as it is decompressed, so you don't have to wait until the compression is complete, as you do with Gzip. This provides better performance when you analyze the data using Amazon EMR. This example also improves performance by using the regular expression specified in the --groupBy option to combine all of the logs for a given hour into a single file. Amazon EMR job flows are more efficient when processing a few, large, LZO-compressed files than when processing many, small, Gzip-compressed files.

elastic-mapreduce --jobflow j-3GY8JC4179IOK --jar \
s3://us-east-1.elasticmapreduce/libs/s3distcp/1.0.1/s3distcp.jar \
--args '--src,s3://myawsbucket/cf,\
--dest,hdfs:///local,\
--groupBy,.*XABCD12345678.([0-9]+-[0-9]+-[0-9]+-[0-9]+).*,\
--targetSize,128,\
--outputCodec,lzo,--deleteOnSuccess'
		

Consider the case in which the preceding example is run over the following CloudFront log files.

s3://myawsbucket/cf/XABCD12345678.2012-02-23-01.HLUS3JKx.gz
s3://myawsbucket/cf/XABCD12345678.2012-02-23-01.I9CNAZrg.gz
s3://myawsbucket/cf/XABCD12345678.2012-02-23-02.YRRwERSA.gz
s3://myawsbucket/cf/XABCD12345678.2012-02-23-02.dshVLXFE.gz
s3://myawsbucket/cf/XABCD12345678.2012-02-23-02.LpLfuShd.gz
		

S3DistCp copies, concatenates, and compresses the files into the following two files, where the file name is determined by the match made by the regular expression.

hdfs:///local/2012-02-23-01.lzo
hdfs:///local/2012-02-23-02.lzo
		

To add a S3DistCp step to a job flow using the API

  • Send a request similar to the following example, where the arguments specified by Steps.member.1.HadoopJarStep.Args.member alternate the argument name and value, where the value is URL encoded.

    https://elasticmapreduce.amazonaws.com?
    JobFlowId=jobflow-identifier&
    Steps.member.1.Name="S3DistCp Step"&
    Steps.member.1.ActionOnFailure=CONTINUE&
    Steps.member.1.HadoopJarStep.Jar=s3://us-east-1.elasticmapreduce/libs/s3distcp/1.0.1/s3distcp.jar&
    Steps.member.1.HadoopJarStep.Args.member.1=--src&
    Steps.member.1.HadoopJarStep.Args.member.2=s3%3A%2F%2Fbucket%2Fcf&	
    Steps.member.1.HadoopJarStep.Args.member.3=--dest&
    Steps.member.1.HadoopJarStep.Args.member.4=hdfs%3A%2F%2F%2Flocal&
    Steps.member.1.HadoopJarStep.Args.member.5=--srcPattern&
    Steps.member.1.HadoopJarStep.Args.member.6=.%2A%5Ba-zA-Z%5D%2B&
    Steps.member.1.HadoopJarStep.Args.member.7=--groupBy&
    Steps.member.1.HadoopJarStep.Args.member.8=.%2A%5Ba-zA-Z%5D%2B&					
    Operation=AddJobFlowSteps&
    AWSAccessKeyId=access-key-identifier&
    SignatureVersion=2&
    SignatureMethod=HmacSHA256&
    Timestamp=2011-12-28T21%3A51%3A51.000Z&
    Signature=calculated-value