掘金 后端 ( ) • 2024-06-05 16:53

为什么写这个转流服务呢?起因是公司有多个数据中心智慧大屏项目,需要在大屏展示实时监控,监控设备一般都是海康、大华等。

针对项目的不同环境遇到如下几种情况:

  • 要求只需要内网范围访问,财务比较紧张,无资金购买海康等附加配套软件,比如海康的综合安防管理平台 解决方式:使用开源基于 GB28181-2016 标准实现的开箱即用的网络视频平台 wvp-GB28181-pro,该平台会提供多播放协议的播放地址 缺陷:搭建费时,耗资源,对 java 人员难以维护
  • 指定 ip 和端口才能被外网访问,安全性要求比较高,财力雄厚并购买了海康的综合安防管理平台,平台开放 api 只有指定 ip 才能访问, 并且通过 api 获取视频播放地址对外网不能访问,需要在指定 ip 服务器上进行转流处理 解决方式:指定 ip 服务器上的 Java 程序进行转流操作 缺陷:通过基于 Tomcat web 容器的并发量,连接数有限制,转流程序与业务代码偶合
  • 财力雄厚购买了海康的综合安防管理平台,平台开放 api 获取的播放地址可以被外网访问 解决方式:直接使用平台开放 api 获取的播放地址即可

针对前两种情况,这个转流服务即可满足

功能

  • 基于 flv 包封装 实现 http-flv、ws-flv
  • 支持 rtsp、rtmp 等网络流、本地视频
  • 支持 H.264 H.265 视频编码
  • 支持 转复用、转码

主要代码

完整代码,回复提供

依赖

按需导入,否则打出的包很大

 <properties>
    <lombok.version>1.18.22</lombok.version>
    <hutool-all.version>5.7.14</hutool-all.version>
    <javacv.version>1.5.9</javacv.version>
    <ffmpeg.version>6.0</ffmpeg.version>
    <javacpp.version>1.5.9</javacpp.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-thymeleaf</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>${lombok.version}</version>
    </dependency>

    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>${hutool-all.version}</version>
    </dependency>

    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
    </dependency>

    <!-- 单纯只导入javacv-->
    <dependency>
        <groupId>org.bytedeco</groupId>
        <artifactId>javacv</artifactId>
        <version>${javacv.version}</version>
        <!--排除里面ffmpeg javacpp opencv等相关依赖,自己按需导入-->
        <exclusions>
            <exclusion>
                <groupId>*</groupId>
                <artifactId>*</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

 <!-- 导入ffmpeg依赖-->
    <dependency>
        <groupId>org.bytedeco</groupId>
        <artifactId>ffmpeg</artifactId>
        <version>${ffmpeg.version}-${javacv.version}</version>
    </dependency>
    <!--linux平台导入ffmpeg依赖 -->
    <dependency>
        <groupId>org.bytedeco</groupId>
        <artifactId>ffmpeg</artifactId>
        <version>${ffmpeg.version}-${javacv.version}</version>
        <!--该标签标识了引入ffmpeg的平台版本,这是因为ffmpeg由c语言编写,在不同平台上的编译结果不同-->
        <classifier>linux-x86_64</classifier>
    </dependency>
    <!--windows平台导入ffmpeg依赖 -->
    <dependency>
        <groupId>org.bytedeco</groupId>
        <artifactId>ffmpeg</artifactId>
        <version>${ffmpeg.version}-${javacv.version}</version>
        <!--该标签标识了引入ffmpeg的平台版本,这是因为ffmpeg由c语言编写,在不同平台上的编译结果不同-->
        <classifier>windows-x86_64</classifier>
    </dependency>

</dependencies>

创建 Netty 服务

/**
 * @author LGC
 */
@Slf4j
@Component
public class NettyServer implements CommandLineRunner, DisposableBean {
    @Resource
    private MediaHandler mediaHandler;

    @Value("${media.port}")
    private Integer mediaPort;

    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private ChannelFuture channelFuture;

    @Override
    public void destroy() throws Exception {
        log.info("websocket server shutdown...");
        if (bossGroup != null) {
            bossGroup.shutdownGracefully().sync();
        }
        if (workerGroup != null) {
            workerGroup.shutdownGracefully().sync();
        }
        if (channelFuture != null) {
            channelFuture.channel().closeFuture().syncUninterruptibly();
        }
        log.info("websocket server shutdown");
    }

