ScalaでHadoopのジョブを書く
HadoopのジョブをScalaで書くためのこんなものがある。
- http://blog.jonhnnyweslley.net/2008/05/shadoop.html
試したところ、サンプルがHadoop 0.20.2では動かないっぽい。
サンプルを動くように改変してみた。
WordCount.scala
import shadoop.SHadoop._ import java.util.Iterator import org.apache.hadoop.fs._ import org.apache.hadoop.io._ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.util.GenericOptionsParser import scala.collection.JavaConversions._ object WordCount { class Map extends Mapper[AnyRef, Text, Text, IntWritable] { val one: IntWritable = 1 override def map(key: AnyRef, value: Text, output: Mapper[AnyRef, Text, Text, IntWritable]#Context): Unit = { for (x <- value.split(" ")) { output.write(x, one) } } } class Reduce extends Reducer[Text, IntWritable, Text, IntWritable] { override def reduce(key: Text, values: java.lang.Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context): Unit = { val sum = values.reduceLeft( (x, y) => x + y) context.write(key, sum) } } def main(args: Array[String]) = { val conf = new Configuration() val otherArgs: Array[String] = (new GenericOptionsParser(conf, args)).getRemainingArgs() if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>") exit(2) } val job = new Job(conf, "word count") job.setJarByClass(WordCount.getClass) job.setMapperClass(classOf[Map]); job.setCombinerClass(classOf[Reduce]); job.setReducerClass(classOf[Reduce]); job.setOutputKeyClass(classOf[Text]); job.setOutputValueClass(classOf[IntWritable]) FileInputFormat.addInputPath(job, new Path(otherArgs(0))) FileOutputFormat.setOutputPath(job, new Path(otherArgs(1))) exit(if (job.waitForCompletion(true)) 0 else 1) } }
overrideしたメソッドの引数のContextが書くのが激しくだるいので、SHadoopで工夫の余地する余地があるけど、他力本願的に何もしない。