Netty(三)_核心模块组件与实战

标签: Netty  netty  多线程  java  网络  io

Netty核心模块组件与实战

回顾上一篇讲IO线程模型最终抛出的Netty线程模型示意图如下。

img

本文紧接上篇,正式介绍netty,上图是由上一章在最后抛出来的netty线程模型示意图。

这里篇文章篇幅还是挺长的,前面部分先简单介绍下netty的最少必备API,后面再有五个小程序带出netty的应用

前半部分听个响,在应用的时候自然就懂了!

在下一篇的源码分析中,我们会更清楚的明白netty这个机器是如何动起来的。

Bootstrap、ServerBootstrap

Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。

ServerBootstrap常见方法

//指定具体的EventLoopGroup,即服务端的父子线程组BossEventGroup和WorkEventGroup
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup)	
    

//该方法为父类AbstractBootstrap所定义,用来设置一个服务器端的通道   
public B channel(Class<? extends C> channelClass) 

//父类定义,用来给ServerChannel添加配置
public <T> B option(ChannelOption<T> option, T value) 

//对在acceptor接收到的channel进行配置
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) 

//设置自定义的Hanler用来处理accept接收的channel的事件   
public ServerBootstrap childHandler(ChannelHandler childHandler) 
    
//服务端channel绑定的监听端口    
public ChannelFuture bind(int inetPort) 

服务端的Boostact先不介绍,但是逻辑都是类似的

Future、ChannelFuture

Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。

但是可以通过监听机制,一旦有结果返回,可以监听到。

具体的实现就是通过 Future 和 ChannelFutures。

他们可以注册一个监听,当操作执行成功或失败时,执行操作的线程会调用你监听者的函数来进行通知!

常见的异步方法有 Channel channel()返回当前正在进行 IO 操作的通道。

ChannelFuture sync()等待异步操作执行完毕,该方法立刻返回一个Future对象。

通过返回Future,Future代表一个“未来”。

Channel

Channel是Netty网络通信的核心组件,意为通道/频道。能够用于提供执行网络 I/O 操作(read/write)的场所。

通过 Channel对象可获得当前网络连接的通道的状态、配置参数 (如Buffer缓冲区大小)

Channel 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口)

异步调用意味着任何 I/O 调用都将立即返回,所以不保证在调用结束时所请求的 I/O 操作已完成,调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以在I/O 操作成功、失败或取消时回调通知调用方

Channel支持关联 I/O 操作与对应的处理程序(Handler)

不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应,常用的 Channel 类型有:

  • NioSocketChannel,异步的客户端 TCP Socket 连接。
  • NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
  • NioDatagramChannel,异步的 UDP 连接。
  • NioSctpChannel,异步的客户端 Sctp 连接。
  • NioSctpServerChannel,异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。

你可以例用netty编写一个高度定制化的网络服务器!

ChannelHandler

ChannelHandler 是一个接口,用于真正处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链) 中的下一个Handler。这里涉及到我们后面要讲的pipeline。

Handler都在pipeline管道中进行维护。服务端处理IO事件要从pipeline中调用相应的handler进行处理。

ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类

ChannelHandler及其实现类

img

  • ChannelInboundHandler用于处理入站(从客户端到服务端)IO事件。
  • 反之,ChannelOutboundHandler用于处理出站IO事件。
  • ChannelInitializer用于初始化一个ChannelHandler。

我们经常需要自定义一个 Handler 类去继承 ChannelInboundHandlerAdapter,然后通过重写相应方法实现对接收的channel的业务处理,一般需要重写以下方法

//Invoked when the current Channel has read a message from the peer.
//当该通道需要读取事件时,触发该方法。(事件驱动)
//ChannelHandlerContext为上下文对象,携带该通道的所有信息。
void channelRead(ChannelHandlerContext ctx, java.lang.Object msg)
    
//通道连接就绪时触发该方法。    
void channelActive(ChannelHandlerContext ctx)
The Channel of the ChannelHandlerContext is now active    

    ...

