Apache Thrift 序列化机制精讲

前言

Thrift 支持二进制,压缩格式,以及 json 格式数据的序列化和反序列化。开发人员可以更加灵活的选择协议的具体形式。协议是可自由扩展的,新版本的协议,完全兼容老的版本。

数据交换格式简介

当前流行的数据交换格式可以分为如下几类:

(一) 自解析型

序列化的数据包含完整的结构, 包含了 field 名称和 value 值。比如 xml/json/java serizable,百度的 mcpack/compack,都属于此类。即调整不同属性的顺序对序列化/反序列化不造成影响。

(二) 半解析型

序列化的数据,丢弃了部分信息, 比如 field 名称, 但引入了 index(常常是 id+type 的方式)来对应具体属性和值。这方面的代表有 google protobuf/thrift 也属于此类。

(三) 无解析型

百度的 infpack 实现,就是借助该种方式来实现,丢弃了很多有效信息,性能/压缩比最好,不过向后兼容需要开发做一定的工作, 详情不知。

Thrift的序列化协议

(a). 首先编写一个简单的 thrift文件 pair.thrift:

struct Pair {
    1:required string key
    2:required string value
}

这里标识了 required 的字段,要求在使用时必须正确赋值,否则运行时会抛出 TProtocolException 异常。缺省和指定为 optional 时,则运行时不做字段非空校验。

(b). 编译并生成 java源代码:

thrift -gen java pair.thrift

(c). 编写序列化和反序列化的测试代码:

序列化测试,将 Pair对象写入文件中

private static void writeData() throws IOException, TException {
    Pair pair = new Pair();
    pair.setKey("key1").setValue("value1");
    FileOutputStream fos = new FileOutputStream(new File("pair.txt"));
    pair.write(new TBinaryProtocol(new TIOStreamTransport(fos)));
    fos.close();
}

反序列化测试,从文件中解析生成 Pair对象

private static void readData() throws TException, IOException {
    Pair pair = new Pair();
    FileInputStream fis = new FileInputStream(new File("pair.txt"));
    pair.read(new TBinaryProtocol(new TIOStreamTransport(fis)));
    System.out.println("key => " + pair.getKey());
    System.out.println("value => " + pair.getValue());
    fis.close();
}

(d) 观察运行结果,正常输出表明序列化和反序列化过程正常完成。

uYpDz8CRyZmr974N5gsL_r1.jpg

Thrift协议源码

(一) writeData()分析

首先查看 thrift 的序列化机制,即数据写入实现,这里采用二进制协议 TBinaryProtocol,切入点为 pair.write(TProtocol):

y9Tv8eknSpma6wYg2t0q_r2.jpg

查看 scheme()方法,决定采用元组计划(TupleScheme)还是标准计划(StandardScheme)来实现序列化,默认采用的是标准计划 StandardScheme。

7QqdwyD4H0W9PUZjO5cB_r3.jpg

标准计划(StandardScheme)下的 write()方法:

CTOGeFqMdHv97g2mNWoY_r4.jpg

这里完成了几步操作:

(a). 根据 ThriftIDL 文件中定义了 required 的字段验证字段是否正确赋值。

public void validate() throws org.apache.thrift.TException {
    // check for required fields
    if (key == null) {
        throw new org.apache.thrift.protocol.TProtocolException("Required field 'key' was not present! Struct: " + toString());
    }
    if (value == null) {
        throw new org.apache.thrift.protocol.TProtocolException("Required field 'value' was not present! Struct: " + toString());
    }
}

(b). 通过 writeStructBegin()记录写入结构的开始标记。

public void writeStructBegin(TStruct struct) {}

(c). 逐一写入 Pair 对象的各个字段,包括字段字段开始标记、字段的值和字段结束标记。

if (struct.key != null) {
    oprot.writeFieldBegin(KEY_FIELD_DESC);
    oprot.writeString(struct.key);
    oprot.writeFieldEnd();
}
// 省略...

(1). 首先是字段开始标记,包括 type 和 field-id。 type 是字段的数据类型的标识号, field-id 是 ThriftIDL 定义的字段次序,比如说 key 为1, value 为2。

