Thursday, November 17, 2011

Seven years of blogging in less than 500 words


In a couple of days, this blog will be 7 years old. Hard to believe so much time has passed since my first Hello World post.

I keep reading that blogging is on the wane, and it's true, mainly because of the popularity of Twitter. But I strongly believe that blogging is still important, and that more people should do it. For me, it's a way to give back to the community. I can't even remember how many times I found solutions to my technical problems by reading a blog post. I personally try to post something on my blog every single time I solve an issue that I've struggled with. If you post documentation to a company wiki (assuming it's not confidential), I urge you to try to also blog publicly about it – think of it as a public documentation that can help both you and others.

Blogging is also a great way to further your career. Back in September 2008 I blogged about my experiences with EC2. I had launched an m1.small instance and I had started to play with it. Lo and behold, I got a job offer based on that post. I accepted the offer, and that job allowed me to greatly enhance my experience with cloud computing. This brings me to a more general point: if you want to work on something you know nothing about, start small on your own, persevere, and yes, blog about it! In my experience, you'll invariably find yourself in a position to be paid to work on it.

It's also pretty interesting for me to look at my blog posts and to recognize in them the evolution of my own career. I started to blog when I was working as a tester. I was passionate about automated testing, Python and agile processes and tools. This led to digging more into automation tools, then into automated deployments and configuration management in the context of system architecture. This led into cloud computing, and these days this is leading into Big Data analytics. Who knows what's next? Whatever it is, I'm sure it will be exciting, because I make an effort to make it so!

I think it's also important to recognize that learning a tool or a technique is necessary but not sufficient: at the end of the day it's what you do with it that matters. For me the progression has been testing->automation->cloud computing->data analytics->??? (I actually started my career as a programmer, but here I include only the period coinciding with my blogging years.)

This may be self-help-kind-of-corny, but a quote which is attributed (maybe wrongly) to Goethe really applies to everybody:

“Whatever you can do, or dream you can do, begin it. Boldness has genius, power, and magic in it. Begin it now."

Wednesday, November 09, 2011

Troubleshooting memory allocation errors in Elastic MapReduce

Yesterday we ran into an issue with some Hive scripts running within an Amazon Elastic MapReduce cluster. Here's the error we got:


Caused by: java.io.IOException: Spill failed
 at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:877)
 at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:474)
 at org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.processOp(ReduceSinkOperator.java:289)
 ... 11 more
Caused by: java.io.IOException: Cannot run program "bash": java.io.IOException: error=12, Cannot allocate memory
 at java.lang.ProcessBuilder.start(ProcessBuilder.java:460)
 at org.apache.hadoop.util.Shell.runCommand(Shell.java:176)
 at org.apache.hadoop.util.Shell.run(Shell.java:161)
 at org.apache.hadoop.fs.DF.getAvailable(DF.java:73)
 at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:329)
 at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:124)
 at org.apache.hadoop.mapred.MapOutputFile.getSpillFileForWrite(MapOutputFile.java:107)
 at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1238)
 at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$1800(MapTask.java:703)
 at org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask.java:1190)
Caused by: java.io.IOException: java.io.IOException: error=12, Cannot allocate memory
 at java.lang.UNIXProcess.(UNIXProcess.java:148)
 at java.lang.ProcessImpl.start(ProcessImpl.java:65)
 at java.lang.ProcessBuilder.start(ProcessBuilder.java:453)



Googling around for java.io.IOException: java.io.IOException: error=12, Cannot allocate memory, it seems it's a common problem. See this AWS Developer Forums thread, this Hadoop core-user mailing list thread, and this explanation by Ken Krugler from Bixo Labs.

Basically, it boils down to the fact that when Java tries to fork a new process (in this case a bash shell), Linux will try to allocate as much memory as the current Java process, even though not all that memory will be required. There are several workarounds (read in particular the AWS Forum thread), but a solution that worked for us was to simply add swap space to the Elastic MapReduce slave nodes.