Pipeline 和 ChannelPipeline

ChannelPipeline 是一个 Handler 的集合,对应图中的pipeline,它负责处理和拦截 inbound 或者 outbound 的事件和操作,相当于 一个贯穿 Netty 的链。

ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互

在 Netty 中每个 Channel 都有且仅持有一个 ChannelPipeline 与之对应,它们的组成关系如下

在这里插入图片描述

一个Channel包含一个ChannelPipeline,ChannelPipeline中维护了一个由ChannelHandler组成的双向链表

入站事件从head到tail逐个被Handler处理,出站事件反之。

通过ChannelPipeline addFirst(ChannelHandler... handlers),或ChannelPipeline addLast(ChannelHandler... handlers)向ChannelPipeline中添加Handler。

ChannelHandlerContext

ChannelHandlerContext用于保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象

即在pipeline链式传递的过程中进行链式传递。

即 ChannelHandlerContext 中 包 含 一 个 具 体 的 事 件 处 理 器 ChannelHandler , 同 时 ChannelHandlerContext 中也绑定了对应的 pipeline 和 Channel 的信息,方便对 ChannelHandler 进行调用。

在下一篇的源码分析中,你可以认为,ChannelHandlerContext就是Handler

常用方法:

ChannelFuture close(); //关闭通道
    
ChannelOutboundInvoker flush(); //刷新挂起的消息

ChannelFuture writeAndFlush(Object msg); // ..
    

EventLoopGroup

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vFSzf3ss-1604217943497)(C:\Users\20335\AppData\Roaming\Typora\typora-user-images\image-20201027165808059.png)]

EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般会有多个 EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例

通常一个服务端口即一个 ServerSocketChannel 对应一个 Selector 和一个 EventLoop 线程。

BossEventLoop 负责接收客户端的连接并将 SocketChannel 交给 WorkerEventLoopGroup 来进行 IO 处理。

EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop 来处理任务。

在 Netty 服 务 器 端 编 程 中 , 我 们 一 般 都 需 要 提 供 两 个 EventLoopGroup , 例 如 : BossEventLoopGroup 和 WorkerEventLoopGroup。

Selector

Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个 Chan绑定在其上的channnel 事件。

当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select) 这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel

Option

Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。

ChannelOption 参数如下:

ChannelOption.SO_BACKLOG:对应TCP/IP协议listen函数中的backlog参数,用于初始化服务器的可连接队列容量。

服务器处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户端来的时候,服务器将暂时无法处理的客户端连接放在队列中等待处理,backlog参数用于指定大小。

ChannelOption.SO_KEEPALIVE:设置一直保持连接活动状态。

Unpooled

Unpooled是Netty 提供一个专门用来操作缓冲区(即 Netty 的数据容器)的工具类

一般用来创建Netty中的缓存区,相当于NIO中的Buffer… 在Netty中为ByteBuf

Creates a new ByteBuf by allocating new space…

最常用的方法为

    /**
     * Creates a new big-endian buffer whose content is the specified
     * string encoded in the specified charset.
     */
    public static ByteBuf copiedBuffer(CharSequence string, Charset charset) {
    }

下面再上四个实例

NettyBS-TCP简单通信

创建 Netty 项目

  • Netty 服务器在 6668 端口监听,客户端能发送消息给服务器 “hello, 服务器~”
  • 服务器可以回复消息给客户端 “hello, 客户端~”
  • 目的:对 Netty 线程模型有一个初步认识,便于理解 Netty 模型理论

配置netty的Maven依赖:

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.42.Final</version>
        </dependency>

服务端编写:

