Friday, November 04, 2011

Experiences with Amazon Elastic MapReduce

We started to use AWS Elastic MapReduce (EMR) in earnest a short time ago, with the help of Bradford Stephens from Drawn to Scale. We needed somebody to jumpstart our data analytics processes and workflow, and Bradford's help was invaluable. At some point we'll probably build our own Hadoop cluster either in EC2 or in-house, but for now EMR is doing the job just fine.

We started with an EMR cluster containing the master + 5 slave nodes, all m1.xlarge. We still have that cluster up and running, but in the mean time I've also experimented with launching clusters, running our data analytics processes on them, then shutting them down -- which is the 'elastic' type of workflow that takes full advantage of the pay-per-hour model of EMR.

Before I go into the details of launching and managing EMR clusters, here's the general workflow that we follow for our data analytics processes on a nightly basis:

  1. We gather data from various sources such as production databases, ad impression reports, mail server logs, and other 3rd party sources. All this data is in CSV format, mostly comma-separated or tab-separated. Each CSV file is timestamped with YYYY-MM-DD and corresponds to a 'table' or 'entity' that we want to analyse later.
  2. We gzip all CSV files and upload them to various S3 buckets.
  3. On the EMR Hadoop master node, we copy the csv.gz files we need from S3 into HDFS.
  4. We create Hive tables, one table for each type of 'entity'.
  5. We run Hive queries against these tables and save the results in HDFS.
  6. We export the results of the Hive queries to MySQL so we can further analyse them when we need to, and so we can visualize them in a dashboard.
I think this is a fairly common workflow for doing data analytics with Hadoop. It can definitely be optimized. Currently we create the Hive tables from scratch in step 4. Ideally we'll want to save them to S3, and only append new data to them, but the append operation seems to only exist in Hive 0.8 which is not yet available in EMR. But as suboptimal as it is, even if it takes a few hours each night, this process allows us to run queries that were simply impossible to execute outside of Hadoop.

Here are some experiments I've done with using EMR in its truly 'elastic' mode.

Installing the EMR Ruby CLI

Eventually we'll use something like boto's EMR bindings to manage our EMR clusters, but for quick experimentation I preferred a purely command-line tool, and the Ruby-based elastic-mapreduce tool seemed to be the only one available. To install, download the zip file from here, then unzip it somewhere on an EC2 instance where you can store your AWS credentials (a management-type instance usually). I installed in /opt/emr on one of our EC2 instances. At this point it's also a good idea to become familiar with the EMR Developer Guide, which has examples of various elastic-mapreduce use cases. I also found a good README on GitHub.

Next, create a credentials.json file containing some information about your AWS credentials and the keypair that will be used when launching the EMR cluster. The format of this JSON file is:

  "access-id": "YOUR_AWS_ACCESS_ID",
  "private-key": "YOUR_AWS_SECRET_KEY",
  "key-pair": "YOUR_EC2_KEYPAIR_NAME",
  "region": "us-east-1",
  "log-uri": "s3://"

Launching an EMR cluster

# ./elastic-mapreduce -c /opt/emr/credentials.json --create --name "test1" --alive --num-instances 3 --master-instance-type m1.small --slave-instance-type m1.small --hadoop-version 0.20 --hive-interactive --hive-versions 0.7.1

The command line above launches an EMR cluster called test1 with 3  instances (1 Hadoop master and 2 slave nodes), installs Hadoop 0.20 and Hive 0.71 on it, then keeps the cluster up and running (because we specified --alive). For experimentation purposes I recommend using m1.small instances.

This command returns a jobflow ID, which you'll need for all other commands that reference this specific cluster.

Getting information about a specific jobflow

# ./elastic-mapreduce --describe --jobflow JOBFLOWID

This command returns a JSON document containing a wealth of information about the EMR cluster. Here's an example output.

Listing the state of a jobflow

# ./elastic-mapreduce --list --jobflow JOBFLOWID
JOBFLOWID     WAITING         test1
   COMPLETED      Setup Hive                    

