掘金 后端 ( ) • 2024-04-16 13:44

原文在这儿HTML5的新特性(三)——Websocket (附一个ws聊天室demo)

简单说一下原始代码的逻辑:

  • 服务使用ws自带server启动并监听了4090端口

  • 当新客户端连接时,服务器会打印一条消息表示有新连接。

  • 客户端可以发送两种类型的消息:加入房间(join)和发送消息到房间(message)。

  • 当收到join类型的消息时,服务器会将该客户端添加到指定的房间。如果房间不存在,会先创建房间。然后向房间内所有客户端发送一条欢迎消息。

  • 当收到message类型的消息时,服务器会将该消息广播到指定房间内的所有客户端,包括消息的发送者。

使用TS + 改造

对于websocket这部分,我们在根目录中新增了websockets一个专门用于处理websocket逻辑的目录

│   ├── websockets/           # WebSocket处理逻辑目录
│   │   ├── handlers/         # WebSocket事件处理器
│   │   │   └── messageHandler.ts # 例如,处理WebSocket消息的逻辑
│   │   ├── ws.d.ts           # 用于存放类型定义
│   │   └── index.ts          # WebSocket的设置和初始化逻辑

initWebSocket函数

在正式改造之前,我们先按照之前webserver.ts中的思路使用一个函数来初始化ws。具体函数如下:

// websockets/index.ts.ts
import logger from "@/logger";
import websocketPlugin, { WebSocket } from "@fastify/websocket";
import { type FastifyInstance, FastifyRequest } from "fastify";

// WebSocket初始化函数
const initWebSocket = (app: FastifyInstance) => {
  // 注册WebSocket插件
  app.register(websocketPlugin);

  // 设置WebSocket路由
  app.register(async function (fastify) {
    fastify.get(
      "/ws",
      { websocket: true },
      (socket: WebSocket, req: FastifyRequest) => {
        socket.on("message", (message: string) => {
          logger.info(JSON.stringify(req.headers));
          // 当从客户端接收到消息时
          socket.send(`你好:${message}`); // 回应客户端消息
        });
      }
    );
    logger.info("websocket 初始化成功");
  });
};
export default initWebSocket;

这段代码定义了一个 initWebSocket 函数,用于在 Fastify 应用中添加 WebSocket 支持。它通过注册 @fastify/websocket 插件并设置一个 /ws 路由来监听 WebSocket 连接。当有客户端通过 WebSocket 发送消息时,服务器会接收这些消息并回复一个简单的响应。

开始改造

房间信息

在改造过程中,我们首先对rooms常量引入了明确的类型注解和约束,以增强代码的类型安全性和可读性。rooms被定义为一个键值对集合,其中键(key)是一个字符串,表示房间的唯一标识符;值(value)是一个ExtendedWebSocket实例的Set集合。这里的ExtendedWebSocket是对标准WebSocket接口的扩展,它新增了roomuserId两个可选属性。这些属性分别用于指示WebSocket连接所属的房间和用户ID,从而使得按房间或用户ID进行WebSocket连接的管理和消息广播变得更为灵活和高效。

通过为rooms常量添加类型注解,我们不仅明确了每个房间所关联的WebSocket连接类型,还为后续的操作提供了类型检查,确保了代码的健壮性。

const rooms: { [key: string]: Set<ExtendedWebSocket> } = {}
// websockets/ws.d.ts
// 扩展WebSocket连接以包含房间和用户ID
export interface ExtendedWebSocket extends WebSocket {
  room?: string
  userId?: string
}

消息类型

对于加入房间(join)和发送消息到房间(message)也需要明确的类型注解和约束,让我们更新一下ws.d.ts文件

import { WebSocket } from '@fastify/websocket'
// 定义消息类型
export interface IJoinMessage {
type: 'join' // 指定消息类型为加入房间
room: string // 房间标识符
userId: string // 用户标识符
}

export interface IMessageMessage {
type: 'message' // 指定消息类型为发送消息
room: string // 房间标识符,指定消息发送到哪个房间
userId: string // 发送消息的用户标识符
content: string // 消息内容
}

// 使用类型联合来表示可能的消息
export type WebSocketMessage = JoinMessage | ChatMessage

// 扩展WebSocket连接以包含房间和用户ID
export interface ExtendedWebSocket extends WebSocket {
room?: string
userId?: string
}

使用TS和@fastify/websocket更新WebSocket聊天室代码

