Reactor模型三个角色:
Reactor:Reactor主要是用来监听事件的,无论accept事件还是read事件,至于它到底监听什么事件根据模 型来决定。
Acceptor:就是Reactor接收到事件后交给acceptor,然后接收到把得到的read事件负责交个handler处理。
Handlers:用来处理网络IO事件,处理IO操作。
一.单Reactor单线程模型

package com.lx.netty.reactor.single;
import java.io.IOException;
public class SingleMain {
//单Reactor单线程模型 Reactor是应用层的模型 poll是操作系统底层的多路复用模型(内存)
public static void main(String[] args) throws IOException {
new Thread(new Reactor(8080,"single-main")).start();
}
}
package com.lx.netty.reactor.single;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;
public class Reactor implements Runnable {
Selector selector;
public Reactor(int port, String threadName) throws IOException {
selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT,new Acceptor(selector,serverSocketChannel));
}
public void run() {
// 查看线程是否被中断过,只要没有中断,就一直等待客户端过来
while (!Thread.interrupted()){
try {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterable = selectionKeys.iterator();
while (iterable.hasNext()){
dispatch(iterable.next());
iterable.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void dispatch(SelectionKey selectionKey){
// 如果是accept,这里的runnable就是Acceptor
// 如果是read事件,这里的runnable就是handler
Runnable runnable = (Runnable)selectionKey.attachment();
if(runnable!=null){
runnable.run();
}
}
}
package com.lx.netty.reactor.single;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class Acceptor implements Runnable{
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
this.selector = selector;
this.serverSocketChannel = serverSocketChannel;
}
public void run() {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ,new Handler(socketChannel));
} catch (IOException e) {
e.printStackTrace();
}
}
}
package com.lx.netty.reactor.single;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class Handler implements Runnable{
SocketChannel socketChannel;
public Handler(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
public void run() {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int length=0,total =0;
String message = "";
try {
do {
length = socketChannel.read(byteBuffer);
message+= new String(byteBuffer.array());
System.out.println(length);
// 就是判断数据是否有没有读完
}while (length > byteBuffer.capacity());
System.out.println(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
二.单Reactor多线程模型

package com.lx.netty.reactor.mult;
import java.io.IOException;
public class MutilMain {
//单Reactor多线程模式
public static void main(String[] args) throws IOException {
new Thread(new MutilReactor(8080,"single-main")).start();
}
}
package com.lx.netty.reactor.mult;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;
public class MutilReactor implements Runnable {
Selector selector;
public MutilReactor(int port, String threadName) throws IOException {
selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT,new MutilAcceptor(selector,serverSocketChannel));
}
public void run() {
// 查看线程是否被中断过,只要没有中断,就一直等待客户端过来
while (!Thread.interrupted()){
try {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterable = selectionKeys.iterator();
while (iterable.hasNext()){
dispatch(iterable.next());
iterable.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void dispatch(SelectionKey selectionKey){
// 如果是accept,这里的runnable就是Acceptor
// 如果是read事件,这里的runnable就是handler
Runnable runnable = (Runnable)selectionKey.attachment();
if(runnable!=null){
runnable.run();
}
}
}
package com.lx.netty.reactor.mult;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class MutilAcceptor implements Runnable{
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
public MutilAcceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
this.selector = selector;
this.serverSocketChannel = serverSocketChannel;
}
public void run() {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ,new MutilDispatchHandler(socketChannel));
} catch (IOException e) {
e.printStackTrace();
}
}
}
package com.lx.netty.reactor.mult;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class MutilDispatchHandler implements Runnable{
SocketChannel socketChannel;
private Executor executor = Executors.newCachedThreadPool();
public MutilDispatchHandler(SocketChannel socketChannel){
this.socketChannel = socketChannel;
}
public void run() {
executor.execute(new ReaderHandler(socketChannel));
}
static class ReaderHandler implements Runnable{
SocketChannel socketChannel;
public ReaderHandler(SocketChannel socketChannel){
this.socketChannel = socketChannel;
}
public void run() {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int length=0,total =0;
String message = "";
try {
do {
length = socketChannel.read(byteBuffer);
message+= new String(byteBuffer.array());
//System.out.println(length);
// 就是判断数据是否有没有读完
}while (length > byteBuffer.capacity());
System.out.println(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
三.主从Reactor多线程模型

package com.lx.netty.reactor.main;
import java.io.IOException;
public class MainReactorThread {
//主从Reactor多线程模型
public static void main(String[] args) throws IOException {
new Thread(new MainReactor(8080),"Mian-Thread").start();
}
}
package com.lx.netty.reactor.main;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;
public class MainReactor implements Runnable {
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
public MainReactor(int port) throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT,new MainAcceptor(serverSocketChannel));
}
@Override
public void run() {
// 查看线程是否被中断过,只要没有中断,就一直等待客户端过来
while (!Thread.interrupted()){
try {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterable = selectionKeys.iterator();
while (iterable.hasNext()){
dispatch(iterable.next());
iterable.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void dispatch(SelectionKey selectionKey){
// 如果是accept,这里的runnable就是Acceptor
// 如果是read事件,这里的runnable就是handler
Runnable runnable = (Runnable)selectionKey.attachment();
if(runnable!=null){
runnable.run();
}
}
}
package com.lx.netty.reactor.main;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
public class SubReactor implements Runnable{
Selector selector;
public SubReactor(Selector selector){
this.selector = selector;
}
@Override
public void run() {
while (true){
try {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
dispatch(key);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void dispatch(SelectionKey selectionKey){
// 如果是accept,这里的runnable就是Acceptor
// 如果是read事件,这里的runnable就是handler
Runnable runnable = (Runnable)selectionKey.attachment();
if(runnable!=null){
runnable.run();
}
}
}
package com.lx.netty.reactor.main;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class MainAcceptor implements Runnable{
private ServerSocketChannel serverSocketChannel;
// private AtomicInteger index = new AtomicInteger();
private int index=0;
Selector[] selectors = new Selector[Runtime.getRuntime().availableProcessors()*2];
SubReactor[] subReactors = new SubReactor[Runtime.getRuntime().availableProcessors()*2];
Thread[] threads = new Thread[Runtime.getRuntime().availableProcessors()*2];
public MainAcceptor(ServerSocketChannel serverSocketChannel) throws IOException {
// 初始化服务员==》 SubReactor==>selector
// SubReactor会被线程启动
this.serverSocketChannel = serverSocketChannel;
for(int i = 0;i<Runtime.getRuntime().availableProcessors()*2;i++){
// 每一个服务员都有个铃铛
selectors[i] = Selector.open();
subReactors[i] = new SubReactor(selectors[i]);
threads[i] = new Thread(subReactors[i]);
// 每一个服务员都像线程一样启动
threads[i].start();
}
}
@Override
public void run() {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
// 唤醒阻塞的selector
selectors[index].wakeup();
// 唤醒的是第一个服务员,然后注册read事件,交给服务员处理
socketChannel.register( selectors[index], SelectionKey.OP_READ,new WorkerHandler(socketChannel));
// 服务员处理了后,下次就给下一个服务员,知道所有服务员都接待了客人,回到第一服务员
if(++index == threads.length){
index = 0;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
package com.lx.netty.reactor.main;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
public class WorkerHandler implements Runnable{
private SocketChannel socketChannel;
public WorkerHandler(SocketChannel socketChannel){
this.socketChannel = socketChannel;
}
@Override
public void run() {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
try {
socketChannel.read(byteBuffer);
String message = new String(byteBuffer.array(), StandardCharsets.UTF_8);
System.out.println(socketChannel.getRemoteAddress()+":" +message);
socketChannel.write(ByteBuffer.wrap("消息收到了".getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
e.printStackTrace();
}
}
}
四.Proactor模型

package com.lx.netty.proactor;
import java.io.IOException;
public class ProactorMain {
//Proactor模型
public static void main(String[] args) throws IOException {
new Thread(new AIOProactor(8080),"Main-Thread").start();
}
}
package com.lx.netty.proactor;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;
public class AIOProactor implements Runnable {
public CountDownLatch latch;
public AsynchronousServerSocketChannel serverSocketChannel;
public AIOProactor(int prot) throws IOException {
serverSocketChannel = AsynchronousServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(prot));
}
@Override
public void run() {
latch = new CountDownLatch(1);
serverSocketChannel.accept(this,new AIOAcceptorHandler());
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package com.lx.netty.proactor;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class AIOAcceptorHandler implements CompletionHandler<AsynchronousSocketChannel, AIOProactor> {
@Override
public void completed(AsynchronousSocketChannel channel, AIOProactor serverhanlder){
// 每接收一个连接之后,再执行一次异步连接请求,这样就能一直处理多个连接
serverhanlder.serverSocketChannel.accept(serverhanlder,this);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 异步读取
channel.read(byteBuffer,byteBuffer,new ReadHandler(channel));
}
@Override
public void failed(Throwable exc, AIOProactor attachment) {
attachment.latch.countDown();
}
}
package com.lx.netty.proactor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel socketChannel;
public ReadHandler(AsynchronousSocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
/**
* 读取到消息后处理
*
* @param result
* @param attachment
*/
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
byte[] message = new byte[attachment.remaining()];
attachment.get(message);
String str = new String(message, StandardCharsets.UTF_8);
System.out.println("收到客户端的信息:"+str);
//向客户端发送消息
String clientStr = "收到客户端的消息!!!";
doWrite(clientStr);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
//发送消息
private void doWrite(String result) {
byte[] bytes = result.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
//异步写数据 参数与前面的read一样
//将回调接口作为内部类实现了
socketChannel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
//如果没有发送完,就继续发送直到完成
if (buffer.hasRemaining()) {
socketChannel.write(buffer, buffer, this);
} else {
//创建新的Buffer
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
//异步读 第三个参数为接收消息回调的业务Handler
socketChannel.read(readBuffer, readBuffer, new ReadHandler(socketChannel));
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
socketChannel.close();
} catch (IOException e) {
}
}
});
}
}
版权声明:本文为qq_33816292原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。