This command is useful when you're trying to ascertain whether the initial configuration of the cluster is done. The state of the cluster immediately after you launch it will change from STARTING to RUNNING (during which step Hadoop and Hive are installed) and finally to WAITING.

Enabling and disabling termination protection for a jobflow

If you want to make sure that the jobflow won't be terminated, you can turn the termination protection on (it's off by default):

# ./elastic-mapreduce --set-termination-protection true --jobflow JOBFLOWID

To disable the termination protection, set it to false.

Adding core nodes to the EMR cluster

There are 2 types of Hadoop slave nodes: core (which contribute to the HDFS cluster) and task (which run Hadoop tasks but are not part of the HDFS cluster). If you want to add core nodes, you can use this command:

# ./elastic-mapreduce --modify-instance-group CORE --instance-count NEW_COUNT

where NEW_COUNT is the new overall count of core nodes that you are targetting (so for example if you had 5 core nodes and you wanted to add 2 more, the NEW_COUNT will be 7).

Note that you can only request an increased NEW_COUNT for core nodes, never a decreased count.

Also note that if one or more slave nodes are misbehaving (more on that in a bit), it's better to terminate them (via ElasticFox or the AWS console for example) than to add new core nodes. When you terminate them, the EMR jobflow will automatically launch extra nodes so that the core node count is kept the same.

Accessing the Hadoop master node and working with the HDFS cluster

You can ssh into the Hadoop master by using the private key you specified in credentials.json. 

# ssh -i /path/to/private_ssh_jey hadoop@public_ip_or_dns_of_master

Or you can use:

# ./elastic-mapreduce --jobflow JOBFLOWID --ssh

