Monday, April 29, 2013

Run MapReduce in Play development mode

When you want to invoke a MapReduce job in Play development mode, the first problem you have will be that the jar file is not generated yet. so job.setJarByClass(classOf[Mapper]) won't work. You can overcome this issue by using Hadoop utility class JarFinder like this:
    job.setJarByClass(classOf[MyMapper])
    if (job.getJar() == null) {
      val jarFinder = Class.forName("org.apache.hadoop.util.JarFinder")
      if (jarFinder != null) {
        val m = jarFinder.getMethod("getJar", classOf[Class[_]]);

        job.getConfiguration().asInstanceOf[org.apache.hadoop.mapred.JobConf]
          .setJar(m.invoke(null, classOf[MyMapper]).asInstanceOf[String])
      }
    }
Here are what you need to pay attentions:
  • org.apache.hadoop.util.JarFinder is a Hadoop Utility class for testing, which creates a Jar on the fly. You can find the jar in hadoop-common package with classifier "tests", hadoop-common-2.0.0-cdh4.2.1-tests.jar in cloudera distribution. Unfortunately simply putting a dependency into Build.scala won't work:
       "org.apache.hadoop" % "hadoop-common" % "2.0.0-cdh4.2.1" classifier "tests", 
    
    You will get a lot of compilation errors. If you add "test" like this, compilation pass but you cannot use JarFinder in development mode because the jar is not in the classpath (it IS in test classpath)
       "org.apache.hadoop" % "hadoop-common" % "2.0.0-cdh4.2.1" % "test" classifier "tests", 
    
    I just simply get JarFinder.java from the source package and put into play app/org/apache/hadoop/util directory like other scala files. It will be compiled by play.
  • Depends on which hadoop version you are using, you may use setJar directly on job like this
      job.setJar(m.invoke(null, classOf[MyMapper]).asInstanceOf[String])
    
    I'm using "org.apache.hadoop" % "hadoop-client" % "2.0.0-mr1-cdh4.2.0". org.apache.hadoop.mapreduce.Job doesn't have setJar.
  • Property mapred.jar will be created when setJar is called, you can verify if this property exists in the job file on JobTracker web page. You could make a mistake to setJar in another Configuration object which is not the one used by job, for example,
        val config = HBaseConfiguration.create()
        val job = new Job(config, "My MR job")
        ...
        job.setJarByClass(classOf[MyMapper])
        if (job.getJar() == null) {
          val jarFinder = Class.forName("org.apache.hadoop.util.JarFinder")
          if (jarFinder != null) {
            ...
            // This won't work because jc is a cloned object, not used by Job.
            // You cannot use val config too for the same reason.
            val jc = new JobConf(job.getConfiguration())
            jc.setJar(m.invoke(null, classOf[MyMapper]).asInstanceOf[String]);
          }
        }
    
  • Another problem you may encounter is that JarFinder.getJar still returns null. I had this problem when I ran a HBase sbt project, but don't remember if this happened in Play project. If you have this problem, you can add the following code in JarFinder to fix it:
      public static String getJar(Class klass) {
        Preconditions.checkNotNull(klass, "klass");
        ClassLoader loader = klass.getClassLoader();
    
        // Try to use context class loader 
        if (loader == null) {
         loader = Thread.currentThread().getContextClassLoader();
        }
    
        if (loader != null) {
    
    My HBase project starts a MR job that needs scala-library.jar distributed. Here is the snippet
    
        TableMapReduceUtil.initTableMapperJob(
          tableName,
          getScan(siteKey, collDefId),
          classOf[TextMapper],
          classOf[ImmutableBytesWritable],
          classOf[Text],
          job,
          // don't add dependency jars
          false)
        job.setOutputFormatClass(classOf[SequenceFileOutputFormat[Text, Text]])
        job.setOutputKeyClass(classOf[Text])
        job.setOutputValueClass(classOf[Text])
        FileOutputFormat.setCompressOutput(job, true)
        FileOutputFormat.setOutputPath(job, new Path(outDir))
        job.setNumReduceTasks(0);
    
        // Add dependency jars 
        TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
          classOf[org.apache.zookeeper.ZooKeeper],
          classOf[com.google.protobuf.Message],
          job.getMapOutputKeyClass(),
          job.getMapOutputValueClass(),
          job.getInputFormatClass(),
          job.getOutputKeyClass(),
          job.getOutputValueClass(),
          job.getOutputFormatClass(),
          job.getPartitionerClass(),
          job.getCombinerClass(),
    
          // include scala-library.jar
          classOf[ScalaObject],
    
          // joda jar used by mapper/reducer
          classOf[org.joda.time.DateTime]);
    
    HBase TableMapReduceUtil (0.94.2-cdh4.2.0) will use JarFinder.getJar if it presents. Unfortunately the classloader used by sbt to launcher the application doesn't have ScalaObject in the classpath. klass.getClassLoader() will return null. When you use the context classloader, scala-library will be found without any problem.
  • The code works in both dev and prod mode. In prod mode, the jar of mappers and reducers are already in the classpath, setJarByClass will work without entering JarFinder block.

No comments:

Post a Comment