NettyReactor模型和Proactor模型

Reactor模型三个角色:

Reactor:Reactor主要是用来监听事件的,无论accept事件还是read事件,至于它到底监听什么事件根据模 型来决定。

Acceptor:就是Reactor接收到事件后交给acceptor,然后接收到把得到的read事件负责交个handler处理。

Handlers:用来处理网络IO事件,处理IO操作。

一.单Reactor单线程模型

package com.lx.netty.reactor.single;

import java.io.IOException;

public class SingleMain {

    //单Reactor单线程模型  Reactor是应用层的模型  poll是操作系统底层的多路复用模型(内存)
    public static void main(String[] args) throws IOException {

        new Thread(new Reactor(8080,"single-main")).start();

    }
}
package com.lx.netty.reactor.single;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Reactor implements Runnable {

    Selector selector;

    public Reactor(int port, String threadName) throws IOException {
        selector = Selector.open();

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT,new Acceptor(selector,serverSocketChannel));
    }

    public void run() {
        // 查看线程是否被中断过,只要没有中断,就一直等待客户端过来
        while (!Thread.interrupted()){
            try {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterable = selectionKeys.iterator();
                while (iterable.hasNext()){
                    dispatch(iterable.next());
                    iterable.remove();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void dispatch(SelectionKey selectionKey){
        // 如果是accept,这里的runnable就是Acceptor
        // 如果是read事件,这里的runnable就是handler
        Runnable runnable = (Runnable)selectionKey.attachment();
        if(runnable!=null){
            runnable.run();
        }
    }
}
package com.lx.netty.reactor.single;

import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class Acceptor implements Runnable{

    private final Selector selector;
    private final ServerSocketChannel serverSocketChannel;


    public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
        this.selector = selector;
        this.serverSocketChannel = serverSocketChannel;

    }

    public void run() {
        try {
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            socketChannel.register(selector,SelectionKey.OP_READ,new Handler(socketChannel));
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}
package com.lx.netty.reactor.single;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class Handler implements Runnable{

    SocketChannel socketChannel;

    public Handler(SocketChannel socketChannel) {

        this.socketChannel = socketChannel;
    }

    public void run() {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        int length=0,total =0;
        String message = "";
        try {
            do {
                length = socketChannel.read(byteBuffer);
                message+= new String(byteBuffer.array());
                System.out.println(length);
                // 就是判断数据是否有没有读完
            }while (length > byteBuffer.capacity());
            System.out.println(message);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

二.单Reactor多线程模型

package com.lx.netty.reactor.mult;

import java.io.IOException;

public class MutilMain {

    //单Reactor多线程模式
    public static void main(String[] args) throws IOException {

        new Thread(new MutilReactor(8080,"single-main")).start();

    }
}
package com.lx.netty.reactor.mult;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;

public class MutilReactor implements Runnable {

    Selector selector;

    public MutilReactor(int port, String threadName) throws IOException {
        selector = Selector.open();

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT,new MutilAcceptor(selector,serverSocketChannel));
    }

    public void run() {
        // 查看线程是否被中断过,只要没有中断,就一直等待客户端过来
        while (!Thread.interrupted()){
            try {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterable = selectionKeys.iterator();
                while (iterable.hasNext()){
                    dispatch(iterable.next());
                    iterable.remove();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void dispatch(SelectionKey selectionKey){
        // 如果是accept,这里的runnable就是Acceptor
        // 如果是read事件,这里的runnable就是handler
        Runnable runnable = (Runnable)selectionKey.attachment();
        if(runnable!=null){
            runnable.run();
        }
    }
}
package com.lx.netty.reactor.mult;

import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class MutilAcceptor implements Runnable{

    private final Selector selector;
    private final ServerSocketChannel serverSocketChannel;


    public MutilAcceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
        this.selector = selector;
        this.serverSocketChannel = serverSocketChannel;

    }

    public void run() {
        try {
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            socketChannel.register(selector,SelectionKey.OP_READ,new MutilDispatchHandler(socketChannel));
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}
package com.lx.netty.reactor.mult;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class MutilDispatchHandler  implements Runnable{

    SocketChannel socketChannel;

    private Executor executor = Executors.newCachedThreadPool();

    public MutilDispatchHandler(SocketChannel socketChannel){
        this.socketChannel = socketChannel;
    }

    public void run() {
        executor.execute(new ReaderHandler(socketChannel));
    }


    static class ReaderHandler implements  Runnable{
        SocketChannel socketChannel;

        public ReaderHandler(SocketChannel socketChannel){
            this.socketChannel = socketChannel;
        }

        public void run() {
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

            int length=0,total =0;
            String message = "";
            try {
                do {
                    length = socketChannel.read(byteBuffer);
                    message+= new String(byteBuffer.array());
                    //System.out.println(length);
                    // 就是判断数据是否有没有读完
                }while (length > byteBuffer.capacity());
                System.out.println(message);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

三.主从Reactor多线程模型

package com.lx.netty.reactor.main;

import java.io.IOException;

public class MainReactorThread {

    //主从Reactor多线程模型
    public static void main(String[] args) throws IOException {

        new Thread(new MainReactor(8080),"Mian-Thread").start();
    }
}
package com.lx.netty.reactor.main;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;

public class MainReactor implements Runnable {

    private final Selector selector;
    private final ServerSocketChannel serverSocketChannel;
    public MainReactor(int port) throws IOException {

        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT,new MainAcceptor(serverSocketChannel));
    }

    @Override
    public void run() {
        // 查看线程是否被中断过,只要没有中断,就一直等待客户端过来
        while (!Thread.interrupted()){
            try {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterable = selectionKeys.iterator();
                while (iterable.hasNext()){
                    dispatch(iterable.next());
                    iterable.remove();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }


    private void dispatch(SelectionKey selectionKey){
        // 如果是accept,这里的runnable就是Acceptor
        // 如果是read事件,这里的runnable就是handler
        Runnable runnable = (Runnable)selectionKey.attachment();
        if(runnable!=null){
            runnable.run();
        }
    }
}
package com.lx.netty.reactor.main;

import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;

public class SubReactor implements Runnable{

    Selector selector;

    public SubReactor(Selector selector){
        this.selector = selector;
    }


    @Override
    public void run() {

        while (true){
            try {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    dispatch(key);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void dispatch(SelectionKey selectionKey){
        // 如果是accept,这里的runnable就是Acceptor
        // 如果是read事件,这里的runnable就是handler
        Runnable runnable = (Runnable)selectionKey.attachment();
        if(runnable!=null){
            runnable.run();
        }
    }
}
package com.lx.netty.reactor.main;

import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class MainAcceptor implements Runnable{

    private ServerSocketChannel serverSocketChannel;

//    private AtomicInteger index = new AtomicInteger();
    private int index=0;
    Selector[] selectors = new Selector[Runtime.getRuntime().availableProcessors()*2];
    SubReactor[] subReactors = new SubReactor[Runtime.getRuntime().availableProcessors()*2];

    Thread[] threads = new Thread[Runtime.getRuntime().availableProcessors()*2];

    public MainAcceptor(ServerSocketChannel serverSocketChannel) throws IOException {

        // 初始化服务员==》 SubReactor==>selector
        // SubReactor会被线程启动
        this.serverSocketChannel = serverSocketChannel;
        for(int i = 0;i<Runtime.getRuntime().availableProcessors()*2;i++){
            // 每一个服务员都有个铃铛
            selectors[i] = Selector.open();
            subReactors[i] = new SubReactor(selectors[i]);
            threads[i] = new Thread(subReactors[i]);
            // 每一个服务员都像线程一样启动
            threads[i].start();
        }
    }

    @Override
    public void run() {
        try {
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.configureBlocking(false);

            // 唤醒阻塞的selector
            selectors[index].wakeup();
            // 唤醒的是第一个服务员,然后注册read事件,交给服务员处理
            socketChannel.register( selectors[index], SelectionKey.OP_READ,new WorkerHandler(socketChannel));

            // 服务员处理了后,下次就给下一个服务员,知道所有服务员都接待了客人,回到第一服务员
            if(++index == threads.length){
                index = 0;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
package com.lx.netty.reactor.main;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;

public class WorkerHandler implements Runnable{

    private SocketChannel socketChannel;


    public WorkerHandler(SocketChannel socketChannel){
        this.socketChannel = socketChannel;

    }
    @Override
    public void run() {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

        try {
            socketChannel.read(byteBuffer);
            String message = new String(byteBuffer.array(), StandardCharsets.UTF_8);
            System.out.println(socketChannel.getRemoteAddress()+":" +message);
            socketChannel.write(ByteBuffer.wrap("消息收到了".getBytes(StandardCharsets.UTF_8)));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

四.Proactor模型

package com.lx.netty.proactor;

import java.io.IOException;

public class ProactorMain {

    //Proactor模型
    public static void main(String[] args) throws IOException {

        new Thread(new AIOProactor(8080),"Main-Thread").start();
    }
}
package com.lx.netty.proactor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;

public class AIOProactor implements Runnable {

    public CountDownLatch latch;

    public AsynchronousServerSocketChannel serverSocketChannel;

    public AIOProactor(int prot) throws IOException {
        serverSocketChannel = AsynchronousServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(prot));
    }

    @Override
    public void run() {

        latch = new CountDownLatch(1);
        serverSocketChannel.accept(this,new AIOAcceptorHandler());
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
package com.lx.netty.proactor;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AIOAcceptorHandler implements CompletionHandler<AsynchronousSocketChannel, AIOProactor> {

    @Override
    public void completed(AsynchronousSocketChannel channel, AIOProactor serverhanlder){
        // 每接收一个连接之后,再执行一次异步连接请求,这样就能一直处理多个连接
        serverhanlder.serverSocketChannel.accept(serverhanlder,this);
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        // 异步读取
        channel.read(byteBuffer,byteBuffer,new ReadHandler(channel));
    }

    @Override
    public void failed(Throwable exc, AIOProactor attachment) {
        attachment.latch.countDown();
    }
}
package com.lx.netty.proactor;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;

public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {

    private AsynchronousSocketChannel socketChannel;

    public ReadHandler(AsynchronousSocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }

    /**
     * 读取到消息后处理
     *
     * @param result
     * @param attachment
     */
    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        attachment.flip();
        byte[] message = new byte[attachment.remaining()];
        attachment.get(message);

        String str = new String(message, StandardCharsets.UTF_8);
        System.out.println("收到客户端的信息:"+str);
        //向客户端发送消息
        String clientStr = "收到客户端的消息!!!";
        doWrite(clientStr);

    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {

    }

    //发送消息
    private void doWrite(String result) {
        byte[] bytes = result.getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        writeBuffer.put(bytes);
        writeBuffer.flip();

        //异步写数据 参数与前面的read一样
        //将回调接口作为内部类实现了
        socketChannel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer buffer) {
                //如果没有发送完,就继续发送直到完成
                if (buffer.hasRemaining()) {
                    socketChannel.write(buffer, buffer, this);
                } else {
                    //创建新的Buffer
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    //异步读 第三个参数为接收消息回调的业务Handler
                    socketChannel.read(readBuffer, readBuffer, new ReadHandler(socketChannel));

                }
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                try {
                    socketChannel.close();
                } catch (IOException e) {
                }
            }

        });

    }
}


版权声明:本文为qq_33816292原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。