You can ssh into a slave node from the EMR master node by using the same private key you used when launching the EMR cluster, and by targeting the internal IP address of the slave node. In our case, the slaves are m1.xlarge instances, and they have 4 local disks (/dev/sdb through /dev/sde) mounted as /mnt, /mnt1, /mnt2 and /mnt3, with 414 GB available on each file system. I ran this simple script via sudo on each slave to add 4 swap files of 1 GB each, one on each of the 4 local disks.


$ cat make_swap.sh 
#!/bin/bash


SWAPFILES='
/mnt/swapfile1
/mnt1/swapfile1
/mnt2/swapfile1
/mnt3/swapfile1
'
for SWAPFILE in $SWAPFILES; do
dd if=/dev/zero of=$SWAPFILE bs=1024 count=1048576
mkswap $SWAPFILE
swapon $SWAPFILE
echo "$SWAPFILE swap swap defaults 0 0" >> /etc/fstab
done

This solved our issue. No more failed Map tasks, no more failed Reduce tasks. Maybe this will be of use to some other frantic admins out there (like I was yesterday) who are not sure how to troubleshoot the intimidating Hadoop errors they're facing.



Friday, November 04, 2011

Experiences with Amazon Elastic MapReduce


We started to use AWS Elastic MapReduce (EMR) in earnest a short time ago, with the help of Bradford Stephens from Drawn to Scale. We needed somebody to jumpstart our data analytics processes and workflow, and Bradford's help was invaluable. At some point we'll probably build our own Hadoop cluster either in EC2 or in-house, but for now EMR is doing the job just fine.

We started with an EMR cluster containing the master + 5 slave nodes, all m1.xlarge. We still have that cluster up and running, but in the mean time I've also experimented with launching clusters, running our data analytics processes on them, then shutting them down -- which is the 'elastic' type of workflow that takes full advantage of the pay-per-hour model of EMR.

Before I go into the details of launching and managing EMR clusters, here's the general workflow that we follow for our data analytics processes on a nightly basis:

  1. We gather data from various sources such as production databases, ad impression reports, mail server logs, and other 3rd party sources. All this data is in CSV format, mostly comma-separated or tab-separated. Each CSV file is timestamped with YYYY-MM-DD and corresponds to a 'table' or 'entity' that we want to analyse later.
  2. We gzip all CSV files and upload them to various S3 buckets.
  3. On the EMR Hadoop master node, we copy the csv.gz files we need from S3 into HDFS.
  4. We create Hive tables, one table for each type of 'entity'.
  5. We run Hive queries against these tables and save the results in HDFS.
  6. We export the results of the Hive queries to MySQL so we can further analyse them when we need to, and so we can visualize them in a dashboard.
I think this is a fairly common workflow for doing data analytics with Hadoop. It can definitely be optimized. Currently we create the Hive tables from scratch in step 4. Ideally we'll want to save them to S3, and only append new data to them, but the append operation seems to only exist in Hive 0.8 which is not yet available in EMR. But as suboptimal as it is, even if it takes a few hours each night, this process allows us to run queries that were simply impossible to execute outside of Hadoop.

Here are some experiments I've done with using EMR in its truly 'elastic' mode.

Installing the EMR Ruby CLI

Eventually we'll use something like boto's EMR bindings to manage our EMR clusters, but for quick experimentation I preferred a purely command-line tool, and the Ruby-based elastic-mapreduce tool seemed to be the only one available. To install, download the zip file from here, then unzip it somewhere on an EC2 instance where you can store your AWS credentials (a management-type instance usually). I installed in /opt/emr on one of our EC2 instances. At this point it's also a good idea to become familiar with the EMR Developer Guide, which has examples of various elastic-mapreduce use cases. I also found a good README on GitHub.

Next, create a credentials.json file containing some information about your AWS credentials and the keypair that will be used when launching the EMR cluster. The format of this JSON file is:

{
  "access-id": "YOUR_AWS_ACCESS_ID",
  "private-key": "YOUR_AWS_SECRET_KEY",
  "key-pair": "YOUR_EC2_KEYPAIR_NAME",
  "key-pair-file": "PATH_TO_PRIVATE_SSH_KEY_CORRESPONDING_TO_KEYPAIR",
  "region": "us-east-1",
  "log-uri": "s3://somebucket.yourcompany.com/logs"
}

