Friday, December 20, 2013

Convert a bag of key-value pairs to map in Pig

Here is my input file: data.txt.
id_1    key_1  v_i1_a1
id_1    key_2  v_i1_a2
id_2    key_1  v_i2_a1
id_1    key_3  v_i1_a3
id_2    key_3  v_i2_a3
id_1    key_4  v_i1_a4
I want to get a map of key-value pairs for an ID like this:
(id_1, [key_1#v_i1_a1, key_2#v_i1_a2, key_3#v_i1_a3, key_4#v_i1_a4])
(id_2, [key_1#v_i2_a1, key_3#v_i2_a3])
Does this pig script work? TOMAP and BagToTuple are built-in UDFs.
data = load 'data.txt' as (id: chararray, key: chararray, value: chararray);

-- Group data by id
g_data = group data by id;
describe g_data;

-- Convert key-value pair bag to tuple, then convert to map
to_tuple = foreach g_data generate group, TOMAP(BagToTuple(data.(key, value)));
dump to_tuple;
describe to_tuple;
Unfortunately, the above script doesn't work. The output looks like this:
g_data: {group: chararray,data: {(id: chararray,key: chararray,value: chararray)
}}
(id_1,)
(id_2,)
to_tuple: {group: chararray,map[]}
What's wrong? Let us remove TOMAP and run it again
...
to_tuple = foreach g_data generate group, BagToTuple(data.(key, value)) as kv;
...
The results looks like this:
g_data: {group: chararray,data: {(id: chararray,key: chararray,value: chararray)
}}
(id_1,(key_4,v_i1_a4,key_3,v_i1_a3,key_2,v_i1_a2,key_1,v_i1_a1))
(id_2,(key_3,v_i2_a3,key_1,v_i2_a1))
to_tuple: {group: chararray,kv: (key: chararray,value: chararray)}
Did you find anything wrong? BagToTuple converts the bag to tuples correctly, but the schema of the tuple kv has only two fields. What about flatten the tuple, and use project-range? Unfortunately it still not working. Only one key-value pair in the map.
to_tuple = foreach g_data generate group, FLATTEN(BagToTuple(data.(key, value)));
...

to_map = foreach to_tuple generate group, TOMAP($1 ..);
dump to_map;
describe to_map;

g_data: {group: chararray,data: {(id: chararray,key: chararray,value: chararray)}}
(id_1,key_4,v_i1_a4,key_3,v_i1_a3,key_2,v_i1_a2,key_1,v_i1_a1)
(id_2,key_3,v_i2_a3,key_1,v_i2_a1)
to_tuple: {group: chararray,org.apache.pig.builtin.bagtotuple_8::key: chararray,
org.apache.pig.builtin.bagtotuple_8::value: chararray}
(id_1,[key_4#v_i1_a4])
(id_2,[key_3#v_i2_a3])
to_map: {group: chararray,map[]}

The problem is caused by BagToTuple, who sets the output schema to the tuple schema in the bag. This is not correct because the generated tuple's length is unknown. BagToTuple should let Pig to figure out the schema. The fix would be very simple: just find the file BagToTuple.java, and totally remove output schema method.

And you also need to resolve another issue related to TOMAP. Using TOMAP(BagToTuple(..)) won't work because the tuple will be passed as the first field. You have to FLATTEN the tuple and use position reference like this:

define MyBagToTuple my_package.BagToTuple();

data = load 'data.txt' as (id: chararray, key: chararray, value: chararray);

-- Group data by id
g_data = group data by id;
describe g_data;

-- Convert key-value pair bag to tuple, then convert to map
to_tuple = foreach g_data generate group, FLATTEN(MyBagToTuple(data.(key, value)));
dump to_tuple;
describe to_tuple;

to_map = foreach to_tuple generate $0 as id, TOMAP($1 ..) as kv;
dump to_map;
describe to_map;

g_data: {group: chararray,data: {(id: chararray,key: chararray,value: chararray)
}}
(id_1,key_4,v_i1_a4,key_3,v_i1_a3,key_2,v_i1_a2,key_1,v_i1_a1)
(id_2,key_3,v_i2_a3,key_1,v_i2_a1)
Schema for to_tuple unknown.
(id_1,[key_3#v_i1_a3,key_2#v_i1_a2,key_1#v_i1_a1,key_4#v_i1_a4])
(id_2,[key_3#v_i2_a3,key_1#v_i2_a1])
to_map: {id: bytearray,kv: map[]}

Wednesday, September 11, 2013

Deploy LZO for YARN in CDH4

This page Using the LZO Parcel is only for MRv1, not for YARN. It took me a while to figure out how to set up LZO in YARN correctly.

You may experience different error messages if you do not configure YARN correctly:

  • Class com.hadoop.compression.lzo.LzoCodec not found.
  • Class com.hadoop.mapred.DeprecatedLzoTextInputFormat not found.
  • No LZO codec found, cannot run.
  • native-lzo library not available

Here are the steps to setup LZO correctly:
  • You can follow the instruction in "Using the LZO Parcel" to install and activate the parcel
  • Add LzoCodec and LzopCodec. In cloudera manager, find the field for core-site.xml: hdfs1->Configuration -> Service-Wide -> Advanced -> Cluster-wide Configuration Safety Valve for core-site.xml. and add this property:
    
        io.compression.codecs
     org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec
    
    
  • Add classpath and native library. In Cloudera manager, find this field in mapred-site.xml: yarn1->Configuration->Service-Wide->Advanced->YARN Service MapReduce Configuration Safety Valve, then add the following two properties:
      
        mapreduce.application.classpath
        $HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*,$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*,/opt/cloudera/parcels/HADOOP_LZO/lib/hadoop/lib/*
      
      
        mapreduce.admin.user.env
        LD_LIBRARY_PATH=$HADOOP_COMMON_HOME/lib/native:/opt/cloudera/parcels/HADOOP_LZO/lib/hadoop/lib/native
      
    
  • Restart YARN and put the configuration files to gateways.
  • Don't forget to run "Deploy Client Configuration"

Sunday, September 1, 2013

Make Eclipse Font Smaller in Gnome/Xfce

There are a lot of posts on the Internet. Here are the one working for me:

  • To make the menu font smaller
    • create ~/.gtkrc-eclipse
      style "eclipse" {
        font_name = "Sans 9"
      }
      
      class "GtkWidget" style "eclipse"
    • If you want the buttons are more compact, please follow this page. I use like this, but have a problem the color of menu items on the menu bar is same as the background.
      style "eclipse" {
        font_name = "Sans 9"
      }
      
      class "GtkWidget" style "eclipse"
      
      style "compact-toolbar" {
          GtkToolbar::internal-padding = 0
          xthickness = 1
          ythickness = 1
      }
      
      style "compact-button" {
          xthickness = 0
          ythickness = 0
      }
      
      class "GtkToolbar" style "compact-toolbar"
      widget_class "*<GtkToolbar>*<GtkButton>" style "compact-button"
      
    • add this line into ~/.bashrc
      export GTK2_RC_FILES=/usr/share/themes/Clearlooks/gtk-2.0/gtkrc:$HOME/.gtkrc-eclipse 
      
  • To make the tab font smaller, such as "Package Explorer", you need to modify the css file (version # could be different).
    $ vi $ECLIPSE_HOME/plugins/org.eclipse.platform_4.2.2.v201302041200/css/e4_default_gtk.css
    
    .MPartStack {
            font-size: 9;
            swt-simple: false;
            swt-mru-visible: false;
    }
    
    You may try to make the height of tabs smaller as suggested in this page, but I found it didn't work well because the buttons like 'minimum' or 'maximum' become too small and large when the mouse is hovering.
    .MPartStack {
      font-size: 9;
      font-family: Liberation Sans;
      swt-tab-renderer: null;
      swt-tab-height: 22px;
      swt-selected-tabs-background: #FFFFFF #ECE9D8 100%;
      swt-simple: false;
      swt-mru-visible: false;
    }
    

Friday, August 16, 2013

Fedora 19 XBMC Autologin in XFCE

  • Create user xbmc and set password
    useradd -g media xbmc
    passwd xbmc 
    
  • log on as xbmc, choose XBMC as session instead of Xfce session
  • modify /etc/lightdm/lightdm.conf, I added the following into section [LightDM]
    autologin-user=xbmc
    autologin-session=XBMC
    
    and into section [SeatDefaults].
    greeter-show-manual-login=false
    autologin-user=xbmc
    autologin-session=XBMC
    
    Not sure if I need to repeat user/session in both sections, but it works.
  • It seems that you have to give xbmc a password, otherwise autologin doesn't work.

Wednesday, August 14, 2013

Scala and Java collection interoperability in MapReduce job

In Scala, you can import scala.collections.JavaConversions._ to make collections interoperabable between Scala and Java. for example
scala.collection.Iterable <=> java.lang.Iterable
Usually I prefer Scala collection API because it is concise and powerful. But be careful, this may not work in all cases. I encountered this problem when I wrote a Scala MapReduce job:
// do something on values(1)
values.drop(1).foreach { v =>
  ...
}
The code tries to handle the first element and the rest differently. This piece of code worked in the combiner perfectly, but failed in the reducer. Both the combiner and reducer use values.drop(1).foreach The reason is, I believe, that the iterable in reducer is based on a file, the file position cannot go back. When you call drop(1) in Scala, the file position moves to next, then two elements are actually dropped.

Don't call filesystem.close in Hadoop

Recently the MapReduce jobs in my project suddenly failed. It turned out that my colleague added a line in the code which closes the filesystem.

val path = new Path("/user/bewang/data")
val fs = path.getFileSystem(conf)
fs.mkdirs(path)
fs.close
Nothing seems wrong. We were told to clean up the mess you created. After using it, just close it. Is it a good practice? Unfortunately it doesn't work in Hadoop world. Actually Hadoop client manage all connections to Hadoop cluster. If you call fs.close(), the connections to the cluster are broken, and you cannot do anything after that. Don't call close, let Hadoop handle it.

Tuesday, August 13, 2013

Output a stream into multiple files in the specified percentages.

I recently finished a project which outputs JDBC results randomly into multiple files in the specified percentages. For example, I want to generate three files, which have 10%, 20%, and 70% of the total rows respectively and which file is chose for a row is randomly picked. The data is dumped from Hive/Impala through JDBC, and the result could have million rows.

The problem seems too easy. The first algorithm jumped in my mind is: generate a random number for each row between 0 and 1; If the value locates in 0-0.1, write to the first file, and 0.1 to 0.3, write to the second file, and larger than 0.3 to the third file. This method works, but unfortunately, not perfect. The problem is that the output rows may not strictly satisfy the percentage requirement. The random numbers generated in Java/Scala is uniformly distributed, but doesn't means that it will have exactly 100 numbers between 0 to 0.1, 200 between 0.1 to 0.3, and 700 between 0.3 to 1.0. There may be 3% and 5% errors.

The next algorithm I got is:

  • Output the row into a temporary file, then the total number of rows n is known. 
  • Generate an array with 0.1*n 1s, 0.2*n 2s and 0.7*n 3s, shuffle them. 
  • Read line by line from the temporary file, and write to the file according to the number in the array.
This method can generate rows satisfied the percentage requirements, but is definitely bad because I need a temporary file and may be a huge array in memory.

I finally figured out a better way which doesn't need a temporary file and a large array: Just buffer 100 rows, once the buffer is full, shuffle the rows and write them into the files according to the percentages. Usually there are still rows in the buffer when the stream is closed, you cannot just write them according to the percentages because the errors when you dump the buffer each time can be accumulated to a large number. To make the number of rows in each file satisfy the requirement strictly, you need to handle the last buffered rows carefully. Because the total number of rows is an integer, as well as are the number of rows in each file, you cannot get exactly the specified percentages. Is there a way to make x1+...+xn = total and x1/total, ..., xn/total are best approximate to the specified percentages?  


Tuesday, June 18, 2013

Avro-mapred-1.7.3-hadoop2 for AvroMultipleOutputs

I got the following error message in my MapReduce job when I ran it in CDH 4.2.0 cluster:

2013-06-18 12:50:11,095 FATAL org.apache.hadoop.mapred.Child: Error running child : java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected
 at org.apache.avro.mapreduce.AvroMultipleOutputs.getNamedOutputsList(AvroMultipleOutputs.java:218)
 at org.apache.avro.mapreduce.AvroMultipleOutputs.(AvroMultipleOutputs.java:351)

It turns out that avro-mapred-1.7.3 causes this problem. My sbt project has a dependency on hive-exec which depends on avro-mapred-1.7.3. To eliminate this error, you should exclude avro-mapred from hive-exec, and add avro-mapred-1.7.3-hadoop2

If you have hive-exec-0.10.0-cdh4.2.0 in your project, you have trouble to see the source code for Avro because this jar include a copy of all avro classes, and hive-exec-0.10.0-cdh4.2.0-sources.jar doesn't include the source codes of Avro.

AvroMultipleOutputs in 1.7.3 doesn't support different outputs have different output schema. See Avro-1266.

Thursday, June 13, 2013

Avro

It took me a while to figure out how to write an Avro file in MapReduce which can be imported into Hive and Impala.

  • There are a lot of OutputFormat in avro 1.7.3: AvroOutputFormat, AvroKeyOutputFormat, AvroKeyValueOutputFormat and AvroSequenceFileOutputFormat. Which one can be imported into Hive? You should use AvroKeyOutputFormat in MapReduce Job to output Avro Container Files.
  • You cannot specified any above output format in hive create table "stored as" clause because they don't implement HiveOutputFormat.
  • You need to set the schema using AvroJob.setOutputKeySchema
  • You have three ways to set avro compression codec as follows:
            AvroJob.setOutputKeySchema(job, getAvroSchema(schemaFile))
            job.setOutputFormatClass(classOf[AvroKeyOutputFormat[GenericRecord]])
            job.setOutputKeyClass(classOf[AvroKey[GenericRecord]])
            job.setOutputValueClass(classOf[NullWritable])
            FileOutputFormat.setCompressOutput(job, true)
            // You can us any of the following three ways to set compression
            // FileOutputFormat.setOutputCompressorClass(job, classOf[SnappyCodec])
            // job.getConfiguration().set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC)
            // job.getConfiguration().set(AvroJob.CONF_OUTPUT_CODEC, CodecFactory.snappyCodec().toString());
    
            FileOutputFormat.setOutputPath(job, new Path(outputPath))
    
    Be careful to check Cloudera Manager compression settings because it may override your settings. If you find you cannot compress avro files using the above code, you can verify if the compress is enabled in your mapper or reducer like this:
    class AvroGenericRecordMapper extends TableMapper[AvroKey[GenericRecord], NullWritable] {
    
      type Context = Mapper[ImmutableBytesWritable, Result, AvroKey[GenericRecord], NullWritable]#Context
      private var converter: AvroConverter = _
    
      // val outKey = new LongWritable(0L)
      val outVal = NullWritable.get()
      val outKey = new AvroKey[GenericRecord]()
    
      override def setup(context: Context) {
        converter = new AvroConverter(AvroJob.getOutputKeySchema(context.getConfiguration()))
        
        import org.apache.hadoop.mapreduce.TaskAttemptContext
        println("compress: " + FileOutputFormat.getCompressOutput(context.asInstanceOf[TaskAttemptContext]))
        println("codec: " + context.getConfiguration().get(AvroJob.CONF_OUTPUT_CODEC))
      }
    
      override def map(key: ImmutableBytesWritable, value: Result, context: Context) {
        outKey.datum(converter.convert(value))
        context.write(outKey, outVal)
      }
    }
    
    This mapper tries to dump Hbase table into avro files. AvroConverter is an class to convert Hbase Result to Avro GenericRecord.
  • Follow the example on this page: https://cwiki.apache.org/confluence/display/Hive/AvroSerDe. Unfortunately if you use "Avro Hive" to search, google shows your this page https://cwiki.apache.org/Hive/avroserde-working-with-avro-from-hive.html which has a wrong example, and you will get error message like:
    FAILED: Error in metadata: Cannot validate serde: org.apache.hadoop.hive.serde2.AvroSerDe
    FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask
    
    What's wrong? The serde name should be org.apache.hadoop.hive.serde2.avro.AvroSerDe
  • You don't have to define the columns because it can get from avro schema.

Friday, June 7, 2013

Fix "value parse is not a member of object org.joda.time.DateTime"

When you search this error message, you will find the answer that you missed joda-convert in your dependencies. The answer is correct.

[error] MySpec.scala:27: value parse is not a member of object org.joda.time.DateTime
[error]       val from = DateTime.parse("2013-03-01")
[error]                           ^

However, our project still reported this error even we were 100% sure joda-convert was included in the classpath. Our project has transitive dependencies on joda-time:joda-time:2.1 and org.joda:joda-convert:1.2 through play:play_2.10:2.1.1. If typing "show test:dependency-classpath" in Play console, joda-convert is in the output, and the jar in ivy repository is ok.

It turns out that we have another dependency providing org.joda.time.DateTime: org.jruby:jruby-complete:1.6.5, a transitive dependency through org.apache.hbase:hbase:0.94.2-cdh4.2.0. If joda-time comes first, you have no problem, otherwise, you see the error. Because it doesn't always happen, you might be lucky not experience this problem at all.

The fix is really simple, excluding jruby-complete from the dependency:

"org.apache.hbase" % "hbase" % hbaseVersion exclude ("org.slf4j", "slf4j-log4j12") exclude ("org.slf4j", "slf4j-api") exclude
      ("org.jruby", "jruby-complete")

Wednesday, June 5, 2013

Impala build steps on CentOS 6.3

  • Build boost-1.42.0
  • Before building impala
    • change be/CMakeLists.txt. I removed all boost RPMs and built boost libraries from sources. boost_date_time will be /usr/local/lib/libboost_date_time-mt.*. The build failed without this change. If you have boost RPMs 1.41 installed, you may not need to change this. But the build will fail with other issues.
      diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
      index c14bd31..cd5abac 100644
      --- a/be/CMakeLists.txt
      +++ b/be/CMakeLists.txt
      @@ -224,7 +224,7 @@ set (IMPALA_LINK_LIBS
         ${LIBZ}
         ${LIBBZ2}
         ${AVRO_STATIC_LIB}
      -  -lrt -lboost_date_time
      +  -lrt -lboost_date_time-mt
       )
       
       if ("${CMAKE_BUILD_TYPE}" STREQUAL "CODE_COVERAGE")
      
    • change build_public.sh to build release version and don't have to put -build_thirdparty in command line:
      diff --git a/build_public.sh b/build_public.sh
      index 6ea491b..28b445a 100755
      --- a/build_public.sh
      +++ b/build_public.sh
      @@ -23,8 +23,8 @@ set -e
       # Exit on reference to uninitialized variable
       set -u
       
      -BUILD_THIRDPARTY=0
      -TARGET_BUILD_TYPE=Debug
      +BUILD_THIRDPARTY=1
      +TARGET_BUILD_TYPE=Release
       
       for ARG in $*
       do
      
  • After building, run shell/make_shell_tarball.sh. This can generate a shell/build dir to have all files for impala-shell.
  • Prepare hadoop, hbase and hive config files, copy from /var/run/cloudera-scm-agent/process.
  • change bin/set-classpath.sh like this
    CLASSPATH=\
    $HOME/hadoop/hadoop-conf:\
    $HOME/hadoop/hbase-conf:\
    $HOME/hadoop/hive-conf:\
    #$IMPALA_HOME/fe/src/test/resources:\
    #$IMPALA_HOME/fe/target/classes:\
    #$IMPALA_HOME/fe/target/dependency:\
    #$IMPALA_HOME/fe/target/test-classes:\
    $IMPALA_HOME/fe/target/impala-frontend-0.1-SNAPSHOT.jar:\
    ${HIVE_HOME}/lib/datanucleus-core-2.0.3.jar:\
    ${HIVE_HOME}/lib/datanucleus-enhancer-2.0.3.jar:\
    ${HIVE_HOME}/lib/datanucleus-rdbms-2.0.3.jar:\
    ${HIVE_HOME}/lib/datanucleus-connectionpool-2.0.3.jar:
    
    for jar in `ls ${IMPALA_HOME}/fe/target/dependency/*.jar`; do
      CLASSPATH=${CLASSPATH}:$jar
    done
    
    export CLASSPATH
    
    Otherwise, you might see if you don't include impala-frontend.jar
    Exception in thread "main" java.lang.NoClassDefFoundError: com/cloudera/impala/common/JniUtil
    Caused by: java.lang.ClassNotFoundException: com.cloudera.impala.common.JniUtil
    
    Or this if you don't have hadoop-conf in the path
    E0605 09:32:23.236434  5272 impala-server.cc:377] Unsupported file system. Impala only supports DistributedFileSystem but the LocalFileSystem was found. fs.defaultFS(file:///) might be set incorrectly
    E0605 09:32:23.236655  5272 impala-server.cc:379] Impala is aborted due to improper configurations.
    
  • Impalad_flags
    -beeswax_port=21001
    -fe_port=21001
    -be_port=22001
    -hs2_port=21051
    -enable_webserver=true
    -mem_limit=-1
    -webserver_port=25001
    -state_store_subscriber_port=23001
    -default_query_options
    -log_filename=impalad
    -use_statestore=false
    -nn=5K04.corp.pivotlink.com
    -nn_port=8020
    
  • create a tarball of impala build because no such a open-source script.
    tar zcvf impala.tar.gz impala --exclude="*.class" --exclude="*.o" --exclude="impala/thirdparty" --exclude="impala/.git" --exclude="*.java" --exclude="*.cpp" --exclude="*.h" --exclude="expr-test"
  • start impalad
    cd impala_home
    export IMPALA_HOME=$PWD
    bin/start-impalad.sh -build_type=release --flagfile=impalad_flags_path
    
  • start impala-shell
    cd impala_home
    export IMPALA_HOME=$PWD
    export IMPALA_SHELL_HOME=$PWD/shell/build/impala-shell-1.0.1
    $IMPALA_SHELL_HOME/impala-shell -i impalad-host:21001
    

Tuesday, June 4, 2013

Build boost for Impala in CentOS 6.3

CentOS 6.3 has only rpm for boost_1.41.0 at the time I made the build. I had to build boost from source by myself.

  • Clean up the old installation. Find all boost installations, then remove all old versions.
  • $ rpm -qa | grep boost
    $ yum remove boost boost-filesystem ...
    
  • Download boost tarball and expand into a directory.
  • Make a build. Using tagged layout, this will generate /usr/local/lib/libboost_filesystem-mt.so. If you use --layout=system, /usr/local/lib/libboost_filesystem.so is created. Don't use --build_type=complete because the build takes too long.
  • cd boost_1.42.0
    ./bootstrap
    sudo ./bjam --layout=tagged install
    

Friday, May 31, 2013

Avro Schema Evolution

When I added a new nullable field into my avro schema, Avro reported an AvroTypeException.
SchemaEvolutionSpec:
Avro
  can read the file using the old schema
  - when adding a new field *** FAILED ***
    org.apache.avro.AvroTypeException: Found TestRecord, expecting TestRecord
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:231)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:127)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:169)
The writer schema is:
  { "type": "record",
    "name": "TestRecord",
    "version" : 1,
    "fields": [
       { "name": "A", "type": ["null", "string"] },
       { "name": "B", "type": ["null", "int" ] } 
    ] }
The reader schema is:
  { "type": "record",
    "name": "TestRecord",
    "version" : 1,
    "fields": [
       { "name": "A", "type": ["null", "string"] },
       { "name": "B", "type": ["null", "int" ] }, 
       { "name": "C", "type": ["null", "double" ] }, 
    ] }
The error message "Found ..., expecting ..." is misleading. It is ResolvingGrammarGenerator who emits this message, not ResolvingDecoder. Here is the code:
221       for (Field rf : rfields) {
222         String fname = rf.name();
223         if (writer.getField(fname) == null) {
224           if (rf.defaultValue() == null) {
225             result = Symbol.error("Found " + writer.getFullName()
226                                   + ", expecting " + reader.getFullName());
227             seen.put(wsc, result);
228             return result;
229           } else {
230             reordered[ridx++] = rf;
231             count += 3;
232           }
233         }
234       }
The message actaully means that the reader tries to read a field that does not exist in the writer schema and does not have a default value. What? Is field "C" nullable? I think Avro should not enforce users to provide "null" for a nullable field. If you cannot wait for the fix of this issue, here is the workaround, using null as default value for field "C". This reader schema works
  { "type": "record",
    "name": "TestRecord",
    "version" : 1,
    "fields": [
       { "name": "A", "type": ["null", "string"] },
       { "name": "B", "type": ["null", "int" ] }, 
       { "name": "C", "type": ["null", "double" ], "default": null }, 
    ] }
Here is my ScalaTest code:
import org.scalatest.WordSpec
import org.scalatest.ShouldMatchers
import org.scalatest.BeforeAndAfterEach
import java.io.File

import org.apache.avro.{ Schema => AvroSchema }
import org.apache.avro.generic.{ GenericRecord, GenericRecordBuilder }
import org.apache.avro.generic.{ GenericDatumReader, GenericDatumWriter }
import org.apache.avro.file.{ DataFileReader, DataFileWriter }
import org.apache.avro.util.Utf8

import collection.JavaConversions._

class SchemaEvolutionSpec extends WordSpec with ShouldMatchers {

  "Avro" can {
    val file = new File("users.avro")

    def avroSchema(json: String): AvroSchema =
      (new AvroSchema.Parser).parse(json)

    def writeAs(schema: AvroSchema)(rec: GenericRecord) = {
      val dataFileWriter = new DataFileWriter[GenericRecord](
        new GenericDatumWriter[GenericRecord](schema))
      dataFileWriter.create(schema, file);
      dataFileWriter.append(rec);
      dataFileWriter.close();
    }

    def readAs(writeSchema: AvroSchema, readSchema: AvroSchema): GenericRecord = {
      val dataFileReader = new DataFileReader[GenericRecord](
        file, new GenericDatumReader[GenericRecord](writeSchema, readSchema));
      dataFileReader.next(null);
    }

    def record(schema: AvroSchema, data: Map[String, Any]): GenericRecord = {
      val builder = new GenericRecordBuilder(schema)
      data.foreach { kv => builder.set(kv._1, kv._2) }
      builder.build()
    }

    "read the file using the old schema" when {
      "adding a new field" in {
        val oldSchema = avroSchema("""
            { "type": "record",
              "name": "TestRecord",
              "version" : 1,
              "fields": [
             { "name": "A", "type": ["null", "string"] },
             { "name": "B", "type": ["null", "int" ] } 
              ] }
            """)
        val newSchema = avroSchema("""
            { "type": "record",
              "name": "TestRecord",
              "version": 2,
              "fields": [
             { "name": "A", "type": ["null", "string"] },
             { "name": "B", "type": ["null", "int" ] }, 
             { "name": "C", "type": ["null", "double"] } 
              ] }
            """)

        val rec = record(oldSchema, Map("A" -> "Hello", "B" -> 5))
        writeAs(oldSchema)(rec)

        val newRec = readAs(oldSchema, newSchema)

        def value[T](field: String): T =
          newRec.get(field).asInstanceOf[T]

        value[Utf8]("A").toString should be("Hello")
        value[Int]("B") should be(5)
        newRec.get("C") should be(null)
      }

      "deleting a field" in {
        pending
      }

      "renaming a field" in {
        pending
      }

      "changing data type" in {
        pending
      }
    }

  }

}

Thursday, May 16, 2013

Play run in DEV mode and "ClassNotFoundException"

Play "~run" command makes development much easier. You can change the code and test it without building, packaging and deploying. But it also causes annoying "ClassNotFoundException".

Our application has a hbase filter. The filter will be packaged and deployed to HBase region servers, but it is also needed on the client side when you build a Scan. If we run the application in DEV mode, we will get "ClassNotFoundException". The java code of the filter is definitely compiled and "in the classpath" because I can find it in the output of "show full-classpath". This confusing issue forces us to use stage/dist again.

The issue is actually caused by the classloader when you start "run" command. If you use a customized filter, HBase will use "Class.forName" to load the class. Because the filter is NOT in the classpath of the classloader which loads HBase classes, "ClassNotFoundException" is thrown.

But Why the filter is NOT in the classpath? There are several classloaders when Play runs in DEV mode:

  • sbtLoader, the loader loads sbt;
  • applicationLoader, the loader loads the jar files in dependencyClasspath in Compile. it is also called as "SBT/Play shared ClassLoader".
  • ReloadableClassLoader(v1), the loader loads the classes of the project
Because ReloadableClassLoader is a child of applicationLoader, the filter is in the classpath of ReloadableClassLoader rather than applicationLoader, and HBase library uses applicationLoader, the filter is indeed invisible to applicationLoader.

The simple workaround is to make the filter a separate project and a dependency of play. The only disadvantage is you have to build, publish, and update if you are developing the filter and the application code at the same time.

You can find the same issue: https://github.com/playframework/Play20/issues/822

Play 2.1.1 Jdbc is not compatible with hive-jdbc-0.10.0-cdh4.2.0

If you want to access a Hive/Impala server in Play 2.1.1, you will encounter a strange error saying "Cannot connect to database [xxx]. Unfortunately no further information can help you identify where is wrong. It is strange because you can using the same URL to connect to the server without any problem.
import java.sql.DriverManager

object ImpalaJdbcClient extends App {

  val impalaUrl = "jdbc:hive2://impala-host:21050/;auth=noSasl"

  val driverClass = Class.forName("org.apache.hive.jdbc.HiveDriver")

  val conn = DriverManager.getConnection(impalaUrl)

  val st = conn.createStatement()
  val rs = st.executeQuery("select count(distinct customer_id) from customers where repeat ='Y'")
  while (rs.next()) {
    println("count=%d".format(rs.getLong(1)))
  }

  conn.close
}
By digging Play and Hive-JDBC code, I figured out that Play-jdbc calls a lot of methods what hive-jdbc doesn't support. For those methods, such as setReadOnly and setCatalog, hive-jdbc just simply throws a SQLException saying "Method not supported", then Play-jdbc catch it and report "Cannot connect to database" error, but unfortunately it doesn't include the message of "Method not supported". You can fix it by removing throw statements from hive-jdbc unsupported method and recompiling. Another way is to create your own BoneCPPlugin. Just copy the source code ./src/play-jdbc/src/main/scala/play/api/db/DB.scala and remove the offending method calls:
  • setAutoCommit
  • commit
  • rollback
  • setTransactionIsolation
  • setReadOnly
  • setCatalog
and comment or replace this line
case mode => Logger("play").info("database [" + ds._2 + "] connected at " + dbURL(ds._1.getConnection))
to
case mode => Logger("play").info("database [" + ds._2 + "] connected at " + ds._1)
because dbURL calls conn.getMetaData.getURL and HiveDatabaseMetaData doesn't support getURL. Change dbplugin in app.configuration.getString("dbplugin").filter(_ == "disabled") to something else to avoid conflict with Play's BoneCPPlugin. Then register your own BoneCPPlugin in conf/play.plugins.

Wednesday, May 15, 2013

How Play set up ivy repository?

Play sets ivy repository to ${PLAY_HOME}/repository instead of using the default Ivy home $HOME/.ivy2. Checkout the file ${PLAY_HOME}/play and you will find this at the end
"$JAVA" -Dsbt.ivy.home=$dir/repository -Dplay.home=$dir/framework -Dsbt.boot.properties=$dir/framework/sbt/play.boot.properties -jar $dir/framework/sbt/sbt-launch.jar "$@"
-Dsbt.ivy.home does the trick.

Wednesday, May 1, 2013

Install Impala 1.0 in Cloudera Manager 4.5.0

If you don't want to upgrade to 4.5.2, you can change impala parcel URL to get the impala parcel. http://archive.cloudera.com/impala/parcels/latest/

Monday, April 29, 2013

Run MapReduce in Play development mode

When you want to invoke a MapReduce job in Play development mode, the first problem you have will be that the jar file is not generated yet. so job.setJarByClass(classOf[Mapper]) won't work. You can overcome this issue by using Hadoop utility class JarFinder like this:
    job.setJarByClass(classOf[MyMapper])
    if (job.getJar() == null) {
      val jarFinder = Class.forName("org.apache.hadoop.util.JarFinder")
      if (jarFinder != null) {
        val m = jarFinder.getMethod("getJar", classOf[Class[_]]);

        job.getConfiguration().asInstanceOf[org.apache.hadoop.mapred.JobConf]
          .setJar(m.invoke(null, classOf[MyMapper]).asInstanceOf[String])
      }
    }
Here are what you need to pay attentions:
  • org.apache.hadoop.util.JarFinder is a Hadoop Utility class for testing, which creates a Jar on the fly. You can find the jar in hadoop-common package with classifier "tests", hadoop-common-2.0.0-cdh4.2.1-tests.jar in cloudera distribution. Unfortunately simply putting a dependency into Build.scala won't work:
       "org.apache.hadoop" % "hadoop-common" % "2.0.0-cdh4.2.1" classifier "tests", 
    
    You will get a lot of compilation errors. If you add "test" like this, compilation pass but you cannot use JarFinder in development mode because the jar is not in the classpath (it IS in test classpath)
       "org.apache.hadoop" % "hadoop-common" % "2.0.0-cdh4.2.1" % "test" classifier "tests", 
    
    I just simply get JarFinder.java from the source package and put into play app/org/apache/hadoop/util directory like other scala files. It will be compiled by play.
  • Depends on which hadoop version you are using, you may use setJar directly on job like this
      job.setJar(m.invoke(null, classOf[MyMapper]).asInstanceOf[String])
    
    I'm using "org.apache.hadoop" % "hadoop-client" % "2.0.0-mr1-cdh4.2.0". org.apache.hadoop.mapreduce.Job doesn't have setJar.
  • Property mapred.jar will be created when setJar is called, you can verify if this property exists in the job file on JobTracker web page. You could make a mistake to setJar in another Configuration object which is not the one used by job, for example,
        val config = HBaseConfiguration.create()
        val job = new Job(config, "My MR job")
        ...
        job.setJarByClass(classOf[MyMapper])
        if (job.getJar() == null) {
          val jarFinder = Class.forName("org.apache.hadoop.util.JarFinder")
          if (jarFinder != null) {
            ...
            // This won't work because jc is a cloned object, not used by Job.
            // You cannot use val config too for the same reason.
            val jc = new JobConf(job.getConfiguration())
            jc.setJar(m.invoke(null, classOf[MyMapper]).asInstanceOf[String]);
          }
        }
    
  • Another problem you may encounter is that JarFinder.getJar still returns null. I had this problem when I ran a HBase sbt project, but don't remember if this happened in Play project. If you have this problem, you can add the following code in JarFinder to fix it:
      public static String getJar(Class klass) {
        Preconditions.checkNotNull(klass, "klass");
        ClassLoader loader = klass.getClassLoader();
    
        // Try to use context class loader 
        if (loader == null) {
         loader = Thread.currentThread().getContextClassLoader();
        }
    
        if (loader != null) {
    
    My HBase project starts a MR job that needs scala-library.jar distributed. Here is the snippet
    
        TableMapReduceUtil.initTableMapperJob(
          tableName,
          getScan(siteKey, collDefId),
          classOf[TextMapper],
          classOf[ImmutableBytesWritable],
          classOf[Text],
          job,
          // don't add dependency jars
          false)
        job.setOutputFormatClass(classOf[SequenceFileOutputFormat[Text, Text]])
        job.setOutputKeyClass(classOf[Text])
        job.setOutputValueClass(classOf[Text])
        FileOutputFormat.setCompressOutput(job, true)
        FileOutputFormat.setOutputPath(job, new Path(outDir))
        job.setNumReduceTasks(0);
    
        // Add dependency jars 
        TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
          classOf[org.apache.zookeeper.ZooKeeper],
          classOf[com.google.protobuf.Message],
          job.getMapOutputKeyClass(),
          job.getMapOutputValueClass(),
          job.getInputFormatClass(),
          job.getOutputKeyClass(),
          job.getOutputValueClass(),
          job.getOutputFormatClass(),
          job.getPartitionerClass(),
          job.getCombinerClass(),
    
          // include scala-library.jar
          classOf[ScalaObject],
    
          // joda jar used by mapper/reducer
          classOf[org.joda.time.DateTime]);
    
    HBase TableMapReduceUtil (0.94.2-cdh4.2.0) will use JarFinder.getJar if it presents. Unfortunately the classloader used by sbt to launcher the application doesn't have ScalaObject in the classpath. klass.getClassLoader() will return null. When you use the context classloader, scala-library will be found without any problem.
  • The code works in both dev and prod mode. In prod mode, the jar of mappers and reducers are already in the classpath, setJarByClass will work without entering JarFinder block.

Monday, April 1, 2013

Extract compressed Tar

  • Extract GZIP compressed TAR file
    tar xzvf file.tar.gz
  • Extract BZ2 compressed TAR file
    tar xvjpf file.tar.gz

Friday, March 1, 2013

Git-SVN

Here are some notes when I play with git-svn:
  • Clone a project. You can run the following commands if the project follows the standard layout with trunk/tags/branches.
    root
      |- ProjectA
      |   |- branches
      |   |- tags
      |   \- trunk
      |- OtherProject
    
  • Download trunk, tags and branches. You will find the same directories just like SVN repository.
    git svn clone -r XXXX https://subversion.mycompany.com/ProjectA
    
  • init a new project. This will create dir projectA and initializ it as a git repository.
    git svn init https://subversion.mycompany.com/projectA projectA
    
  • Download trunk only. The directory have the files under trunk only.
    git svn clone -s -r XXXX https://subversion.mycompany.com/ProjectA
    • Provide the revision number using "-r xxxx", otherwise have to wait for a long time until git-svn finishes scanning all revisions.
    • Use the revision of the project (96365) instead of the latest revision (96381). Otherwise, you will get the working directory empty. You can find the revision number in viewvc like this:
      Index of /ProjectA
      
      Files shown: 0
      Directory revision: 96365 (of 96381)
      Sticky Revision:    
      
  • Apply git patch to SVN:
    git diff --no-prefix > /tmp/xx.patch
    cd svn-dir/
    patch -p0 < /tmp/xx.patch
    

Friday, February 15, 2013

Scala SBT

Requirements: 


  1. Include scala-library.jar in the zip file because the machine may not have scala installed. 
  2. Include scripts and configure files into the zip just like Maven assembly. 
  3. Include the jar in the zip file. 

 Lessons:

  1. Key. 
    • SettingKey and TaskKey
    • Cannot reference Key directory. Need to use tuple like this:
      (sbt_key) => { key_val => }
    • projectBin is the jar
    • managed
  2. Must have a Project
  3. libraryDependencies and resolvers defined in Build not effect for the project
  4. IO.zip doesn't support permission of shell scripts
  5. Build with java code perfectly.
  6. custom cleanFiles
  7. Define a custom task like dist
  8. Useful commands of sbt
    settings // list settings keys
    tasks // list all tasks
    show clean-files // check value of settings keys
    inspect clean-files // check more information than show
    

HBase LeaseException issue

Excerpt from http://hbase.apache.org/book.html. If you google "hbase LeaseException", this page may not be on the first page.

12.5.2. LeaseException when calling Scanner.next

In some situations clients that fetch data from a RegionServer get a LeaseException instead of the usual Section 12.5.1, “ScannerTimeoutException or UnknownScannerException”. Usually the source of the exception is org.apache.hadoop.hbase.regionserver.Leases.removeLease(Leases.java:230) (line number may vary). It tends to happen in the context of a slow/freezing RegionServer#next call. It can be prevented by having hbase.rpc.timeout > hbase.regionserver.lease.period. Harsh J investigated the issue as part of the mailing list thread HBase, mail # user - Lease does not exist exceptions

Friday, January 11, 2013

Bash single quote escape in running a Hive query

If you want to run this hive query using SSH, you probably will be headache how to escape:
select * from my_table where date_column = '2013-01-11';
The correct command looks like this:
ssh myserver 'hive -e "select * from my_table where date_column = '"'2013-01-11'"';"'
What hack? Here is how bash read this:
1. ssh
2. myserver
3. hive -e "select * from my_table where date_column = 
4. '2013-01-01'
5. ;"
3,4,5 will be concatenated to one line
hive -e "select * from my_table where date_column = '2013-01-01';"