    @Override
    public void run(String... args) throws Exception {
        // bossGroup连接线程组,主要负责接受客户端连接,一般一个线程足矣
        bossGroup = new NioEventLoopGroup(1);
        // workerGroup工作线程组,主要负责网络IO读写
        workerGroup = new NioEventLoopGroup(4);
        try {
            // 服务端启动引导类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 服务端启动引导类绑定两个线程组
            serverBootstrap.group(bossGroup, workerGroup);
            // 设置通道为NioChannel
            serverBootstrap.channel(NioServerSocketChannel.class);
            // 可以对入站\出站事件进行日志记录,从而方便我们进行问题排查。
            serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
            // 首选直接内存
            serverBootstrap.option(ChannelOption.ALLOCATOR, PreferredDirectByteBufAllocator.DEFAULT);
            // 服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
            serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            // 设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
            serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
            // 将小的数据包包装成更大的帧进行传送,提高网络的负载,即TCP延迟传输 设置为不延时
            serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
            // 设置自定义的通道初始化器 入站、出站处理
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    CorsConfig corsConfig = CorsConfigBuilder.forAnyOrigin().allowNullOrigin().allowCredentials().build();

                    // 基于http协议,使用http的编码和解码器
                    pipeline.addLast(new HttpServerCodec());
                    // 以块方式写,添加ChunkedWriteHandler处理器
                    pipeline.addLast(new ChunkedWriteHandler());
                    /**
                     *  说明
                     *  1. http数据在传输过程中是分段, HttpObjectAggregator ,就是可以将多个段聚合
                     *  2. 这就就是为什么,当浏览器发送大量数据时,就会发出多次http请求
                     */
                    pipeline.addLast(new HttpObjectAggregator(1024 * 1024 * 1024));
                    /**
                     *说明
                     * 1. 对应websocket ,它的数据是以 帧(frame) 形式传递
                     * 2. 可以看到WebSocketFrame 下面有六个子类
                     * 3. 浏览器请求时 ws://localhost:8082/v1 表示请求的uri
                     * 4. WebSocketServerProtocolHandler 核心功能是将 http协议升级为 ws协议 , 保持长连接
                     * 5. 是通过一个 状态码 101
                     *
                     * 这里我们自己进行握手
                     */
//                    pipeline.addLast(new WebSocketServerProtocolHandler("/v1", null, true, 65535));
                    // 跨域handler
                    pipeline.addLast(new CorsHandler(corsConfig));
                    // 添加自定义的handler
                    pipeline.addLast(mediaHandler);
                }
            });

            // 异步IO的结果
            channelFuture = serverBootstrap.bind(new InetSocketAddress("0.0.0.0", mediaPort)).sync();
            // 服务端启动监听事件
            channelFuture.addListener(future -> {
                if (future.isSuccess()) {
                    log.info("websocket start success");
                } else {
                    log.info("websocket start failed");
                }
            });
        } catch (Exception e) {
            log.info("websocket start error");
        }
    }
}

媒体请求处理 Handler

/**
 * 媒体请求处理Handler
 *
 * @author LGC
 */
@Slf4j
@Component
@ChannelHandler.Sharable
public class MediaHandler extends SimpleChannelInboundHandler<Object> {


    private final static String MEDIA_NAME = "media-serve";

    private final MediaService mediaService;
    /**
     * http-flv
     */
    private CameraDTO httpCamera = null;
    /**
     * ws-flv
     */
    private CameraDTO wsCamera = null;


    public MediaHandler( MediaService mediaService ) {
        this.mediaService = mediaService;
    }


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("连接新增");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("连接关闭");
        close(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("连接异常", cause);
        close(ctx);
    }