Launching an EMR cluster

# ./elastic-mapreduce -c /opt/emr/credentials.json --create --name "test1" --alive --num-instances 3 --master-instance-type m1.small --slave-instance-type m1.small --hadoop-version 0.20 --hive-interactive --hive-versions 0.7.1

The command line above launches an EMR cluster called test1 with 3  instances (1 Hadoop master and 2 slave nodes), installs Hadoop 0.20 and Hive 0.71 on it, then keeps the cluster up and running (because we specified --alive). For experimentation purposes I recommend using m1.small instances.

This command returns a jobflow ID, which you'll need for all other commands that reference this specific cluster.

Getting information about a specific jobflow

# ./elastic-mapreduce --describe --jobflow JOBFLOWID

This command returns a JSON document containing a wealth of information about the EMR cluster. Here's an example output.

Listing the state of a jobflow

# ./elastic-mapreduce --list --jobflow JOBFLOWID
JOBFLOWID     WAITING        ec2-A-B-C-D.compute-1.amazonaws.com         test1
   COMPLETED      Setup Hive                    

This command is useful when you're trying to ascertain whether the initial configuration of the cluster is done. The state of the cluster immediately after you launch it will change from STARTING to RUNNING (during which step Hadoop and Hive are installed) and finally to WAITING.

Enabling and disabling termination protection for a jobflow

If you want to make sure that the jobflow won't be terminated, you can turn the termination protection on (it's off by default):

# ./elastic-mapreduce --set-termination-protection true --jobflow JOBFLOWID

To disable the termination protection, set it to false.

Adding core nodes to the EMR cluster

There are 2 types of Hadoop slave nodes: core (which contribute to the HDFS cluster) and task (which run Hadoop tasks but are not part of the HDFS cluster). If you want to add core nodes, you can use this command:

# ./elastic-mapreduce --modify-instance-group CORE --instance-count NEW_COUNT

where NEW_COUNT is the new overall count of core nodes that you are targetting (so for example if you had 5 core nodes and you wanted to add 2 more, the NEW_COUNT will be 7).

Note that you can only request an increased NEW_COUNT for core nodes, never a decreased count.

Also note that if one or more slave nodes are misbehaving (more on that in a bit), it's better to terminate them (via ElasticFox or the AWS console for example) than to add new core nodes. When you terminate them, the EMR jobflow will automatically launch extra nodes so that the core node count is kept the same.

Accessing the Hadoop master node and working with the HDFS cluster

You can ssh into the Hadoop master by using the private key you specified in credentials.json. 

# ssh -i /path/to/private_ssh_jey hadoop@public_ip_or_dns_of_master

Or you can use:

# ./elastic-mapreduce --jobflow JOBFLOWID --ssh

