Netty框架入门(一) Netty框架简介

一.IO与NIO

传送门:https://blog.csdn.net/qq_40772692/article/details/100302360

二.Netty框架简介

1.什么是Netty

Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。

操作系统底层是支持异步I/O通信的,但是java传统的IO并没有提供异步IO通信的类库,导致java通信长时间都采用了同步阻塞模式(BIO),因此很多大型服务器大多都采用的c/c++开发,这才导致了java NIO的出现,但是NIO开发复杂,存在漏洞而且是同步非阻塞IO。简而言之:Netty相当于是对NIO的封装和优化而来的框架,是一个完全异步非阻塞IO框架, 通过主从Reactor模型来实现的。可以让我们忽略底层复杂的实现,更快速高效的开发出健壮稳定的通信网络应用。

2.Netty框架开发环境搭建

(1)官网下载netty4的最新版jar包:https://netty.io/

(2)新建普通Java 工程,起名为NettyTest,并新建文件夹lib用于导入jar包

(3)导入下载完毕的jar包,注意,这里下载的jar包分为两类,一类是为了方便学习和使用所提供的netty所有组件单独的jar包;另一类是包含所有组件的综合jar包:在all-in-one文件夹下的netty-all-4.1.39.Final.jar这个jar包,我们导入这个即可。

导入方法:将jar包复制到lib文件夹下,但是此时还不能使用。右击netty-all-4.1.39.Final.jar,点击Build Path->add to Build Path,之后会出现如下目录

环境部署完成,可以书写Netty程序了

三.Netty的核心组件

1.EventLoop和EventLoopGroup

EventLoop可以看作是一个Reactor线程,其内部封装了select,用来对注册事件进行轮询来分离事件,并且充当分发(Dispatcher)的角色,将事件分配给相应的一组handler。而EventLoopGroup是EventLoop的数组,即Reactor线程池,内部维护了一组EventLoop线程。

(1)服务器端:在 Netty 服务器端编程中我们需要 BossEventLoopGroup 和WorkerEventLoopGroup 两个 EventLoopGroup 来进行工作。BossEventLoopGroup 通常是一个单线程的 EventLoop,即里面只有一个EventLoop来处理连接,EventLoop 维护着一个注册了 ServerSocketChannel 的 Selector 实例,EventLoop 的实现涵盖 IO 事件的分离和分发(Dispatcher)

  • BossEventLoopGroup 只负责处理连接,故开销非常小,通常只有一个EventLoop,连接到来,马上按照策略将 SocketChannel 转发给 WorkerEventLoopGroup。
  • WorkerEventLoopGroup 会由 next 随机选择其中一个 EventLoop 来将这 个SocketChannel 注册到其维护的 Selector 并对其后续的 IO 事件进行专门处理。

(2)客户端:通常只有一个EventLoopGroup来发起连接处理IO操作

(3)小结

  • NioEventLoopGroup 实际上就是个线程池,一个 EventLoopGroup 包含一个或者多个 EventLoop;
  • 一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;
  • 所有有 EnventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理;
  • 一个 Channel 在它的生命周期内只注册于一个 EventLoop;
  • 每一个 EventLoop 负责处理一个或多个 Channel;

(4)创建代码实示例

 EventLoopGroup bossGroup = new NioEventLoopGroup();//接收请求交给worker
 EventLoopGroup workerGroup = new NioEventLoopGroup();//处理事件

2.ServerBootstrap和Bootstrap

ServerBootstrap是服务器端的辅助启动类,Bootstrap是客户端的辅助启动类。一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类,启动netty程序,配置连接参数等作用。

ServerBootstrap serverBootstrap = new ServerBootstrap();
			 serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
			 .childHandler(new MyServerHandlerInitializer());
//服务器端添加线程组,NioServerSocketChannel通过反射机制创建类来监听端口,添加对每个连接channel的事件处理的handler处理器

3.ChannelFuture与Listener监听器