package netty.simple;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyServer {
    public static void main(String[] args) throws Exception {


        /*
        * 创建BossGroup 和 WorkerGroup
        * 说明
        * 1. 创建两个线程组 bossGroup 和 workerGroup
        * 2. bossGroup 只是处理连接请求 , 真正的对客户端业务处理,会交给 workerGroup完成
        * bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数默认是 cpu核数 * 2
        * */

        //只准备一个接收连接的线程组即可
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8

        try {
            //创建服务器端的启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();

            //使用链式编程来对线程组进行设置
            bootstrap.group(bossGroup, workerGroup) //设置两个线程组
                    .channel(NioServerSocketChannel.class) //使用netty的NioSocketChannel作为服务器的通道
                    .option(ChannelOption.SO_BACKLOG, 128) // 设置服务器通道连接队列容量
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //通信通道保持活动连接状态
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        //向SocketChannel添加Handler
                        //以下方法在channel实例化后会被调用
                        //目的是为了加入我们自定义的Handler 
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //通过channel向管道添加自定义handler
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });

            //异步绑定一个端口, 生成了一个 ChannelFuture 对象
            //启动服务器(并绑定端口)
            ChannelFuture cf = bootstrap.bind(6668).sync();

            //给cf注册监听器,监控我们关心的事件
            cf.addListener(new ChannelFutureListener() {
                @Override
                //以下方法在bind操作完成时会被操作线程进行回调
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (cf.isSuccess()) {
                        System.out.println("监听端口 6668 成功...");
                    } else {
                        System.out.println("监听端口 6668 失败...");
                    }
                }
            });

            //对关闭通道进行监听
            cf.channel().closeFuture().sync();
        }finally {
            //优雅得关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

客户端编写:

package netty.simple;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {
    public static void main(String[] args) throws Exception {

        //客户端只需要一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //创建客户端启动对象
            //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            //设置相关参数
            bootstrap.group(group) //设置线程组
                    .channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        //向客户端channel添加自定义handler
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyClientHandler()); 
                        }
                    });

            System.out.println("客户端 ok..");
            //启动客户端去连接服务器端
            //注意这也是个异步操作
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            group.shutdownGracefully();
        }
    }
}

自定义服务端Handler:

package netty.simple;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;

import java.util.concurrent.TimeUnit;
/**
 * 服务端需要继承ChannelInboundHandlerAdapter
 * */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    
    /** 
     * 当服务端通道有Read事件发生时,即客户端向服务端通道写的时候,服务端的读自动触发。
     * 读取客户端发送的数据
     * 1. ChannelHandlerContext ctx:上下文对象, 含有管道pipeline , 通道channel, 地址等信息
     * 2. Object msg: 就是客户端发送的数据 默认Object
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        
        //从上下文对象中获取通道
        Channel channel = ctx.channel();
        //将 msg 转成一个 ByteBuf
        //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址:" + channel.remoteAddress());
    }

    //数据读取完毕触发
    //这里我们将数据写入缓存
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

        //writeAndFlush 是 write + flush
        //将数据写入到缓存,并刷新
        //一般讲,我们对这个发送的数据进行编码
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
    }

    //处理异常, 一般是需要关闭通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

客户自定义端Handler代码:

package netty.simple;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    //当通道就绪就会触发该方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client " + ctx);
        //向客户端写数据
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));
    }

    //当通道有读取事件时,即客户端向该客户端发送消息时,会触发
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务器的地址: "+ ctx.channel().remoteAddress());
    }

    //发生异常时
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

Netty-Http服务编写

Netty 服务器在 6668 端口监听,浏览器发出请求 "http://localhost:6668/ "

服务器可以回复消息给客户端 "Hello! 我是服务器 " , 并对特定请求资源进行过滤.

编写服务端代码:

public class TestServer {
    public static void main(String[] args) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //ServerBootstrap引导
            serverBootstrap.group(bossGroup, workerGroup) //设置线程组
                    .channel(NioServerSocketChannel.class) //设置ServerChannel
                    .childHandler(new TestServerInitializer()); //设置接收的Channel的Handler
            //异步绑定 省略注册监听
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            if (channelFuture.isDone()){
                System.out.println("服务器已绑定端口:7000");
            }

            //监听关闭通道
            channelFuture.channel().closeFuture().sync();

        }finally {
            //优雅关闭服务器资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

编写自定义Handler:

/**
 * 说明
 * 1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter(适配器)
 * 2. HttpObject 客户端和服务器端相互通讯的数据被封装成 HttpObject
 */
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {

	//读事件处理
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {

        //判断 msg 是不是 httprequest请求
        if(msg instanceof HttpRequest) {
            System.out.println("服务端" + ctx.handler() + "开始处理...");
            System.out.println("  " + "客户端地址" + ctx.channel().remoteAddress() + "发出请求...");
			//获得请求
            HttpRequest httpRequest = (HttpRequest) msg;
            //获取uri, 过滤指定的资源
            URI uri = new URI(httpRequest.uri());
            //对请求网站图标的请求做过滤
            if("/favicon.ico".equals(uri.getPath())) {
                System.out.println("请求了 favicon.ico, 不做响应...");
                return;
            }

            //回复信息给浏览器 [http协议]
            ByteBuf content = Unpooled.copiedBuffer("hello, 我是服务器", CharsetUtil.UTF_8);
            //构造一个http的相应,即 httpresponse
            //HTTP_1_1代表HTTP版本,HttpResponseStatus.OK代表200状态码
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
            //对HTTP头设置
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
            //将构建好的response写回客户端
            ctx.writeAndFlush(response);
        }
    }
}

