使用Apache Avro

Avro[1]是最近加入到Apache的Hadoop家族的项目之一。为支持数据密集型应用,它定义了一种数据格式并在多种编程语言中支持这种格式。

Avro提供的功能类似于其他编组系统,如Thrift、Protocol Buffers等。而Avro的主要不同之处在于[2]:

  • “动态类型:Avro无需生成代码。数据总是伴以模式定义,这样就可以在不生成代码、静态数据类型的情况下对数据进行所有处理。这样有利于构建通用的数据处理系统和语言。
  • 无标记数据:由于在读取数据时有模式定义,这就大大减少了数据编辑所需的类型信息,从而减少序列化空间。
  • 不用手动分配的字段ID:当数据模式发生变化,处理数据时总是同时提供新旧模式,差异就可以用字段名来做符号化的分析。”

由于性能高、基本代码少和产出数据量精简等特点,Avro周围展开了众多活动——许多NoSQL实现,包括Hadoop、Cssandra等,都把Avro整合到它们的客户端API和储存功能中;已经有人对Avro与其他流行序列化框架做了Benchmark测试并得到结果[3],但是,目前尚无可供人们学习使用Avro的代码示例[4]


如何建立组件化Avro模式,使用组件搭建整体模式,分别保存在多个文件中在这篇文章中我将试着描述我使用Avro的经验,特别是:

  • 在Avro中实现继承
  • 在Avro中实现多态
  • Avro文档的向后兼容性。

组件化Apache Avro模式

如Avro规范所述[5]Avro文档模式定义成JSON文件。在当前Avro实现中,模式类需要一个文件(或字符串)来表示内部模式。同XML模式不一样,Avro当前版本不支持向模式文档中导入(一个或多个)子模式,这往往迫使开发者编写非常复杂的模式定义[6],并大大复杂化了模式的重用。下面的代码示例给出了一个有趣的拆分和组合模式文件的例子。它基于模式类提供的一个toString()方法,该方法返回一个JSON字符串以表示给定的模式定义。用这种办法,我提供了一个简单AvroUtils,能够自动完成上述功能:

package com.navteq.avro.common;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;

import org.apache.avro.Schema;

public class AvroUtils {

        private static Map<String, Schema> schemas = new HashMap<String, Schema>();
	
        private AvroUtils(){}
	
        public static void addSchema(String name, Schema schema){
               schemas.put(name, schema);

        }

        public static Schema getSchema(String name){
               return schemas.get(name);

        }

        public static String resolveSchema(String sc){

                String result = sc;
                for(Map.Entry<String, Schema> entry : schemas.entrySet())
                      result = replace(result, entry.getKey(),
                                        entry.getValue().toString());
                return result;

        }

        static String replace(String str, String pattern, String replace) {

                int s = 0;
                int e = 0;
                StringBuffer result = new StringBuffer();
                while ((e = str.indexOf(pattern, s)) >= 0) {
                result.append(str.substring(s, e));
                result.append(replace);
                s = e+pattern.length();

        }
        result.append(str.substring(s));
        return result.toString();

}

public static Schema parseSchema(String schemaString){

        String completeSchema = resolveSchema(schemaString);
        Schema schema = Schema.parse(completeSchema);
        String name = schema.getFullName();
        schemas.put(name, schema);
        return schema;

}

public static Schema parseSchema(InputStream in)throws IOException {

    StringBuffer out = new StringBuffer();
    byte[] b = new byte[4096];
    for (int n; (n = in.read(b)) != -1;) {
     out.append(new String(b, 0, n));
    }
    return parseSchema(out.toString());

}

public static Schema parseSchema(File file)throws IOException {

        FileInputStream fis = new FileInputStream(file);
        return parseSchema(fis);
    }
}

清单1 AvroUtils类

这个简单实现基于全局(静态)模式注册表,它由完全限定的模式名及与之对应的对象构成。对于每一个要解析的新模式,该实现在注册表中搜索已保存的完全限定模式名,并且在给定的模式中做字符串替换。模式字符串被解析之后,它的全名和模式名都存储在注册表中。

下面是一个简单的测试,展示如何使用这个类:

package com.navteq.avro.common;

import java.io.File;

import org.junit.Test;

public class AvroUtilsTest {

