Amazon Elastic MapReduce
Developer Guide (API Version 2009-11-30)
Print this pageEmail this pageGo to the ForumsView the PDFShare this page on TwitterShare this page on FacebookBookmark this page on DeliciousSubmit this page to RedditSubmit this page to DiggDid this page help you?  Yes  No   Tell us about it...

How to Create a Streaming Job Flow

This section covers the basics of creating and launching a streaming job flow using Amazon Elastic MapReduce (Amazon EMR). You'll step through how to create a streaming job flow using either the Amazon EMR console, the CLI, or the Query API. Before you create your job flow you'll need to create objects and permissions; for more information see Setting Up Your Environment to Run a Job Flow.

A streaming job flow reads input from standard input and then runs a script or executable (called a mapper) against each input. The result from each of the inputs is saved locally, typically on a Hadoop Distributed File System (HDFS) partition. Once all the input is processed by the mapper, a second script or executable (called a reducer) processes the mapper results. The results from the reducer are sent to standard output. You can chain together a series of streaming job flows, where the output of one streaming job flow becomes the input of another job flow.

The mapper and the reducer can each be referenced as a file or you can supply a Java class. You can implement the mapper and reducer in any of the supported languages, including Ruby, Perl, Python, PHP, or Bash.

The example that follows is based on the Amazon EMR Word Count Example. This example shows how to use Hadoop streaming to count the number of times each word occurs within a text file. In this example, the input is located in the Amazon S3 bucket s3n://elasticmapreduce/samples/wordcount/input. The mapper is a Python script that counts the number of times a word occurs in each input string and is located at s3://elasticmapreduce/samples/wordcount/wordSplitter.py. The reducer references a standard Hadoop library package called aggregate. Aggregate provides a special Java class and a list of simple aggregators that perform aggregations such as sum, max, and min over a sequence of values. The output is saved to an Amazon S3 bucket you created in Setting Up Your Environment to Run a Job Flow.

Amazon EMR Console

This example describes how to use the Amazon EMR console to create a streaming job flow.

To create a streaming job flow

  1. Sign in to the AWS Management Console and open the Amazon Elastic MapReduce console at https://console.aws.amazon.com/elasticmapreduce/.

  2. Click Create New Job Flow.

    Create New Job Flow
  3. In the DEFINE JOB FLOW page, do the following:

    1. Enter a name in the Job Flow Name field. This name is optional, and does not need to be unique.

    2. Select Run your own application.

    3. Select Streaming in the drop-down list.

    4. Click Continue.

    Streaming Job Flow
  4. In the SPECIFY PARAMETERS page, enter values in the boxes using the following table as a guide, and then click Continue.

    FieldAction
    Input Location* Specify the URI where the input data resides in Amazon S3. The value must be in the form BucketName/path.
    Output Location* Specify the URI where you want the output stored in Amazon S3. The value must be in the form BucketName/path.
    Mapper* Specify either a class name that refers to a mapper class in Hadoop, or a path on Amazon S3 where the mapper executable, such as a Python program, resides. The path value must be in the form BucketName/path/MapperExecutable.
    Reducer*Specify either a class name that refers to a reducer class in Hadoop, or a path on Amazon S3 where the reducer executable, such as a Python program, resides. The path value must be in the form BucketName/path/ReducerExecutable. Amazon EMR supports the special aggregate keyword. For more information, go to the Aggregate library supplied by Hadoop.
    Extra Args Optionally, enter a list of arguments (space-separated strings) to pass to the Hadoop streaming utility. For example, you can specify additional files to load into the distributed cache.

    * Required parameter

    Specify Streaming Parameters
  5. In the CONFIGURE EC2 INSTANCES page, select the type and number of instances, using the following table as a guide, and then click Continue.

    [Note]Note

    Twenty is the default maximum number of nodes per AWS account. For example, if you have two job flows running, the total number of nodes running for both job flows must be 20 or less. If you need more than 20 nodes, you must submit a request to increase your Amazon EC2 instance limit. For more information, go to the Request to Increase Amazon EC2 Instance Limit Form.

    FieldAction
    Instance CountSpecify the number of nodes to use in the Hadoop cluster. There is always one master node in each job flow. You can specify the number of core and tasks nodes.
    Instance TypeSpecify the Amazon EC2 instance types to use as master, core, and task nodes. Valid types are m1.small (default), m1.large, m1.xlarge, c1.medium, c1.xlarge, m2.xlarge, m2.2xlarge, and m2.4xlarge, cc1.4xlarge, cg1.4xlarge.
    Request Spot InstancesClick this checkbox to run master, core, or task nodes on Spot Instances. For more information, see Lowering Costs with Spot Instances

    * Required parameter

    Configure EC2 Instances
  6. In the ADVANCED OPTIONS page, set additional configuration options, using the following table as a guide, and then click Continue.

    FieldAction
    Amazon EC2 Key PairOptionally, specify a key pair that you created previously. For more information, see Create an Amazon EC2 Key Pair and PEM File. If you do not enter a value in this field, you cannot SSH into the master node.
    Amazon VPC Subnet IdOptionally, specify a VPC subnet identifier to launch the job flow in an Amazon VPC. For more information, see Running Job Flows on an Amazon VPC.
    Amazon S3 Log PathOptionally, specify a path in Amazon S3 to store the Amazon EMR log files. The value must be in the form BucketName/path. If you do not supply a location, Amazon EMR does not log any files.
    Enable DebuggingSelect Yes to store Amazon Elastic MapReduce-generated log files. You must enable debugging at this level if you want to store the log files generated by Amazon EMR.

    If you select Yes, you must supply an Amazon S3 bucket name where Amazon Elastic MapReduce can upload your log files.

    For more information, see Troubleshooting.

    [Important]Important

    You can enable debugging for a job flow only when you initially create the job flow.

    Keep AliveSelect Yes to cause the job flow to continue running when all processing is completed.

    Advanced Options
  7. In the BOOTSTRAP ACTIONS page, select Proceed with no Bootstrap Actions, and then click Continue.

    For more information about bootstrap actions, see Bootstrap Actions.

    Bootstrap Actions
  8. In the REVIEW page, review the information, edit as necessary to correct any of the values, and then click Create Job Flow when the information is correct.

    After you click Create Job Flow your request is processed; when it succeeds, a message appears.

    Amazon EMR console
  9. Click Close.

    The Amazon EMR console shows the new job flow starting.

    Amazon EMR console