    private void close(ChannelHandlerContext ctx) {
        if (httpCamera != null) {
            log.info("http-flv 关闭播放,url:{}", httpCamera.getUrl());
            mediaService.closeForHttp(httpCamera, ctx);
        }
        if (wsCamera != null) {
            log.info("http-ws 关闭播放,url:{}", wsCamera.getUrl());
            mediaService.closeForWs(wsCamera, ctx);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

        if (msg instanceof FullHttpRequest) {
            log.info("处理http请求");
            FullHttpRequest req = (FullHttpRequest) msg;
            // 请求解析
            QueryStringDecoder decoder = new QueryStringDecoder(req.uri());
            log.info("【HttpRequest-PATH:" + decoder.path() + "】");
            log.info("【HttpRequest-URI:" + decoder.uri() + "】");
            log.info("【HttpRequest-Parameters:" + decoder.parameters() + "】");
            log.info("【HttpRequest-Method:" + req.method().name() + "】");

            Iterator<Map.Entry<String, String>> iterator = req.headers().iteratorAsString();
            while (iterator.hasNext()) {
                Map.Entry<String, String> entry = iterator.next();
                log.info("【Header-Key:" + entry.getKey() + ";Header-Value:" + entry.getValue() + "】");
            }
            // http请求及非转换升级为ws请求
            if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
                log.info("http请求 响应构建");
                // 判断请求uri 是否是转流指定url
                if ("/live".equals(decoder.path())) {
                    HttpResponse rsp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                    rsp.headers()
                            .set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
                            .set(HttpHeaderNames.CONTENT_TYPE, "video/x-flv").set(HttpHeaderNames.ACCEPT_RANGES, "bytes")
                            .set(HttpHeaderNames.PRAGMA, "no-cache").set(HttpHeaderNames.CACHE_CONTROL, "no-cache")
                            .set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED).set(HttpHeaderNames.SERVER, MEDIA_NAME);
                    ctx.writeAndFlush(rsp);
                    httpCamera = buildCamera(req.uri());
                    mediaService.playForHttp(httpCamera, ctx);
                }
                // 非转流指定url 关闭连接
                else {
                    ByteBuf content = Unpooled.copiedBuffer(MEDIA_NAME, CharsetUtil.UTF_8);
                    FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
                    response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");
                    response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
                    ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
                }
            }
            // http请求升级为ws
            else {
                // 升级为ws握手
                upgradeWsHandshaker(ctx, req, decoder);
                // 判断请求uri 是否是转流指定url
                if ("/live".equals(decoder.path())) {
                    wsCamera = buildCamera(req.uri());
                    mediaService.playForWs(wsCamera, ctx);
                }
            }
        }

        if (msg instanceof WebSocketFrame) {
            log.info("处理ws请求");
            WebSocketFrame webSocketFrame = (WebSocketFrame) msg;
            handlerWebSocketRequest(ctx, webSocketFrame);
        }
    }


    /**
     * ws://localhost:9998/live?url=rtsp://admin:[email protected]:554/Streaming/Channels/1
     */
    private CameraDTO buildCamera(String url) {
        CameraDTO cameraDto = new CameraDTO();
        String[] split = url.split("url=");
        cameraDto.setUrl(split[1]);
        cameraDto.setMediaKey(SecureUtil.md5(cameraDto.getUrl()));
        if (isLocalFile(cameraDto.getUrl())) {
            cameraDto.setType(1);
        }
        return cameraDto;
    }

    /**
     * url 地址类型
     */
    private boolean isLocalFile(String streamUrl) {
        // 协议
        List<String> protocols = Arrays.asList("http", "https", "ws", "wss", "rtsp", "rtmp");
        String[] split = streamUrl.trim().split("\:");
        if (split.length > 0) {
            if (protocols.contains(split[0])) {
                return false;
            }
        }
        return true;
    }


    /**
     * 升级为ws握手
     * 参考摘自 WebSocketServerProtocolHandshakeHandler
     */
    private static void upgradeWsHandshaker(ChannelHandlerContext ctx, FullHttpRequest req, QueryStringDecoder decoder) {
        log.info("websocket握手,请求升级");
        log.info("getWebSocketLocation:{}", getWebSocketLocation(ctx.pipeline(), req, decoder.path()));
        // 参数分别是ws地址,子协议,是否扩展,最大frame负载长度
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                getWebSocketLocation(ctx.pipeline(), req, decoder.path()),
                null, true, 3 * 1024 * 1024);
        WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            // 握手
            HttpResponse rsp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
            rsp.headers().set(HttpHeaderNames.SERVER, MEDIA_NAME);
            ChannelPromise channelPromise = ctx.channel().newPromise();