自定义的Handler需要ChannelInitializer进行加入

public class TestServerInitializer extends ChannelInitializer<SocketChannel> {

    /**
     * 在Channel初始化的时候设置好。
     * 每次请求配置一次管道。
     * */
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        //通过Channel得到管道
        ChannelPipeline pipeline = ch.pipeline();
        //向管道加入Handler
        //HttpServerCodec是netty提供的处理http的编-解码器
        System.out.println("管道加入HttpServerCodec...");
        pipeline.addLast("MyHttpServerCodec",new HttpServerCodec());
        System.out.println("管道加入自定义Handler...");
        pipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());
        System.out.println("管道处理器配置成功....");
    }
}

Netty-群聊系统

编写一个 Netty 群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)

要求:

1.实现多人群聊

2.服务器端:可以监测用户上线,离线,并实现消息转发功能

3.客户端:通过 channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息

4.使用TCP实现

服务端编写:

public class GroupChatServer {
    private int port; //监听端口
    public GroupChatServer(int port) {
        this.port = port;
    }

    //编写run方法,处理客户端的请求
    public void run() throws Exception{

        //创建两个线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop

        try {
            //服务端Netty引导
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) //基于TCP
                    .option(ChannelOption.SO_BACKLOG, 128) //连接队列 128
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //配置客户端通道保持通道活跃
                    .childHandler(new GroupChatServerInitializer());

            ChannelFuture channelFuture = b.bind(port).sync();
            if (channelFuture.isDone()){
                System.out.println("服务端绑定端口" + port + "成功...");
            }


            //监听关闭通道
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws Exception {

        new GroupChatServer(7000).run();
    }
}

服务端GroupChatClientInitializer配置:

/**
 * 管道配置 Channel初始化时被调用
 * */
public class GroupChatClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        //得到pipeline
        ChannelPipeline pipeline = ch.pipeline();
        //客户端收到消息要解码 发送的消息要编码
        //注意在出站事件和入站事件,这两个Handler的处理顺序是不同的,这由netty帮我们完成
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        //向管道加入自定义的handler
        pipeline.addLast(new GroupChatClientHandler());
    }
}

服务端Handler编写

