深入探究如何使用Java编写MapReduce程序

  import java.io.IOException;

  import java.util.StringTokenizer;

  import org.apache.hadoop.conf.Configuration;

  import org.apache.hadoop.fs.Path;

  import org.apache.hadoop.io.IntWritable;

  import org.apache.hadoop.io.LongWritable;

  import org.apache.hadoop.io.Text;

  import org.apache.hadoop.mapreduce.Job;

  import org.apache.hadoop.mapreduce.Mapper;

  import org.apache.hadoop.mapreduce.Reducer;

  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  public class WordCount {

  public static class Map extends Mapper {

  private final static IntWritable one = new IntWritable(1);

  private Text word = new Text();

  public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

  String line = value.toString();

  StringTokenizer tokenizer = new StringTokenizer(line);

  while (tokenizer.hasMoreTokens()) {

  word.set(tokenizer.nextToken());

  context.write(word, one);

  }

  }

  }

  public static class Reduce extends Reducer {

  public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

  int sum = 0;

  for (IntWritable value : values) {

  sum += value.get();

  }

  context.write(key, new IntWritable(sum));

  }

  }

  public static void main(String[] args) throws Exception {

  Configuration conf = new Configuration();

  Job job = Job.getInstance(conf, "wordcount");

  job.setJarByClass(WordCount.class);

  job.setMapperClass(Map.class);

  job.setCombinerClass(Reduce.class);

  job.setReducerClass(Reduce.class);

  job.setOutputKeyClass(Text.class);

  job.setOutputValueClass(IntWritable.class);

  FileInputFormat.addInputPath(job, new Path(args[0]));

  FileOutputFormat.setOutputPath(job, new Path(args[1]));

  System.exit(job.waitForCompletion(true) ? 0 : 1);

  }

  }