Netty学习(三)—Codec编解码基础

Netty学习(三)—Codec编解码基础

Codec框架无论是decoder还是encoder本质上都是ChannelHandler处理器,用来将字节转换成基本数据类型或者将基本数据类型转换成字节;

个人主页:tuzhenyu’s page
原文地址:Netty学习(三)—Codec编解码基础

解码器

  • 解码器用来将输入数据流按照特定的格式转换成目标程序格式,解码器的基础类包括ByteToMessageDecoder和MessageToMessageDecoder两种,这两个类都继承了ChannelInboundHandlerAdapter类,实现了自定义的ChannelRead()方法用于对输入字节流进行处理;

  • 用户自定义特定作用的解码器都需继承ByteToMessageDecoder或MessageToMessageDecoder,实现其自定的解码方法decode()即可;

  • ByteToMessageDecoder和MessageToMessageDecoder区别:

    • ByteToMessageDecoder解码基类用于将特定数量的字节转换成特定格式的消息,比如LineBasedFrameDecoder,DelimiterBasedFrameDecoder,FixedLengthFrameDecoder和LengthFieldBasedFrameDecoder等都是继承ByteToMessageDecoder基类,实现其decode()方法;

    • MessageToMessageDecoder解码基类是用来将一个消息类型转换成另外一种消息格式,例如StringDecoder继承MessageToMessageDecoder将字节类型转换成字符类型;

ByteToMessageDecoder解码基类

  • ByteToMessageDecoder继承了ChannelInboundHandlerAdapter,实现了ChannelRead()方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            ByteBuf data = (ByteBuf) msg;
            first = cumulation == null;
            if (first) {
                cumulation = data;
            } else {
                cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
            }
            callDecode(ctx, cumulation, out);
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable t) {
            throw new DecoderException(t);
        } finally {
            if (cumulation != null && !cumulation.isReadable()) {
                numReads = 0;
                cumulation.release();
                cumulation = null;
            } else if (++ numReads >= discardAfterReads) {
                // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                // See https://github.com/netty/netty/issues/4275
                numReads = 0;
                discardSomeReadBytes();
            }

            int size = out.size();
            decodeWasNull = !out.insertSinceRecycled();
            fireChannelRead(ctx, out, size);
            out.recycle();
        }
    } else {
        ctx.fireChannelRead(msg);
    }
}
  • 在ChannelRead()中将数据流读入到ByteBuf中,调用自定义实现的decode()方法对ByteBuf进行处理;
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
        while (in.isReadable()) {
            int outSize = out.size();

            if (outSize > 0) {
                fireChannelRead(ctx, out, outSize);
                out.clear();
                if (ctx.isRemoved()) {
                    break;
                }
                outSize = 0;
            }

            int oldInputLength = in.readableBytes();
            decode(ctx, in, out);    //调用子类自定义decode()方法进行自定义解码

            if (ctx.isRemoved()) {
                break;
            }

            if (outSize == out.size()) {
                if (oldInputLength == in.readableBytes()) {
                    break;
                } else {
                    continue;
                }
            }

            if (oldInputLength == in.readableBytes()) {
                throw new DecoderException(
                        StringUtil.simpleClassName(getClass()) +
                        ".decode() did not read anything but decoded a message.");
            }

            if (isSingleDecode()) {
                break;
            }
        }
    } catch (DecoderException e) {
        throw e;
    } catch (Throwable cause) {
        throw new DecoderException(cause);
    }
}

LineBasedFrameDecoder换行符解码器

  • 查找ByteBuf中的换行符”\n”,根据换行符对ByteBuf进行分割
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
    final int eol = findEndOfLine(buffer);
    if (!discarding) {
        if (eol >= 0) {
            final ByteBuf frame;
            final int length = eol - buffer.readerIndex();
            final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;

            if (length > maxLength) {
                buffer.readerIndex(eol + delimLength);
                fail(ctx, length);
                return null;
            }

            if (stripDelimiter) {
                frame = buffer.readRetainedSlice(length);
                buffer.skipBytes(delimLength);
            } else {
                frame = buffer.readRetainedSlice(length + delimLength);
            }

            return frame;
        } else {
            final int length = buffer.readableBytes();
            if (length > maxLength) {
                discardedBytes = length;
                buffer.readerIndex(buffer.writerIndex());
                discarding = true;
                if (failFast) {
                    fail(ctx, "over " + discardedBytes);
                }
            }
            return null;
        }
    } else {
        if (eol >= 0) {
            final int length = discardedBytes + eol - buffer.readerIndex();
            final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
            buffer.readerIndex(eol + delimLength);
            discardedBytes = 0;
            discarding = false;
            if (!failFast) {
                fail(ctx, length);
            }
        } else {
            discardedBytes += buffer.readableBytes();
            buffer.readerIndex(buffer.writerIndex());
        }
        return null;
    }
}