//                    ChannelPromise channelPromise = new DefaultChannelPromise(ctx.channel());
            final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req, rsp.headers(), channelPromise);
            handshakeFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    if (!future.isSuccess()) {
                        channelPromise.tryFailure(future.cause());
                        log.info("握手失败");
                        ctx.fireExceptionCaught(future.cause());
                    } else {
                        channelPromise.trySuccess();
                        log.info("握手成功");
                        // 发送握手成功事件,后面handler可监听
                        // Kept for compatibility
//                                ctx.fireUserEventTriggered(
//                                        WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE);
//                                ctx.fireUserEventTriggered(
//                                         new WebSocketServerProtocolHandler.HandshakeComplete(
//                                                req.uri(), req.headers(), handshaker.selectedSubprotocol()));
                    }
                }
            });
        }
    }

    /**
     * 摘自 WebSocketServerProtocolHandshakeHandler
     */
    private static String getWebSocketLocation(ChannelPipeline cp, HttpRequest req, String path) {
        String protocol = "ws";
        if (cp.get(SslHandler.class) != null) {
            // SSL in use so use Secure WebSockets
            protocol = "wss";
        }
        String host = req.headers().get(HttpHeaderNames.HOST);
        return protocol + "://" + host + path;
    }

    /**
     * 处理ws 消息
     */
    private static void handlerWebSocketRequest(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // 文本消息
        if (frame instanceof TextWebSocketFrame) {
            log.info("文本消息");
            return;
        }
        // 握手PING/PONG信息
        if (frame instanceof PingWebSocketFrame) {
            log.info("PING消息");
            ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        // 二进制信息
        if (frame instanceof BinaryWebSocketFrame) {
            log.info("二进制信息");
            return;
        }

        // 请求关闭连接信息
        if (frame instanceof CloseWebSocketFrame) {
            log.info("关闭ws信息");
        }
    }

}

媒体服务类

/**
 * 媒体服务
 *
 * @author LGC
 */
@Component
@RequiredArgsConstructor
public class MediaService {

    private final ThreadPoolExecutor threadPoolExecutor;

    /**
     * 缓存流转换线程
     */
    public static ConcurrentHashMap<String, MediaTransfer> mediaTransferMap = new ConcurrentHashMap<>();


    /**
     * http-flv播放
     */
    public void playForHttp(CameraDTO cameraDto, ChannelHandlerContext ctx) {
        if (mediaTransferMap.containsKey(cameraDto.getMediaKey())) {
            MediaTransfer mediaConvert = mediaTransferMap.get(cameraDto.getMediaKey());
            if (mediaConvert instanceof MediaTransferFlvByJavacv) {
                MediaTransferFlvByJavacv mediaTransferFlvByJavacv = (MediaTransferFlvByJavacv) mediaConvert;
                mediaTransferFlvByJavacv.addClient(ctx, ClientType.HTTP);
            }
        } else {
            MediaTransferFlvByJavacv mediaConvert = new MediaTransferFlvByJavacv(cameraDto);
            mediaTransferMap.put(cameraDto.getMediaKey(), mediaConvert);
            threadPoolExecutor.execute(mediaConvert);
            mediaConvert.addClient(ctx, ClientType.HTTP);
        }
    }

    /**
     * http-flv 关闭播放
     */
    public void closeForHttp(CameraDTO cameraDto, ChannelHandlerContext ctx) {
        if (mediaTransferMap.containsKey(cameraDto.getMediaKey())) {
            MediaTransfer mediaConvert = mediaTransferMap.get(cameraDto.getMediaKey());
            if (mediaConvert instanceof MediaTransferFlvByJavacv) {
                MediaTransferFlvByJavacv mediaTransferFlvByJavacv = (MediaTransferFlvByJavacv) mediaConvert;
                mediaTransferFlvByJavacv.removeClient(ctx, ClientType.HTTP);
            }
        }
    }


    /**
     * ws-flv播放
     */
    public void playForWs(CameraDTO cameraDto, ChannelHandlerContext ctx) {
        if (mediaTransferMap.containsKey(cameraDto.getMediaKey())) {
            MediaTransfer mediaConvert = mediaTransferMap.get(cameraDto.getMediaKey());
            if (mediaConvert instanceof MediaTransferFlvByJavacv) {
                MediaTransferFlvByJavacv mediaTransferFlvByJavacv = (MediaTransferFlvByJavacv) mediaConvert;
                mediaTransferFlvByJavacv.addClient(ctx, ClientType.WEBSOCKET);
            }
        } else {
            MediaTransferFlvByJavacv mediaConvert = new MediaTransferFlvByJavacv(cameraDto);
            mediaTransferMap.put(cameraDto.getMediaKey(), mediaConvert);
            threadPoolExecutor.execute(mediaConvert);
            mediaConvert.addClient(ctx, ClientType.WEBSOCKET);
        }
    }