此次改造主要集中在将原始的JavaScript和WebSocket聊天室代码迁移到TypeScript和@fastify/websocket的组合上。通过这一转变,我们不仅利用了TypeScript的强类型系统来增强代码的健壮性和可维护性,还通过@fastify/websocket插件简化了WebSocket的初始化和管理过程。

  • 类型安全: 引入TypeScript后,我们通过定义ExtendedWebSocket和消息类型(如IJoinMessageIMessageMessage),为WebSocket连接和消息处理引入了类型检查,从而减少了运行时错误的可能性。
  • 代码组织: 使用@fastify/websocket插件,我们能够更简洁地设置WebSocket路由和处理逻辑,同时保持了代码的清晰和模块化。
  • 功能实现: 尽管技术栈发生了变化,但聊天室的基本功能和逻辑保持不变。这包括处理加入房间、消息广播以及客户端断开连接时的逻辑。
// websockets/index.ts
import logger from '@/logger'
import websocketPlugin from '@fastify/websocket'
import { type FastifyInstance } from 'fastify'
import {
  ExtendedWebSocket,
  IJoinMessage,
  IMessageMessage,
  WebSocketMessage
} from './ws'
import WebSocket from 'ws'

// WebSocket初始化函数
const initWebSocket = (app: FastifyInstance) => {
  // 注册WebSocket插件
  app.register(websocketPlugin, {
    options: { maxPayload: 1048576 }
  })
  const rooms: { [key: string]: Set<ExtendedWebSocket> } = {}
  app.register(async function (fastify) {
    fastify.get(
      '/ws',
      { websocket: true },
      (socket: ExtendedWebSocket /* WebSocket */, req /* FastifyRequest */) => {
        socket.on('message', (data: string) => {
          logger.info(`data: ${req.id}`)
          let message: WebSocketMessage
          try {
            message = JSON.parse(data)
          } catch (e) {
            socket.send(JSON.stringify({ error: '无法解析消息' }))
            return
          }
          const joinMsg = message as IJoinMessage
          const messageMsg = message as IMessageMessage
          switch (message.type) {
            case 'join':
              if (!rooms[joinMsg.room]) {
                rooms[joinMsg.room] = new Set()
              }
              rooms[joinMsg.room].add(socket)
              socket.room = joinMsg.room
              socket.userId = joinMsg.userId
              socket.send(
                JSON.stringify({
                  message: `你好,你已经加入房间 ${joinMsg.room}`
                })
              )
              rooms[joinMsg.room].forEach((client) => {
                if (client !== socket && client.readyState === WebSocket.OPEN) {
                  client.send(
                    JSON.stringify({
                      message: `欢迎 用户 ${joinMsg.userId} 加入房间 ${joinMsg.room}`
                    })
                  )
                }
              })
              break
            case 'message':
              if (rooms[messageMsg.room]) {
                rooms[messageMsg.room].forEach((client) => {
                  if (client.readyState === WebSocket.OPEN) {
                    client.send(
                      JSON.stringify({
                        message: `${
                          messageMsg.userId
                        } [${new Date().toISOString()}] : ${messageMsg.content}`
                      })
                    )
                  }
                })
              }
              break
          }
        })

        socket.on('close', () => {
          if (socket.room && rooms[socket.room]) {
            rooms[socket.room].delete(socket)
            if (rooms[socket.room].size === 0) {
              delete rooms[socket.room]
            }
          }
        })
      }
    )
    logger.info('websocket 初始化成功')
  })
}

export default initWebSocket

在http Server中注册websocket

webserver.ts中添加注册函数来启动websock

import fastify from "fastify";
import { getConfig } from "./config";
import logger from "./logger";
import errorHandlerPlugin from "./plugins/error-handler-plugin";
import initWebSocket from "./websockets";

// 创建 Fastify 应用实例,启用内置的日志记录功能
const app = fastify({
  logger: true,
});

// 定义一个异步函数来启动服务器
const startServer = async () => {
  const startTime = Date.now(); // 记录开始启动服务器的时间

  try {
    // 定义根路由,当访问 '/' 时返回 { hello: 'world' }
    app.get("/", async () => {
      return { hello: "world" };
    });

    // 注册自定义的错误处理插件
    await errorHandlerPlugin(app);

    // 使用WebSocket初始化函数初始化WebSocket支持
    initWebSocket(app); // --------------------新增了这一行

    // 从配置中获取应用程序信息
    const APP_INFO = getConfig("APP");
    // 启动服务器,监听配置中指定的端口和主机
    await app.listen({ port: APP_INFO.port, host: APP_INFO.host });

    const endTime = Date.now(); // 记录服务器启动完成的时间
    const startupTime = (endTime - startTime) / 1000; // 计算服务器启动耗时(秒)

    // 记录启动信息到日志
    logger.info(
      `Starting ${APP_INFO.name} server on ${APP_INFO.host}:${APP_INFO.port}`
    );
    // 记录启动耗时到日志
    logger.info(`Server started in ${startupTime} seconds.`);
  } catch (err) {
    // 如果启动过程中发生错误,则记录错误信息并退出进程
    app.log.error(err);
    process.exit(1);
  }
};

