Friday, May 31, 2013

Avro Schema Evolution

When I added a new nullable field into my avro schema, Avro reported an AvroTypeException.
SchemaEvolutionSpec:
Avro
  can read the file using the old schema
  - when adding a new field *** FAILED ***
    org.apache.avro.AvroTypeException: Found TestRecord, expecting TestRecord
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:231)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:127)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:169)
The writer schema is:
  { "type": "record",
    "name": "TestRecord",
    "version" : 1,
    "fields": [
       { "name": "A", "type": ["null", "string"] },
       { "name": "B", "type": ["null", "int" ] } 
    ] }
The reader schema is:
  { "type": "record",
    "name": "TestRecord",
    "version" : 1,
    "fields": [
       { "name": "A", "type": ["null", "string"] },
       { "name": "B", "type": ["null", "int" ] }, 
       { "name": "C", "type": ["null", "double" ] }, 
    ] }
The error message "Found ..., expecting ..." is misleading. It is ResolvingGrammarGenerator who emits this message, not ResolvingDecoder. Here is the code:
221       for (Field rf : rfields) {
222         String fname = rf.name();
223         if (writer.getField(fname) == null) {
224           if (rf.defaultValue() == null) {
225             result = Symbol.error("Found " + writer.getFullName()
226                                   + ", expecting " + reader.getFullName());
227             seen.put(wsc, result);
228             return result;
229           } else {
230             reordered[ridx++] = rf;
231             count += 3;
232           }
233         }
234       }
The message actaully means that the reader tries to read a field that does not exist in the writer schema and does not have a default value. What? Is field "C" nullable? I think Avro should not enforce users to provide "null" for a nullable field. If you cannot wait for the fix of this issue, here is the workaround, using null as default value for field "C". This reader schema works
  { "type": "record",
    "name": "TestRecord",
    "version" : 1,
    "fields": [
       { "name": "A", "type": ["null", "string"] },
       { "name": "B", "type": ["null", "int" ] }, 
       { "name": "C", "type": ["null", "double" ], "default": null }, 
    ] }
Here is my ScalaTest code:
import org.scalatest.WordSpec
import org.scalatest.ShouldMatchers
import org.scalatest.BeforeAndAfterEach
import java.io.File

import org.apache.avro.{ Schema => AvroSchema }
import org.apache.avro.generic.{ GenericRecord, GenericRecordBuilder }
import org.apache.avro.generic.{ GenericDatumReader, GenericDatumWriter }
import org.apache.avro.file.{ DataFileReader, DataFileWriter }
import org.apache.avro.util.Utf8

import collection.JavaConversions._

class SchemaEvolutionSpec extends WordSpec with ShouldMatchers {

  "Avro" can {
    val file = new File("users.avro")

    def avroSchema(json: String): AvroSchema =
      (new AvroSchema.Parser).parse(json)

    def writeAs(schema: AvroSchema)(rec: GenericRecord) = {
      val dataFileWriter = new DataFileWriter[GenericRecord](
        new GenericDatumWriter[GenericRecord](schema))
      dataFileWriter.create(schema, file);
      dataFileWriter.append(rec);
      dataFileWriter.close();
    }

    def readAs(writeSchema: AvroSchema, readSchema: AvroSchema): GenericRecord = {
      val dataFileReader = new DataFileReader[GenericRecord](
        file, new GenericDatumReader[GenericRecord](writeSchema, readSchema));
      dataFileReader.next(null);
    }

    def record(schema: AvroSchema, data: Map[String, Any]): GenericRecord = {
      val builder = new GenericRecordBuilder(schema)
      data.foreach { kv => builder.set(kv._1, kv._2) }
      builder.build()
    }

    "read the file using the old schema" when {
      "adding a new field" in {
        val oldSchema = avroSchema("""
            { "type": "record",
              "name": "TestRecord",
              "version" : 1,
              "fields": [
             { "name": "A", "type": ["null", "string"] },
             { "name": "B", "type": ["null", "int" ] } 
              ] }
            """)
        val newSchema = avroSchema("""
            { "type": "record",
              "name": "TestRecord",
              "version": 2,
              "fields": [
             { "name": "A", "type": ["null", "string"] },
             { "name": "B", "type": ["null", "int" ] }, 
             { "name": "C", "type": ["null", "double"] } 
              ] }
            """)

        val rec = record(oldSchema, Map("A" -> "Hello", "B" -> 5))
        writeAs(oldSchema)(rec)

        val newRec = readAs(oldSchema, newSchema)

        def value[T](field: String): T =
          newRec.get(field).asInstanceOf[T]

        value[Utf8]("A").toString should be("Hello")
        value[Int]("B") should be(5)
        newRec.get("C") should be(null)
      }

      "deleting a field" in {
        pending
      }

      "renaming a field" in {
        pending
      }

      "changing data type" in {
        pending
      }
    }

  }

}

