Wednesday, July 27, 2011

Processing mail logs with Elastic MapReduce and Pig

These are some notes I took while trying out Elastic MapReduce (EMR), and more specifically its Pig functionality, by processing sendmail mail logs. A big help was Eric Lubow's blog post on EMR and Pig. Before I go into  details, here's my general processing flow:

  • N mail servers (running sendmail) send their mail logs to a central server running syslog-ng.
  • A process running on the central logging server tails the aggregated mail log (at 5 minute intervals), parses the lines it finds, extracts relevant information from each line, and saves the output in JSON format to a local file (actually there are 2 types of files generated, one for sender information and one for recipient information, corresponding to the 'from' and 'to' lines in the mail log -- see below)
  • Another process compresses the generated files in bzip2 format and uploads them to S3.
I have 2 sets of files, one set with names similar to "from-2011-07-12-20-58" and containing JSON records of the following form, one per line:

{"nrcpts": "1", "src": "info@example.com", "sendmailid": "p6D0r0u1006229", "relay": "app03.example.com", "classnumber": "0", "msgid": "WARQZCXAEMSSVWPPOOYZXRLQIKMFUY.155763@example.com", "
pid": "6229", "month": "Jul", "time": "20:53:00", "day": "12", "mailserver": "mail5", "size": "57395"}

The second set contains files with names similar to "to-2011-07-12-20-58" and containing JSON records of the following form, one per line:

{"sendmailid": "p6D0qwvm006395", "relay": "gmail-smtp-in.l.google.com.", "dest": "somebody@gmail.com", "pid": "6406", "stat": "Sent (OK 1310518380 pd12si6025606vcb.162)", "month": "Jul", "delay": "00:00:02", "time": "20:53:00", "xdelay": "00:00:02", "day": "12", "mailserver": "mail2"}

For the initial EMR/Pig setup, I followed "Parsing Logs with Apache Pig and Elastic MapReduce". It's fairly simple to end up with an EC2 instance running Hadoop and Pig that you can play with.

I then ssh-ed into the EMR master instance (note that it was still shown in 'Waiting' state in the EMR console, but once it got assigned an IP and internal name I was able to ssh into it).

In order for Pig to be able to process input in JSON format, you need to use Kevin Weil's elephant-bird library. I followed Eric Lubow's post to get that set up:

$ mkdir git && mkdir pig-jars
$ cd git && wget --no-check-certificate https://github.com/kevinweil/elephant-bird/tarball/eb1.2.1_with_jsonloader
$ tar xvfz eb1.2.1_with_jsonloader
$ cd kevinweil-elephant-bird-ecf8356/
$ cp lib/google-collect-1.0.jar ~/pig-jars/
$ cp lib/json-simple-1.1.jar ~/pig-jars/
$ ant nonothing
$ cd build/classes/
$ jar -cf ../elephant-bird-1.2.1-SNAPSHOT.jar com
$ cp ../elephant-bird-1.2.1-SNAPSHOT.jar ~/pig-jars/

I then copied 3 elephant-bird jar files to S3 so I can register them every time I run Pig. I did that via the grunt command prompt:

$ pig -x local

grunt> cp file:///home/hadoop/pig-jars/google-collect-1.0.jar s3://MY_S3_BUCKET/jars/pig/
grunt> cp file:///home/hadoop/pig-jars/json-simple-1.1.jar s3://MY_S3_BUCKET/jars/pig/                                               
grunt> cp file:///home/hadoop/pig-jars/elephant-bird-1.2.1-SNAPSHOT.jar s3://MY_S3_BUCKET/jars/pig/         
At this point, I was ready to process some of the files I uploaded to S3. 
I first tried processing a single file, using Pig's local mode (which doesn't involve HDFS). It turns out that Pig doesn't load compressed files correctly via elephant-bird when you run in local mode, so I tested this on an uncompressed file previously uploaded to S3:
$ pig -x local

