Create your Gitee Account
Explore and code with more than 6 million developers,Free private repositories !:)
Sign up
Clone or download
netty.md 41.84 KB
Copy Edit Web IDE Raw Blame History
liner123 authored 2020-09-04 12:23 . 2020-9-4

Netty笔记

IO分类:

1、Bio (blocking I/O)同步阻塞

2、Nio (no-bolcking I/O)同步非阻塞

3、Aio 异步非阻塞 (暂无多少应用)

BIO:

BIO工作原理

BIO代码

/**
 * @author hl
 * @Data 2020/8/28
 */
public class BioServer {

    public static void main(String[] args) throws Exception {
          ThreadPoolExecutor executorService = new ThreadPoolExecutor(10, 20, 30L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(50), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        ServerSocket serverSocket = new ServerSocket(6666);
        while (true) {
            System.out.println(Thread.currentThread().getId() + ":main:" + Thread.currentThread().getName());
            Socket socket = serverSocket.accept();
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    handler(socket);
                }
            });
        }

    }

    public static void handler(Socket socket) {
        InputStream inputStream = null;
        try {
            System.out.println(Thread.currentThread().getId() + ":handler:" + Thread.currentThread().getName());
            inputStream = socket.getInputStream();
            byte[] bytes = new byte[1024];
            while (true) {
                int read = inputStream.read(bytes);
                if (read != -1) {
                    System.out.println(new String(bytes, 0, read));
                } else {
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                inputStream.close();
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

BIO存在问题

NIO:

NIO概念

1、no-block I/O ,JDK1.4引入

2、三大核心部分: Channel (通道) ,Buffer (缓存器) ,Selector (选择器)

NIO 和 BIO区别

NIO工作原理

Selector 、 Channel 、 Buffer对应关系

1、 每个Channel对应一个Buffer。

2、 Selector 对应一个线程,一个线程对应多个Channel。

3、程序切换到哪个Channel由事件决定,Event很重要。

4、 Selector会根据不同事件,在各个通道切换。

5、 Buffer是一个内存块,底层是一个数组。

6、 数据的读取是通过Buffer,BIO中是输入/输出流。NIO的Buffer中可以双向切换使用 。intBuffer.flip()

7 、Channel 也是双向的,可以返回底层操作系统的情况,比如Linux,底层操作系统就是双向的。

Buffer:

Buffer的简单使用
// buffer的使用
// 创建一个容量为 5 的buffer
IntBuffer intBuffer = IntBuffer.allocate(5);
for (int i = 0; i < intBuffer.capacity(); i++) {
    intBuffer.put(i * 2);
}
// 读写切换 !!! --> Position = 0
intBuffer.flip();
while (intBuffer.hasRemaining()) {
    System.out.println(intBuffer.get());
}
Buffer子类

Buffer 四大属性
    private int mark = -1;  // 标记
    private int position = 0; // 数据指针
    private int limit;  // 最大限制 (可改)
    private int capacity; // 最大容量 (不可改)
Buffer Api

![](src\main\netty_piture\buffer API.png)

最常用的ByteBuffer Api
![](src\main\netty_piture\byteBuffer Api.png)

Channel:

Channel分类
// FileSocketChannel 用于文件数据读写
// DatagramChannel 用于UDP数据的读写
// ServerSocketChannel 用于TCP数据的读写
Channel创建流程
// 1、服务端 创建 ServerSocketChannel(ServerSocketChannelImpl)
// 2、客户端连接 ServerSocketChannel时创建一个SocketChannel(SocketChannelImpl)
// 3、通过 SocketChannel进行通信
FileChannel方法

需站在 Channel的角度看

read / write/ transferFrom/tranferTo

案例一 : 将hello,channel写入文件file01.txt
    /**
     * 将 hello,channel 写入 file01.txt文件
     */
    public static void writeChannel() throws Exception {
        String str = "hello,channel";
        FileOutputStream fileOutputStream = new FileOutputStream("F:\\file01.txt");
        FileChannel channel = fileOutputStream.getChannel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        byteBuffer.put(str.getBytes());
        byteBuffer.flip();
        // 将byteBuffer的数据写入channel
        channel.write(byteBuffer);
        fileOutputStream.close();
    }
案例二 :使用channel从文件读数据并写入控制台
 /**
   * 使用channel从文件读数据并写入控制台
   */
    public static void readChannel() throws Exception {
        File file = new File("F:\\file01.txt");
        FileInputStream fileInputStream = new FileInputStream(file);
        FileChannel channel = fileInputStream.getChannel();
        ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());
        // 将数据从通道读到缓冲区
        channel.read(byteBuffer);
        // 将字节转为字符串
        System.out.println(new String(byteBuffer.array()));
        fileInputStream.close();
    }
案例三: 使用一个bufferfile进行拷贝
    /**
     * 使用一个buffer
     * fileChannel进行拷贝
     */
    public static void copyChannel() throws Exception {
        FileInputStream fileInputStream = new FileInputStream("src\\main\\1.txt");
        FileChannel channel01 = fileInputStream.getChannel();
        File file = new File("src\\main\\2.txt");
        FileOutputStream fileOutputStream = new FileOutputStream(file);
        FileChannel channel02 = fileOutputStream.getChannel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(5);
        while (true) {
            /**
             *     public final Buffer clear() {
             *         position = 0;
             *         limit = capacity;
             *         mark = -1;
             *         return this;
             *     }
             */
            // 清空buffer  重要!!!
            byteBuffer.clear();
            int read = channel01.read(byteBuffer);
            if (read == -1) {
                break;
            }
            // 转为写出模式
            byteBuffer.flip();
            channel02.write(byteBuffer);
        }
        fileInputStream.close();
        fileOutputStream.close();
    }
案例四: 使用TransferFrom进行拷贝
/**
 * 使用TransferFrom 进行拷贝
 */
public static void copyChannelByTran() throws Exception {
    FileInputStream fileInputStream = new FileInputStream("src\\main\\1.txt");
    FileChannel channel01 = fileInputStream.getChannel();
    FileOutputStream fileOutputStream = new FileOutputStream("src\\main\\3.txt");
    FileChannel channel02 = fileOutputStream.getChannel();
    channel02.transferFrom(channel02, 0, channel01.size());
    channel01.close();
    channel02.close();
    fileInputStream.close();
    fileOutputStream.close();
}
补充

1、ReadOnlyBuffer 只读Buffer

MappedByteBuffer 直接在内存修改数据 (实际类型DirectByteBuffer) 速度快

参数 : 1 、FileChannel.MapMode.READ_WRITE 使用读写模式

​ 2、 可以直接修改的位置

​ 3、映射的内存大小(不是索引位置),即是多少个字节映射到内存。

2、Buffer支持类型化的操作,put 类型和 get 类型需保持一致

使用Buffer数组
/**
* @author hl
* @Data 2020/8/29
* <p> 多个Buffer(Buffer数组)
* Scattering(分散) : 将数据写入到buffer时,可以采用buffer数组.依次写入
* Gathering(聚合) : 将数据读出到buffer时。可以采用buffer数组,依次读
*/
public class ScatteringAndGatheringTest {

   /**
    * 服务端
    * byteRead=6 (六个字节)
    * position=5,limit=5
    * position=1,limit=3
    */
   public static void main(String[] args) throws Exception {
       // 使用ServerSocketChannel 和 SocketChannel 网络
       ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
       InetSocketAddress inetSocketAddress = new InetSocketAddress(7000);
       // 绑定端口到socket,并启动
       serverSocketChannel.socket().bind(inetSocketAddress);
       // 创建Buffer数组
       ByteBuffer[] byteBuffers = new ByteBuffer[2];
       byteBuffers[0] = ByteBuffer.allocate(5);
       byteBuffers[1] = ByteBuffer.allocate(3);
       // 等待客户端连接
       SocketChannel socketChannel = serverSocketChannel.accept();
       // 循环读取
       // 假定从客户端接受8给字节
       int msgLength = 8;
       while (true) {
           int byteRead = 0;
           while (byteRead < msgLength) {
               long l = socketChannel.read(byteBuffers);
               byteRead += l;
               System.out.println("byteRead=" + byteRead);
               Arrays.asList(byteBuffers).stream().map(buffer -> {
                   return "position=" + buffer.position() + ",limit=" + buffer.limit();
               }).forEach(System.out::println);
           }
           Arrays.asList(byteBuffers).forEach(buffer -> {
               // 将所有buffer flip
               buffer.flip();
           });
           int byteWrite = 0;
           while (byteWrite < msgLength) {
               long l = socketChannel.write(byteBuffers);
               byteWrite += l;
               System.out.println("byteWrite=" + byteWrite);
           }
       }
   }
}

Selector:

Selector方法
    /**
     * Selector 方法
     * Selector.select() 阻塞
     * Selector.select(1000) 等待1000ms后返回
     * Selector.selectNow() 直接返回
     * Selector.wakeup() 唤醒阻塞中的
     */
NIO 非阻塞网络编程原理分析

`

NIO 服务端和客户端相互通信
NIO Server 代码
public class NioServer {

    public static void main(String[] args) throws Exception {
        // ServerSocket
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // Selector
        Selector selector = Selector.open();
        // 绑定端口
        serverSocketChannel.socket().bind(new InetSocketAddress(6666));
        serverSocketChannel.configureBlocking(false);
        // 把serverSocketChannel 注册到Selector , 事件为OP_ACCEPT
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        // 等待客户端连接
        while (true) {
            // 1000ms后还是无相关的事件
            if (selector.select(1000) == 0) {
                System.out.println("服务器无连接");
                continue;
            }
            // 否则 , 获取到相关的SelectedKeys集合
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            //遍历
            Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                // 根据key对应的channel发生的事件做处理
                if (key.isAcceptable()) { /*  OP_ACCEPT */
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    System.out.println("客户端已连接" + socketChannel.hashCode());
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                    socketChannel.close();
                }

                if (key.isReadable()) {
                    SocketChannel channel = (SocketChannel) key.channel();
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    channel.read(buffer);
                    System.out.println("form客户端:" + new String(buffer.array()));
                }
                // 手动从Set中移除这个selectKey
                keyIterator.remove();
            }
        }
    }
}
NIO Client 代码
public class NioClient {

    public static void main(String[] args) throws Exception {
        // 拿到socketChannel
        SocketChannel socketChannel = SocketChannel.open();
        // 设置非阻塞
        socketChannel.configureBlocking(false);
        // 提供 host + port
        InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
        if (!socketChannel.connect(inetSocketAddress)) {
            // 未连接 ,就可以去做其他事
            System.out.println("做其他事情中。。。");
        }
        // 连接成功
        String str = "hello,Nio";
        // Wraps a byte array into a buffer.
        ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());
        // 将buffer 写入 channel
        socketChannel.write(byteBuffer);
        System.in.read();
    }
}

使用NIO 写一个群聊系统

NIOServer

public class NioServer {

    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    private final Integer port = 6666;

    public NioServer() {
        try {
            serverSocketChannel = ServerSocketChannel.open();
            selector = Selector.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void listen() {
        while (true) {
            try {
                int count = selector.select(2000);
                if (count > 0) {
                    // 处理
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    if (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        if (selectionKey.isAcceptable()) {
                            // 连接事件
                            SocketChannel socketChannel = serverSocketChannel.accept();
                            socketChannel.configureBlocking(false);
                            socketChannel.register(selector, SelectionKey.OP_READ);
                            System.out.println(socketChannel.getLocalAddress() + " 上线了 ");
                        }
                        if (selectionKey.isReadable()) {
                            // 读事件
                            readData(selectionKey);
                        }
                        iterator.remove();
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

        }
    }

    /**
     * 读事件  buffer --> socketChannel
     *
     * @param key
     */
    private void readData(SelectionKey key) {
        SocketChannel sc = null;
        try {
            sc = (SocketChannel) key.channel();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int count = sc.read(buffer);
            if (count > 0) {
                // 读取到数据了
                String msg = new String(buffer.array());
                System.out.println("msg = " + msg);
                // 向其他客户端转发 (排除自己)
                sendInfo2OtherClient(msg, sc);
            }
        } catch (IOException e) {
            try {
                System.out.println(sc.getRemoteAddress() + "离线了。。。");
                // 取消注册
                key.cancel();
                // 关闭通道
                sc.close();
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
        }
    }

    /**
     * 向其他客户端转发  socketChannel --> buffer
     */
    private void sendInfo2OtherClient(String msg, SocketChannel self) throws IOException {
        System.out.println("服务器转发消息中 ... ");
        for (SelectionKey key : selector.keys()) {
            // 通过key 取出 socketChannel
            SocketChannel targetChannel = (SocketChannel) key.channel();
            // 排除自己
            if (targetChannel != self) {
                ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                // buffer(msg) --> socketChannel
                targetChannel.write(buffer);
            }
        }
    }

    public static void main(String[] args) {
        NioServer nioServer = new NioServer();
        nioServer.listen();
    }
}

NIOClient

public class NioClient03 {

    private final String host = "127.0.0.1";
    private final Integer port = 6666;
    private Selector selector;
    private SocketChannel socketChannel;
    private String username;

    public NioClient03() throws IOException {
        selector = Selector.open();
        // 连接服务器
        socketChannel = SocketChannel.open(new InetSocketAddress(host, port));
        // 将channel注册到Selector
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
        // 的到username
        username = socketChannel.getLocalAddress().toString();
        System.out.println(username + "客户端, ok了。。。");
    }

    public void sendInfo(String msg) {
        msg = username + "说:" + msg;
        try {
            socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void readInfo() {
        try {
            int count = selector.select();
            if (count > 0) {
                // 有可用通道
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    if (selectionKey.isReadable()) {
                        SocketChannel sc = (SocketChannel) selectionKey.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        int read = sc.read(buffer);
                        if (read > 0) {
                            String msg = new String(buffer.array());
                            System.out.println(msg.trim());
                        }
                    }
                    iterator.remove();
                }
            } /*else {
                System.out.println("无可用通道");
            }*/
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {
        NioClient03 nioClient = new NioClient03();
        // 读取数据
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    nioClient.readInfo();
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
        // 发送数据
        new Thread(new Runnable() {
            @Override
            public void run() {
                Scanner scanner = new Scanner(System.in);
                while (scanner.hasNextLine()) {
                    String s = scanner.nextLine();
                    nioClient.sendInfo(s);
                }
            }
        }).start();
    }
}

NIO 与零拷贝

零拷贝概念

从操作系统角度来说的。内核缓冲区之间,没有数据是重复的就是零拷贝。

传统IO的拷贝

4 次 拷贝 3 次转换 (效率低)

NIO的拷贝

mmap 优化 3 次拷贝 3 次转换
sendFile优化01 3次拷贝 2次转换

sendFile优化0 2 2次拷贝 2次转换

sendFile 和 mmap区别

零拷贝实例 (TranferTo)

NIOServer
public class NEWIOServer {

    public static void main(String[] args) throws Exception {
        InetSocketAddress inetSocketAddress = new InetSocketAddress(5555);
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(inetSocketAddress);
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept();
            int readCount = 0;
            while (-1 != readCount) {
                try {
                    readCount = socketChannel.read(byteBuffer);
                } catch (Exception e) {
                    break;
                }
            }
            /*
            Rewinds this buffer.  The position is set to zero and the mark is discarded.
            */
            byteBuffer.rewind(); // 倒带 position = 0 , mark 作废;
        }
    }
}
NIOClient
public class NEWIOClient {

    public static void main(String[] args) throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost", 5555));
        String filename = "G:\\github\\NettyPro\src\\main\\1.txt";
        FileChannel channel = new FileInputStream(filename).getChannel();
        long startTime = System.currentTimeMillis();
        // linux 可用全部发送 windows 只能发送 8 M ,需要分段
        // TransferTo 底层使用零拷贝
        int i = (int) (channel.size() / (8 * 1024));
        if (0 != channel.size() % (8 * 1024)) {
            i += 1;
        }
        long transferToSize = 0;
        for (int j = 0; j < i; j++) {
            transferToSize += channel.transferTo(j * 8 * 1024, channel.size() / i, socketChannel);
        }
        long lastTime = System.currentTimeMillis();
        System.out.println("字节数=" + transferToSize + ",所费时间=" + (lastTime - startTime));
        channel.close();
    }
}

原生NIO存在的问题

AIO:

目前无广泛使用,暂不学

线程模型:

传统IO模式 : 阻塞

reactor模式

/**
 * Reactor : 在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序对I/O事件做出反应。(分发人物的)
 * Handlers : 处理程序执行I/O事件要完成的实际事件。  (干活的)
 */

三种Reactor : 1、单Reactor 单线程

​ 2、单 Reactor 多线程

​ 3、主从Reactor

1、单Reactor 单线程

![](src\main\netty_piture\单 Reactor.png)

2、单 Reactor 多线程

优点: 充分利用多核cpu处理。

缺点: 高并发情况下, Reactor处理的监听和响应过多,单线程下,容易出现性能瓶颈。

3、主从 Reactor 多线程

Netty:

Netty模型

netty原理简单版

netty原理进阶版

netty模型最终版 (重要)

补充: 每个Worker NIOEventLoop处理业务时,会使用pipeline(管道),pipeline中包含了很多channel,即通过pipeline可以获取到相应的管道,管道中维护了很多处理器。

netty快速入门案例

代码路径 :src\main\java\com\athl\netty\netty\simple

补充 :

1、如果任务过大,损耗时间过长可以考虑使用NioEventLoop的TaskQueue异步执行

2、如果是延时任务,可以使用schedule

 // 如果有特别耗时的业务 --> 异步执行 --> 提交到当前channel对应的TaskQueue执行
        // 解决方法:
        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(10 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端,我是一个大任务", CharsetUtil.UTF_8));
                } catch (InterruptedException e) {
                    System.out.println("出现了异常" + e);
                }
            }
        });
        ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端,我是一个定时任务", CharsetUtil.UTF_8));
            }
        }, 1, TimeUnit.SECONDS);

异步模型:

异步模型 Future-Listener机制

异步模型工作原理

代码 路径

src\main\java\com\athl\netty\netty\http

Netty核心组件

BootStrap

Future and ChannelFuture

![](src\main\netty_piture\future and channelFuture.png)

Channel

ChannelHandler

Pipeline 和 ChannelPipeline关系

![](src\main\netty_piture\channelpipe and channelhandlercontext关系图.png)

Netty实现群聊系统

代码路径

src\main\java\com\athl\netty\netty\chat

群聊系统代码

/**
 * Server 端
 */
public class NettyServer {

    private int port;

    public NettyServer(int port) {
        this.port = port;
    }

    // 编写run处理客户端请求
    public void run() {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(8);

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 提供空闲状态的处理器
                            //  long readerIdleTime, 表示多长时间未读,发送一个心跳检测,检测连接状态
                            //  long writerIdleTime, 表示多长时间未写,发送一个心跳检测,检测连接状态
                            //  long allIdleTime,   表示多长时间未写和读,发送一个心跳检测,检测连接状态
                            pipeline.addLast(new IdleStateHandler(3, 3, 3, TimeUnit.SECONDS));
                            // 编码
                            pipeline.addLast("decoder", new StringDecoder());
                            // 解码
                            pipeline.addLast("encoder", new StringEncoder());
                            // 业务处理
                            pipeline.addLast("myHandler", new NettyServerHandler());
                        }
                    });
            System.out.println("服务器启动了");
            ChannelFuture channelFuture = bootstrap.bind(port).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            System.out.println("服务端出现了异常 e = " + e);
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new NettyServer(6666).run();
    }
}
/**
 * Server 端的handler
 */
public class NettyServerHandler extends SimpleChannelInboundHandler<String> {

//    public static List<Channel> channels = new ArrayList<Channel>();
    // 用于私聊 String : 保存为当前channel的 id
//    private static Map<String, Channel> channels = new HashMap<>();

    // GlobalEventExecutor.INSTANCE是一个全局的事件执行器,是一个单列的

    public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    /**
     * handlerAdded表示连接建立,一个连接,第一个被执行,将当前channel 加入到 channelGroup
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        // 将该客户加入聊天的信息推送给其他在线的客户端
        /*
        该方法会将 channelGroup 中所有的channel遍历,并发送writeAndFlush中消息,无需自己遍历
         */
        String time = sdf.format(System.currentTimeMillis());
        channelGroup.writeAndFlush("【客户端】" + channel.remoteAddress() + "加入聊天\n" + time);
        channelGroup.add(channel);
        System.out.println("channelGroup Size=" + channelGroup.size());
    }

    /**
     * 断开连接,将xx客户离开信息推送给给当前用户
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        String time = sdf.format(System.currentTimeMillis());
        channelGroup.writeAndFlush("【客户端】" + channel.remoteAddress() + "离开了\n" + time);
        channelGroup.remove(channel);
        System.out.println("channelGroup Size=" + channelGroup.size());
    }


    /**
     * 表示channel 处于活跃状态,提示xx上线
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + "上线了");
    }

    /**
     * 表示channel 处于非活跃状态,提示xx下线
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + "下线了");
    }


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // 获取到当前channel
        Channel channel = ctx.channel();
        // 遍历channelGroup,根据不同情况回送不同消息
        channelGroup.forEach(ch -> {
            if (channel != ch) { // 不是当前的channel
                ch.writeAndFlush("【客户】" + channel.remoteAddress() + "说:" + msg + "\n");
            } else {
                // 回显自己发送的消息
                ch.writeAndFlush("我发送了:" + msg);
            }
        });
    }

    /**
     * 发生异常
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
/**
 * 客户端
 */
public class NettyClient01 {

    private final String host;
    private final int port;

    public NettyClient01(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void run() {
        EventLoopGroup clientGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(clientGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            pipeline.addLast("myHandler", new NettyClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            Channel channel = channelFuture.channel();
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String msg = scanner.nextLine();
                // 通过channel发送传到服务器端
                channel.writeAndFlush(msg + "\r\n");
            }
        } catch (Exception e) {
            System.out.println("客户端出现了异常 e = " + e);
        } finally {
            clientGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new NettyClient01("127.0.0.1", 6666).run();
    }
}
/**
 * 客户端的handler
 */
public class NettyClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg.trim());
    }
}

heart心跳检测

路径 com.athl.netty.netty.heart

.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 提供空闲状态的处理器
        //  long readerIdleTime, 表示多长时间未读,发送一个心跳检测,检测连接状态
        //  long writerIdleTime, 表示多长时间未写,发送一个心跳检测,检测连接状态
        //  long allIdleTime,   表示多长时间未写和读,发送一个心跳检测,检测连接状态
        // 当IdleStateHandler触发后,就会传递给管道的下一个handler去处理,通过调用handler的 userEventTiggered在该方法中去处理
        pipeline.addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));
        pipeline.addLast("myIdleHandler", new NettyServerHandler());
    }
});
/*
NettyServerHandler
*/
@Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            String eventType = null;
            switch (event.state()) {
                case READER_IDLE:
                    eventType = "读空闲";
                    break;
                case WRITER_IDLE:
                    eventType = "写空闲";
                    break;
                case ALL_IDLE:
                    eventType = "读写空闲";
                    break;
            }
            System.out.println(ctx.channel().remoteAddress() + "超时事件=" + eventType);
//            ctx.close();
        }
    }

WebSocket 长连接

com.athl.netty.netty.websocket

.childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
              ChannelPipeline pipeline = ch.pipeline();
           // 因为是基于http协议,使用http的编解码器
               pipeline.addLast(new HttpServerCodec());
            // 以块方式写,添加ChunkedWriteHandler处理器
               pipeline.addLast(new ChunkedWriteHandler());
              /*
              1、因为http协议在传输过程中是分段的,HttpObjectAggregator就是可以将多个段聚合
               2、这就是为什么,当浏览器发送大量数据时,就会发出多次http请求
               */
             pipeline.addLast(new HttpObjectAggregator(8192));
              /*
             1、对于WebSocket是以帧 (frame)的形式传递
             2、可以看到WebSocketFrame 下面有六个子类
             3、浏览器请求时: ws://localhost:7000/xxx  表示请求uri
             4、WebSocketServerProtocolHandler 核心功能是将http协议升级为webSocket协议,保持长连接
            */
              pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
              // 业务
              pipeline.addLast(new MyWebSocketFrameHandler());
                        }
 	});
/**
 * MyWebSocketFrameHandler
 */
public class MyWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        System.out.println("服务器端收到消息:" + msg.text());
        // 回复消息
        ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器事件" + LocalDateTime.now() + "msg=" + msg.text()));
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // id表示唯一的一个值 LongText (唯一的)
        System.out.println("handlerAdded被调用 :" + ctx.channel().id().asLongText());
        System.out.println("handlerAdded被调用 :" + ctx.channel().id().asShortText());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerRemoved被调用 :" + ctx.channel().id().asLongText());
        System.out.println("handlerRemoved被调用 :" + ctx.channel().id().asShortText());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("异常发生 " + cause.getMessage());
        ctx.close();
    }
}
/*
 hello.html
 */
<script>
    var socket;
    // 判断浏览器是否支持WebSocket编程
    if (!window.WebSocket) {
        alert("当前浏览器不支持Websocket编程")
    } else {
        socket = new WebSocket("ws://localhost:7000/hello");
        socket.onmessage = function (ev) {
            var rt = document.getElementById("responseText");
            rt.value = rt.value + "\n" + ev;
        }
        socket.onopen = function (ev) {
            var rt = document.getElementById("responseText")
            rt.value = "连接开启了..."
        }
        socket.onclose = function (ev) {
            var rt = document.getElementById("responseText")
            rt.value = rt.value + "\n" + "连接关闭了..."
        }
    }

    // 发送消息到服务器
    function send(message) {
        if (!window.socket) {
            return;
        }
        if (socket.readyState === WebSocket.OPEN) {
            // 通过Socket发送
            socket.send(message);
        } else {
            alert("连接未开启")
        }
    }
</script>

<form onsubmit="return false">
    <textarea name="message" style="height: 300px;width: 300px;"></textarea>
    <input type="button" value="发送消息" onclick="send(this.form.message.value)">
    <textarea id="responseText" style="height: 300px;width: 300px;"></textarea>
    <input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
</form>

编解码

基本介绍

![](src\main\netty_piture\encoder and decoder.png)

由于 netty 自带的编解码器效率较差,跨语言性较差,所以采用谷歌的Protobuf

Protobuf基本介绍

参考文档地址

protoc.exe下载地址

https://github.com/google/protobuf/releases/download/v3.8.0/protoc-3.8.0-win32.zip

使用步骤

1、编写 Student.proto文件

2、将Student.proto拷入protoc.exe所在文件夹

3、在protoc.exe的文件下执行protoc.exe --java_out=. Student.proto得到StudentPOJO文件

4、在发送端 加入Proto编码器 pipeline.addLast("protoEncoder", new ProtobufEncoder()); (编解码器加入顺序要在业务之前!!!) 5、在接收端 加入Proto解码器 pipeline.addLast("protoDecoder",new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));

proto文件示例

syntax = "proto3";
// 加快解析
option optimize_for = SPEED;
// 指定包
option java_package = "com.athl.netty.netty.codec2";
// 外部类名称
option java_outer_classname = "MyDataPOJO";

message MyMessage{
  // 定义一个枚举类型
  enum DataType{
    studentType = 0; // 在proto3要求从0开始
    workerType = 1;
  }
  // 使用data_type来标识传的是哪一个枚举类型
  DataType data_type = 1;
  // 表示每次枚举类型最多只能出现 Student or Worker 之一
  oneof dataBody{
    Student student = 2;
    Worker worker = 3;
  }
}

message Student{
  int32 id = 1;
  string name = 2;
}
message Worker{
  string name = 1;
  int32 age = 2;
}

具体代码路径 src\main\java\com\athl\netty\netty\codec 或者 src\main\java\com\athl\netty\netty\codec2

Netty入站和出站机制

head channelpipelline tail

socket ---> 入站(ChannelInboundHandler)

​ <--- 出站(ChannelOutBoundHandler)

入站 :数据 -- 编码器 --> 被编码的数据。 ChannelInboundHandler

出站 : 被编码的数据 -- 解码器 --> 数据 。ChanneloutboundHandler

ByteToMessageDecoder使用

代码路径 src\main\java\com\athl\netty\netty\ioboundhandler

decode方法会根据接收的数据,被调用多次,直到确定没有新的元素被添加到List,或者ByteBuf没有更多可读字节,如果List Out不为空,就会将List的内容传递给下一个channelInBoundHandler处理,该处理器方法也会被调用多次

调用过程图解

![](src\main\netty_piture\encoder and decoder图.png)

其他解码器和编码器

![](src\main\netty_piture\other decoder.png)

Tcp 粘包和拆包

基本概念

tcp粘拆包解决方案一 :

代码路径 src\main\java\com\athl\netty\netty\protocoltcp

##源码分析 。。。。

Comment ( 0 )

Sign in for post a comment