Once you're logged in as user hadoop on the master node, you can run various HDFS commands such as:
  • creating an HDFS directory: $ hadoop fs -mkdir /user/hadoop/mydir
  • copying files from S3 into HDFS: $ hadoop fs -cp s3://*gz /user/hadoop/mydir
  • listing files in an HDFS directory: $ hadoop fs -ls /user/hadoop/mydir
  • deleting files in an HDFS directory: $ hadoop fs -rm /user/hadoop/*.gz
  • copying files from HDFS to a local file system on the master node: $ hadoop fs -copyToLocal /user/hadoop/mydir/*.gz /home/hadoop/somedirTh
There is also a very useful admin-type command which allows you to see the state of the slave nodes, and the HDFS file system usage. Here's an example which I ran on a cluster with 3 slave nodes (I added the 3nd node at a later time, which is why its DFS usage is less than the usage on the first 2 nodes):

$ hadoop dfsadmin -report
Configured Capacity: 5319863697408 (4.84 TB)
Present Capacity: 5052263763968 (4.6 TB)
DFS Remaining: 4952298123264 (4.5 TB)
DFS Used: 99965640704 (93.1 GB)
DFS Used%: 1.98%
Under replicated blocks: 3
Blocks with corrupt replicas: 0
Missing blocks: 0

Datanodes available: 3 (3 total, 0 dead)

Decommission Status : Normal
Configured Capacity: 1773287899136 (1.61 TB)
DFS Used: 47647641600 (44.38 GB)
Non DFS Used: 89327800320 (83.19 GB)
DFS Remaining: 1636312457216(1.49 TB)
DFS Used%: 2.69%
DFS Remaining%: 92.28%
Last contact: Fri Nov 04 20:31:31 UTC 2011

Decommission Status : Normal
Configured Capacity: 1773287899136 (1.61 TB)
DFS Used: 49191796736 (45.81 GB)
Non DFS Used: 89329020928 (83.19 GB)
DFS Remaining: 1634767081472(1.49 TB)
DFS Used%: 2.77%
DFS Remaining%: 92.19%
Last contact: Fri Nov 04 20:31:29 UTC 2011

Decommission Status : Normal
Configured Capacity: 1773287899136 (1.61 TB)
DFS Used: 3126202368 (2.91 GB)
Non DFS Used: 88943112192 (82.83 GB)
DFS Remaining: 1681218584576(1.53 TB)
DFS Used%: 0.18%
DFS Remaining%: 94.81%
Last contact: Fri Nov 04 20:31:30 UTC 2011

When there's something wrong with any slave data node, e.g. it can't be contacted by the master, then the number of dead nodes will be non-zero, and the 'Last contact' date for that slave will be off compared to the healthy nodes.

Another useful admin command is 

$ hadoop fsck /

which should report HEALTHY:

 Total size: 99648130222 B
 Total dirs: 76
 Total files: 5044 (Files currently being written: 1)
 Total blocks (validated): 5215 (avg. block size 19107982 B) (Total open file blocks (not validated): 1)
 Minimally replicated blocks: 5215 (100.0 %)
 Over-replicated blocks: 0 (0.0 %)
 Under-replicated blocks: 3 (0.057526365 %)
 Mis-replicated blocks: 0 (0.0 %)
 Default replication factor: 1
 Average block replication: 1.0011505
 Corrupt blocks: 0
 Missing replicas: 21 (0.4022218 %)
 Number of data-nodes: 3
 Number of racks: 1

The filesystem under path '/' is HEALTHY

Terminating a jobflow

As you would expect, the command is:

# ./elastic-mapreduce --terminate --jobflow JOBFLOWID

Inspecting Hive logs

After running a Hive query on the Hadoop master node, you can inspect the logs created in this directory (this is for Hive 0.71): /mnt/var/lib/hive_07_1/tmp/history

Putting it all together

I put many of these pieces together in one script (see gist here) that I run every night from the EC2 management instance, and that does the following:
  1. Launches EMR cluster with desired instance types for the master node and the slave nodes, and with the desired instance count (this count includes the master node); the command line also specified that Hadoop 0.20 and Hive 0.71 need to be installed, and it keeps the cluster up and running via the --alive option.
  2. Waits in a loop for the state of the jobflow to be WAITING and sleeps 10 seconds in between checks.
  3. Retrieves the public DNS name of the Hadoop master from the JSON description of the new jobflow.
  4. Copies from the management node to the Hadoop master a local directory containing scripts that will be run on the master.
  5. Runs a script (called on the Hadoop master via ssh; this script does all the further Hadoop and Hive processing, which includes
    • creation of HDFS directories
    • copying of csv.gz files from S3 into HDFS
    • creation of Hive files
    • running of Hive queries
    • saving of Hive queries to a local directory on the Hadoop master
  6. Retrieves the Hive output files by scp-ing them back from the Hadoop master
  7. Terminates the EMR cluster
At this point, the Hive output files are processed and the data is inserted into a MySQL instance for further analysis and visualization.

That's about it for now. We have a lot more work to do before we declare ourselves satisfied with the state of our data analytics platform, and I'll blog more about it as soon as we cross more things off our todo list.


Anonymous said...

How much data are typically processing and how long does it take to process it? We have some similar use cases as well, but I *think* our needs might be more real time oriented.

I had forgotten all about EMR since it was released, so thank you for the simple example.

Grig Gheorghiu said...

Every night we process around 100 GB of data. It takes around 4 hours, including the download from S3 to HDFS, then the Hive queries (which are fairly complex and include joins).

Jie Li said...

Interesting! What is your current workflow now? If you only need to process 4 hours a day, then the "elastic" workflow might save money. But seems the reserved instances might be cheaper for the long run, say, 3 years?

Grig Gheorghiu said...

Hi Jie Li -- we still run our EMR cluster 24x7 because our data analyst needs to run ad-hoc reports during the day too. It still makes economic sense for us, cheaper than investing in our own Hadoop cluster.

Patrice said...

Great post, thanks for sharing your experience!

Can you please give us more details on how you insert date into a MySQL instance? Thank you.

Modifying EC2 security groups via AWS Lambda functions

One task that comes up again and again is adding, removing or updating source CIDR blocks in various security groups in an EC2 infrastructur...