    /**
     * http-ws 关闭播放
     */
    public void closeForWs(CameraDTO cameraDto, ChannelHandlerContext ctx) {
        if (mediaTransferMap.containsKey(cameraDto.getMediaKey())) {
            MediaTransfer mediaConvert = mediaTransferMap.get(cameraDto.getMediaKey());
            if (mediaConvert instanceof MediaTransferFlvByJavacv) {
                MediaTransferFlvByJavacv mediaTransferFlvByJavacv = (MediaTransferFlvByJavacv) mediaConvert;
                mediaTransferFlvByJavacv.removeClient(ctx, ClientType.WEBSOCKET);
            }
        }
    }
}

javacv 媒体转流类

/**
 * 通过 javacv 将 媒体转流封装为flv格式
 * <p/>
 * 支持转复用和转码
 * <p/>
 * 转复用 流来源是视频H264格式,音频AAC格式
 * <p/>
 * 转码  流来源不是视频H264格式,音频AAC格式 转码为视频H264格式,音频AAC格式
 *
 * @author LGC
 */
@Slf4j
public class MediaTransferFlvByJavacv implements MediaTransfer, Runnable {
    static {
        // 设置日志级别
        avutil.av_log_set_level(avutil.AV_LOG_ERROR);
        FFmpegLogCallback.set();
    }

    /**
     * 相机
     */
    private final CameraDTO camera;
    /**
     * http客户端
     */
    private final ConcurrentHashMap<String, ChannelHandlerContext> httpClients = new ConcurrentHashMap<>();
    /**
     * ws客户端
     */
    private final ConcurrentHashMap<String, ChannelHandlerContext> wsClients = new ConcurrentHashMap<>();
    /**
     * 运行状态
     */
    private volatile boolean running = false;
    /**
     * 拉流器创建状态
     */
    private boolean grabberStatus = false;
    /**
     * 推流录制器创建状态
     */
    private boolean recorderStatus = false;
    /**
     * 拉流器
     */
    private FFmpegFrameGrabber grabber;
    /**
     * 推流录制器
     */
    private FFmpegFrameRecorder recorder;
    /**
     * true:转复用,false:转码 默认转码
     */
    private boolean transferFlag = false;
    /**
     * flv header 转FLV格式的头信息 如果有第二个客户端播放首先要返回头信息
     */
    private byte[] header = null;
    /**
     * 视频输出流
     */
    private final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

    /**
     * 当前http连接 当前ws连接 在线人数
     */
    private int hcSize, wcSize = 0;

    /**
     * 没有客户端时候的计时
     */
    private long noClient = 0;


    public MediaTransferFlvByJavacv(CameraDTO camera) {
        super();
        this.camera = camera;
    }

    public boolean isRunning() {
        return running;
    }

    public void setRunning(boolean running) {
        this.running = running;
    }

    public boolean isGrabberStatus() {
        return grabberStatus;
    }

    public boolean isRecorderStatus() {
        return recorderStatus;
    }

    /**
     * 创建拉流器
     */
    private boolean createGrabber() {
        // 拉流器
        grabber = new FFmpegFrameGrabber(camera.getUrl());
        // 像素格式
        grabber.setPixelFormat(avutil.AV_PIX_FMT_YUV420P);
        // 超时时间(15秒)
        grabber.setOption("stimeout", camera.getNetTimeout());
//        grabber.setOption("threads", "1"); //线程数
        // 设置缓存大小,提高画质、减少卡顿花屏
        grabber.setOption("buffer_size", "1024000");
        // 读写超时,适用于所有协议的通用读写超时
        grabber.setOption("rw_timeout", camera.getReadOrWriteTimeout());
        // 探测视频流信息,为空默认5000000微秒
        grabber.setOption("probesize", camera.getReadOrWriteTimeout());
        // 解析视频流信息,为空默认5000000微秒
        grabber.setOption("analyzeduration", camera.getReadOrWriteTimeout());

        // 如果为rtsp流,增加配置
        if ("rtsp".equals(camera.getUrl().substring(0, 4))) {
            // 设置打开协议tcp / udp
            grabber.setOption("rtsp_transport", "tcp");
            // 首选TCP进行RTP传输
            grabber.setOption("rtsp_flags", "prefer_tcp");

        } else if ("rtmp".equals(camera.getUrl().substring(0, 4))) {
            // rtmp拉流缓冲区,默认3000毫秒
            grabber.setOption("rtmp_buffer", "1000");
            // 默认rtmp流为直播模式,不允许seek
            grabber.setOption("rtmp_live", "live");
        }
        try {
            grabber.start();
            log.info("\r\n{}\r\n启动拉流器成功", camera.getUrl());
            return grabberStatus = true;
        } catch (Exception e) {
            MediaService.mediaTransferMap.remove(camera.getMediaKey());
            log.error("\r\n{}\r\n启动拉流器失败,网络超时或视频源不可用", camera.getUrl(), e);
        }
        return grabberStatus = false;
    }