       private static final String schemaDescription =
         "{ \n" +
            " \"namespace\": \"com.navteq.avro\", \n" +
            " \"name\": \"FacebookUser\", \n" +
            " \"type\": \"record\",\n" +
            " \"fields\": [\n" +
                     " {\"name\": \"name\", \"type\": [\"string\", \"null\"] },\n" +
                     " {\"name\": \"num_likes\", \"type\": \"int\"},\n" +
                     " {\"name\": \"num_photos\", \"type\": \"int\"},\n" +
            " {\"name\": \"num_groups\", \"type\": \"int\"} ]\n" +
         "}";

       private static final String schemaDescriptionExt =
         " { \n" +
             " \"namespace\": \"com.navteq.avro\", \n" +
             " \"name\": \"FacebookSpecialUser\", \n" +
             " \"type\": \"record\",\n" +
             " \"fields\": [\n" +
                      " {\"name\": \"user\", \"type\": com.navteq.avro.FacebookUser },\n" +
                      " {\"name\": \"specialData\", \"type\": \"int\"} ]\n" +
          "}";

       @Test
       public void testParseSchema() throws Exception{

               AvroUtils.parseSchema(schemaDescription1);
               Schema extended = AvroUtils.parseSchema(schemaDescriptionExt);
               System.out.println(extended.toString(true));
       }
}

清单2 AvroUtils测试

在这个测试中,第一个模式的完全限定名是com.navteq.avro.FacebookUser,替换正常运行并打印出以下结果:

{
  "type" : "record",
  "name" : "FacebookSpecialUser",
  "namespace" : "com.navteq.avro",
  "fields" : [ {
    "name" : "user",
    "type" : {
      "type" : "record",
      "name" : "FacebookUser",
      "fields" : [ {
        "name" : "name",
        "type" : [ "string", "null" ]
      }, {
        "name" : "num_likes",
        "type" : "int"
      }, {
        "name" : "num_photos",
        "type" : "int"
      }, {
        "name" : "num_groups",
        "type" : "int"
      } ]
    }
  }, {
    "name" : "specialData",
    "type" : "int"
  } ]
}

清单3 AvroUtilsTest的执行结果

使用Apache Avro实现继承

一种常见的定义数据的方法是通过继承——使用现有的数据定义并添加参数。虽然技术上Avro不支持继承[7],但要是实现一个类继承的结构非常简单。

如果我们有一个基类的定义——FacebookUser,如下:

{
"namespace": "com.navteq.avro",
"name": "FacebookUser",
"type": "record",
"fields": [
  {"name": "name", "type": ["string", "null"] },
  {"name": "num_likes", "type": "int"},
  {"name": "num_photos", "type": "int"},
  {"name": "num_groups", "type": "int"} ]
} 

清单4 Facebook用户记录的定义

要创建一个FacebookSpecialUser定义非常简单,它大概是这样的:

{
    "namespace": "com.navteq.avro",
  "name": "FacebookSpecialUser",
  "type": "record",
  "fields": [
    {"name": "user", "type": com.navteq.avro.FacebookUser },
      {"name": "specialData", "type": "int"}
    ]
}

清单5 Facebook特殊的用户记录的定义

一个特殊的用户定义包含两个字段——Facebook的用户类型的用户和一个int类型的数据字段。

特殊Facebook用户的简单测试类如下:

package com.navteq.avro.inheritance;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.File;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.util.Utf8;
import org.junit.Before;
import org.junit.Test;

import com.navteq.avro.common.AvroUtils;

public class TestSimpleInheritance {

        private Schema schema;
        private Schema subSchema;

        @Before
        public void setUp() throws Exception {

                subSchema = AvroUtils.parseSchema(new File("resources/facebookUser.avro"));
                schema = AvroUtils.parseSchema(new File("resources/FacebookSpecialUser.avro"));

        }

