Wednesday, April 25, 2012

Using DynamoDB BatchWriteItem with boto

This is just a quick note about the advantage of using DynamoDB's newly introduced BatchWriteItem functionality, which allows you to write multiple items at the same time to a table, with the write operation parallelized behind the scenes by DynamoDB. Currently there is a limit of 25 items that can be batch-written or batch-deleted to/from a DynamoDB table.

I was glad to see that the boto library already supports this new feature -- the fact that Mitch Garnaat is now an employee of Amazon probably helps too ;-) You do have to git pull the latest boto code from GitHub, since BatchWriteItem is not available in the latest boto release 2.3.0.

I tested this feature inside a script which was parsing mail logs and uploading lines corresponding to certain regular expressions as items to a DynamoDB table. When I used the standard item-at-a-time method, it took 7 hours to write 2 million items into the table. When using BatchWriteItem, it only took 26 minutes -- so a 16x improvement.

Here's how I used this new functionality with boto:

1) I created a DynamoDB connection object and a table object:

dynamodb_conn = boto.connect_dynamodb(aws_access_key_id=MY_ACCESS_KEY_ID, aws_secret_access_key=MY_SECRET_ACCESS_KEY)

mytable = dynamodb_conn.get_table('mytable')

2) I created a batch_list object:

batch_list = dynamodb_conn.new_batch_write_list()

3) I populated this object with a list of DynamoDB items:

batch_list.add_batch(mytable, puts=items)

where items is a Python list containing item objects obtained via


mytable.new_item(attrs=item_attributes)

4) I used the batch_write_item of the layer2 module in boto to write the batch list:

dynamodb_conn.batch_write_item(batch_list)

That was about it. I definitely recommend using BatchWriteItem whenever you can, for the speedup it provides.

Thursday, April 12, 2012

Initial experiences with Amazon DynamoDB

I've been experimenting a bit with Amazon DynamoDB -- the "fully managed NoSQL database service that provides fast and predictable performance with seamless scalability" according to Amazon -- in order to see how easy to use it is, and what kind of performance you can get out of it. My initial impressions are favorable, with some caveats.

Defining tables

To get started with DynamoDB, you can use the AWS Console web interface. You need to define a table by giving it a name. Then you need to define a hash key, which enables DynamoDB to build an unordered hash index for partitioning and querying purposes. You can also define a range key, in which case DynamoDB will build an unordered hash index on the hash key, and an sorted range index on the range key. For most intents and purposes, the range key will be some sort of timestamp-related attribute of your data. You can find out more details in the DynamoDB Data Model documentation.

The most confusing part when defining a table is coming up with Read Throughput and Write Throughput values for the table. This revolves around estimating the capacity units you will need when reading from and writing to that table. Here is an excerpt from the "Capacity Unit Calculations" documentation:

For each request you send, Amazon DynamoDB computes the capacity units consumed by that operation. Item size is one of the factors it uses in computing the capacity units consumed. The size of an item is sum of lengths of its attribute names and values. This section describes how Amazon DynamoDB determines the size of item(s) involved in an operation.

The get, put, and delete operations involve one item. However, batch get, query and scan operations can return multiple items.
For operations that involve only one item, Amazon DynamoDB rounds the item size up to the next 1 KB. For example, if you get, put, or delete an item of 1.6 KB in size, Amazon DynamoDB rounds the items size to 2 KB. This rounding also applies to batch get operation, which operates on several items. Amazon DynamoDB rounds the size of each individual item returned in the batch. For example, if you use the batch get operation to retrieve 2 items of 1.2 KB and 3.6 KB, Amazon DynamoDB rounds these items sizes to 2 KB and 4 KB respectively, resulting a total size for the operation of 6 KB.
A query or scan can return multiple items. By default Amazon DynamoDB returns up to 1 MB of items for a query or scan. In this case Amazon DynamoDB computes the total item size for the request by computing the sum of all items sizes and then rounding to the next KB. For example, suppose your query returns 10 items whose combined size is 10.2 KB. Amazon DynamoDB rounds the item size for the operation to 11 KB, for the purpose of computing capacity units consumed by that operation. Note that unlike for single item operations, this size is not necessarily proportional to the number of items. Instead it is the cumulative size of processed items, rounded up to the next KB increment. For example, if your query returns 1,500 items of 64 bytes each, the cumulative size is 94 KB, not 1,500 KB.
In case of a scan operation, it is not the size of items returned by scan, rather it is the size of items evaluated by Amazon DynamoDB. That is, for a scan requests, Amazon DynamoDB evaluates up to 1 MB of items and returns only the items that satisfy the scan condition.
So what this means in practice is that you need to estimate your read and write operations/second, and multiply them by N, where N is the item size in KB. Of course, you need to err on the conservative size and overestimate, at least initially, which works in Amazon's favor, since they bill you by your estimated capacity and not by your actual consumed capacity. This is one caveat I have when using DynamoDB. I find it a bit odd that the billing process is not more dynamic and usage-based, as is the case with pretty much all the other AWS offerings.