(1)功能:Netty 为异步非阻塞的框架,即所有的 I/O 操作都为异步的即立即返回的,因此,我们不能立刻得知操作的结果和状态。因此Netty 提供了 ChannelFuture ,通过ChannelFuture的 addListener() 方法注册一个 ChannelFutureListener监听器,当操作执行成功或者失败时即操作完成时,监听器就会自动回调返回结果通知线程。

(2)状态:ChannelFuture有两种状态:completed和uncompleted。当一个IO操作开始时,会创建一个新的ChannelFuture,此时他处于一个uncompleted状态,当IO操作完成时,ChannelFuture就会被设置为completed,同时回调ChannelFutureListener中的operationComplete方法,此时他的完成状态有三种,如下图所示:

(3)示例代码

  • ChannelFutureListener提供了一些封装好的Listener,比如ChannelFutureListener.CLOSE该监听器会在操作完成后关闭相应的channel通道:
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);//监听ctx的发送写入情况,写入完成后调用listener自动关闭通道
  • 自定义Listener:
future.addListener(new ChannelFutureListener() {
				
   @Override
   public void operationComplete(ChannelFuture arg0) throws Exception {
					// TODO Auto-generated method stub
		if(arg0.isSuccess()) {//successful
						//do..
		}else if(arg0.cause()!=null) {//fail
						//do..
		}else if(arg0.isCancelled()) {//cancel
						//do..
		}
	}
});

4.ChannelHandler与ChannelPipline

(1)ChannelHandler:Netty 的主要组件是ChannelHandler,它充当了所有处理入站和出站数据的应用程序逻辑的容器。负责对到来的IO事件或IO操作选择性的进行拦截和处理,一个事件可以由多个handler处理。handler被分为两大类分别是入站inBoundHandler和出站outBoundHandler分别来处理入站和出站事件

(2)ChannelPipline:一个channel对应一个channelPipline(管道),channelPipline实际上是维护handler的一个双向链表,是handler的容器,一个channel可以有多个handler处理,对应的channelPipline和channelhandler之间有一个ChannelHandlerContext(上下文)类来影射和联系起来。关系图如下:

(3)注意事项

  • ChannelInboundHandler之间的数据传递,需要通过调用 ctx.fireChannelRead(msg) 实现当前数据传递到下一个inboundhandler;
  • 调用ctx.writeAndflush()会从当前handler位置向前寻找下一个outhandler;调用ctx.channel().writeAndflush()会从pipline尾部往前寻找outhandler输出

(4)消息的处理流程

  1. 消息到来时,由select()进行事件通知,触发底层的socketChannel.read()读取ByteBuf,触发ChannelRead事件,由IO线程EventLoop调用相应的ChannelPipline中的fireChannelRead()方法,将消息传输到ChannelPipline中
  2. 消息依次被handler1,handler2...拦截和处理,在这个过程中任何一个handler都可以中断消息的传递
  3. 消息处理完后调用write方法,消息经过多个outhandler处理添加到缓冲区BuyeBuf中,等待socketChannel.write()发送 

(5)使用代码 

/*ChannelHandler初始化器*/
public class MyServerHandlerInitializer extends ChannelInitializer<SocketChannel>{

	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		ChannelPipeline pipeline = ch.pipeline();
		pipeline.addLast(new MyServerHandler());
		pipeline.addLast(new MyServerHandlerNext());
	}
         
}

5.监听端口和关闭服务

(1)同步退出方式

我们常见的Netty Demo服务器端都是这么写的,其中sync()是同步方法

  • bind(8899).sync();表示服务器端同步阻塞等待服务器端绑定监听端口成功再往下执行
  • closeFuture().sync();表示添加一个连接关闭监听器,同步阻塞等待连接关闭后再执行优雅退出释放资源

Demo这么写是因为,开启服务器的是当前主线程,执行监听端口和处理IO的是子线程,而线程之间是独立的,主线程执行完毕就会结束。为了方便调试,使用sync()来阻塞主线程等待子线程结束,而不是提前结束。这样使用的话,客户端可以主动向服务器发送消息,但是服务器主线程由于阻塞无法执行别的操作。

 ChannelFuture future = serverBootstrap.bind(8899).sync();//同步阻塞直到完成
  future.channel().closeFuture().sync();


