Friday, May 31, 2013

Avro Schema Evolution

When I added a new nullable field into my avro schema, Avro reported an AvroTypeException.
SchemaEvolutionSpec:
Avro
  can read the file using the old schema
  - when adding a new field *** FAILED ***
    org.apache.avro.AvroTypeException: Found TestRecord, expecting TestRecord
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:231)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:127)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:169)
The writer schema is:
  { "type": "record",
    "name": "TestRecord",
    "version" : 1,
    "fields": [
       { "name": "A", "type": ["null", "string"] },
       { "name": "B", "type": ["null", "int" ] } 
    ] }
The reader schema is:
  { "type": "record",
    "name": "TestRecord",
    "version" : 1,
    "fields": [
       { "name": "A", "type": ["null", "string"] },
       { "name": "B", "type": ["null", "int" ] }, 
       { "name": "C", "type": ["null", "double" ] }, 
    ] }
The error message "Found ..., expecting ..." is misleading. It is ResolvingGrammarGenerator who emits this message, not ResolvingDecoder. Here is the code:
221       for (Field rf : rfields) {
222         String fname = rf.name();
223         if (writer.getField(fname) == null) {
224           if (rf.defaultValue() == null) {
225             result = Symbol.error("Found " + writer.getFullName()
226                                   + ", expecting " + reader.getFullName());
227             seen.put(wsc, result);
228             return result;
229           } else {
230             reordered[ridx++] = rf;
231             count += 3;
232           }
233         }
234       }
The message actaully means that the reader tries to read a field that does not exist in the writer schema and does not have a default value. What? Is field "C" nullable? I think Avro should not enforce users to provide "null" for a nullable field. If you cannot wait for the fix of this issue, here is the workaround, using null as default value for field "C". This reader schema works
  { "type": "record",
    "name": "TestRecord",
    "version" : 1,
    "fields": [
       { "name": "A", "type": ["null", "string"] },
       { "name": "B", "type": ["null", "int" ] }, 
       { "name": "C", "type": ["null", "double" ], "default": null }, 
    ] }
Here is my ScalaTest code:
import org.scalatest.WordSpec
import org.scalatest.ShouldMatchers
import org.scalatest.BeforeAndAfterEach
import java.io.File

import org.apache.avro.{ Schema => AvroSchema }
import org.apache.avro.generic.{ GenericRecord, GenericRecordBuilder }
import org.apache.avro.generic.{ GenericDatumReader, GenericDatumWriter }
import org.apache.avro.file.{ DataFileReader, DataFileWriter }
import org.apache.avro.util.Utf8

import collection.JavaConversions._

class SchemaEvolutionSpec extends WordSpec with ShouldMatchers {

  "Avro" can {
    val file = new File("users.avro")

    def avroSchema(json: String): AvroSchema =
      (new AvroSchema.Parser).parse(json)

    def writeAs(schema: AvroSchema)(rec: GenericRecord) = {
      val dataFileWriter = new DataFileWriter[GenericRecord](
        new GenericDatumWriter[GenericRecord](schema))
      dataFileWriter.create(schema, file);
      dataFileWriter.append(rec);
      dataFileWriter.close();
    }

    def readAs(writeSchema: AvroSchema, readSchema: AvroSchema): GenericRecord = {
      val dataFileReader = new DataFileReader[GenericRecord](
        file, new GenericDatumReader[GenericRecord](writeSchema, readSchema));
      dataFileReader.next(null);
    }

    def record(schema: AvroSchema, data: Map[String, Any]): GenericRecord = {
      val builder = new GenericRecordBuilder(schema)
      data.foreach { kv => builder.set(kv._1, kv._2) }
      builder.build()
    }

    "read the file using the old schema" when {
      "adding a new field" in {
        val oldSchema = avroSchema("""
            { "type": "record",
              "name": "TestRecord",
              "version" : 1,
              "fields": [
             { "name": "A", "type": ["null", "string"] },
             { "name": "B", "type": ["null", "int" ] } 
              ] }
            """)
        val newSchema = avroSchema("""
            { "type": "record",
              "name": "TestRecord",
              "version": 2,
              "fields": [
             { "name": "A", "type": ["null", "string"] },
             { "name": "B", "type": ["null", "int" ] }, 
             { "name": "C", "type": ["null", "double"] } 
              ] }
            """)

        val rec = record(oldSchema, Map("A" -> "Hello", "B" -> 5))
        writeAs(oldSchema)(rec)

        val newRec = readAs(oldSchema, newSchema)

        def value[T](field: String): T =
          newRec.get(field).asInstanceOf[T]

        value[Utf8]("A").toString should be("Hello")
        value[Int]("B") should be(5)
        newRec.get("C") should be(null)
      }

      "deleting a field" in {
        pending
      }

      "renaming a field" in {
        pending
      }

      "changing data type" in {
        pending
      }
    }

  }

}

1 comment:

  1. Any other workarounds found for this issue? Has been a long time.

    ReplyDelete