Thursday, June 13, 2013

Avro

It took me a while to figure out how to write an Avro file in MapReduce which can be imported into Hive and Impala.

  • There are a lot of OutputFormat in avro 1.7.3: AvroOutputFormat, AvroKeyOutputFormat, AvroKeyValueOutputFormat and AvroSequenceFileOutputFormat. Which one can be imported into Hive? You should use AvroKeyOutputFormat in MapReduce Job to output Avro Container Files.
  • You cannot specified any above output format in hive create table "stored as" clause because they don't implement HiveOutputFormat.
  • You need to set the schema using AvroJob.setOutputKeySchema
  • You have three ways to set avro compression codec as follows:
            AvroJob.setOutputKeySchema(job, getAvroSchema(schemaFile))
            job.setOutputFormatClass(classOf[AvroKeyOutputFormat[GenericRecord]])
            job.setOutputKeyClass(classOf[AvroKey[GenericRecord]])
            job.setOutputValueClass(classOf[NullWritable])
            FileOutputFormat.setCompressOutput(job, true)
            // You can us any of the following three ways to set compression
            // FileOutputFormat.setOutputCompressorClass(job, classOf[SnappyCodec])
            // job.getConfiguration().set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC)
            // job.getConfiguration().set(AvroJob.CONF_OUTPUT_CODEC, CodecFactory.snappyCodec().toString());
    
            FileOutputFormat.setOutputPath(job, new Path(outputPath))
    
    Be careful to check Cloudera Manager compression settings because it may override your settings. If you find you cannot compress avro files using the above code, you can verify if the compress is enabled in your mapper or reducer like this:
    class AvroGenericRecordMapper extends TableMapper[AvroKey[GenericRecord], NullWritable] {
    
      type Context = Mapper[ImmutableBytesWritable, Result, AvroKey[GenericRecord], NullWritable]#Context
      private var converter: AvroConverter = _
    
      // val outKey = new LongWritable(0L)
      val outVal = NullWritable.get()
      val outKey = new AvroKey[GenericRecord]()
    
      override def setup(context: Context) {
        converter = new AvroConverter(AvroJob.getOutputKeySchema(context.getConfiguration()))
        
        import org.apache.hadoop.mapreduce.TaskAttemptContext
        println("compress: " + FileOutputFormat.getCompressOutput(context.asInstanceOf[TaskAttemptContext]))
        println("codec: " + context.getConfiguration().get(AvroJob.CONF_OUTPUT_CODEC))
      }
    
      override def map(key: ImmutableBytesWritable, value: Result, context: Context) {
        outKey.datum(converter.convert(value))
        context.write(outKey, outVal)
      }
    }
    
    This mapper tries to dump Hbase table into avro files. AvroConverter is an class to convert Hbase Result to Avro GenericRecord.
  • Follow the example on this page: https://cwiki.apache.org/confluence/display/Hive/AvroSerDe. Unfortunately if you use "Avro Hive" to search, google shows your this page https://cwiki.apache.org/Hive/avroserde-working-with-avro-from-hive.html which has a wrong example, and you will get error message like:
    FAILED: Error in metadata: Cannot validate serde: org.apache.hadoop.hive.serde2.AvroSerDe
    FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask
    
    What's wrong? The serde name should be org.apache.hadoop.hive.serde2.avro.AvroSerDe
  • You don't have to define the columns because it can get from avro schema.

No comments:

Post a Comment