Once you're logged in as user hadoop on the master node, you can run various HDFS commands such as:
  • creating an HDFS directory: $ hadoop fs -mkdir /user/hadoop/mydir
  • copying files from S3 into HDFS: $ hadoop fs -cp s3://somebucket.yourcompany.com/dir1/data*gz /user/hadoop/mydir
  • listing files in an HDFS directory: $ hadoop fs -ls /user/hadoop/mydir
  • deleting files in an HDFS directory: $ hadoop fs -rm /user/hadoop/*.gz
  • copying files from HDFS to a local file system on the master node: $ hadoop fs -copyToLocal /user/hadoop/mydir/*.gz /home/hadoop/somedirTh
There is also a very useful admin-type command which allows you to see the state of the slave nodes, and the HDFS file system usage. Here's an example which I ran on a cluster with 3 slave nodes (I added the 3nd node at a later time, which is why its DFS usage is less than the usage on the first 2 nodes):

$ hadoop dfsadmin -report
Configured Capacity: 5319863697408 (4.84 TB)
Present Capacity: 5052263763968 (4.6 TB)
DFS Remaining: 4952298123264 (4.5 TB)
DFS Used: 99965640704 (93.1 GB)
DFS Used%: 1.98%
Under replicated blocks: 3
Blocks with corrupt replicas: 0
Missing blocks: 0

-------------------------------------------------
Datanodes available: 3 (3 total, 0 dead)

Name: 10.68.125.161:9200
Decommission Status : Normal
Configured Capacity: 1773287899136 (1.61 TB)
DFS Used: 47647641600 (44.38 GB)
Non DFS Used: 89327800320 (83.19 GB)
DFS Remaining: 1636312457216(1.49 TB)
DFS Used%: 2.69%
DFS Remaining%: 92.28%
Last contact: Fri Nov 04 20:31:31 UTC 2011


Name: 10.191.121.76:9200
Decommission Status : Normal
Configured Capacity: 1773287899136 (1.61 TB)
DFS Used: 49191796736 (45.81 GB)
Non DFS Used: 89329020928 (83.19 GB)
DFS Remaining: 1634767081472(1.49 TB)
DFS Used%: 2.77%
DFS Remaining%: 92.19%
Last contact: Fri Nov 04 20:31:29 UTC 2011


Name: 10.68.105.36:9200
Decommission Status : Normal
Configured Capacity: 1773287899136 (1.61 TB)
DFS Used: 3126202368 (2.91 GB)
Non DFS Used: 88943112192 (82.83 GB)
DFS Remaining: 1681218584576(1.53 TB)
DFS Used%: 0.18%
DFS Remaining%: 94.81%
Last contact: Fri Nov 04 20:31:30 UTC 2011

When there's something wrong with any slave data node, e.g. it can't be contacted by the master, then the number of dead nodes will be non-zero, and the 'Last contact' date for that slave will be off compared to the healthy nodes.

Another useful admin command is 

$ hadoop fsck /

which should report HEALTHY:


Status: HEALTHY
 Total size: 99648130222 B
 Total dirs: 76
 Total files: 5044 (Files currently being written: 1)
 Total blocks (validated): 5215 (avg. block size 19107982 B) (Total open file blocks (not validated): 1)
 Minimally replicated blocks: 5215 (100.0 %)
 Over-replicated blocks: 0 (0.0 %)
 Under-replicated blocks: 3 (0.057526365 %)
 Mis-replicated blocks: 0 (0.0 %)
 Default replication factor: 1
 Average block replication: 1.0011505
 Corrupt blocks: 0
 Missing replicas: 21 (0.4022218 %)
 Number of data-nodes: 3
 Number of racks: 1




The filesystem under path '/' is HEALTHY

Terminating a jobflow

As you would expect, the command is:

# ./elastic-mapreduce --terminate --jobflow JOBFLOWID

Inspecting Hive logs

After running a Hive query on the Hadoop master node, you can inspect the logs created in this directory (this is for Hive 0.71): /mnt/var/lib/hive_07_1/tmp/history

Putting it all together

I put many of these pieces together in one script (see gist here) that I run every night from the EC2 management instance, and that does the following:
  1. Launches EMR cluster with desired instance types for the master node and the slave nodes, and with the desired instance count (this count includes the master node); the command line also specified that Hadoop 0.20 and Hive 0.71 need to be installed, and it keeps the cluster up and running via the --alive option.
  2. Waits in a loop for the state of the jobflow to be WAITING and sleeps 10 seconds in between checks.
  3. Retrieves the public DNS name of the Hadoop master from the JSON description of the new jobflow.
  4. Copies from the management node to the Hadoop master a local directory containing scripts that will be run on the master.
  5. Runs a script (called run_hive_scripts.sh) on the Hadoop master via ssh; this script does all the further Hadoop and Hive processing, which includes
    • creation of HDFS directories
    • copying of csv.gz files from S3 into HDFS
    • creation of Hive files
    • running of Hive queries
    • saving of Hive queries to a local directory on the Hadoop master
  6. Retrieves the Hive output files by scp-ing them back from the Hadoop master
  7. Terminates the EMR cluster
At this point, the Hive output files are processed and the data is inserted into a MySQL instance for further analysis and visualization.

That's about it for now. We have a lot more work to do before we declare ourselves satisfied with the state of our data analytics platform, and I'll blog more about it as soon as we cross more things off our todo list.