Thursday, May 16, 2013

Play run in DEV mode and "ClassNotFoundException"

Play "~run" command makes development much easier. You can change the code and test it without building, packaging and deploying. But it also causes annoying "ClassNotFoundException".

Our application has a hbase filter. The filter will be packaged and deployed to HBase region servers, but it is also needed on the client side when you build a Scan. If we run the application in DEV mode, we will get "ClassNotFoundException". The java code of the filter is definitely compiled and "in the classpath" because I can find it in the output of "show full-classpath". This confusing issue forces us to use stage/dist again.

The issue is actually caused by the classloader when you start "run" command. If you use a customized filter, HBase will use "Class.forName" to load the class. Because the filter is NOT in the classpath of the classloader which loads HBase classes, "ClassNotFoundException" is thrown.

But Why the filter is NOT in the classpath? There are several classloaders when Play runs in DEV mode:

  • sbtLoader, the loader loads sbt;
  • applicationLoader, the loader loads the jar files in dependencyClasspath in Compile. it is also called as "SBT/Play shared ClassLoader".
  • ReloadableClassLoader(v1), the loader loads the classes of the project
Because ReloadableClassLoader is a child of applicationLoader, the filter is in the classpath of ReloadableClassLoader rather than applicationLoader, and HBase library uses applicationLoader, the filter is indeed invisible to applicationLoader.

The simple workaround is to make the filter a separate project and a dependency of play. The only disadvantage is you have to build, publish, and update if you are developing the filter and the application code at the same time.

You can find the same issue: https://github.com/playframework/Play20/issues/822

Play 2.1.1 Jdbc is not compatible with hive-jdbc-0.10.0-cdh4.2.0

If you want to access a Hive/Impala server in Play 2.1.1, you will encounter a strange error saying "Cannot connect to database [xxx]. Unfortunately no further information can help you identify where is wrong. It is strange because you can using the same URL to connect to the server without any problem.
import java.sql.DriverManager

object ImpalaJdbcClient extends App {

  val impalaUrl = "jdbc:hive2://impala-host:21050/;auth=noSasl"

  val driverClass = Class.forName("org.apache.hive.jdbc.HiveDriver")

  val conn = DriverManager.getConnection(impalaUrl)

  val st = conn.createStatement()
  val rs = st.executeQuery("select count(distinct customer_id) from customers where repeat ='Y'")
  while (rs.next()) {
    println("count=%d".format(rs.getLong(1)))
  }

  conn.close
}
By digging Play and Hive-JDBC code, I figured out that Play-jdbc calls a lot of methods what hive-jdbc doesn't support. For those methods, such as setReadOnly and setCatalog, hive-jdbc just simply throws a SQLException saying "Method not supported", then Play-jdbc catch it and report "Cannot connect to database" error, but unfortunately it doesn't include the message of "Method not supported". You can fix it by removing throw statements from hive-jdbc unsupported method and recompiling. Another way is to create your own BoneCPPlugin. Just copy the source code ./src/play-jdbc/src/main/scala/play/api/db/DB.scala and remove the offending method calls:
  • setAutoCommit
  • commit
  • rollback
  • setTransactionIsolation
  • setReadOnly
  • setCatalog
and comment or replace this line
case mode => Logger("play").info("database [" + ds._2 + "] connected at " + dbURL(ds._1.getConnection))
to
case mode => Logger("play").info("database [" + ds._2 + "] connected at " + ds._1)
because dbURL calls conn.getMetaData.getURL and HiveDatabaseMetaData doesn't support getURL. Change dbplugin in app.configuration.getString("dbplugin").filter(_ == "disabled") to something else to avoid conflict with Play's BoneCPPlugin. Then register your own BoneCPPlugin in conf/play.plugins.

Wednesday, May 15, 2013

How Play set up ivy repository?

Play sets ivy repository to ${PLAY_HOME}/repository instead of using the default Ivy home $HOME/.ivy2. Checkout the file ${PLAY_HOME}/play and you will find this at the end
"$JAVA" -Dsbt.ivy.home=$dir/repository -Dplay.home=$dir/framework -Dsbt.boot.properties=$dir/framework/sbt/play.boot.properties -jar $dir/framework/sbt/sbt-launch.jar "$@"
-Dsbt.ivy.home does the trick.

Wednesday, May 1, 2013

Install Impala 1.0 in Cloudera Manager 4.5.0

If you don't want to upgrade to 4.5.2, you can change impala parcel URL to get the impala parcel. http://archive.cloudera.com/impala/parcels/latest/