public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
    //这是一个负责群聊的核心Handler
    //定义一个channel 组,neety支持Handler对部分通道进行直接管理
    //GlobalEventExecutor.INSTANCE) 是全局的事件执行器,是一个单例对象
    private static ChannelGroup  channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    //定义时间
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");


    //handlerAdded 表示连接建立,一旦连接,第一个被执行
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //将当前channel 加入到  channelGroup
        channelGroup.add(channel);

        //推送所有在线用户上线人信息
        //该方法会将 channelGroup 中所有的channel 遍历,并发送消息,
        channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n");
    }

    //断开连接, 将xx客户离开信息推送给当前在线的客户
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了\n");
        //一旦某个客户端断开,自动从channel组中去除
    }

    //表示channel 处于活动状态, 提示 xx上线
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " 上线了~");
    }

    //表示channel 处于不活动状态, 提示 xx离线了
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " 离线了~");
    }

    //读取数据 当客户端发送聊天信息时触发
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        //获取到当前channel
        Channel channel = ctx.channel();
        //这时我们遍历channelGroup, 根据不同的情况,回送不同的消息
        channelGroup.forEach(ch -> {
            //转发消息
            if(channel != ch) {
                ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送了消息" + msg + "\n");
            }else {
                //回显给发送了消息的客户端
                ch.writeAndFlush("[自己]发送了消息" + msg + "\n");
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //关闭通道
        ctx.close();
    }
}

客户端编写:

import java.util.Scanner;

public class GroupChatClient {

    //用户连接目标ip+端口
    private final String host;
    private final int port;

    public GroupChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    //启动客户端
    public void run() throws Exception{
        //客户端一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
        Bootstrap bootstrap = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class) 
                .handler(new GroupChatClientInitializer());
        //异步连接
        ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
        ///拿到连接后的通道,由于上面连接操作是异步,这里必然是阻塞    
        Channel channel = channelFuture.channel();
        System.out.println("客户端channel已建立:-------" + channel.localAddress()+ "--------");
        //客户端需要输入信息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()) {
            //换行即发送
            String msg = scanner.nextLine();
            //通过channel 发送到服务器端
            channel.writeAndFlush(msg + "\r\n");
            }
        }finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new GroupChatClient("127.0.0.1", 7000).run();
    }
}

客户端管道配置:

/**
 * 管道配置 Channel初始化时被调用
 * */
public class GroupChatClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        //得到pipeline
        ChannelPipeline pipeline = ch.pipeline();
        //编解码器
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        //向管道加入自定义的handler
        pipeline.addLast(new GroupChatClientHandler());
    }
}

自定义客户端Handler:

public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    //直接读就完事
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg.trim());
    }
}

Netty-心跳检测机制

当一个连接进来,长时间没有操作,我们可以使用心跳机制得知该连接是否已经断连。

如未断连,我们可以直接给它断了不让它长时间持有服务器的资源

netty提供的IdleStateHandler可以帮助我们完成心跳检测机制

IdleStateHandler 是netty 提供的处理空闲状态的处理器

  1. long readerIdleTime : 表示多长时间没有读, 就会发送一个心跳检测包检测是否连接
  2. long writerIdleTime : 表示多长时间没有写, 就会发送一个心跳检测包检测是否连接
  3. long allIdleTime : 表示多长时间没有读写, 就会发送一个心跳检测包检测是否连接

当 IdleStateEvent 触发后 , 就会传递给管道 的下一个handler去处理

通过调用(触发)下一个handler 的 userEventTiggered , 在该方法中去处理 IdleStateEvent(读空闲,写空闲,读写空闲)

public class MyServer {
    public static void main(String[] args) throws Exception{


        //创建两个线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    //7s不读则读空闲,7秒不写则写空闲,10秒不读写则读写空闲
                    pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS));
                    //加入一个对空闲检测进一步处理的handler(自定义)
                    pipeline.addLast(new MyServerHandler());
                }
            });

            //启动服务器
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

对空闲事件的处理Handler:

public class MyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        //如果事件属于IdleStateEvent
        if(evt instanceof IdleStateEvent) {
            //将evt 向下转型 IdleStateEvent
            IdleStateEvent event = (IdleStateEvent) evt;
            String eventType = null;
            //判断到底是哪个空间
            switch (event.state()) {
                case READER_IDLE:
                  eventType = "读空闲";
                  break;
                case WRITER_IDLE:
                    eventType = "写空闲";
                    break;
                case ALL_IDLE:
                    eventType = "读写空闲";
                    break;
            }
            System.out.println(ctx.channel().remoteAddress() + "--长时间未操作--" + eventType);
            //如果发生空闲,我们关闭通道
           // ctx.channel().close();
        }
    }
}

