ScalaでHadoopのジョブを書く

HadoopのジョブをScalaで書くためのこんなものがある。

- http://blog.jonhnnyweslley.net/2008/05/shadoop.html

試したところ、サンプルがHadoop 0.20.2では動かないっぽい。
サンプルを動くように改変してみた。

WordCount.scala

import shadoop.SHadoop._

import java.util.Iterator
import org.apache.hadoop.fs._
import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.Mapper
import org.apache.hadoop.mapreduce.Reducer

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.util.GenericOptionsParser

import scala.collection.JavaConversions._

object WordCount {

  class Map extends Mapper[AnyRef, Text, Text, IntWritable] {

    val one: IntWritable = 1

    override def map(key: AnyRef, value: Text, output: Mapper[AnyRef, Text, Text, IntWritable]#Context): Unit = {

      for (x <- value.split(" ")) {
        output.write(x, one)
      }
    }
  }

  class Reduce extends Reducer[Text, IntWritable, Text, IntWritable] {

    override def reduce(key: Text, values: java.lang.Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context): Unit = {

      val sum = values.reduceLeft( (x, y) => x + y)
      context.write(key, sum)
    }
  }

  def main(args: Array[String]) = {

    val conf = new Configuration()
    val otherArgs: Array[String] = (new GenericOptionsParser(conf, args)).getRemainingArgs()
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>")
      exit(2)
    }
    val job = new Job(conf, "word count")
    job.setJarByClass(WordCount.getClass)
    job.setMapperClass(classOf[Map]);
    job.setCombinerClass(classOf[Reduce]);
    job.setReducerClass(classOf[Reduce]);
    job.setOutputKeyClass(classOf[Text]);
    job.setOutputValueClass(classOf[IntWritable])
    FileInputFormat.addInputPath(job, new Path(otherArgs(0)))
    FileOutputFormat.setOutputPath(job, new Path(otherArgs(1)))

    exit(if (job.waitForCompletion(true)) 0 else 1)
  }
}

overrideしたメソッドの引数のContextが書くのが激しくだるいので、SHadoopで工夫の余地する余地があるけど、他力本願的に何もしない。