netty 默認(rèn)支持protobuf 的封裝與解碼,如果通信雙方都使用netty則沒有什么障礙,但如果客戶端是其它語言(C#)則需要自己仿寫與netty一致的方式(解碼+封裝),提前是必須很了解netty是如何進(jìn)行封裝與解碼的。這里主要通過讀源碼主要類ProtobufVarint32FrameDecoder(解碼)+ProtobufVarint32LengthFieldPrepender(封裝) 來解析其原理與實(shí)現(xiàn)。


一,支持protobuf 協(xié)議的默認(rèn)實(shí)現(xiàn)

iOS培訓(xùn),Swift培訓(xùn),蘋果開發(fā)培訓(xùn),移動(dòng)開發(fā)培訓(xùn)

//配置服務(wù)端NIO線程組  
        EventLoopGroup bossGroup = new NioEventLoopGroup();  
        EventLoopGroup workerGroup = new NioEventLoopGroup();  
        try{  
            ServerBootstrap b = new ServerBootstrap();  
            b.group(bossGroup, workerGroup)  
                .channel(NioServerSocketChannel.class)  
                .option(ChannelOption.SO_BACKLOG, 1024)  
                .handler(new LoggingHandler(LogLevel.INFO))  
                .childHandler(new ChannelInitializer<SocketChannel>() {  
  
                    @Override  
                    protected void initChannel(SocketChannel ch) throws Exception {  
                        ch.pipeline()  
                        .addLast(new ProtobufVarint32FrameDecoder())                          
                        .addLast(new ProtobufDecoder(  
                                SubscribeReqProto.SubscribeReq.getDefaultInstance()))                         
                        .addLast(new ProtobufVarint32LengthFieldPrepender())                          
                        .addLast(new ProtobufEncoder())                       
                        .addLast(new SubReqServerHandler());                          
                    }  
                      
                });  
            //綁定端口,同步等待成功  
            ChannelFuture f = b.bind(port).sync();  
            //等待服務(wù)端監(jiān)聽端口關(guān)閉              f.channel().closeFuture().sync();  
              
        }finally{  
            //退出時(shí)釋放資源              bossGroup.shutdownGracefully();  
            workerGroup.shutdownGracefully();  
        }

iOS培訓(xùn),Swift培訓(xùn),蘋果開發(fā)培訓(xùn),移動(dòng)開發(fā)培訓(xùn)

 

以上是提供的默認(rèn)實(shí)現(xiàn)。關(guān)鍵在于ProtobufVarint32FrameDecoder,ProtobufVarint32LengthFieldPrepender類。

二,ProtobufVarint32LengthFieldPrepender 編碼類

An encoder that prepends the the Google Protocol Buffers 128 Varints integer length field.

* BEFORE DECODE (300 bytes)       AFTER DECODE (302 bytes)* +---------------+               +--------+---------------+* | Protobuf Data |-------------->| Length | Protobuf Data |* |  (300 bytes)  |               | 0xAC02 |  (300 bytes)  |* +---------------+               +--------+---------------+

從類的說明來看, proto 消息格式如:Length + Protobuf Data (消息頭+消息數(shù)據(jù)) 方式,這里特別需要注意的是頭長使用的是varints方式不是int ,消息頭描述消息數(shù)據(jù)體的長度。為了更減少傳輸量,消息頭采用的是varint 格式。

什么是varint?

Varint 是一種緊湊的表示數(shù)字的方法。它用一個(gè)或多個(gè)字節(jié)來表示一個(gè)數(shù)字,值越小的數(shù)字使用越少的字節(jié)數(shù)。這能減少用來表示數(shù)字的字節(jié)數(shù)。 Varint 中的每個(gè) byte 的最高位 bit 有特殊的含義,如果該位為 1,表示后續(xù)的 byte 也是該數(shù)字的一部分,如果該位為 0,則結(jié)束。其他的 7 個(gè) bit 都用來表示數(shù)字。因此小于 128 的數(shù)字都可以用一個(gè) byte 表示。大于 128 的數(shù)字,會(huì)用兩個(gè)字節(jié)。

更多可參見我上篇文章

最大的區(qū)別是消息頭它不是固定長度(常見是的使用INT 4個(gè)字節(jié)固定長度),Varint它用一個(gè)或多個(gè)字節(jié)來表示一個(gè)數(shù)字決定它不是固定長度!

ProtobufVarint32LengthFieldPrepender 類的主要方法如下:

iOS培訓(xùn),Swift培訓(xùn),蘋果開發(fā)培訓(xùn),移動(dòng)開發(fā)培訓(xùn)

@Override    protected void encode(
            ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {        int bodyLen = msg.readableBytes();        int headerLen = CodedOutputStream.computeRawVarint32Size(bodyLen);
        out.ensureWritable(headerLen + bodyLen);

        CodedOutputStream headerOut =
                CodedOutputStream.newInstance(new ByteBufOutputStream(out), headerLen);
        headerOut.writeRawVarint32(bodyLen);
        headerOut.flush();

        out.writeBytes(msg, msg.readerIndex(), bodyLen);
    }

iOS培訓(xùn),Swift培訓(xùn),蘋果開發(fā)培訓(xùn),移動(dòng)開發(fā)培訓(xùn)

 

CodedOutputStream 主要是針對(duì)與varints相關(guān)操作類。 先看是如何寫消息頭的,得到bodyLen 消息體長度然后調(diào)用computeRawVarint32Size()計(jì)算需要多少個(gè)字節(jié),

iOS培訓(xùn),Swift培訓(xùn),蘋果開發(fā)培訓(xùn),移動(dòng)開發(fā)培訓(xùn)

public static int computeRawVarint32Size(final int value) {    if ((value & (0xffffffff <<  7)) == 0) return 1;    if ((value & (0xffffffff << 14)) == 0) return 2;    if ((value & (0xffffffff << 21)) == 0) return 3;    if ((value & (0xffffffff << 28)) == 0) return 4;    return 5;
  }

iOS培訓(xùn),Swift培訓(xùn),蘋果開發(fā)培訓(xùn),移動(dòng)開發(fā)培訓(xùn)

 

0xffffffff << 7 二進(jìn)制表示11111111111111111111111110000000 ,當(dāng)與value &計(jì)算=0則表示value最大只會(huì)是000000000000000000000001111111,一個(gè)字節(jié)足以。

通過&運(yùn)算得出使用多少個(gè)字節(jié)就可以表示當(dāng)前數(shù)字。左移7位是與Varint定義相關(guān),第一位需要保留給標(biāo)識(shí)(1表示后續(xù)的 byte 也是該數(shù)字的一部分,0則結(jié)束)。要表示 int 32位 和多加的每個(gè)字節(jié)第一個(gè)標(biāo)識(shí)位,多出了4位,所以就最大會(huì)有5個(gè)字節(jié)。

得到了varints值,然后如何寫入out? 再看關(guān)鍵方法writeRawVarint32()。

iOS培訓(xùn),Swift培訓(xùn),蘋果開發(fā)培訓(xùn),移動(dòng)開發(fā)培訓(xùn)

public void writeRawVarint32(int value) throws IOException {    while (true) {      //0x7F為127
      if ((value & ~0x7F) == 0) {//是否小于127,小于則一個(gè)字節(jié)就可以表示了        writeRawByte(value);        return;
      } else {
        writeRawByte((value & 0x7F) | 0x80);//因不于小127,加一高位標(biāo)識(shí)
        value >>>= 7;//右移7位,再遞歸      }
    }
  }    /** Write a single byte. */
  public void writeRawByte(final byte value) throws IOException {    if (position == limit) {
      refreshBuffer();
    }

    buffer[position++] = value;
  }  
  private void refreshBuffer() throws IOException {    if (output == null) {      // We're writing to a single buffer.
      throw new OutOfSpaceException();
    }    // Since we have an output stream, this is our buffer    // and buffer offset == 0
    output.write(buffer, 0, position);
    position = 0;
  }

iOS培訓(xùn),Swift培訓(xùn),蘋果開發(fā)培訓(xùn),移動(dòng)開發(fā)培訓(xùn)

 

byte 的取值(-128~127) , 0x7F為127 , 0x80為128

循環(huán)取后7位,如果小于127則結(jié)束,不小于第一位加標(biāo)識(shí)位1。 因?yàn)檠h(huán)右移所以,實(shí)際位置顛倒了,解碼時(shí)需要倒過來再拼接。

消息頭因?yàn)槭莢arint32可變字節(jié),所以比較復(fù)雜些,消息體簡單直接writeBytes即可。

二,ProtobufVarint32FrameDecoder 解碼類

同樣對(duì)應(yīng)CodedOutputStream有CodedInputStream類,操作解碼時(shí)的varints。

iOS培訓(xùn),Swift培訓(xùn),蘋果開發(fā)培訓(xùn),移動(dòng)開發(fā)培訓(xùn)

@Override    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        in.markReaderIndex();        final byte[] buf = new byte[5];        for (int i = 0; i < buf.length; i ++) {            if (!in.isReadable()) {
                in.resetReaderIndex();                return;
            }

            buf[i] = in.readByte();            if (buf[i] >= 0) {                int length = CodedInputStream.newInstance(buf, 0, i + 1).readRawVarint32();                if (length < 0) {                    throw new CorruptedFrameException("negative length: " + length);
                }                if (in.readableBytes() < length) {
                    in.resetReaderIndex();                    return;
                } else {
                    out.add(in.readBytes(length));                    return;
                }
            }
        }        // Couldn't find the byte whose MSB is off.
        throw new CorruptedFrameException("length wider than 32-bit");
    }