    /**
     * 创建转码推流录制器
     */
    private boolean createRecorder() {
        recorder = new FFmpegFrameRecorder(byteArrayOutputStream, grabber.getImageWidth(), grabber.getImageHeight(),
                grabber.getAudioChannels());
        recorder.setFormat("flv"); // 视频封装格式
        if (!transferFlag) {
            // 转码
            log.info("转码模式");
            recorder.setInterleaved(true);//允许交叉
            recorder.setVideoOption("tune", "zerolatency");// 编码延时 zerolatency(零延迟)
            recorder.setVideoOption("preset", "ultrafast");// 编码速度 ultrafast(极快) fast(快)
            recorder.setVideoOption("crf", "28");// 编码质量 有效范围为0到63,数字越大表示质量越低,从主观上讲,18~28是一个合理的范围。18被认为是视觉无损的
//            recorder.setVideoOption("threads", "1"); // 线程数
            recorder.setFrameRate(25);// 设置帧率
            recorder.setSampleRate(grabber.getSampleRate());
            recorder.setGopSize(50);// 设置gop,与帧率相同,相当于间隔1秒一个关键帧
            recorder.setPixelFormat(avutil.AV_PIX_FMT_YUV420P); // 像素格式
            recorder.setVideoBitrate(grabber.getVideoBitrate());// 视频码率
//   recorder.setVideoBitrate(600 * 1000);// 码率600kb/s
            recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264);// 视频编码
            recorder.setVideoBitrate(grabber.getVideoBitrate()); // 视频比特率
            if (grabber.getAudioChannels() > 0) {
                recorder.setAudioChannels(grabber.getAudioChannels());
                recorder.setAudioBitrate(grabber.getAudioBitrate());
                recorder.setAudioCodec(avcodec.AV_CODEC_ID_AAC);
            }
            // 启用RDOQ算法,优化视频质量 1:在视频码率和视频质量之间取得平衡 2:最大程度优化视频质量(会降低编码速度和提高码率)
            recorder.setTrellis(1);
            recorder.setMaxDelay(0);// 设置延迟
            try {
                recorder.start();
                log.info("\r\n{}\r\n启动转码录制器成功", camera.getUrl());
                return recorderStatus = true;
            } catch (org.bytedeco.javacv.FrameRecorder.Exception e1) {
                log.error("启动转码录制器失败", e1);
                MediaService.mediaTransferMap.remove(camera.getMediaKey());
            }
        } else {
            // 转复用
            log.info("转复用模式");
            // 不让recorder关联关闭outputStream
            recorder.setCloseOutputStream(false);
            try {
                recorder.start(grabber.getFormatContext());
                log.info("\r\n{}\r\n启动转复用录制器成功", camera.getUrl());
                return recorderStatus = true;
            } catch (org.bytedeco.javacv.FrameRecorder.Exception e) {
                log.warn("\r\n{}\r\n启动转复用录制器失败", camera.getUrl());
                // 如果转复用失败,则自动切换到转码模式
                transferFlag = false;
                log.info("转复用失败,自动切换到转码模式");
                if (recorder != null) {
                    try {
                        recorder.close();
                    } catch (org.bytedeco.javacv.FrameRecorder.Exception e1) {
                    }
                }
                if (createRecorder()) {
                    log.error("\r\n{}\r\n切换到转码模式", camera.getUrl());
                    return true;
                }
                log.error("\r\n{}\r\n切换转码模式失败", camera.getUrl(), e);
            }
        }
        return recorderStatus = false;
    }

    /**
     * 是否支持Flv格式编解码器
     */
    private boolean supportFlvFormatCodec() {
        int videoCodec = grabber.getVideoCodec();
        int audioCodec = grabber.getAudioCodec();
        return (camera.getType() == 0)
                && (avcodec.AV_CODEC_ID_H264 == videoCodec)
                && (avcodec.AV_CODEC_ID_AAC == audioCodec || grabber.getAudioChannels() == 0);
    }

    /**
     * 发送帧数据
     */
    private void sendFrameData(byte[] data) {
        // http
        for (Entry<String, ChannelHandlerContext> entry : httpClients.entrySet()) {
            try {
                entry.getValue().writeAndFlush(Unpooled.copiedBuffer(data));
            } catch (java.lang.Exception e) {
                httpClients.remove(entry.getKey());
                hasClient();
                log.error("关闭 http 客户端异常", e);
            }
        }
        // ws
        for (Entry<String, ChannelHandlerContext> entry : wsClients.entrySet()) {
            try {
                entry.getValue().writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(data)));
            } catch (java.lang.Exception e) {
                wsClients.remove(entry.getKey());
                hasClient();
                log.error("关闭 ws 客户端异常", e);
            }
        }

    }

    /**
     * 将视频流转换为flv
     */
    private void transferStream2Flv() {
        if (!createGrabber()) {
            return;
        }
        transferFlag = supportFlvFormatCodec();
        if (!createRecorder()) {
            return;
        }

        try {
            grabber.flush();
        } catch (Exception e) {
            log.info("清空拉流器缓存失败", e);
        }
        if (header == null) {
            header = byteArrayOutputStream.toByteArray();
            byteArrayOutputStream.reset();
        }

        running = true;

        // 启动监听线程 自动关闭推流
        listenClient();

        int nullNumber = 0;
        for (; running && grabberStatus && recorderStatus; ) {
            try {
                // 转复用
                if (transferFlag) {
                    // 判断读空包
                    AVPacket pkt = grabber.grabPacket();
                    if (null != pkt && !pkt.isNull()) {
                        recorder.recordPacket(pkt);
                        // 发送数据帧到客户端
                        if (byteArrayOutputStream.size() > 0) {
                            byte[] b = byteArrayOutputStream.toByteArray();
                            byteArrayOutputStream.reset();
                            // 发送帧数据
                            sendFrameData(b);
                        }
                        avcodec.av_packet_unref(pkt);
                    } else {
                        nullNumber++;
                        log.info("转复用空包:{},url:{}", nullNumber, camera.getUrl());
                        if (nullNumber > 200) {
                            break;
                        }
                    }
                }
                // 转码
                else {
                    // 判断读空包
                    Frame frame = grabber.grab();
                    if (frame != null) {
                        recorder.record(frame);
                        // 发送数据帧到客户端
                        if (byteArrayOutputStream.size() > 0) {
                            byte[] b = byteArrayOutputStream.toByteArray();
                            byteArrayOutputStream.reset();
                            // 发送帧数据
                            sendFrameData(b);
                        }
                    } else {
                        nullNumber++;
                        log.info("转码空包:{},url:{}", nullNumber, camera.getUrl());
                        if (nullNumber > 200) {
                            break;
                        }
                    }
                }
            } catch (Exception e) {
                log.error("转码或转复用异常", e);
                grabberStatus = false;
                MediaService.mediaTransferMap.remove(camera.getMediaKey());
            } catch (org.bytedeco.javacv.FrameRecorder.Exception e) {
                log.error("转码或转复用异常", e);
                recorderStatus = false;
                MediaService.mediaTransferMap.remove(camera.getMediaKey());
            }
        }
        try {
            recorder.close();
            grabber.close();
            byteArrayOutputStream.close();
        } catch (java.lang.Exception e) {
            log.error("关闭媒体流Exception", e);
        } finally {
            closeMedia();
        }
        log.info("关闭媒体流 url,{} ", camera.getUrl());
    }


    /**
     * 判断有没有客户端, 自动关闭推流
     */
    private void hasClient() {
        int newHcSize = httpClients.size();
        int newWcSize = wsClients.size();
        if (hcSize != newHcSize || wcSize != newWcSize) {
            hcSize = newHcSize;
            wcSize = newWcSize;
            log.info("\r\n{}\r\nhttp连接数:{}, ws连接数:{} \r\n", camera.getUrl(), newHcSize, newWcSize);
        }

        // 无需自动关闭
        if (!camera.isAutoClose()) {
            return;
        }

        if (httpClients.isEmpty() && wsClients.isEmpty()) {
            // 等待1分钟还没有客户端,则关闭推流
            if (noClient > camera.getAutoCloseDuration()) {
                closeMedia();
            } else {
                // 增加1秒
                noClient += 1;
                log.info("\r\n{}\r\n {} 秒后自动关闭推拉流 \r\n", camera.getUrl(), camera.getAutoCloseDuration() - noClient);
            }
        } else {
            // 重置计时
            noClient = 0;
        }
    }

    /**
     * 监听客户端,自动关闭推流
     */
    private void listenClient() {
        /**
         * 监听线程,用于监听状态
         */
        new Thread(() -> {
            while (running) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
                hasClient();
            }
        }).start();
    }


    /**
     * 关闭流媒体
     */
    private void closeMedia() {
        running = false;
        MediaService.mediaTransferMap.remove(camera.getMediaKey());
        // 媒体异常时,主动断开前端长连接
        for (Entry<String, ChannelHandlerContext> entry : httpClients.entrySet()) {
            try {
                entry.getValue().close();
            } finally {
                httpClients.remove(entry.getKey());
            }
        }
        for (Entry<String, ChannelHandlerContext> entry : wsClients.entrySet()) {
            try {
                entry.getValue().close();
            } finally {
                wsClients.remove(entry.getKey());
            }
        }
    }


    /**
     * 新增客户端
     */
    public void addClient(ChannelHandlerContext ctx, ClientType clientType) {
        int timeout = 0;
        while (true) {
            try {
                if (header != null) {
                    try {
                        // 发送帧前先发送header
                        if (ClientType.HTTP.getType() == clientType.getType()) {
                            ChannelFuture future = ctx.writeAndFlush(Unpooled.copiedBuffer(header));
                            future.addListener(future1 -> {
                                if (future1.isSuccess()) {
                                    httpClients.put(ctx.channel().id().toString(), ctx);
                                }
                            });
                        } else if (ClientType.WEBSOCKET.getType() == clientType.getType()) {
                            ChannelFuture future = ctx
                                    .writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(header)));
                            future.addListener(future12 -> {
                                if (future12.isSuccess()) {
                                    wsClients.put(ctx.channel().id().toString(), ctx);
                                }
                            });
                        }

                    } catch (java.lang.Exception e) {
                        log.info("添加客户端异常", e);
                    }
                    break;
                }

                // 等待推拉流启动
                Thread.sleep(50);
                // 启动录制器失败
                timeout += 50;
                if (timeout > 30000) {
                    break;
                }
            } catch (java.lang.Exception e) {
                log.error("添加客户端异常", e);
            }
        }
    }


    /**
     * 移除客户端
     */
    public void removeClient(ChannelHandlerContext ctx, ClientType clientType) {
        if (ClientType.HTTP.getType() == clientType.getType()) {
            httpClients.remove(ctx.channel().id().toString(), ctx);
            hasClient();
        } else if (ClientType.WEBSOCKET.getType() == clientType.getType()) {
            wsClients.remove(ctx.channel().id().toString(), ctx);
            hasClient();
        }
        ctx.close();

    }


    @Override
    public void run() {
        transferStream2Flv();
    }

}

访问测试

播放格式

http://localhost:9998/live?url=播放地址

ws://localhost:9998/live?url=播放地址

播放 url

各品牌海康、大华播放地址如何拼接自行百度

海康播放地址: rtsp://admin:[email protected]:554/Streaming/Channels/1

本地视频:F:/otherGitProject/video/file.mp4

使用方式

前端使用西瓜播放器,flv.js 进行播放

本地客户端使用 vlc 播放

vlc 客户端测试

客户端加入地址

http://localhost:9998/live?url=rtsp://admin:[email protected]:554/Streaming/Channels/1

ws://localhost:9998/live?url=rtsp://admin:[email protected]:554/Streaming/Channels/1

本地 flv.js 前端测试

http://localhost:9997/live?rtsp://admin:[email protected]:554/Streaming/Channels/1

http://localhost:9997/live?F:/otherGitProject/video/file.mp4