public void writeFieldBegin(TField field) throws TException {
    writeByte(field.type);
    writeI16(field.id);
}

Thrift 提供了 TType,对不同的数据类型(type)提供了唯一标识的 typeID。

public final class TType {
    public static final byte STOP = 0; // 数据读写完成
    public static final byte VOID = 1; // 空值
    public static final byte BOOL = 2; // 布尔值
    public static final byte BYTE = 3; // 字节
    public static final byte DOUBLE = 4; // 双精度浮点型
    public static final byte I16 = 6; // 短整型
    public static final byte I32 = 8; // 整型
    public static final byte I64 = 10; // 长整型
    public static final byte STRING = 11; // 字符串类型
    public static final byte STRUCT = 12; // 引用类型
    public static final byte MAP = 13; // Map
    public static final byte SET = 14; // 集合
    public static final byte LIST = 15; // 列表
    public static final byte ENUM = 16; // 枚举
}

(2). 然后是写入字段的值,根据字段的数据类型又归纳为以下实现: writeByte()、 writeBool()、 writeI32()、 writeI64()、 writeDouble()、 writeString()和 writeBinary()方法。

TBinaryProtocol 通过一个长度为8的 byte 字节数组缓存写入或读取的临时字节数据。

private final byte[] inoutTemp = new byte[8];

常识1:16进制的介绍。以0x开始的数据表示16进制,0xff换成十进制为255。在16进制中,A、B、C、D、E、F这五个字母来分别表示10、11、12、13、14、15。

16进制变十进制:f表示15。第n位的权值为16的n次方,由右到左从0位起:0xff = 1516^1 + 1516^0 = 255 16进制变二进制再变十进制:0xff = 1111 1111 = 2^8 - 1 = 255

常识2:位运算符的使用。>>表示代表右移符号,如:int i=15; i>>2的结果是3,移出的部分将被抛弃。而<<表示左移符号,与>>刚好相反。

转为二进制的形式可能更好理解,0000 1111(15)右移2位的结果是0000 0011(3),0001 1010(18)右移3位的结果是0000 0011(3)。

  • writeByte():写入单个字节数据。
public void writeByte(byte b) throws TException {
    inoutTemp[0] = b;
    trans_.write(inoutTemp, 0, 1);
}
  • writeBool():写入布尔值数据。
public void writeBool(boolean b) throws TException {
    writeByte(b ? (byte)1 : (byte)0);
}
  • writeI16():写入短整型 short类型数据。
public void writeI16(short i16) throws TException {
    inoutTemp[0] = (byte)(0xff & (i16 >> 8));
    inoutTemp[1] = (byte)(0xff & (i16));
    trans_.write(inoutTemp, 0, 2);
}
  • writeI32():写入整型 int类型数据。
public void writeI32(int i32) throws TException {
    inoutTemp[0] = (byte)(0xff & (i32 >> 24));
    inoutTemp[1] = (byte)(0xff & (i32 >> 16));
    inoutTemp[2] = (byte)(0xff & (i32 >> 8));
    inoutTemp[3] = (byte)(0xff & (i32));
    trans_.write(inoutTemp, 0, 4);
}
  • writeI64():写入长整型 long类型数据。
public void writeI64(long i64) throws TException {
    inoutTemp[0] = (byte)(0xff & (i64 >> 56));
    inoutTemp[1] = (byte)(0xff & (i64 >> 48));
    inoutTemp[2] = (byte)(0xff & (i64 >> 40));
    inoutTemp[3] = (byte)(0xff & (i64 >> 32));
    inoutTemp[4] = (byte)(0xff & (i64 >> 24));
    inoutTemp[5] = (byte)(0xff & (i64 >> 16));
    inoutTemp[6] = (byte)(0xff & (i64 >> 8));
    inoutTemp[7] = (byte)(0xff & (i64));
    trans_.write(inoutTemp, 0, 8);
}
  • writeDouble():写入双浮点型 double类型数据。