WebSocket 编程实现BS长连接

Http 协议是无状态的, 浏览器和服务器间的请求响应一次,下一次会重新创建连接

如果要实现Http的长连接,就得基于webSocket编程

通过改变并提供新的Handler添加到我们的Channel就行了

服务端编写:

public class MyServer {
    public static void main(String[] args) throws Exception{
        //创建两个线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup);
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    //因为基于http协议,使用http的编码和解码器
                    pipeline.addLast(new HttpServerCodec());
                    //以块方式写,添加ChunkedWriteHandler处理器
                    pipeline.addLast(new ChunkedWriteHandler());

                    //1. http数据在传输过程中是分段, HttpObjectAggregator,可以将多个段聚合
                    //2. 这也就是为什么,当浏览器发送大量数据时,就会发出多次http请求
                    pipeline.addLast(new HttpObjectAggregator(8192));
                    
                    //websocket数据是以 帧(frame) 形式传递

                    //浏览器请求时 ws://localhost:7000/hello 表示请求的uri
                    //WebSocketServerProtocolHandler 核心功能是将 http协议升级为ws协议,保持长连接
                    pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));

                    //自定义的handler ,处理业务逻辑
                    pipeline.addLast(new MyTextWebSocketFrameHandler());
                }
            });

            //启动服务器
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

Handler:

//这里 TextWebSocketFrame 类型,数据在Handler链中以文本帧(frame)传递 
public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        System.out.println("服务器收到消息 " + msg.text());
        //回复消息
        ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间" + LocalDateTime.now() + " " + msg.text()));
    }
    //当web客户端连接后,立即触发
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        //id 表示唯一的值,LongText 是唯一的 ShortText 不是唯一
        System.out.println("handlerAdded 被调用" + ctx.channel().id().asLongText());
        System.out.println("handlerAdded 被调用" + ctx.channel().id().asShortText());
    }
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerRemoved 被调用" + ctx.channel().id().asLongText());
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("异常发生 " + cause.getMessage());
        ctx.close(); //关闭连接
    }
}

客户端_JS

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8"请求qet="UTF-8"ung'qiuqet="UTF-8"请求qet="UTF-8">
    <title>Title</title>
</head>
<body>
<script>
    var socket;
    //判断当前浏览器是否支持websocket
    if(window.WebSocket) {
        //访问服务端指定的ws路径,返回socket
        socket = new WebSocket("ws://localhost:7000/hello");

        //收到服务器端回送的消息后进行回调
        socket.onmessage = function (ev) {
            //获得文本框
            var rt = document.getElementById("responseText");
            //将服务器回送的消息放上去
            rt.value = rt.value + "\n" + ev.data;
        }

        //连接开启(感知到连接开启)后回调
        socket.onopen = function (ev) {
            var rt = document.getElementById("responseText");
            rt.value = "连接开启了.."
        }

        //连接关闭(感知到连接关闭)后回调
        socket.onclose = function (ev) {
            var rt = document.getElementById("responseText");
            rt.value = rt.value + "\n" + "连接关闭了.."
        }
    } else {
        alert("当前浏览器不支持websocket")
    }

    //点击发送消息后触发
    function send(message) {
        //先判断socket是否创建好
        if(!window.socket) { 
            return;
        }
        //socket准备好了
        if(socket.readyState == WebSocket.OPEN) {
            //通过socket 发送消息
            socket.send(message)
        } else {
            alert("连接没有开启");
        }
    }
</script>
    <form onsubmit="return false">
        <textarea name="message" style="height: 300px; width: 300px"></textarea>
        <input type="button" value="发生消息" onclick="send(this.form.message.value)">
        <textarea id="responseText" style="height: 300px; width: 300px"></textarea>
        <input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
    </form>
</body>
</html>
版权声明:本文为JunSIrhl原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/JunSIrhl/article/details/109429042