        @Test
        public void testSimpleInheritance() throws Exception{
                ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
                GenericDatumWriter writer =
new GenericDatumWriter(schema);
Encoder encoder = new BinaryEncoder(outputStream);

GenericRecord subRecord1 = new GenericData.Record(subSchema);
subRecord1.put("name", new Utf8("Doctor Who"));
subRecord1.put("num_likes", 1);
subRecord1.put("num_photos", 0);
subRecord1.put("num_groups", 423);
GenericRecord record1 = new GenericData.Record(schema);
record1.put("user", subRecord1);
record1.put("specialData", 1);

writer.write(record1, encoder);

GenericRecord subRecord2 = new GenericData.Record(subSchema);
subRecord2.put("name", new org.apache.avro.util.Utf8("Doctor WhoWho"));
subRecord2.put("num_likes", 2);
subRecord2.put("num_photos", 0);
subRecord2.put("num_groups", 424);
GenericRecord record2 = new GenericData.Record(schema);
record2.put("user", subRecord2);
record2.put("specialData", 2);

writer.write(record2, encoder);

encoder.flush();

ByteArrayInputStream inputStream =
new ByteArrayInputStream(outputStream.toByteArray());
Decoder decoder = DecoderFactory.defaultFactory().
createBinaryDecoder(inputStream, null);
GenericDatumReader reader =
new GenericDatumReader(schema);
while(true){
try{
GenericRecord result = reader.read(null, decoder);
System.out.println(result);
}
catch(EOFException eof){
break;
}
catch(Exception ex){
ex.printStackTrace();
}
}
}
}[8]

清单6 一个特殊的Facebook用户的测试类

运行这个测试类产生预期的结果:

{"user": {"name": "Doctor Who", "num_likes": 1, "num_photos": 0,
"num_groups": 423}, "specialData": 1}
{"user": {"name": "Doctor WhoWho", "num_likes": 2, "num_photos": 0,
"num_groups": 424}, "specialData": 2}

清单7 Facebook特殊用户的测试结果

如果唯一需要的是有包含基础数据和其他参数的记录,此代码工作正常,但它不提供多态性——读取相同记录时,没办法知道到底读的是哪个类型的记录。

使用ApacheAvro实现多态性

与谷歌protocol buffers不同[9],Avro不支持可选参数[10],上述继承的实现不适应于多态性的实现——这是由于必须具备特殊的数据参数。幸运的是,Avro支持联合体,允许省略某些记录的参数。下面的定义可用于创建一个多态的纪录。对于基准纪录,我将使用清单4中描述的例子。为了扩展我们将使用以下两个定义:

{
     "namespace": "com.navteq.avro",
   "name": "FacebookSpecialUserExtension1",
   "type": "record",
   "fields": [
      {"name": "specialData1", "type": "int"}
     ]
}

清单8 首条扩展记录的定义

{
     "namespace": "com.navteq.avro",
   "name": "FacebookSpecialUserExtension2",
   "type": "record",
   "fields": [
      {"name": "specialData2", "type": "int"}
     ]
}

清单9 第二条扩展记录的定义

有了以上两个定义一个多态记录可以定义如下:

{
     "namespace": "com.navteq.avro",
   "name": "FacebookSpecialUser",
   "type": "record",
   "fields": [
      {"name": "type", "type": "string" },
      {"name": "user", "type": com.navteq.avro.FacebookUser },
        {"name": "extension1", "type":
            [com.navteq.avro.FacebookSpecialUserExtension1, "null"]},
        {"name": "extension2", "type":
            [com.navteq.avro.FacebookSpecialUserExtension2, "null"]}
      ]
}

清单10 Facebook特殊用户的多态定义

这里扩展1和扩展2都是可选的且二者皆可。为了使处理更简单,我添加了一个类型字段,可以用来明确定义的记录类型。

下面给出一个更好的多态记录的定义:

{
     "namespace": "com.navteq.avro",
   "name": "FacebookSpecialUser1",
   "type": "record",
   "fields": [
      {"name": "type", "type": "string" },
      {"name": "user", "type": com.navteq.avro.FacebookUser },
        {"name": "extension", "type":
            [com.navteq.avro.FacebookSpecialUserExtension1,
            com.navteq.avro.FacebookSpecialUserExtension2,
            "null"]}
      ]
}

清单11 Facebook特殊用户的改进多态定义

下面给出一个多态Facebook特殊用户的简单测试类:

package com.navteq.avro.inheritance;

package com.navteq.avro.inheritance;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.File;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.util.Utf8;
import org.junit.Before;
import org.junit.Test;

import com.navteq.avro.common.AvroUtils;