public void writeDouble(double dub) throws TException {
    writeI64(Double.doubleToLongBits(dub));
}
  • writeString():写入字符串类型,这里先写入字符串长度,再写入字符串内容。
public void writeString(String str) throws TException {
    try {
        byte[] dat = str.getBytes("UTF-8");
        writeI32(dat.length);
        trans_.write(dat, 0, dat.length);
    } catch (UnsupportedEncodingException uex) {
        throw new TException("JVM DOES NOT SUPPORT UTF-8");
    }
}
  • writeBinary:写入二进制数组类型数据,这里数据输入是 NIO中的 ByteBuffer类型。
public void writeBinary(ByteBuffer bin) throws TException {
    int length = bin.limit() - bin.position();
    writeI32(length);
    trans_.write(bin.array(), bin.position() + bin.arrayOffset(), length);
}

(3). 每个字段写入完成后,都需要记录字段结束标记。

public void writeFieldEnd() {}

(d). 当所有的字段都写入以后,需要记录字段停止标记。

public void writeFieldStop() throws TException {
    writeByte(TType.STOP);
}

(e). 当所有数据写入完成后,通过 writeStructEnd()记录写入结构的完成标记。

public void writeStructEnd() {}

(二) readData()分析

查看 thrift 的反序列化机制,即数据读取实现,同样采用二进制协议 TBinaryProtocol,切入点为 pair.read(TProtocol):

BGhCHr4wSaosmgJDRyuZ_r5.jpg

数据读取和数据写入一样,也是采用的标准计划 StandardScheme。标准计划(StandardScheme)下的 read()方法:

nrbjQ25AZ7Wwqa0tLYRg_r6.jpg

这里完成的几步操作:

(a). 通过 readStructBegin读取结构的开始标记。

iprot.readStructBegin()

(b). 循环读取结构中的所有字段数据到 Pair 对象中,直到读取到 org.apache.thrift.protocol.TType.STOP 为止。 iprot.readFieldBegin() 指明开始读取下一个字段的前需要读取字段开始标记。

while (true) {
    schemeField = iprot.readFieldBegin();
    if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
        break;
    }
    // 字段的读取,省略...
}

(c). 根据 ThriftIDL 定义的 field-id 读取对应的字段,并赋值到 Pair 对象中,并设置 Pair 对象相应的字段为已读状态(前提:字段在 IDL 中被定义为 required)。

switch (schemeField.id) {
    case 1: // KEY
        if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
            struct.key = iprot.readString();
            struct.setKeyIsSet(true);
        } else {
            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
        }
        break;
    case 2: // VALUE
        if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
            struct.value = iprot.readString();
            struct.setValueIsSet(true);
        } else {
            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
        }
        break;
    default:
        org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}

关于读取字段的值,根据字段的数据类型也分为以下实现: readByte()、 readBool()、 readI32()、 readI64()、 readDouble()、 readString()和 readBinary()方法。

  • readByte():读取单个字节数据。
public byte readByte() throws TException {
    if (trans_.getBytesRemainingInBuffer() >= 1) {
        byte b = trans_.getBuffer()[trans_.getBufferPosition()];
        trans_.consumeBuffer(1);
        return b;
    }
    readAll(inoutTemp, 0, 1);
    return inoutTemp[0];
}
  • readBool():读取布尔值数据。
public boolean readBool() throws TException {
    return (readByte() == 1);
}
  • readI16():读取短整型 short类型数据。
public short readI16() throws TException {
    byte[] buf = inoutTemp;
    int off = 0;
    if (trans_.getBytesRemainingInBuffer() >= 2) {
        buf = trans_.getBuffer();
        off = trans_.getBufferPosition();
        trans_.consumeBuffer(2);
    } else {
        readAll(inoutTemp, 0, 2);
    }
    return (short) (((buf[off] & 0xff) << 8) |
         ((buf[off+1] & 0xff)));
}
  • readI32():读取整型 int类型数据。
