- N mail servers (running sendmail) send their mail logs to a central server running syslog-ng.
- A process running on the central logging server tails the aggregated mail log (at 5 minute intervals), parses the lines it finds, extracts relevant information from each line, and saves the output in JSON format to a local file (actually there are 2 types of files generated, one for sender information and one for recipient information, corresponding to the 'from' and 'to' lines in the mail log -- see below)
- Another process compresses the generated files in bzip2 format and uploads them to S3.
I have 2 sets of files, one set with names similar to "from-2011-07-12-20-58" and containing JSON records of the following form, one per line:
{"nrcpts": "1", "src": "info@example.com", "sendmailid": "p6D0r0u1006229", "relay": "app03.example.com", "classnumber": "0", "msgid": "WARQZCXAEMSSVWPPOOYZXRLQIKMFUY.155763@example.com", "
pid": "6229", "month": "Jul", "time": "20:53:00", "day": "12", "mailserver": "mail5", "size": "57395"}
The second set contains files with names similar to "to-2011-07-12-20-58" and containing JSON records of the following form, one per line:
{"sendmailid": "p6D0qwvm006395", "relay": "gmail-smtp-in.l.google.com.", "dest": "somebody@gmail.com", "pid": "6406", "stat": "Sent (OK 1310518380 pd12si6025606vcb.162)", "month": "Jul", "delay": "00:00:02", "time": "20:53:00", "xdelay": "00:00:02", "day": "12", "mailserver": "mail2"}
For the initial EMR/Pig setup, I followed "Parsing Logs with Apache Pig and Elastic MapReduce". It's fairly simple to end up with an EC2 instance running Hadoop and Pig that you can play with.
I then ssh-ed into the EMR master instance (note that it was still shown in 'Waiting' state in the EMR console, but once it got assigned an IP and internal name I was able to ssh into it).
In order for Pig to be able to process input in JSON format, you need to use Kevin Weil's elephant-bird library. I followed Eric Lubow's post to get that set up:
$ mkdir git && mkdir pig-jars
$ cd git && wget --no-check-certificate https://github.com/kevinweil/elephant-bird/tarball/eb1.2.1_with_jsonloader
$ tar xvfz eb1.2.1_with_jsonloader
$ cd kevinweil-elephant-bird-ecf8356/
$ cp lib/google-collect-1.0.jar ~/pig-jars/
$ cp lib/json-simple-1.1.jar ~/pig-jars/
$ ant nonothing
$ cd build/classes/
$ jar -cf ../elephant-bird-1.2.1-SNAPSHOT.jar com
$ cp ../elephant-bird-1.2.1-SNAPSHOT.jar ~/pig-jars/
$ cd git && wget --no-check-certificate https://github.com/kevinweil/elephant-bird/tarball/eb1.2.1_with_jsonloader
$ tar xvfz eb1.2.1_with_jsonloader
$ cd kevinweil-elephant-bird-ecf8356/
$ cp lib/google-collect-1.0.jar ~/pig-jars/
$ cp lib/json-simple-1.1.jar ~/pig-jars/
$ ant nonothing
$ cd build/classes/
$ jar -cf ../elephant-bird-1.2.1-SNAPSHOT.jar com
$ cp ../elephant-bird-1.2.1-SNAPSHOT.jar ~/pig-jars/
I then copied 3 elephant-bird jar files to S3 so I can register them every time I run Pig. I did that via the grunt command prompt:
$ pig -x local
grunt> cp file:///home/hadoop/pig-jars/google-collect-1.0.jar s3://MY_S3_BUCKET/jars/pig/
grunt> cp file:///home/hadoop/pig-jars/json-simple-1.1.jar s3://MY_S3_BUCKET/jars/pig/
grunt> cp file:///home/hadoop/pig-jars/elephant-bird-1.2.1-SNAPSHOT.jar s3://MY_S3_BUCKET/jars/pig/
grunt> cp file:///home/hadoop/pig-jars/json-simple-1.1.jar s3://MY_S3_BUCKET/jars/pig/
grunt> cp file:///home/hadoop/pig-jars/elephant-bird-1.2.1-SNAPSHOT.jar s3://MY_S3_BUCKET/jars/pig/
At this point, I was ready to process some of the files I uploaded to S3.
I first tried processing a single file, using Pig's local mode (which doesn't involve HDFS). It turns out that Pig doesn't load compressed files correctly via elephant-bird when you run in local mode, so I tested this on an uncompressed file previously uploaded to S3:
$ pig -x local
grunt> REGISTER s3://MY_S3_BUCKET/jars/pig/google-collect-1.0.jar;
grunt> REGISTER s3://MY_S3_BUCKET/jars/pig/json-simple-1.1.jar;
grunt> REGISTER s3://MY_S3_BUCKET/jars/pig/elephant-bird-1.2.1-SNAPSHOT.jar;grunt> json = LOAD 's3://MY_S3_BUCKET/mail_logs/2011-07-12/to-2011-07-12-16-49' USING com.twitter.elephantbird.pig.load.JsonLoader();
grunt> REGISTER s3://MY_S3_BUCKET/jars/pig/json-simple-1.1.jar;
grunt> REGISTER s3://MY_S3_BUCKET/jars/pig/elephant-bird-1.2.1-SNAPSHOT.jar;grunt> json = LOAD 's3://MY_S3_BUCKET/mail_logs/2011-07-12/to-2011-07-12-16-49' USING com.twitter.elephantbird.pig.load.JsonLoader();
Note that I used the JSON loader from the elephant-bird JAR file.
I wanted to know the top 3 mail servers from the file I loaded (this is again heavily inspired by Eric Lubow's example in his blog post):
grunt> mailservers = FOREACH json GENERATE $0#'mailserver' AS mailserver;
grunt> mailserver_count = FOREACH (GROUP mailservers BY $0) GENERATE $0, COUNT($1) AS cnt;
grunt> mailserver_sorted_count = LIMIT(ORDER mailserver_count BY cnt DESC) 3;
grunt> DUMP mailserver_sorted_count;
grunt> mailserver_sorted_count = LIMIT(ORDER mailserver_count BY cnt DESC) 3;
grunt> DUMP mailserver_sorted_count;
I won't go into detail as far as the actual Pig operations I ran -- I recommend going through some Pig Latin tutorials or buying the O'Reilly 'Programming Pig' book. Suffice to say that I extracted the 'mailserver' JSON field, then I grouped the records by mail server and counted how many there are in each group. Finally, I dumped the 3 top mail servers found.
Here's a slightly more interesting exercise: finding out the top 10 mail recipients by looking at all the to-* files uploaded to S3 (still uncompressed in this case):
grunt> to = LOAD 's3://MY_S3_BUCKET/mail_logs/2011-07-13/to*' USING com.twitter.elephantbird.pig.load.JsonLoader();
grunt> to_emails = FOREACH to GENERATE $0#'dest' AS dest;
grunt> to_count = FOREACH (GROUP to_emails BY $0) GENERATE $0, COUNT($1) AS cnt;
grunt> to_sorted_count = LIMIT(ORDER to_count BY cnt DESC) 10;
grunt> DUMP to_sorted_count;
grunt> to_emails = FOREACH to GENERATE $0#'dest' AS dest;
grunt> to_count = FOREACH (GROUP to_emails BY $0) GENERATE $0, COUNT($1) AS cnt;
grunt> to_sorted_count = LIMIT(ORDER to_count BY cnt DESC) 10;
grunt> DUMP to_sorted_count;
I tried the same processing steps on bzip2-compressed files using Pig's Hadoop mode (which you invoke by just running 'pig' and not 'pig -x local'). The files were loaded correctly this time, but the MapReduce phase failed with messages similar to this in /mnt/var/log/apps/pig.log:
Pig Stack Trace
---------------
ERROR 6015: During execution, encountered a Hadoop error.
org.apache.pig.impl.logicalLayer.FrontendException: ERROR 1066: Unable to open iterator for alias to_sorted_count
at org.apache.pig.PigServer.openIterator(PigServer.java:482)
at org.apache.pig.tools.grunt.GruntParser.processDump(GruntParser.java:546)
at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:241)
at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:165)
at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:141)
at org.apache.pig.tools.grunt.Grunt.run(Grunt.java:75)
at org.apache.pig.Main.main(Main.java:374)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 6015: During execution, encountered a Hadoop error.
at .apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862)
at .apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:474)
at .apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Map.collect(PigMapReduce.java:109)
at .apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:255)
at .apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:244)
at .apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Map.map(PigMapReduce.java:94)
at .apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at .apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:363)
at .apache.hadoop.mapred.MapTask.run(MapTask.java:312)
Caused by: java.io.IOException: Type mismatch in key from map: expected org.apache.pig.impl.io.NullableBytesWritable, recieved org.apache.pig.impl.io.NullableText
... 9 more
---------------
ERROR 6015: During execution, encountered a Hadoop error.
org.apache.pig.impl.logicalLayer.FrontendException: ERROR 1066: Unable to open iterator for alias to_sorted_count
at org.apache.pig.PigServer.openIterator(PigServer.java:482)
at org.apache.pig.tools.grunt.GruntParser.processDump(GruntParser.java:546)
at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:241)
at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:165)
at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:141)
at org.apache.pig.tools.grunt.Grunt.run(Grunt.java:75)
at org.apache.pig.Main.main(Main.java:374)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 6015: During execution, encountered a Hadoop error.
at .apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862)
at .apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:474)
at .apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Map.collect(PigMapReduce.java:109)
at .apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:255)
at .apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:244)
at .apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Map.map(PigMapReduce.java:94)
at .apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at .apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:363)
at .apache.hadoop.mapred.MapTask.run(MapTask.java:312)
Caused by: java.io.IOException: Type mismatch in key from map: expected org.apache.pig.impl.io.NullableBytesWritable, recieved org.apache.pig.impl.io.NullableText
... 9 more
A quick Google search revealed JIRA Pig ticket #919 which offered a workaround. Basically this happens when a value coming out of a map is used in a group/cogroup/join. By default the type of that value is bytearray, and you need to cast it to chararray to make things work (I confess I didn't dig too much into the nitty-gritty of this issue yet, I was just happy I made it work).
So what I had to do was to modify a single line and cast the value used in the GROUP BY clause to chararray:
grunt> to_count = FOREACH (GROUP to_emails BY (chararray)$0) GENERATE $0, COUNT($1) AS cnt;
At this point, I was able to watch Elastic MapReduce in action, slower than in local mode becase I only had 1 m1.small instance. I'll try it next with several instances and hopefully see a near-linear improvement.
That's it for now. This was just a toy example, but it got me started with EMR and Pig. Hopefully I'll follow up with more interesting log processing and analysis.