grunt> REGISTER s3://MY_S3_BUCKET/jars/pig/google-collect-1.0.jar;                                                                   
grunt> REGISTER s3://MY_S3_BUCKET/jars/pig/json-simple-1.1.jar;
grunt> REGISTER s3://MY_S3_BUCKET/jars/pig/elephant-bird-1.2.1-SNAPSHOT.jar;grunt> json = LOAD 's3://MY_S3_BUCKET/mail_logs/2011-07-12/to-2011-07-12-16-49' USING com.twitter.elephantbird.pig.load.JsonLoader();
Note that I used the JSON loader from the elephant-bird JAR file.
I wanted to know the top 3 mail servers from the file I loaded (this is again heavily inspired by Eric Lubow's example in his blog post):
grunt> mailservers = FOREACH json GENERATE $0#'mailserver' AS mailserver;
grunt> mailserver_count = FOREACH (GROUP mailservers BY $0) GENERATE $0, COUNT($1) AS cnt;
grunt> mailserver_sorted_count = LIMIT(ORDER mailserver_count BY cnt DESC) 3;
grunt> DUMP mailserver_sorted_count;

I won't go into detail as far as the actual Pig operations I ran -- I recommend going through some Pig Latin tutorials or buying the O'Reilly 'Programming Pig' book. Suffice to say that I extracted the 'mailserver' JSON field, then I grouped the records by mail server and counted how many there are in each group. Finally, I dumped the 3 top mail servers found.

Here's a slightly more interesting exercise: finding out the top 10 mail recipients by looking at all the to-* files uploaded to S3 (still uncompressed in this case):

grunt> to = LOAD 's3://MY_S3_BUCKET/mail_logs/2011-07-13/to*' USING com.twitter.elephantbird.pig.load.JsonLoader();
grunt> to_emails = FOREACH to GENERATE $0#'dest' AS dest;                                                                      
grunt> to_count = FOREACH (GROUP to_emails BY $0) GENERATE $0, COUNT($1) AS cnt;                                              
grunt> to_sorted_count = LIMIT(ORDER to_count BY cnt DESC) 10;                                                                                  
grunt> DUMP to_sorted_count;


I tried the same processing steps on bzip2-compressed files using Pig's Hadoop mode (which you invoke by just running 'pig' and not 'pig -x local'). The files were loaded correctly this time, but the MapReduce phase failed with messages similar to this in  /mnt/var/log/apps/pig.log:



Pig Stack Trace
---------------
ERROR 6015: During execution, encountered a Hadoop error.
org.apache.pig.impl.logicalLayer.FrontendException: ERROR 1066: Unable to open iterator for alias to_sorted_count
at org.apache.pig.PigServer.openIterator(PigServer.java:482)
at org.apache.pig.tools.grunt.GruntParser.processDump(GruntParser.java:546)
at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:241)
at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:165)
at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:141)
at org.apache.pig.tools.grunt.Grunt.run(Grunt.java:75)
at org.apache.pig.Main.main(Main.java:374)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 6015: During execution, encountered a Hadoop error.
at .apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862)
at .apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:474)
at .apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Map.collect(PigMapReduce.java:109)
at .apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:255)
at .apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:244)
at .apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Map.map(PigMapReduce.java:94)
at .apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at .apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:363)
at .apache.hadoop.mapred.MapTask.run(MapTask.java:312)
Caused by: java.io.IOException: Type mismatch in key from map: expected org.apache.pig.impl.io.NullableBytesWritable, recieved org.apache.pig.impl.io.NullableText
... 9 more

A quick Google search revealed JIRA Pig ticket #919 which offered a workaround. Basically this happens when a value coming out of a map is used in a group/cogroup/join. By default the type of that value is bytearray, and you need to cast it to chararray to make things work (I confess I didn't dig too much into the nitty-gritty of this issue yet, I was just happy I made it work).

So what I had to do was to modify a single line and cast the value used in the GROUP BY clause to chararray:

grunt> to_count = FOREACH (GROUP to_emails BY (chararray)$0) GENERATE $0, COUNT($1) AS cnt;  

At this point, I was able to watch Elastic MapReduce in action, slower than in local mode becase I only had 1 m1.small instance. I'll try it next with several instances and hopefully see a near-linear improvement.

That's it for now. This was just a toy example, but it got me started with EMR and Pig. Hopefully I'll follow up with more interesting log processing and analysis.

4 comments:

Karna said...

Im trying the same example as urs in hadoop cluster..but mines is trowing error
Code :
json1 = load '/user/hdetl/funnel/uetsample.dat' using com.twitter.elephantbird.pig.load.JsonLoader();
dat = FOREACH json1 GENERATE $0#'mailserver' AS mailserver;
dump dat;


Input(s):
Failed to read data from "/user/hdetl/funnel/uetsample.dat"

Anonymous said...

I'm running Hadoop local and get this error:

ERROR org.apache.pig.tools.grunt.Grunt - ERROR 2998: Unhandled internal error. org/slf4j/LoggerFactory

Any idea what the problem is ?

Thanks in advance.

venkat said...

for slf4j; register the library ex:

register '/home/biadmin/pigScripts/pig_lib/slf4j-api-1.5.10.jar'
register '/home/biadmin/pigScripts/pig_lib/slf4j-log4j12-1.5.10.jar'
register '/home/biadmin/pigScripts/pig_lib/log4j-1.2.15.jar'

venkat said...

for slf4j; register the library ex:

register '/home/biadmin/pigScripts/pig_lib/slf4j-api-1.5.10.jar'
register '/home/biadmin/pigScripts/pig_lib/slf4j-log4j12-1.5.10.jar'
register '/home/biadmin/pigScripts/pig_lib/log4j-1.2.15.jar'

Modifying EC2 security groups via AWS Lambda functions

One task that comes up again and again is adding, removing or updating source CIDR blocks in various security groups in an EC2 infrastructur...