public class TestInheritance {

private Schema FBUser;
private Schema base;
private Schema ext1;
private Schema ext2;

@Before
public void setUp() throws Exception {

base = AvroUtils.parseSchema(new File("resources/facebookUser.avro"));
ext1 = AvroUtils.parseSchema(
new File("resources/FacebookSpecialUserExtension1.avro"));
ext2 = AvroUtils.parseSchema(
new File("resources/FacebookSpecialUserExtension2.avro"));
FBUser = AvroUtils.parseSchema(new File("resources/FacebooklUserInheritance.avro"));
}

@Test
public void testInheritanceBinary() throws Exception{
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
GenericDatumWriter writer =
new GenericDatumWriter(FBUser);
Encoder encoder = new BinaryEncoder(outputStream);

GenericRecord baseRecord = new GenericData.Record(base);
baseRecord.put("name", new Utf8("Doctor Who"));
baseRecord.put("num_likes", 1);
baseRecord.put("num_photos", 0);
baseRecord.put("num_groups", 423);
GenericRecord FBrecord = new GenericData.Record(FBUser);
FBrecord.put("type", "base");
FBrecord.put("user", baseRecord);

writer.write(FBrecord, encoder);

baseRecord = new GenericData.Record(base);
baseRecord.put("name", new Utf8("Doctor WhoWho"));
baseRecord.put("num_likes", 1);
baseRecord.put("num_photos", 0);
baseRecord.put("num_groups", 423);
GenericRecord extRecord = new GenericData.Record(ext1);
extRecord.put("specialData1", 1);
FBrecord = new GenericData.Record(FBUser);
FBrecord.put("type", "extension1");
FBrecord.put("user", baseRecord);
FBrecord.put("extension", extRecord);

writer.write(FBrecord, encoder);

baseRecord = new GenericData.Record(base);
baseRecord.put("name", new org.apache.avro.util.Utf8("Doctor WhoWhoWho"));
baseRecord.put("num_likes", 2);
baseRecord.put("num_photos", 0);
baseRecord.put("num_groups", 424);
extRecord = new GenericData.Record(ext2);
extRecord.put("specialData2", 2);
FBrecord = new GenericData.Record(FBUser);
FBrecord.put("type", "extension2");
FBrecord.put("user", baseRecord);
FBrecord.put("extension", extRecord);

writer.write(FBrecord, encoder);

encoder.flush();

byte[] data = outputStream.toByteArray();
ByteArrayInputStream inputStream = new ByteArrayInputStream(data);
Decoder decoder =
DecoderFactory.defaultFactory().createBinaryDecoder(inputStream, null);
GenericDatumReader reader =
new GenericDatumReader(FBUser);
while(true){
try{
GenericRecord result = reader.read(null, decoder);
System.out.println(result);
}
catch(EOFException eof){
break;
}
catch(Exception ex){
ex.printStackTrace();
}
}
}
}

清单12 一条多态Facebook用户记录的测试类

运行这个测试类产生的预期结果:

{"type": "base", "user": {"name": "Doctor Who", "num_likes": 1, "num_photos":
0, "num_groups": 423}, "extension": null}
{"type": "extension1", "user": {"name": "Doctor WhoWho", "num_likes": 1,
"num_photos": 0, "num_groups": 423}, "extension": {"specialData1": 1}}
{"type": "extension2", "user": {"name": "Doctor WhoWhoWho", "num_likes": 2,
"num_photos": 0, "num_groups": 424}, "extension": {"specialData2": 2}} 

清单13 多态Facebook用户记录测试的执行结果

使用ApacheAvro的向后兼容性

XML的优势之一就是当模式定义使用可选参数扩展时具备向后兼容性。我们介绍一个第三扩展记录的定义来测试Avro的这个特性:

{
     "namespace": "com.navteq.avro",
   "name": "FacebookSpecialUserExtension3",
   "type": "record",
   "fields": [
      {"name": "specialData3", "type": "int"}
   ]
}

清单14 第三扩展记录的定义

多态记录的变更定义如下:

{
     "namespace": "com.navteq.avro",
   "name": "FacebookSpecialUser11",
   "type": "record",
   "fields": [
     {"name": "type", "type": "string" },
     {"name": "user", "type": com.navteq.avro.FacebookUser },
       {"name": "extension", "type":
          [com.navteq.avro.FacebookSpecialUserExtension1,
          com.navteq.avro.FacebookSpecialUserExtension2,
          com.navteq.avro.FacebookSpecialUserExtension3,
          "null"]}
     ]
}

清单15 Facebook特殊用户的改进多态定义

为了能读取清单15中记录定义中的记录,清单12中的代码在修改后(但仍然用清单11中的记录定义来写数据)生成下列结果:

{"type": "base", "user": {"name": "Doctor Who", "num_likes": 1, "num_photos":
0, "num_groups": 423}, "extension": {"specialData3": 10}}
java.lang.ArrayIndexOutOfBoundsException
 at java.lang.System.arraycopy(Native Method)
 at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:331)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:265)
at org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:99)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:318)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:312)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:120)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:142)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:114)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:105)
at com.navteq.avro.inheritance.TestInheritance.testInheritanceBinary(TestInheritance.java:119)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)

清单16 多态Facebook用户记录对扩展定义测试的执行结果

虽然Avro提供了一个能够解决这个问题的API——GenericDatumReader<GenericRecord>构造函数可以使用两个参数——分别用来写记录与读记录的模式,但这不总是解决向后兼容问题的一定可行的方法,因为它必须要记住用来写每条记录的所有模式。

一个更合适的解决方案是:从二进制编码器/解码器(它建立记录的二进制表象)切换到JSON编码器/解码器。在这种情况下代码有效,并产生以下结果:

{"type": "base", "user": {"name": "Doctor Who", "num_likes": 1, "num_photos":
0, "num_groups": 423}, "extension": null}
{"type": "extension1", "user": {"name": "Doctor WhoWho", "num_likes": 1,
"num_photos": 0, "num_groups": 423}, "extension": {"specialData1": 1}}
{"type": "extension2", "user": {"name": "Doctor WhoWhoWho", "num_likes": 2,
"num_photos": 0, "num_groups": 424}, "extension": {"specialData2": 2}}

清单17 应用JSON编码多态Facebook用户记录对扩展定义测试的执行结果

通过JSON的编码器,实际的数据转换成JSON:

{"type":"base","user":{"name":{"string":"Doctor
Who"},"num_likes":1,"num_photos":0,"num_groups":423},"extension":null}
{"type":"extension1","user":{"name":{"string":"Doctor
WhoWho"},"num_likes":1,"num_photos":0,"num_groups":423},"extension":{"FacebookSpecialUserExtension1":{"specialData1":1}}}
{"type":"extension2","user":{"name":{"string":"Doctor
WhoWhoWho"},"num_likes":2,"num_photos":0,"num_groups":424},"extension":{"FacebookSpecialUserExtension2":{"specialData2":2}}}

清单18 JSON编码下所转换的数据

还有一个需要考虑的问题,在我的测试中,同样的数据在二进制编码下产生的Avro记录的大小为89字节,而在JSON编码下产生了473字节。

结论

当前实现的Avro不直接支持模式的组件化或模式组件重用,但像本文中描述的一个简单的框架能够为这些特性提供支持。尽管Avro不直接支持多态性,文中利用适当的模式设计可以简单地实现多态数据模式。至于真正意义上向后兼容性问题,只有使用JSON编码的时候Avro才支持[11]。最后一点和Avro的特性没有多大关系,更多的是来自JSON。最后一点严重限制了Avro适用性(如果向后兼容性是必须的),使其使用范围局限为一种高级的JSON编组和处理API。

除了一般的(这里所用到的)Avro方法,也可以使用一个特定的Avro。这时候,可通过(Avro)生产特定的记录而非普通的记录。尽管有些说法指出[12]Avro的特定应用能够获得性能提升,以我使用当前Avro版本(1.4.1)的经验来看,两者有着同样的性能表现。


[1]http://hadoop.apache.org/avro/

[2]http://avro.apache.org/docs/1.4.1/

[3]http://code.google.com/p/thrift-protobuf-compare/wiki/Benchmarking

[4]我在Avro编组Avro Map Reduce发现的几篇

[5]http://avro.apache.org/docs/current/spec.html

[6]很有趣,Avro IDL支持子IDL

[7]与明确支持类型定义中的基类型的XML不同

[8]关于上面的代码需要指出的一点是,模式解析是在构造函数中完成的,原因在于构造解析是Avro实现中最昂贵的操作。

[9]http://code.google.com/p/protobuf/

[10]Avro支持“Null”,这不同于可选参数,在Avro中“Null”表示某个属性没有值

[11]或者如果有旧版本的模式

[12]http://code.google.com/p/thrift-protobuf-compare/wiki/Benchmarking

查看英文原文:Using Apache Avro



转载原文:http://www.infoq.com/cn/articles/ApacheAvro