export { startServer };

npm run dev之后,可以使用websockt工具或者是文章HTML5的新特性(三)——Websocket (附一个ws聊天室demo) HTML代码来测试一下,记得改下连接地址哦~

 <title>WebSocket客户端</title>
    <script>
      document.addEventListener("DOMContentLoaded", () => {
        // 创建WebSocket连接到本地服务器
        const socket = new WebSocket("ws://localhost:3588/ws"); // ————————————————————这里
    </script>

image-20240415142545372转存失败,建议直接上传图片文件

重新组织代码架构

这才三个消息类型,代码已经是一坨了。后面更多消息类型加入,甚至添加横向扩展等逻辑之后现在这种全部写到一坨的代码结构会变得难以管理和维护。因此,重新组织和优化 WebSocket 代码架构是至关重要的。本节将介绍如何通过分离关注点、引入消息处理器和明确类型定义,来提高代码的可维护性和扩展性。

分离关注点:引入消息处理器

将 WebSocket 的消息处理逻辑分离到独立的模块中,可以使得主文件更加清晰,专注于 WebSocket 服务器的配置和初始化,而不是具体的业务逻辑。这样做的好处包括:

  • 提高代码的可读性:通过将复杂的业务逻辑分离到不同的文件中,代码结构变得更清晰,新成员更容易理解项目架构。
  • 便于功能扩展和维护:当需要添加新的消息类型或处理逻辑时,可以直接在对应的处理器中进行修改或添加,而不需要修改核心的 WebSocket 服务器代码。
  • 重用性:相似的消息处理逻辑可以在不同的项目中重用,减少了重复代码的编写。
bailing/
│
├── src/                      # 项目的源代码
│   ├── config/               # 配置文件目录
│   │   ├── config.d.ts       # 配置的TypeScript声明文件,用于类型安全
│   │   └── index.ts          # 配置文件的实现,负责加载和导出配置
│   │
│   ├── errors/               # 错误处理相关的目录
│   │   ├── custom-error.ts   # 自定义错误类,用于创建统一的错误响应
│   │   └── index.ts          # 错误处理的入口文件,可能用于汇总和导出错误类
│   │
│   ├── plugins/              # Fastify插件目录
│   │   └── error-handler-plugin.ts # 错误处理插件,用于全局错误处理
│   │
│   ├── websockets/           # WebSocket处理逻辑目录
│   │   ├── handlers/         # WebSocket事件处理器
│   │   │   └── messageHandler.ts # 例如,处理WebSocket消息的逻辑
│   │   ├── ws.d.ts           # 用于存放类型定义
│   │   └── index.ts          # WebSocket的设置和初始化逻辑
│   │
│   ├── logger.ts             # 日志配置文件,定义日志记录方式和配置
│   ├── webserver.ts          # Fastify服务器设置和启动逻辑
│   └── index.ts              # 应用入口文件,用于启动服务器和其他初始化设置
│
├── package.json              # 定义项目依赖和脚本的npm配置文件
├── package-lock.json         # 锁定安装时的包的版本,确保一致性
├── tsconfig.json             # TypeScript的编译配置文件

messageHandler 处理加入、发送、关闭连接消息

/**
 * 聊天应用的 WebSocket 消息处理模块。
 *
 * 本模块包括处理聊天消息、用户加入请求以及聊天室内连接关闭的处理器。
 *
 * @模块 MessageHandler
 */

import WebSocket from 'ws'
import { ExtendedWebSocket, IJoinMessage, IMessageMessage } from '../ws.d' // 假设您的类型定义在 ws.d.ts 中

// 存储聊天室的对象,每个聊天室包含一组已连接的客户端(ExtendedWebSocket)。
const rooms: { [key: string]: Set<ExtendedWebSocket> } = {}

/**
 * 处理传入的聊天消息,将消息广播给同一聊天室内的所有用户。
 *
 * @param {ExtendedWebSocket} socket - 发送消息用户的 WebSocket 连接。
 * @param {IMessageMessage} messageMsg - 包含房间、用户ID和内容的消息对象。
 */
export const handleMessage = (
  socket: ExtendedWebSocket,
  messageMsg: IMessageMessage
) => {
  // 检查目标聊天室是否存在
  if (rooms[messageMsg.room]) {
    // 遍历聊天室内的每个客户端,并发送消息
    rooms[messageMsg.room].forEach((client) => {
      // 确保客户端的连接状态为 OPEN
      if (client.readyState === WebSocket.OPEN) {
        client.send(
          JSON.stringify({
            message: `${messageMsg.userId} [${new Date().toISOString()}] : ${
              messageMsg.content
            }`
          })
        )
      }
    })
  }
}

/**
 * 处理用户加入聊天室的请求。将用户添加到聊天室并通知其他用户。
 *
 * @param {ExtendedWebSocket} socket - 加入用户的 WebSocket 连接。
 * @param {IJoinMessage} joinMsg - 包含房间和用户ID的加入消息。
 */
export const handleJoin = (
  socket: ExtendedWebSocket,
  joinMsg: IJoinMessage
) => {
  // 如果目标聊天室不存在,则创建一个新的聊天室
  if (!rooms[joinMsg.room]) {
    rooms[joinMsg.room] = new Set()
  }
  // 将用户添加到聊天室
  rooms[joinMsg.room].add(socket)
  // 在 socket 上记录用户的房间和用户ID
  socket.room = joinMsg.room
  socket.userId = joinMsg.userId
  // 向加入的用户发送欢迎消息
  socket.send(
    JSON.stringify({
      message: `你好,你已经加入房间 ${joinMsg.room}`
    })
  )
  // 通知聊天室内的其他用户有新用户加入
  rooms[joinMsg.room].forEach((client) => {
    if (client !== socket && client.readyState === WebSocket.OPEN) {
      client.send(
        JSON.stringify({
          message: `欢迎 用户 ${joinMsg.userId} 加入房间 ${joinMsg.room}`
        })
      )
    }
  })
}

/**
 * 处理 WebSocket 连接关闭,将用户从其所在的聊天室中移除。
 *
 * @param {ExtendedWebSocket} socket - 正在关闭的 WebSocket 连接。
 */
export const handleClose = (socket: ExtendedWebSocket) => {
  // 检查用户是否在某个聊天室内
  if (socket.room && rooms[socket.room]) {
    // 从聊天室中移除用户
    rooms[socket.room].delete(socket)
    // 如果聊天室变为空,则删除该聊天室
    if (rooms[socket.room].size === 0) {
      delete rooms[socket.room]
    }
  }
}

initWebSocket 调用消息处理Handler

// websockets/index.ts
import logger from '@/logger'
import websocketPlugin from '@fastify/websocket'
import { FastifyRequest, type FastifyInstance } from 'fastify'
import {
  ExtendedWebSocket,
  IJoinMessage,
  IMessageMessage,
  WebSocketMessage
} from './ws'
import {
  handleClose,
  handleJoin,
  handleMessage
} from './handlers/messageHandler'

// WebSocket初始化函数
const initWebSocket = (app: FastifyInstance) => {
  // 注册WebSocket插件
  app.register(websocketPlugin, {
    options: { maxPayload: 1048576 }
  })
  app.register(async function (fastify) {
    fastify.get(
      '/ws',
      { websocket: true },
      (socket: ExtendedWebSocket, req: FastifyRequest) => {
        socket.on('message', (data: string) => {
          logger.info(`data: ${JSON.stringify(req.headers)}`)
          let message: WebSocketMessage
          try {
            message = JSON.parse(data)
          } catch (e) {
            socket.send(JSON.stringify({ error: '无法解析消息' }))
            return
          }

          switch (message.type) {
            case 'join': {
              const joinMsg = message as IJoinMessage
              handleJoin(socket, joinMsg)
              break
            }
            case 'message': {
              const messageMsg = message as IMessageMessage
              handleMessage(socket, messageMsg)
              break
            }
          }
        })

        socket.on('close', () => {
          handleClose(socket)
        })
      }
    )
    logger.info('websocket 初始化成功')
  })
}

export default initWebSocket

总结

本文讨论了如何优化和重构 WebSocket 代码结构,以提高项目的可维护性和扩展性。通过引入消息处理器、明确的类型定义,以及将 WebSocket 逻辑组织到专门的目录和文件中,展示了如何构建一个既清晰又高效的 WebSocket 服务架构。这不仅使得代码更易于管理和扩展,也为团队合作和代码共享提供了便利。

展望

后续文章内容可能是使用MySQL/PostgreSQL(也有可能用到MongoDB)和Redis对与整个websocket部分进行升级,可能的内容包括数据库、认证、搜索(Elasicsearch/MeiliSearch)等方面内容。