掘金 后端 ( ) • 2024-05-06 16:32

1、websocket简介

「WebSocket」是一种在单个TCP连接上进行全双工通信的协议。

「WebSocket」使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在「WebSocket API」中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

TCP连接是一种可靠的、面向连接的网络通信协议。它通过三次握手建立连接,然后通过数据包的传输和确认来保证数据的可靠性和顺序性。

在建立TCP连接时,客户端首先向服务器发送一个SYN(同步)包,服务器收到后回复一个SYN+ACK(同步+确认)包,客户端再回复一个ACK(确认)包,完成了三次握手,建立了连接。在连接建立后,双方可以通过发送和接收数据包进行通信。

发送方将数据分割成小的数据包,并为每个数据包添加序列号。接收方接收到数据包后进行确认,并按照序列号将数据包重新组装成完整的数据。

「WebSocket」可以双向通讯,它可以完成客户端到服务端以及服务端到客户端的双向推送的通讯工具。

2、其他类似的产品

因为「WebSocket」可以数据的实时推送,所以常用用推送一些实时变化的数据。在我们业务场景中大多数实现的服务端到客户端的单向推送,其实现的方式还有很多种。

  • 客户端轮询,如间隔n秒发送http请求,以保证拉取准实时的数据

  • 长轮询,保持长时间的http连接,超时之后继续轮询。具有代表性的就是SSE,chatgpt就是基于SSE实现的

  • 长链接,也就是socket连接

前两种都是短连接基于http协议的的,其中长轮询主要是设置请求头:

Connection: keep-alive

3、数据推送存在问题

  • 短轮询存在很多无效的请求,拉取的数据的间隔不好掌握,拉取的数据未必是实时的

  • 长轮询的超时时间难以掌控,如果设置成永久保持,那么页面关闭或退出,连接依然有效,浪费链接资源。

  • SSE是基于长轮询的,实现了断线重连,超时、异常的回调等。但是多开页面,断线重新,都会产品不同的客户端,做到精准推送需要管理好不同的客户端,客户端的增多可能会造成服务端的OOM,这些都是考虑的重点

  • websocket则存在粘包、拆包的问题以及客户端的异常关闭等

其中netty解决websocket中常见的问题。

4、基于Netty解决socket的问题

Netty 中提供一系列解决socket的方案。

4.1 Socket服务端
public void run() {
	EventLoopGroup bossGroup = new NioEventLoopGroup();
	EventLoopGroup workerGroup = new NioEventLoopGroup();
	try {
        // 创建Socket服务端
		ServerBootstrap b = new ServerBootstrap();
        // bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
		b.group(bossGroup, workerGroup)
            // 设置NIO类型的channel
			.channel(NioServerSocketChannel.class)
			.childHandler(new ChannelInitializer<SocketChannel>() {
				@Override
				protected void initChannel(SocketChannel channel) throws Exception {
					// ...
				}
			});

        // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
		Channel channel = b.bind(port).sync().channel();
        // 对关闭通道进行监听
		channel.closeFuture().sync();
	} catch (InterruptedException e) {
		logger.error("webSocket server start error:" + e.getMessage());
	} finally {
		bossGroup.shutdownGracefully();
		workerGroup.shutdownGracefully();
	}
}

从样例代码中有几个关键的对象:

  • ServerBootstrap 这个是服务端的对象

  • NioServerSocketChannel 这个是服务端绑定的Channel类型

  • bind(port) 绑定服务端的端口

  • 使用双EventLoopGroup

4.2 Socket客户端
private void start() throws InterruptedException {
	EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    // 创建客户端
	Bootstrap bootstrap = new Bootstrap();
    // 设置NIO类型的channel
	bootstrap.channel(NioSocketChannel.class);
	bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
	bootstrap.group(eventLoopGroup);
	
	bootstrap.handler(new ChannelInitializer<SocketChannel>() {
		@Override
		protected void initChannel(SocketChannel socketChannel)
				throws Exception {
			// ...
		}
	});
    // 连接
	ChannelFuture future = bootstrap.connect(host, port).sync();
	if (future.isSuccess()) {
		logger.info("connect server success");
	}

	NioSocketChannel channel = future.channel();
}

样例中的关键对象:

  • Bootstrap 创建客户端的对象

  • NioSocketChannel 客户端绑定的channel类型

  • connect(host, port) 客户端是用来连接服务端的

  • 只有一个EventLoopGroup

5、Netty解决的Socket问题的类

5.1 IdleStateHandler

读写空闲处理器,一般用来写空闲来发送心跳检测消息

    /**
     * Creates a new instance firing {@link IdleStateEvent}s.
     *
     * @param readerIdleTimeSeconds
     *        an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
     *        will be triggered when no read was performed for the specified
     *        period of time.  Specify {@code 0} to disable.
     * @param writerIdleTimeSeconds
     *        an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
     *        will be triggered when no write was performed for the specified
     *        period of time.  Specify {@code 0} to disable.
     * @param allIdleTimeSeconds
     *        an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
     *        will be triggered when neither read nor write was performed for
     *        the specified period of time.  Specify {@code 0} to disable.
     */
    public IdleStateHandler(
            int readerIdleTimeSeconds,
            int writerIdleTimeSeconds,
            int allIdleTimeSeconds) {

        this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds, TimeUnit.SECONDS);
    }

对应的Handler处理

需要继承SimpleChannelInboundHandler 或实现 ChannelInboundHandler接口, 重写或实现userEventTriggered 方法。

例:

public class NettyClientHandler extends SimpleChannelInboundHandler<?> {
    //利用写空闲发送心跳检测消息
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
				// 读空间
				case READER_IDLE:
					// ...
					break;
				// 写空闲
                case WRITER_IDLE:
                    // ...
                    break;
				// 读写空闲	
				case ALL_IDLE:
					// ...
					break;
                default:
                    break;
            }
        }
    }
   
}
5.2 HttpServerCodec

Http请求解码器和Http响应的编码器合二为一。相当于HttpRequestDecoderHttpResponseEncoder 。用来处理Http请求的。

pipeline.addLast(new HttpServerCodec());
5.3 ChunkedWriteHandler

以快的方式写入,也就是大数据量的流式写入

pipeline.addLast(new ChunkedWriteHandler());
5.4 HttpObjectAggregator

聚合器,将一块块的数据聚合在一起,形成一条完整的数据

pipeline.addLast(new HttpObjectAggregator(64 * 1024));
5.3 拆包粘包解决方案
  • FixedLengthFrameDecoder 使用定长的报文来分包

  • DelimiterBasedFrameDecoder 添加特殊分隔符报文来分包

  • LineBasedFrameDecoder 数据未尾添加回车换行符来分包

  • LengthFieldBasedFrameDecoder 使用消息头和消息体来分包

  • StringDecoder 字符串解码器

  • StringEncoder 字符串编码器

  • ByteToMessageDecoder 如果想实现自己的半包解码器,实现该类

5.4 WebSocketServerProtocolHandler

订阅websocket的连接,也是webscoket的关键

pipeline.addLast(new WebSocketServerProtocolHandler("/ws"))

6、浏览器作为客户端

WebSocket一般的浏览器是直接支持的的。

let socket = new WebSocket('ws://localhost:8080/ws');
// 打开 连接
socket.addEventListener('open', function (event) {
    socket.send('Hello Server!');
});

// 监听动态 数据
socket.addEventListener('message', function (event) {
    console.log('Message from server ', event.data);
});

浏览器作为客户端时,浏览器已经处理了粘包、拆包分析。不需要专门的处理,数据是frame的形式发送的,frame中包含了完整数据的标识。