bossGroup.shutdownGracefully();//优雅关闭
workerGroup.shutdownGracefully();

(2)异步退出方式

采用异步方式这样可以不阻塞主线程

			//绑定端口 同步等待成功
            ChannelFuture future=b.bind(port).sync();

            //采用非同步方法退出netty 通过异步的方法不会被阻塞
            future.channel().closeFuture().addListener(new ChannelFutureListener() {
				@Override
				public void operationComplete(ChannelFuture future) throws Exception {
					// TODO Auto-generated method stub
					//释放资源退出
					System.out.println(future.channel().toString()+" 链路关闭");
		            bossGroup.shutdownGracefully();
		            workerGroup.shutdownGracefully();
				}
			});
        } catch (InterruptedException e) {
            e.printStackTrace();
        } 

 

6 ByteBuf解析

(1) Netty ByteBuf是对Java NIO的ByteBuffer的一个优化,ByteBuffer有很多缺点,比如:

  • ByteBuffer长度固定,不能动态扩展,容易造成越界错误
  • ByteBuffer只有一个指针,需要通过flip()变换读写,容易出错和混乱
  • ByteBuffer API较少,功能不全

   因此Netty优化封装了ByteBuf,ByteBuf提供双指针readIndex和writeIndex来标记读写位置,同时ByteBuf在写入数据之前会自动进行判断,若剩余空间不足则会进行动态拓展。极大地提高了开发的便捷性。其示意图如下所示:

  • readerIndex,0到readerindex这里就是不读的数据,也就是抛弃的数据;从readerIndex开始读数据。
  • writerIndex,readerIndex到writerIndex这里就是没有读的数据,也就是可读的内容;从writeIndex开始写数据。
  • capacity,writerIndex到capacity这段数据就是我们可以往里面写的空间
  • maxCapacity,其实这里应该还有个maxCapacity(可以看做是在capacity后面),capacity到maxCapacity这里,是这个Byte还可以扩展的空间。
     

 (2)用法介绍(API:https://netty.io/4.1/api/

  •    其常用API如下所示:

(1)基础读写操作(支持随机访问)

  • readXXX()和writeXXX()方法将会推进其对应的索引readerIndex和writerIndex。自动推进
  • getXXX()和setXXX()方法用于访问数据,对writerIndex和readerIndex无影响

(2)slice,slice(int,int)切片操作

  • 返回原始ByteBuf可读字节的一部分, 他们共享这部分内容,修改返回的缓冲区或此缓冲区的内容会影响彼此的内容,他们维护单独的index和makers,此方法不会修改原始缓冲区的readerIndex或writerIndex。

(3)ByteBuf初始化

 Netty提供了一个简单的成为Unpooled的工具类,它提供了静态的辅助方法来创建未池化的ByteBuf实例。

buffer()
buffer(int initialCapacity)
buffer(int initialCapacity, int maxCapacity)

返回一个未池化的基于堆内存存储的ByteBuf 

wrappedBuffer()返回了一个包装了给定数据的ByteBuf
copiedBuffer()返回了一个复制了给定数据的ByteBuf

7.简单服务器和客户端通信实例

(1)MyServer

package com.wx.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class MyServer {
	 public static void main(String []args) {
		 EventLoopGroup bossGroup = new NioEventLoopGroup();
		 EventLoopGroup workerGroup = new NioEventLoopGroup();
		 
		 try {
			 ServerBootstrap serverBootstrap = new ServerBootstrap();
			 serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
			 .childHandler(new MyServerHandlerInitializer());
			 
			 ChannelFuture future = serverBootstrap.bind(8899).sync();
			 future.channel().closeFuture().sync();
			 
		 } catch (InterruptedException e) {
			e.printStackTrace();
		}finally {
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	 }
}

(2)MyServerHandlerInitializer初始化器

package com.wx.netty;



import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class MyServerHandlerInitializer extends ChannelInitializer<SocketChannel>{

	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		ChannelPipeline pipeline = ch.pipeline();
		pipeline.addLast(new MyServerHandler());
		pipeline.addLast(new MyServerHandlerNext());
	}
         
}

(3)MyServerHandler处理器1

package com.wx.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;

public class MyServerHandler extends ChannelInboundHandlerAdapter{
    @Override
	public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    	System.out.println("handler1 is added.");
	}


	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("handler1 make channel active.");
	}
    

	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("handler1 make channel inactive.");
	}
	
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		ByteBuf buf = (ByteBuf) msg;//消息都是存入缓冲区中
        byte[] reg = new byte[buf.readableBytes()];//开辟一个buf中可读字节数大小的byte数组
        buf.readBytes(reg);//将buf中的readIndex-writeIndex内的可读内容读入byte数组
        String body = new String(reg, "UTF-8");//解码为字符串
        System.out.println("The server receive  order : " + body);
 
        String respMsg = body.toUpperCase();
        ByteBuf respByteBuf = Unpooled.copiedBuffer(respMsg.getBytes());//创建缓冲区
        //ctx.writeAndFlush(buf);
        ctx.fireChannelRead(respByteBuf);//发送给下一个InboundHandler处理
	}

 
	
	@Override
	public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
		System.out.println("handler1 make channel registered.");
	}

	@Override
	public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
		System.out.println("handler1 make channel unregistered.");
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		System.out.println(cause.toString());
		ctx.close();
	}


}


