theme: fancy
SSE是使用text/event-stream格式发送的,浏览器会自动将数据解析为事件。服务端只能单向的推送事件到客户端,客户端不能发送消息到服务端。
这里只是一个简单的示例,实际应用中,用户可能有不同得终端类型,后端也会集群部署以及防止消息漏送的问题。可能还会有性能相关的问题。
前端
通过使用EventSource对象来创建一个sse连接,然后通过addEventListener来监听事件。
const id = "as12121"
const eventSource = new EventSource('http://localhost:8080/sse?userId='+id,{
});
eventSource.addEventListener("message", (event) => {
console.log(event);
});
后端
- 创建连接 需要防止重复推送的问题
@GetMapping("/sse")
@CrossOrigin(value = "*")
public ResponseBodyEmitter sseServer(@RequestParam String userId){
ResponseBodyEmitter emitter = null;
if ((emitter = emitterMap.get(userId)) != null){
return emitter;
}
// 判断其他服务器有没有对应的连接,有的话,就算了。直接返回。或者直接转发。可以通过直接调用或者通过mq推送之类的
emitter = new SseEmitter(300000L);
emitter.onTimeout(()->{
emitterMap.remove(userId);
log.info("timeout");
});
emitter.onCompletion(()->{
emitterMap.remove(userId);
log.info("completion");
});
// 在客户端断开连接的时候会触发error回调
emitter.onError(e->{
emitterMap.remove(userId);
log.error("error",e);
});
log.info("create for {}",userId);
emitterMap.put(userId, emitter);
return emitter;
}
- 推送消息
private void send(String message){
emitterMap.values().forEach(emitter -> {
try {
doSend(emitter,message);
} catch (IOException e) {
log.warn("客户端断开连接了");
}
});
}
- 清理连接 需要定时清理不活跃的连接
private void scheduleCleanup() {
new ScheduledThreadPoolExecutor(1).scheduleAtFixedRate(()->{
emitterMap.values().removeIf(emitter -> {
try{
ping(emitter);
return false;
}catch (IOException e){
log.warn("清理一个不活跃的客户端");
return true;
}
});
}, 0, 10, TimeUnit.SECONDS);
}
- 完整代码
/**
* 1. 建立连接
* 2. 推送消息
* 3. 清理连接
*
* 为了防止重复推送,需要保证全局唯一性
*/
@Slf4j
@RestController
@RequestMapping
public class SSEController {
Map<String, ResponseBodyEmitter> emitterMap = new ConcurrentHashMap<>();
/**
* 定时清理不活跃的客户端
*/
@PostConstruct
private void scheduleCleanup() {
new ScheduledThreadPoolExecutor(1).scheduleAtFixedRate(()->{
emitterMap.values().removeIf(emitter -> {
try{
ping(emitter);
return false;
}catch (IOException e){
log.warn("清理一个不活跃的客户端");
return true;
}
});
}, 0, 10, TimeUnit.SECONDS);
}
/**
* 建立连接
*/
@GetMapping("/sse")
@CrossOrigin(value = "*")
public ResponseBodyEmitter sseServer(@RequestParam String userId){
ResponseBodyEmitter emitter = null;
if ((emitter = emitterMap.get(userId)) != null){
return emitter;
}
// 判断其他服务器有没有对应的连接,有的话,就算了。直接返回。或者直接转发。可以通过直接调用或者通过mq推送之类的
emitter = new SseEmitter(300000L);
emitter.onTimeout(()->{
emitterMap.remove(userId);
log.info("timeout");
});
emitter.onCompletion(()->{
emitterMap.remove(userId);
log.info("completion");
});
// 在客户端断开连接的时候会触发error回调
emitter.onError(e->{
emitterMap.remove(userId);
log.error("error",e);
});
log.info("create for {}",userId);
emitterMap.put(userId, emitter);
return emitter;
}
/**
* 推送消息,只需要通过emitter发送即可
*/
@GetMapping("/send")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
send(message);
return ResponseEntity.ok("ok");
}
private void ping(ResponseBodyEmitter emitter) throws IOException {
Set<ResponseBodyEmitter.DataWithMediaType> dataWithMediaTypes = SseEmitter.event()
.id(UUID.randomUUID().toString())
.name("ping")
.data("ping")
.comment("comment")
.build();
emitter.send(dataWithMediaTypes);
}
private void send(String message){
emitterMap.values().forEach(emitter -> {
try {
doSend(emitter,message);
} catch (IOException e) {
log.warn("客户端断开连接了");
}
});
}
private void doSend(ResponseBodyEmitter emitter,String message) throws IOException {
Set<ResponseBodyEmitter.DataWithMediaType> dataWithMediaTypes = SseEmitter.event()
.id(UUID.randomUUID().toString())
.name("message")
.data(message)
.build();
emitter.send(dataWithMediaTypes);
}
}