public int readI32() throws TException {
    byte[] buf = inoutTemp;
    int off = 0;
    if (trans_.getBytesRemainingInBuffer() >= 4) {
        buf = trans_.getBuffer();
        off = trans_.getBufferPosition();
        trans_.consumeBuffer(4);
    } else {
        readAll(inoutTemp, 0, 4);
    }
    return ((buf[off] & 0xff) << 24) |
        ((buf[off+1] & 0xff) << 16) |
        ((buf[off+2] & 0xff) << 8) |
        ((buf[off+3] & 0xff));
}
  • readI64():读取长整型 long类型数据。
public long readI64() throws TException {
    byte[] buf = inoutTemp;
    int off = 0;
    if (trans_.getBytesRemainingInBuffer() >= 8) {
        buf = trans_.getBuffer();
        off = trans_.getBufferPosition();
        trans_.consumeBuffer(8);
    } else {
        readAll(inoutTemp, 0, 8);
    }
    return ((long)(buf[off] & 0xff) << 56) |
        ((long)(buf[off+1] & 0xff) << 48) |
        ((long)(buf[off+2] & 0xff) << 40) |
        ((long)(buf[off+3] & 0xff) << 32) |
        ((long)(buf[off+4] & 0xff) << 24) |
        ((long)(buf[off+5] & 0xff) << 16) |
        ((long)(buf[off+6] & 0xff) << 8) |
        ((long)(buf[off+7] & 0xff));
}
  • readDouble():读取双精度浮点 double类型数据。
public double readDouble() throws TException {
    return Double.longBitsToDouble(readI64());
}
  • readString():读取字符串类型的数据,首先读取并校验 4字节的字符串长度,然后检查 NIO缓冲区中是否有对应长度的字节未消费。如果有,直接从缓冲区中读取;否则,从传输通道中读取数据。
public String readString() throws TException {
    int size = readI32();
    checkStringReadLength(size);
    if (trans_.getBytesRemainingInBuffer() >= size) {
        try {
            String s = new String(trans_.getBuffer(), trans_.getBufferPosition(), size, "UTF-8");
            trans_.consumeBuffer(size);
            return s;
        } catch (UnsupportedEncodingException e) {
            throw new TException("JVM DOES NOT SUPPORT UTF-8");
        }
    }
    return readStringBody(size);
}

如果是从传输通道中读取数据,查看 readStringBody()方法:

public String readStringBody(int size) throws TException {
    try {
        byte[] buf = new byte[size];
        trans_.readAll(buf, 0, size);
        return new String(buf, "UTF-8");
    } catch (UnsupportedEncodingException uex) {
        throw new TException("JVM DOES NOT SUPPORT UTF-8");
    }
}
  • readBinary():读取二进制数组类型数据,和字符串读取类似,返回一个 ByteBuffer字节缓存对象。
public ByteBuffer readBinary() throws TException {
    int size = readI32();
    checkStringReadLength(size);
    if (trans_.getBytesRemainingInBuffer() >= size) {
        ByteBuffer bb = ByteBuffer.wrap(trans_.getBuffer(), trans_.getBufferPosition(), size);
        trans_.consumeBuffer(size);
        return bb;
    }
    byte[] buf = new byte[size];
    trans_.readAll(buf, 0, size);
    return ByteBuffer.wrap(buf);
}

(d). 每个字段数据读取完成后,都需要再读取一个字段结束标记。

public void readFieldEnd() {}

(e). 当所有字段读取完成后,需要通过 readStructEnd() 再读入一个结构完成标记。

public void readStructEnd() {}

(f). 读取结束后,同样需要校验在 ThriftIDL中定义为 required 的字段是否为空,是否合法。

public void validate() throws org.apache.thrift.TException {
    // check for required fields
    if (key == null) {
        throw new org.apache.thrift.protocol.TProtocolException("Required field 'key' was not present! Struct: " + toString());
    }
    if (value == null) {
        throw new org.apache.thrift.protocol.TProtocolException("Required field 'value' was not present! Struct: " + toString());
    }
}

总结

其实到这里,对于 Thrift 的序列化机制和反序列化机制的具体实现和高效性,相信各位已经有了比较深入的认识!

如果觉得这对你有用,请随意赞赏,给与作者支持
评论 0
最新评论