FixedLengthFrameDecoder换行符解码器

  • 将ByteBuf中的数据按照特定长度进行分割
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    Object decoded = decode(ctx, in);
    if (decoded != null) {
        out.add(decoded);
    }
}

protected Object decode(
        @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
    if (in.readableBytes() < frameLength) {
        return null;
    } else {
        return in.readRetainedSlice(frameLength);
    }
}

MessageToMessageDecoder解码基类

  • MessageToMeassageDecoder继承ChannelInboundHandlerAdapter,实现了其ChannelRead()方法并调用decode()方法进行自定义解码
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    CodecOutputList out = CodecOutputList.newInstance();
    try {
        if (acceptInboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            I cast = (I) msg;
            try {
                decode(ctx, cast, out);
            } finally {
                ReferenceCountUtil.release(cast);
            }
        } else {
            out.add(msg);
        }
    } catch (DecoderException e) {
        throw e;
    } catch (Exception e) {
        throw new DecoderException(e);
    } finally {
        int size = out.size();
        for (int i = 0; i < size; i ++) {
            ctx.fireChannelRead(out.getUnsafe(i));
        }
        out.recycle();
    }
}

StringDecoder字符解码器

  • StringDecoder解码器将字节转转换成字符
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
    out.add(msg.toString(charset));
}

编码器

  • 解码器用来将程序的输出数据按照特定的格式转换成字节流,编码器基类包括MessageToByteEncoder和MessageToMessageEncoder,这两个基类都继承了ChannelOutboundHandlerAdapter,都实现了write()方法用于将特定格式的数据转换成字节流;

  • 用户自定义的编码器都需继承MessageToMessageEncoder或MessageToByteEncoder这两种基类,实现其encode()方法完成自定义的解码规则;

  • MessageToMessageEncoder和MessageToByteEncoder的区别:

    • MessageToByteEncoder用来输入的特定格式的消息转换成字节流

    • MessageToMessageEncoder用来将一种特定格式的消息转换成另外一种特定格式的消息;

MessageToByteEncoder

  • MessageToByteEncoder调用ecode()自定义编码方法,用于将特定类型的消息转换成字节流输出到Socket通道中;
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        if (acceptOutboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            I cast = (I) msg;
            buf = allocateBuffer(ctx, cast, preferDirect);
            try {
                encode(ctx, cast, buf);
            } finally {
                ReferenceCountUtil.release(cast);
            }

            if (buf.isReadable()) {
                ctx.write(buf, promise);
            } else {
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        if (buf != null) {
            buf.release();
        }
    }
}

MessageToMessageEncoder

  • MessageToMessageEncoder调用encode()方法,用来将特定格式的消息转换成另外格式的消息输出到Socket通道中
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    CodecOutputList out = null;
    try {
        if (acceptOutboundMessage(msg)) {
            out = CodecOutputList.newInstance();
            @SuppressWarnings("unchecked")
            I cast = (I) msg;
            try {
                encode(ctx, cast, out);
            } finally {
                ReferenceCountUtil.release(cast);
            }

            if (out.isEmpty()) {
                out.recycle();
                out = null;

                throw new EncoderException(
                        StringUtil.simpleClassName(this) + " must produce at least one message.");
            }
        } else {
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable t) {
        throw new EncoderException(t);
    }
}

StringEncoder编码器

  • StringEncoder编码器继承MessageToMessageEncoder,将字符串格式消息转换成字节格式消息输出到Socket通道中;
protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
    if (msg.length() == 0) {
        return;
    }

    out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));
}

总结

  • Codec编解码的基础是四个基础类MessageToByteDecoder,MessageToMessageDecoder,MessageToByteEncoder和MessageToMessageEncoder,自定义数据处理的编解码类继承这几个基础类自定义实现decode()方法或encoder()方法;

版权声明:本文为u013967175原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。