(4)MyServerHandlerNext处理器2

package com.wx.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;

public class MyServerHandlerNext extends ChannelInboundHandlerAdapter{
    @Override
	public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    	System.out.println("handler2 is added.");
	}//handler被添加到通道时调用


	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("handler2 make channel active.");
	}//handler激活时调用
    

	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("handler2 make channel inactive.");
	}
	
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//有消息传来时调用,ctx是一个联系handler与相应pipline的组件上下文
		ByteBuf buf = (ByteBuf) msg;
        byte[] reg = new byte[buf.readableBytes()];
        buf.readBytes(reg);
        String body = new String(reg, "UTF-8");
        System.out.println("The server sended : " + body);
        ByteBuf respByteBuf = Unpooled.copiedBuffer(body.getBytes());
        
        ctx.writeAndFlush(respByteBuf);//发送回客户端
	}


	@Override
	public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
		System.out.println("handler2 make channel registered.");
	}

	@Override
	public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
		System.out.println("handler2 make channel unregistered.");
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.close();
	}

}



(5)MyClient客户端

package com.wx.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class MyClient {
    public static void main(String[] args) {
		EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
		
		try {
			Bootstrap bootstrap = new Bootstrap();
			bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
			.handler(new MyClintInitializer());
			
			ChannelFuture future = bootstrap.connect("127.0.0.1",8899).sync();
			future.channel().closeFuture().sync();
			
		} catch (InterruptedException e) {
			e.printStackTrace();
		}finally {
			eventLoopGroup.shutdownGracefully();
		}
	}
}

(6)客户端初始化器MyClintInitializer

package com.wx.netty;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class MyClintInitializer extends ChannelInitializer<SocketChannel>{

	@Override
	protected void initChannel(SocketChannel arg0) throws Exception {
		ChannelPipeline pipeline = arg0.pipeline();
		pipeline.addLast(new MyClientHandler());
	}

}

(7)客户端MyClientHandler处理器

package com.wx.netty;

import java.util.Scanner;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class MyClientHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		// String mString = "a small character";
		Scanner scanner = new Scanner(System.in);
		String mString = scanner.nextLine();
		ByteBuf respByteBuf = Unpooled.copiedBuffer(mString.getBytes());
		ctx.writeAndFlush(respByteBuf);
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		System.out.println("client read");
		ByteBuf buf = (ByteBuf) msg;
		byte[] reg = new byte[buf.readableBytes()];
		buf.readBytes(reg);
		String body = new String(reg, "UTF-8");
		System.out.println("Client received message: " + body);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		// TODO Auto-generated method stub
		super.exceptionCaught(ctx, cause);
	}

}

(8)运行结果


版权声明:本文为qq_40772692原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。