iOS培訓(xùn),Swift培訓(xùn),蘋果開發(fā)培訓(xùn),移動(dòng)開發(fā)培訓(xùn)

 

前面說明了最大長度為5個(gè)字節(jié)所以這里聲明了5個(gè)長度的字節(jié)來讀取消息頭。

buf[i] >= 0 這里為什么是>0然后就可以解碼了呢?

還是這句話:varints第一位表示后續(xù)的byte是否是該數(shù)字的一部分!

如果字節(jié)第一位為1則表示后續(xù)還有字節(jié)是表示消息頭,當(dāng)這個(gè)字節(jié)的第一位為1則這個(gè)字節(jié)肯定是負(fù)數(shù)(字節(jié)最高位表示正負(fù)),大于等于0表示描述消息體長度的數(shù)字已經(jīng)讀完了。

然后調(diào)用readRawVarint32() 還原成int ,與之前 writeRawVarint32()反其道而行。

iOS培訓(xùn),Swift培訓(xùn),蘋果開發(fā)培訓(xùn),移動(dòng)開發(fā)培訓(xùn)

public int readRawVarint32() throws IOException {   byte tmp = readRawByte();   if (tmp >= 0) {     return tmp;
   }   int result = tmp & 0x7f;   if ((tmp = readRawByte()) >= 0) {
     result |= tmp << 7;
   } else {
     result |= (tmp & 0x7f) << 7;     if ((tmp = readRawByte()) >= 0) {
       result |= tmp << 14;
     } else {
       result |= (tmp & 0x7f) << 14;       if ((tmp = readRawByte()) >= 0) {
         result |= tmp << 21;
       } else {
         result |= (tmp & 0x7f) << 21;
         result |= (tmp = readRawByte()) << 28;         if (tmp < 0) {           // Discard upper 32 bits.
           for (int i = 0; i < 5; i++) {             if (readRawByte() >= 0) {               return result;
             }
           }           throw InvalidProtocolBufferException.malformedVarint();
         }
       }
     }
   }   return result;
 }

iOS培訓(xùn),Swift培訓(xùn),蘋果開發(fā)培訓(xùn),移動(dòng)開發(fā)培訓(xùn)

 

取第N字節(jié)左移7*N位或|第一個(gè)字節(jié)拼接,實(shí)現(xiàn)了倒序拼接,最后得到了消息體長度。然后根據(jù)得到的消息體長度讀取數(shù)據(jù),如果消息體長度不夠則回滾到markReaderIndex,等待數(shù)據(jù)。

四,總結(jié)

本文主要詳細(xì)介紹了netty 對(duì) protobuf 協(xié)議的解碼與包裝。重點(diǎn)在消息頭 varint32的 算法表示上進(jìn)行了說明。了解了varint32在協(xié)議中的實(shí)現(xiàn),方便應(yīng)用在其語言對(duì)接。