CLI

This example describes how to use the CLI to create a streaming job flow. Replace the red text with your Amazon S3 bucket information.

To create a job flow

  • Use the information in the following table to create your job flow:

    If you are using... Enter the following...
    Linux or UNIX
    & ./elastic-mapreduce --create --stream \
    --input s3n://elasticmapreduce/samples/wordcount/input \
    --mapper s3://elasticmapreduce/samples/wordcount/wordSplitter.py \
    --reducer aggregate \
    --output s3n://myawsbucket 
    Microsoft Windows
    c:\ruby elastic-mapreduce --create --stream \
    --input s3n://elasticmapreduce/samples/wordcount/input \
    --mapper s3://elasticmapreduce/samples/wordcount/wordSplitter.py \
    --reducer aggregate \
    --output s3n://myawsbucket 

The output looks similar to the following.

Created jobflow JobFlowID

By default, this command launches a job flow to run on a single-node cluster using an Amazon EC2 m1.small instance. Later, when your steps are running correctly on a small set of sample data, you can launch job flows to run on multiple nodes. You can specify the number of nodes and the type of instance to run with the --num-instances and --instance-type parameters, respectively.

API

This section describes the Amazon EMR API Query request parameters you need to create a streaming job flow. The response includes a <JobFlowID>, which you use in other Amazon EMR operations, such as when describing or terminating a job flow. For this reason, it is important to store job flow IDs.

The Args argument contains location information for your input data, output data, mapper, reducer, and cache file, as shown in the following example.

         "Name": "streaming job flow",
"HadoopJarStep": 
	{
         "Jar": "/home/hadoop/contrib/streaming/hadoop-streaming.jar",
        "Args": 
           [
            "-input",   "s3n://elasticmapreduce/samples/wordcount/input",
            "-output",  "s3n://myawsbucket",
            "-mapper",  "s3://elasticmapreduce/samples/wordcount/wordSplitter.py",
            "-reducer", "aggregate"
           ]
    }
[Note]Note

All paths are prefixed with their location. The prefix “s3://” refers to the s3n file system. If you use HDFS, prepend the path with hdfs:///. Make sure to use three slashes (///), as in hdfs:///home/hadoop/sampleInput2/.