However, once you start using the service, you can see the actual usage by looking at the Monitoring tab in the AWS console. Both Consumed Read Capacity Units and Consumed Write Capacity Units are shown, which does enable you to tweak your throughput values on the fly and bring them closer to the actual usage. You can also define monitoring alerts that will notify you if your read or write throughput is greater than a certain percentage (by default 80%) of the read or write throughput you specified for a given table.

Once you define the table, the keys and the throughput values, you're ready to use the service. If you like Python, you'll be glad to know that the latest version of boto supports DynamoDB very well. I used boto-2.3.0 which I downloaded from here. I used the DynamoDB boto API reference pretty heavily, and also the DynamoDB boto tutorial. There were some slight discrepancies though in the API reference compared to the actual boto 2.3.0 usage, and I'll highlight them below.

My scenario for using DynamoDB was to write items that represent mail log entries. I have a script that tails the mail log every 10 seconds and sends new entries packaged as items to DynamoDB. For this purpose, I created a table called 'maillog', with a hash key called 'email' (which is the destination email) and a range key called 'timestamp_sent' (which represents the time when that mail message was sent to its destination; if the message is deferred and the mail server retries, there will be multiple items sent to DynamoDB, each with their respective timestamp).

Before I go on, I want to mention that boto offers two ways of interacting with DynamoDB: a 'Layer1' class which deals with the low-level format of the DynamoDB API (things such as properly formatting the requests and interpreting the replies), and a 'Layer2' class which encapsulates the Layer1 functionality, and enables you to use higher-level constructs in your code. The DynamoDB boto tutorial deals exclusively with Layer2 functionality and that's what I am going to cover in my code examples too.

Writing items into a table

The first thing you need to do when interacting with DynamoDB via boto is to open a connection:

dynamodb_conn = boto.connect_dynamodb(aws_access_key_id='YOUR_AWS_ACCESS_KEY_ID', aws_secret_access_key='YOUR_AWS_SECRET_ACCESS_KEY)

Then you create a table object via the get_table method, called with the desired table name as a parameter:

maillog_table = dynamodb_conn.get_table('maillog')

You write items into DynamoDB, so you need to construct the proper item object, which is based on a Python dictionary. In my case, I use some complicated regular expressions to parse the mail log lines and extract the elements I am interested in, such as source mail server, relay, message id, source email, destination email, received timestamp, sent timestamp, sent status. So my item dictionary looks something like this:

item_dict = {
'email': dest_email, 
'timestamp_sent': timestamp_sent,
'msgid': msgid,
'timestamp_rcvd': timestamp_rcvd,
'source_email': src_email,
'size': size,
'delay': delay, 
'xdelay': xdelay, 
'relay_sent': relay, 
'status': stat,
}

Note that the hash key name ('email') and range key name ('timestamp_sent') are part of the keys of the dictionary. 

To actually create a DynamoDB item and write it into the database, I used the new_item and put methods on the table object:

item = maillog_table.new_item(attrs=item_dict)
item.put()

I could have also passed hash_key and range_key as parameters to new_item, like this:

item = maillog_table.new_item(hash_key=dest_email, range_key=timestamp_sent, attrs=item_dict)


But as long as those 2 keys are part of the item_dict dictionary, the new_item method is happy.

Note that the item.put() method returns a dictionary of the form:

{'ConsumedCapacityUnits': 1.0}

which is very useful in determining your actual write throughput.

Reading items, querying and scanning tables

The easiest way to read an item is to use the get_item method on the table object. This requires that you pass the hash key, and also the range key if you have one defined for that table. In my example, I do it like this:

item = table.get_item(hash_key=email, range_key=timestamp)
print item['timestamp_sent'], item['email'], item['status']

For my purposes though, after I save the mail log entries in DynamoDB, I want to query the table for a specific destination email, based on a certain time range. To do that, I use the query method on the table object.

items = table.query(hash_key=email, range_key_condition=BETWEEN(timestamp1, timestamp2))
for item in items:
    print item['timestamp_sent'], item['email'], item['status']

Note that I used a class named BETWEEN, which I needed to import:


from boto.dynamodb.condition import LE, EQ, GE, BETWEEN

This is where the online documentation for boto strays from the actual code. The documentation says that range_key_condition is a dict, when in fact it needs to be a condition object such as LE, GE, EQ etc.

The query method is very fast. A slower way to retrieve items, which is not recommended, is to scan the table based on some attributes of the items saved in the table. For example, if I wanted to scan all mail log entries sent within a time range, I could do:

LIMIT = 20
items = table.scan(scan_filter={'timestamp_sent': BETWEEN(timestamp1, timestamp2)}, request_limit=LIMIT)
for item in items:
    print item['timestamp_sent'], item['email'], item['status']

I needed to specify a scan_filter parameter, which is a dictionary with the key being the attribute you want to scan on, and with the value for that key being the condition that defines your scan. 

Whenever you run a scan, it is a very good idea to specify a request_limit parameter in the scan method. This will tell DynamoDB to only retrieve at the most that many items at a time. If you don't specify a limit, the scan operation will attempt to retrieve 1 MB worth of items at a time, which means 1024 items if each item is less than 1 KB. This would normally result in 1024 Consumed Read Capacity Units, but this number gets halved if you don't specify that you want a consistent read type of scan, but instead you go with the default of 'eventually consistent'. In my case, the scan operation with no request_limit resulted in 512.5 Consumed Read Capacity Units, which was way past my specified Read Throughput of 20 for my maillog table. In consequence, I got a monitoring alert of the type:

You are receiving this email because your Amazon CloudWatch Alarm "maillog-ReadCapacityUnitsLimit-xbzrl3i5zunh" in the US - N. Virginia region has entered the ALARM state, because "Threshold Crossed: 1 datapoint (512.5) was greater than or equal to the threshold (16.0)." at "Thursday 12 April, 2012 20:06:50 UTC".

(the threshold which was crossed is 16.0, which is 80% of 20.0, which is the Read Throughput I specified for my table)

When the request_limit is specified, the scan operation will retrieve that many items at a time, and will continue until the scan filter condition is met. This continuation of item retrieval is done at the Layer2 of boto via a mechanism involving a data structure called LastEvaluatedKey, obtained from the previous item retrieval. The LastEvaluatedKey looks something like this in my example:

{'RangeKeyElement': '20120405053639', 'HashKeyElement': 'email@example.com'} 

One functionality I wish was exposed is the exact number of Capacity Units consumed during a scan or query operation. That information is available to the Layer2 query or scan method, in a response object which is not however exposed via the boto API. I raised this issue with Mitch Garnaat, the author of boto, and he kindly opened a Github issue which hopefully will make it into a new feature.

In order to glean that information myself during a scan operation, I hacked the boto/dynamodb/layer2.py module and changed the yield statement in the scan and query methods from

yield item_class(table, attrs=item)

to

yield response['ConsumedCapacityUnits'], item_class(table, attrs=item)

This enabled me to troubleshoot the mysterious (at first) boto exception of the form

boto.exception.BotoServerError: BotoServerError: 400 Bad Request

It turns out you get this exception during a table scan or query operation when your consumed read capacity units are more than your defined read throughput.

I printed the ConsumedCapacityUnits when I specified various values for request_limit in the scan operation. For request_limit = 100, I was getting 19.0 consumed read capacity units, just slightly below my specified read throughput of 20. This is with consistent_read parameter set to the default value of False. As a rule of thumb though, if you keep consistent_read = False, you should be OK with a limit set to twice your table's read throughput.

In my tests, query operations only took 0.5 read capacity units when consistent_read is False, and 1.0 read capacity units when consistent_read is True.

That's about it for now. I haven't really started to use the DynamoDB service in anger, so I don't have any more data points. From what I've seen so far, DynamoDB is good at operations such as getting a specific item, or querying by hash key and range key. It is not good at reporting operations that involve scanning a table. For that, my intention is to use DynamoDB in conjunction with Elastic MapReduce, specifically with Hive. Stay tuned for another blog post on that soon.