When I added a new nullable field into my avro schema, Avro reported an AvroTypeException.
SchemaEvolutionSpec:
Avro
can read the file using the old schema
- when adding a new field *** FAILED ***
org.apache.avro.AvroTypeException: Found TestRecord, expecting TestRecord
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:231)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:127)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:169)
The writer schema is:
{ "type": "record",
"name": "TestRecord",
"version" : 1,
"fields": [
{ "name": "A", "type": ["null", "string"] },
{ "name": "B", "type": ["null", "int" ] }
] }
The reader schema is:
{ "type": "record",
"name": "TestRecord",
"version" : 1,
"fields": [
{ "name": "A", "type": ["null", "string"] },
{ "name": "B", "type": ["null", "int" ] },
{ "name": "C", "type": ["null", "double" ] },
] }
The error message "Found ..., expecting ..." is misleading. It is
ResolvingGrammarGenerator who emits this message, not
ResolvingDecoder. Here is the code:
221 for (Field rf : rfields) {
222 String fname = rf.name();
223 if (writer.getField(fname) == null) {
224 if (rf.defaultValue() == null) {
225 result = Symbol.error("Found " + writer.getFullName()
226 + ", expecting " + reader.getFullName());
227 seen.put(wsc, result);
228 return result;
229 } else {
230 reordered[ridx++] = rf;
231 count += 3;
232 }
233 }
234 }
The message actaully means that the reader tries to read a field that does not exist in the writer schema and does not have a default value. What? Is field "C" nullable? I think Avro should not enforce users to provide "null" for a nullable field.
If you cannot wait for the fix of this issue, here is the workaround, using null as default value for field "C". This reader schema works
{ "type": "record",
"name": "TestRecord",
"version" : 1,
"fields": [
{ "name": "A", "type": ["null", "string"] },
{ "name": "B", "type": ["null", "int" ] },
{ "name": "C", "type": ["null", "double" ], "default": null },
] }
Here is my ScalaTest code:
import org.scalatest.WordSpec
import org.scalatest.ShouldMatchers
import org.scalatest.BeforeAndAfterEach
import java.io.File
import org.apache.avro.{ Schema => AvroSchema }
import org.apache.avro.generic.{ GenericRecord, GenericRecordBuilder }
import org.apache.avro.generic.{ GenericDatumReader, GenericDatumWriter }
import org.apache.avro.file.{ DataFileReader, DataFileWriter }
import org.apache.avro.util.Utf8
import collection.JavaConversions._
class SchemaEvolutionSpec extends WordSpec with ShouldMatchers {
"Avro" can {
val file = new File("users.avro")
def avroSchema(json: String): AvroSchema =
(new AvroSchema.Parser).parse(json)
def writeAs(schema: AvroSchema)(rec: GenericRecord) = {
val dataFileWriter = new DataFileWriter[GenericRecord](
new GenericDatumWriter[GenericRecord](schema))
dataFileWriter.create(schema, file);
dataFileWriter.append(rec);
dataFileWriter.close();
}
def readAs(writeSchema: AvroSchema, readSchema: AvroSchema): GenericRecord = {
val dataFileReader = new DataFileReader[GenericRecord](
file, new GenericDatumReader[GenericRecord](writeSchema, readSchema));
dataFileReader.next(null);
}
def record(schema: AvroSchema, data: Map[String, Any]): GenericRecord = {
val builder = new GenericRecordBuilder(schema)
data.foreach { kv => builder.set(kv._1, kv._2) }
builder.build()
}
"read the file using the old schema" when {
"adding a new field" in {
val oldSchema = avroSchema("""
{ "type": "record",
"name": "TestRecord",
"version" : 1,
"fields": [
{ "name": "A", "type": ["null", "string"] },
{ "name": "B", "type": ["null", "int" ] }
] }
""")
val newSchema = avroSchema("""
{ "type": "record",
"name": "TestRecord",
"version": 2,
"fields": [
{ "name": "A", "type": ["null", "string"] },
{ "name": "B", "type": ["null", "int" ] },
{ "name": "C", "type": ["null", "double"] }
] }
""")
val rec = record(oldSchema, Map("A" -> "Hello", "B" -> 5))
writeAs(oldSchema)(rec)
val newRec = readAs(oldSchema, newSchema)
def value[T](field: String): T =
newRec.get(field).asInstanceOf[T]
value[Utf8]("A").toString should be("Hello")
value[Int]("B") should be(5)
newRec.get("C") should be(null)
}
"deleting a field" in {
pending
}
"renaming a field" in {
pending
}
"changing data type" in {
pending
}
}
}
}