Netty学习(三)—Codec编解码基础
Codec框架无论是decoder还是encoder本质上都是ChannelHandler处理器,用来将字节转换成基本数据类型或者将基本数据类型转换成字节;
个人主页:tuzhenyu’s page
原文地址:Netty学习(三)—Codec编解码基础
解码器
解码器用来将输入数据流按照特定的格式转换成目标程序格式,解码器的基础类包括ByteToMessageDecoder和MessageToMessageDecoder两种,这两个类都继承了ChannelInboundHandlerAdapter类,实现了自定义的ChannelRead()方法用于对输入字节流进行处理;
用户自定义特定作用的解码器都需继承ByteToMessageDecoder或MessageToMessageDecoder,实现其自定的解码方法decode()即可;
ByteToMessageDecoder和MessageToMessageDecoder区别:
ByteToMessageDecoder解码基类用于将特定数量的字节转换成特定格式的消息,比如LineBasedFrameDecoder,DelimiterBasedFrameDecoder,FixedLengthFrameDecoder和LengthFieldBasedFrameDecoder等都是继承ByteToMessageDecoder基类,实现其decode()方法;
MessageToMessageDecoder解码基类是用来将一个消息类型转换成另外一种消息格式,例如StringDecoder继承MessageToMessageDecoder将字节类型转换成字符类型;
ByteToMessageDecoder解码基类
- ByteToMessageDecoder继承了ChannelInboundHandlerAdapter,实现了ChannelRead()方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
- 在ChannelRead()中将数据流读入到ByteBuf中,调用自定义实现的decode()方法对ByteBuf进行处理;
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
decode(ctx, in, out); //调用子类自定义decode()方法进行自定义解码
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}
LineBasedFrameDecoder换行符解码器
- 查找ByteBuf中的换行符”\n”,根据换行符对ByteBuf进行分割
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
final int eol = findEndOfLine(buffer);
if (!discarding) {
if (eol >= 0) {
final ByteBuf frame;
final int length = eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
if (length > maxLength) {
buffer.readerIndex(eol + delimLength);
fail(ctx, length);
return null;
}
if (stripDelimiter) {
frame = buffer.readRetainedSlice(length);
buffer.skipBytes(delimLength);
} else {
frame = buffer.readRetainedSlice(length + delimLength);
}
return frame;
} else {
final int length = buffer.readableBytes();
if (length > maxLength) {
discardedBytes = length;
buffer.readerIndex(buffer.writerIndex());
discarding = true;
if (failFast) {
fail(ctx, "over " + discardedBytes);
}
}
return null;
}
} else {
if (eol >= 0) {
final int length = discardedBytes + eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
buffer.readerIndex(eol + delimLength);
discardedBytes = 0;
discarding = false;
if (!failFast) {
fail(ctx, length);
}
} else {
discardedBytes += buffer.readableBytes();
buffer.readerIndex(buffer.writerIndex());
}
return null;
}
}
FixedLengthFrameDecoder换行符解码器
- 将ByteBuf中的数据按照特定长度进行分割
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < frameLength) {
return null;
} else {
return in.readRetainedSlice(frameLength);
}
}
MessageToMessageDecoder解码基类
- MessageToMeassageDecoder继承ChannelInboundHandlerAdapter,实现了其ChannelRead()方法并调用decode()方法进行自定义解码
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
CodecOutputList out = CodecOutputList.newInstance();
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
decode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(cast);
}
} else {
out.add(msg);
}
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
int size = out.size();
for (int i = 0; i < size; i ++) {
ctx.fireChannelRead(out.getUnsafe(i));
}
out.recycle();
}
}
StringDecoder字符解码器
- StringDecoder解码器将字节转转换成字符
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
out.add(msg.toString(charset));
}
编码器
解码器用来将程序的输出数据按照特定的格式转换成字节流,编码器基类包括MessageToByteEncoder和MessageToMessageEncoder,这两个基类都继承了ChannelOutboundHandlerAdapter,都实现了write()方法用于将特定格式的数据转换成字节流;
用户自定义的编码器都需继承MessageToMessageEncoder或MessageToByteEncoder这两种基类,实现其encode()方法完成自定义的解码规则;
MessageToMessageEncoder和MessageToByteEncoder的区别:
MessageToByteEncoder用来输入的特定格式的消息转换成字节流
MessageToMessageEncoder用来将一种特定格式的消息转换成另外一种特定格式的消息;
MessageToByteEncoder
- MessageToByteEncoder调用ecode()自定义编码方法,用于将特定类型的消息转换成字节流输出到Socket通道中;
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
buf = allocateBuffer(ctx, cast, preferDirect);
try {
encode(ctx, cast, buf);
} finally {
ReferenceCountUtil.release(cast);
}
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
if (buf != null) {
buf.release();
}
}
}
MessageToMessageEncoder
- MessageToMessageEncoder调用encode()方法,用来将特定格式的消息转换成另外格式的消息输出到Socket通道中
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
CodecOutputList out = null;
try {
if (acceptOutboundMessage(msg)) {
out = CodecOutputList.newInstance();
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
encode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(cast);
}
if (out.isEmpty()) {
out.recycle();
out = null;
throw new EncoderException(
StringUtil.simpleClassName(this) + " must produce at least one message.");
}
} else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable t) {
throw new EncoderException(t);
}
}
StringEncoder编码器
- StringEncoder编码器继承MessageToMessageEncoder,将字符串格式消息转换成字节格式消息输出到Socket通道中;
protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
if (msg.length() == 0) {
return;
}
out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));
}
总结
- Codec编解码的基础是四个基础类MessageToByteDecoder,MessageToMessageDecoder,MessageToByteEncoder和MessageToMessageEncoder,自定义数据处理的编解码类继承这几个基础类自定义实现decode()方法或encoder()方法;