A Simple Hadoop MapReduce tutorial/example
When you first start working with Hadoop -- or try to start working with it -- you'll find that it's hard to find a good Hadoop MapReduce tutorial. In fact, the only one I've found that properly explains Hadoop is this this Yahoo tutorial. Even then, they don't give you quite enough information, so in an effort to help the situation, I thought I'd share a modified version of the standard "word count" source code example every book and tutorial seems to use.
To make the code much more understandable, I've added a lot of comments to the Java source code shown below, primarily before each class or method. I've also added some println statements that you can enable and disable to see all the magic that the Hadoop framework is doing for you. The Hadoop tutorial I linked to above gives a fairly good description of all the magic that happens in the code, but I've found that once you see everything printed to an output log file, the lights go on, and you can enjoy that "Aha!" experience much more.
(Hadoop reminds me of the Spring Framework, because when you first start using it, a lot of magic happens behind the scenes, and it's very helpful/necessary to understand that magic.)
Given that introduction, here's the source code for my modified Hadoop Word Count example:
package hadoopwordcount; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class WordTokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); /** * map() gets a key, value, and context (which we'll ignore for the moment). * key - seems to be "bytes from the beginning of the file" * value - the current line; we are being fed one line at a time from the * input file * * here's what the key and value look like if i print them out with the first * println statement below: * * [map] key: (0), value: ([Weekly Compilation of Presidential Documents]) * [map] key: (47), value: (From the 2002 Presidential Documents Online via GPO Access [frwais.access.gpo.gov]) * [map] key: (130), value: ([DOCID:pd04fe02_txt-11] ) * [map] key: (179), value: () * [map] key: (180), value: ([Page 133-139]) * * in the tokenizer loop, each token is a "word" from the current line, so the first token from * the first line is "Weekly", then "Compilation", and so on. as a result, the output from the loop * over the first line looks like this: * * [map] key: (0), value: ([Weekly Compilation of Presidential Documents]) * [map, in loop] token: ([Weekly) * [map, in loop] token: (Compilation) * [map, in loop] token: (of) * [map, in loop] token: (Presidential) * [map, in loop] token: (Documents]) * */ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { //System.err.println(String.format("[map] key: (%s), value: (%s)", key, value)); // break each sentence into words, using the punctuation characters shown StringTokenizer tokenizer = new StringTokenizer(value.toString(), " \t\n\r\f,.:;?![]'"); while (tokenizer.hasMoreTokens()) { // make the words lowercase so words like "an" and "An" are counted as one word String s = tokenizer.nextToken().toLowerCase().trim(); System.err.println(String.format("[map, in loop] token: (%s)", s)); word.set(s); context.write(word, one); } } } /** * this is the reducer class. * some magic happens before the data gets to us. the key and values data looks like this: * * [reduce] key: (Afghan), value: (1) * [reduce] key: (Afghanistan), value: (1, 1, 1, 1, 1, 1, 1) * [reduce] key: (Afghanistan,), value: (1, 1, 1) * [reduce] key: (Africa), value: (1, 1) * [reduce] key: (Al), value: (1) * * there are also many '0' values in the data: * * [reduce] key: (while), value: (0) * [reduce] key: (who), value: (0) * ... * * note that the input to this function is sorted, so it begins with numbers, * like "000", then starts with "a", "about", and so on, after the numbers are printed. * */ public static class WordOccurrenceReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable occurrencesOfWord = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // debug output //printKeyAndValues(key, values); // the actual reducer work int sum = 0; for (IntWritable val : values) { sum += val.get(); } occurrencesOfWord.set(sum); // this writes the word and the count, like this: ("Africa", 2) context.write(key, occurrencesOfWord); // my debug output System.err.println(String.format("[reduce] word: (%s), count: (%d)", key, occurrencesOfWord.get())); } // a little method to print debug output private void printKeyAndValues(Text key, Iterable<IntWritable> values) { StringBuilder sb = new StringBuilder(); for (IntWritable val : values) { sb.append(val.get() + ", "); } System.err.println(String.format("[reduce] key: (%s), value: (%s)", key, sb.toString())); } } /** * the "driver" class. it sets everything up, then gets it started. */ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <inputFile> <outputDir>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(WordTokenizerMapper.class); job.setCombinerClass(WordOccurrenceReducer.class); job.setReducerClass(WordOccurrenceReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
An example "big data" data set
One thing I need to note about this example is that I've used an example "big data" data set. I don't really recommend the book Hadoop in Action, but one thing it turned me onto was using an old State of the Union address with the example Hadoop word count program. Here's a link to that State of the Union data. You can use any data set you want, but in trying to follow the book, I downloaded the 2002 address.
A shell script to run the word count program
To help you along even more, here's the source code for a shell script that I use to run this WordCount example:
# 'wcoutput' is the name of my wordcount output directory. # you need to delete it before running the wordcount program. rm -rf wcoutput 2> /dev/null # this is how i run the wordcount program. # my System.err.println statements are directed to the # wordcount.log file. /Users/Al/bin/hadoop/bin/hadoop jar \ wordcount.jar \ wordcount.WordCount \ wcinput \ wcoutput \ 2> wordcount.log
That shell script shows how I redirect the System.err.println output to a separate file, where I can see all the output I've printed from within the WordCount classes.
Summary
Reporting live from sunny Boulder, Colorado, I hope this extra Hadoop tutorial/example information has been helpful.
Recent blog posts
- Free: Introduction To Functional Programming video training course
- The #1 functional programming (and computer programming) book
- The User Story Mapping Workshop process
- Alvin Alexander, Certified Scrum Product Owner (CSPO)
- Alvin Alexander is now a Certified ScrumMaster (CSM)
- Our “Back To Then” app (for iOS and Android)
- A Docker cheat sheet
- Pushing a Scala 3 JAR/Docker file to Google Cloud Run
- Reading a CSV File Into a Spark RDD (Scala Cookbook recipe)
- Reading a File Into a Spark RDD (Scala Cookbook recipe)