掘金 后端 ( ) • 2024-06-27 16:56

theme: fancy

SSE是使用text/event-stream格式发送的,浏览器会自动将数据解析为事件。服务端只能单向的推送事件到客户端,客户端不能发送消息到服务端。

这里只是一个简单的示例,实际应用中,用户可能有不同得终端类型,后端也会集群部署以及防止消息漏送的问题。可能还会有性能相关的问题。

前端

通过使用EventSource对象来创建一个sse连接,然后通过addEventListener来监听事件。

const id = "as12121"
const eventSource = new EventSource('http://localhost:8080/sse?userId='+id,{
});
eventSource.addEventListener("message", (event) => {
  console.log(event);
});

后端

  1. 创建连接 需要防止重复推送的问题
    @GetMapping("/sse")
    @CrossOrigin(value = "*")
    public ResponseBodyEmitter sseServer(@RequestParam String userId){
        ResponseBodyEmitter emitter = null;
        if ((emitter = emitterMap.get(userId)) != null){
            return emitter;
        }
        // 判断其他服务器有没有对应的连接,有的话,就算了。直接返回。或者直接转发。可以通过直接调用或者通过mq推送之类的
        emitter = new SseEmitter(300000L);
        emitter.onTimeout(()->{
            emitterMap.remove(userId);
            log.info("timeout");
        });
        emitter.onCompletion(()->{
            emitterMap.remove(userId);
            log.info("completion");
        });
        // 在客户端断开连接的时候会触发error回调
        emitter.onError(e->{
            emitterMap.remove(userId);
            log.error("error",e);
        });
        log.info("create for {}",userId);
        emitterMap.put(userId, emitter);
        return emitter;
    }
  1. 推送消息
    private void send(String message){
        emitterMap.values().forEach(emitter -> {
            try {
                doSend(emitter,message);
            } catch (IOException e) {
                log.warn("客户端断开连接了");
            }
        });
    }
  1. 清理连接 需要定时清理不活跃的连接
private void scheduleCleanup() {
        new ScheduledThreadPoolExecutor(1).scheduleAtFixedRate(()->{
            emitterMap.values().removeIf(emitter -> {
                try{
                    ping(emitter);
                    return false;
                }catch (IOException e){
                    log.warn("清理一个不活跃的客户端");
                    return true;
                }
            });
        }, 0, 10, TimeUnit.SECONDS);
    }
  1. 完整代码
/**
 * 1. 建立连接
 * 2. 推送消息
 * 3. 清理连接
 *
 * 为了防止重复推送,需要保证全局唯一性
 */
@Slf4j
@RestController
@RequestMapping
public class SSEController {
    Map<String, ResponseBodyEmitter> emitterMap = new ConcurrentHashMap<>();
    /**
     * 定时清理不活跃的客户端
     */
    @PostConstruct
    private void scheduleCleanup() {
        new ScheduledThreadPoolExecutor(1).scheduleAtFixedRate(()->{
            emitterMap.values().removeIf(emitter -> {
                try{
                    ping(emitter);
                    return false;
                }catch (IOException e){
                    log.warn("清理一个不活跃的客户端");
                    return true;
                }
            });
        }, 0, 10, TimeUnit.SECONDS);
    }
    /**
     * 建立连接
     */
    @GetMapping("/sse")
    @CrossOrigin(value = "*")
    public ResponseBodyEmitter sseServer(@RequestParam String userId){
        ResponseBodyEmitter emitter = null;
        if ((emitter = emitterMap.get(userId)) != null){
            return emitter;
        }
        // 判断其他服务器有没有对应的连接,有的话,就算了。直接返回。或者直接转发。可以通过直接调用或者通过mq推送之类的
        emitter = new SseEmitter(300000L);
        emitter.onTimeout(()->{
            emitterMap.remove(userId);
            log.info("timeout");
        });
        emitter.onCompletion(()->{
            emitterMap.remove(userId);
            log.info("completion");
        });
        // 在客户端断开连接的时候会触发error回调
        emitter.onError(e->{
            emitterMap.remove(userId);
            log.error("error",e);
        });
        log.info("create for {}",userId);
        emitterMap.put(userId, emitter);
        return emitter;
    }
    /**
     * 推送消息,只需要通过emitter发送即可
     */
    @GetMapping("/send")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        send(message);
        return ResponseEntity.ok("ok");
    }
    private void ping(ResponseBodyEmitter emitter) throws IOException {
        Set<ResponseBodyEmitter.DataWithMediaType> dataWithMediaTypes = SseEmitter.event()
                .id(UUID.randomUUID().toString())
                .name("ping")
                .data("ping")
                .comment("comment")
                .build();
        emitter.send(dataWithMediaTypes);
    }
    private void send(String message){
        emitterMap.values().forEach(emitter -> {
            try {
                doSend(emitter,message);
            } catch (IOException e) {
                log.warn("客户端断开连接了");
            }
        });
    }
    private void doSend(ResponseBodyEmitter emitter,String message) throws IOException {
        Set<ResponseBodyEmitter.DataWithMediaType> dataWithMediaTypes = SseEmitter.event()
                .id(UUID.randomUUID().toString())
                .name("message")
                .data(message)
                .build();
        emitter.send(dataWithMediaTypes);
    }
}