#include "readstream.h" ReadStream::ReadStream(QObject *parent) : QObject{parent} { // m_packetsQueue.reserve(QUEUECAPACITY); // m_saverQueue.reserve(QUEUECAPACITY); // m_pusherQueue.reserve(QUEUECAPACITY); initFFmpeg(); } ReadStream::~ReadStream() { if (udpSocket) { udpSocket->abort(); udpSocket->deleteLater(); } if (m_saveFile.isOpen()) { m_saveFile.close(); } } bool ReadStream::openFile(const QString &url) { if (url.isNull()) return false; AVDictionary *dict = nullptr; // 如果设置失败,则设置UDP传输 // av_dict_set(&dict, "rtsp_transport", "udp", 0); // ////注:设置tcp会导致吊舱拉流中断 设置缓存大小,1080p可将值调大 av_dict_set(&dict, "buffer_size", "4096000", 0); // 设置超时断开连接时间,单位微秒//listen_timeout // av_dict_set(&avdic, "listen_timeout", "200000", 0); av_dict_set(&dict, "stimeout", "5000000", 0); // 设置超时5秒 if (url.left(4) != "rtmp") av_dict_set(&dict, "timeout", "5000000", 0); // 设置5秒超时 av_dict_set(&dict, "max_delay", "300000", 0); // 设置最大时延300ms av_dict_set(&dict, "tune", "zerolatency", 0); // 实时编码 av_dict_set(&dict, "preset", "faster", 0); // ultrafast,faster av_dict_set(&dict, "threads", "auto", 0); // 自动开启线程数 // 设置最大重试时间为1s,解决avformat_open_input打开空流时间过长问题 av_dict_set(&dict, "max_interleave_delta", "1000000", 0); // av_dict_set(&dict, "reconnect", "1", 0); // 开启自动重连 // av_dict_set(&dict, "reconnect_streamed", "1", 0); // 对于流媒体自动重连 // av_dict_set(&dict, "reconnect_at_eof", "1", 0); av_dict_set(&dict, "reconnect_delay_max", "5", 0); // 最大重连延时 5 秒 av_dict_set(&dict, "probesize", "10000000", 0); // 10 MB av_dict_set(&dict, "analyzeduration", "20000000", 0); // 20 秒 // 设置 UDP 缓冲区大小 av_dict_set(&dict, "fifo_size", "1000000", 0); // 设置缓冲区大小为1MB // 设置非阻塞标志 // av_dict_set(&dict, "flags", "+nonblock", 0); // 打开输入流之前,设置非阻塞模式 // m_formatContext = avformat_alloc_context(); // m_formatContext->flags |= AVFMT_FLAG_NONBLOCK; // 打开输入流并返回解封装上下文 int ret = avformat_open_input( &m_formatContext, // 返回解封装上下文 url.toUtf8().data(), // 打开视频地址 nullptr, // 如果非null,此参数强制使用特定的输入格式。自动选择解封装器(文件格式) &dict); // 参数设置 // 释放参数字典 if (dict) { av_dict_free(&dict); } // 打开视频失败 if (ret < 0) { showError(ret); free(); return false; } // m_formatContext->flags |= AVFMT_FLAG_GENPTS; // 设置标志生成时间戳 // 读取媒体文件的数据包以获取流信息。 ret = avformat_find_stream_info(m_formatContext, nullptr); if (ret < 0) { showError(ret); free(); return false; } // 通过AVMediaType枚举查询视频流ID(也可以通过遍历查找),最后一个参数无用 m_videoIndex = av_find_best_stream(m_formatContext, AVMEDIA_TYPE_VIDEO, -1, -1, nullptr, 0); if (m_videoIndex < 0) { showError(m_videoIndex); free(); return false; } // 获取视频时长 if (m_formatContext->duration != AV_NOPTS_VALUE) { m_videoDuration = m_formatContext->duration / (double)AV_TIME_BASE; qDebug() << m_videoDuration; } else { m_videoDuration = -1; } m_startTime = m_formatContext->start_time; m_inStreamTimeBase = m_formatContext->streams[m_videoIndex]->time_base; // m_startTime = -1; m_pullURL = url; return initObject(); } bool ReadStream::setStreamDecoder(DecodeStream *decodeStreamer) { if (decodeStreamer) { // QMutexLocker locker(&m_mutex); m_streamDecoder = decodeStreamer; m_queueManager.clearDecodeQueue(); m_decodeStreamFlag = m_streamDecoder->init( &m_queueManager, m_formatContext, m_videoIndex); if (m_decodeStreamFlag) emit decodeStreamer->startDecodeSignal(); return m_decodeStreamFlag; } else { m_decodeStreamFlag = false; } return false; } bool ReadStream::setUDPStreamDecoder(DecodeStream *decodeStreamer) { if (decodeStreamer) { // QMutexLocker locker(&m_mutex); m_streamDecoder = decodeStreamer; m_queueManager.clearDecodeQueue(); m_decodeStreamFlag = m_streamDecoder->init(&m_queueManager); // codec_ctx = m_streamDecoder->getCodecContext(); // parser = m_streamDecoder->getParserContext(); if (m_decodeStreamFlag) emit decodeStreamer->startDecodeSignal(); return m_decodeStreamFlag; } else { m_decodeStreamFlag = false; } return false; } bool ReadStream::setStreamSaver(SaveStream *streamSaver, bool isUDP) { if (streamSaver) { m_streamSaver = streamSaver; m_queueManager.clearSaveQueue(); if (isUDP) { m_saveStreamFlag = m_streamSaver->initUDP(&m_queueManager); } else { m_saveStreamFlag = m_streamSaver->init( m_formatContext, &m_queueManager, m_videoIndex); } return m_saveStreamFlag; } else { m_saveStreamFlag = false; } return false; } bool ReadStream::setStreamPusher(PushStream *streamPusher) { if (streamPusher) { m_streamPusher = streamPusher; m_queueManager.clearPushQueue(); m_pushStreamFlag = streamPusher->init(m_formatContext, &m_queueManager); if (m_pushStreamFlag) streamPusher->startPushStreamSignal(0); return m_pushStreamFlag; } else { m_pushStreamFlag = false; } return false; } void ReadStream::close() { // QMutexLocker locker(&m_mutex); m_start = false; m_end = true; emit closeUDPConnectionSignal(); // if (m_streamDecoder) { // m_streamDecoder->close(); // } // if (m_streamSaver) { // m_streamSaver->close(); // } // if (m_streamPusher) { // m_streamPusher->close(); // } } void ReadStream::setUDPParms(QString ip, int port) { m_UDPIP = ip; m_UDPPort = port; } bool ReadStream::initSocket(QString ip, int port) { qDebug() << "initSocket:" << QThread::currentThreadId(); if (udpSocket == nullptr) { udpSocket = new QUdpSocket(); } if (ip.isEmpty()) { return udpSocket->bind(QHostAddress::Any, port); } else { if (!isMulticastAddress(ip)) { // 单播 return udpSocket->bind(QHostAddress(ip), port); } else { // 组播 if (udpSocket->bind(QHostAddress::AnyIPv4, port, QUdpSocket::ShareAddress)) { return udpSocket->joinMulticastGroup(QHostAddress(ip)); } } } } bool ReadStream::initSavedRawFile(QString fileDir, QString uavName) { QDir dir; if (!dir.exists(fileDir)) { dir.mkdir(fileDir); } QString strName = QString("%1.dat").arg( QDateTime::currentDateTime().toString("yyyyMMddHHmmss")); QString filename = fileDir + "/" + uavName + "_" + strName; m_saveFile.setFileName(filename); bool bOopen = m_saveFile.open(QIODevice::WriteOnly | QIODevice::Append); return bOopen; } double ReadStream::getVideoDuration() { return m_videoDuration; } void ReadStream::seekToVideo(int64_t targetTimestamp) { qDebug() << "Receive targetTimeStamp!**************"; if (m_formatContext) { // double starttime = m_startTime / static_cast(AV_TIME_BASE); int64_t aaa = targetTimestamp * static_cast(AV_TIME_BASE) + m_startTime; int64_t bbb = av_rescale_q(aaa, AV_TIME_BASE_Q, m_inStreamTimeBase); // double tmp = (targetTimestamp + starttime) / // av_q2d(m_inStreamTimeBase); qDebug() << "tmp:" << bbb; m_queueManager.enqueueSeekTime(bbb); }; } void ReadStream::startPullStream() { // 如果没有打开则返回 if (!m_formatContext) { return; } qDebug() << "readStreamThreadID:" << QThread::currentThreadId(); if (m_streamSaver) { emit m_streamSaver->startSaveStreamSignal(); } m_start = true; int readRet; // 读取数据包 while (m_start) { readRet = av_read_frame(m_formatContext, m_inputPacket); if (readRet == AVERROR(EAGAIN)) { // 暂时没有数据流 qDebug() << "No Stream Data"; av_packet_unref(m_inputPacket); av_usleep(30000); // 等待 30 毫秒 continue; } else if (readRet == AVERROR_EOF) { // 流文件结束 qDebug() << "Stream End"; av_packet_unref(m_inputPacket); close(); break; } else if (readRet < 0) { // 发生错误 qDebug() << "Stream Error"; if (reconnect()) { continue; } else { if (m_streamDecoder) { m_streamDecoder->close(); } if (m_streamSaver) { m_streamSaver->close(); } if (m_streamPusher) { m_streamPusher->close(); } break; } } if (m_inputPacket->stream_index == m_videoIndex) { if (isValidAVPacket(m_inputPacket)) { if (m_decodeStreamFlag) { m_queueManager.enqueueDecodePacket(m_inputPacket); } if (m_saveStreamFlag) { m_queueManager.enqueueSavePacket(m_inputPacket); } if (m_pushStreamFlag) { m_queueManager.enqueuePushPacket(m_inputPacket); } } } av_packet_unref(m_inputPacket); QThread::usleep(2000); // 等待 2 毫秒 } clear(); free(); qDebug() << "read Stream End!"; } void ReadStream::startReadFileStream() { // 如果没有打开则返回 if (!m_formatContext) { return; } qDebug() << "readStreamThreadID:" << QThread::currentThreadId(); m_start = true; int readRet; // 读取数据包 while (m_start) { // 检查跳转请求 if (!m_queueManager.isEmptySeekQueue()) { int64_t targetTimestamp = m_queueManager.dequeueSeekTime(); // 设置跳转状态 m_queueManager.m_isSeeking = true; // 尝试跳转 // ; // av_seek_frame(m_formatContext, -1, targetTimestamp, // AVSEEK_FLAG_ANY); if (avformat_seek_file(m_formatContext, m_videoIndex, INT64_MIN, targetTimestamp, INT64_MAX, AVSEEK_FLAG_ANY) >= 0) { m_queueManager.clearDecodeQueue(); m_queueManager.m_isReadEnd = false; } else { } // 恢复读取 m_queueManager.m_isSeeking = false; } readRet = av_read_frame(m_formatContext, m_inputPacket); if (readRet == AVERROR(EAGAIN)) { // 暂时没有数据流 qDebug() << "No Stream Data"; av_packet_unref(m_inputPacket); av_usleep(30000); // 等待 30 毫秒 continue; } else if (readRet == AVERROR_EOF) { // 流文件结束 av_packet_unref(m_inputPacket); m_queueManager.m_isReadEnd = true; if (m_queueManager.m_isDecodeEnd) { qDebug() << "Stream End"; close(); break; } else { av_usleep(2000); continue; } } else if (readRet < 0) { // 发生错误 qDebug() << "Stream Error" << readRet; if (m_streamDecoder) { m_streamDecoder->close(); } if (m_streamPusher) { m_streamPusher->close(); } m_queueManager.m_isReadEnd = true; break; } if (m_inputPacket->stream_index == m_videoIndex && readRet != AVERROR_EOF) { if (isValidAVPacket(m_inputPacket)) { // if (test) { // qDebug() << m_inputPacket->pts; // int i = 0; // } if (m_decodeStreamFlag) { m_queueManager.enqueueDecodePacket(m_inputPacket); } if (m_pushStreamFlag) { m_queueManager.enqueuePushPacket(m_inputPacket); } } } av_packet_unref(m_inputPacket); // QThread::usleep(2000); // 等待 2 毫秒 av_usleep(1000); } clear(); free(); qDebug() << "read Stream End!"; } void ReadStream::startPullUDPStream() { initObject(); // parser = av_parser_init(AV_CODEC_ID_H264); // codec_ctx = avcodec_alloc_context3(NULL); if (!initSocket(m_UDPIP, m_UDPPort)) { emit sendErrorMessageSignal("UDP绑定失败!", 2); return; } initSavedRawFile(); const AVCodec *codec = avcodec_find_decoder(AV_CODEC_ID_H264); codec_ctx = avcodec_alloc_context3(codec); codec_ctx->pix_fmt = AV_PIX_FMT_YUV420P; codec_ctx->width = 1920; codec_ctx->height = 1080; avcodec_open2(codec_ctx, codec, NULL); parser = av_parser_init(AV_CODEC_ID_H264); connect(udpSocket, &QUdpSocket::readyRead, this, &ReadStream::udpDataReceivedSlot); if (m_streamSaver) { emit m_streamSaver->startSaveStreamSignal(); } } void ReadStream::closeUDPConnectionSLot() { qDebug() << "closeUDPConnectionSLot"; if (udpSocket) { udpSocket->abort(); delete udpSocket; udpSocket = nullptr; } if (m_saveFile.isOpen()) { m_saveFile.close(); } } void ReadStream::initFFmpeg() { avformat_network_init(); // m_formatContext = avformat_alloc_context(); // m_formatContext->flags |= AVFMT_FLAG_NONBLOCK; } bool ReadStream::initObject() { // 分配AVPacket并将其字段设置为默认值。 m_inputPacket = av_packet_alloc(); if (!m_inputPacket) { #if PRINT_LOG qWarning() << "av_packet_alloc() Error!"; #endif free(); return false; } return true; } /** * @brief 清空读取缓冲 */ void ReadStream::clear() { // 因为avformat_flush不会刷新AVIOContext // (s->pb)。如果有必要,在调用此函数之前调用avio_flush(s->pb)。 if (m_formatContext && m_formatContext->pb) { avio_flush(m_formatContext->pb); } if (m_formatContext) { avformat_flush(m_formatContext); // 清理读取缓冲 } } void ReadStream::free() { // 关闭并失败m_formatContext,并将指针置为null if (m_formatContext) { avformat_close_input(&m_formatContext); } if (m_inputPacket) { av_packet_free(&m_inputPacket); } } bool ReadStream::reconnect() { m_end = false; if (m_streamDecoder) { m_streamDecoder->close(); } if (m_streamSaver) { m_streamSaver->close(); } if (m_streamPusher) { m_streamPusher->close(); } free(); for (int i = 0; i < MAXRECONNECT; ++i) { m_start = openFile(m_pullURL); if (m_start) { emit sendErrorMessageSignal("重连成功!", 1); if (m_streamDecoder) { setStreamDecoder(m_streamDecoder); } if (m_streamSaver) { setStreamSaver(m_streamSaver); } if (m_streamPusher) { setStreamPusher(m_streamPusher); } return true; } else { qDebug() << "reconnect failed:" << QString::number(i + 1); QString str = QString("流中断,尝试重新连接第%1次").arg(i + 1); emit sendErrorMessageSignal(str, 3); free(); // close(); } if (m_end) break; } emit sendErrorMessageSignal("重连失败!", 2); return false; } bool ReadStream::isValidAVPacket(AVPacket *pkt) { if (pkt == nullptr) { qDebug() << "Invalid AVPacket 0: packet pointer is null."; return false; } // 检查数据指针和大小 if (pkt->data == nullptr || pkt->size <= 0) { qDebug() << "Invalid AVPacket 0: data is null or size is non-positive.\n"; return false; } // 检查时间戳 if (pkt->pts == AV_NOPTS_VALUE || pkt->dts == AV_NOPTS_VALUE) { qDebug() << "Invalid AVPacket 0: pts or dts is AV_NOPTS_VALUE.\n"; return false; } if (pkt->pts < 0 || pkt->dts < 0) { qDebug() << "Invalid AVPacket 0: pts or dts < 0.\n"; return false; } // 检查流索引(如果是多流) if (pkt->stream_index < 0) { qDebug() << "Invalid AVPacket 0: stream_index is invalid.\n"; return false; } // 检查是否是有效的关键帧(可选) if (pkt->flags & AV_PKT_FLAG_KEY) { // 是关键帧 // qDebug() << "This is a keyframe.\n"; } return true; } bool ReadStream::isMulticastAddress(const QString &ip) { QHostAddress address; if (!address.setAddress(ip)) { return false; // 如果不是有效的IP地址,直接返回false } if (address.protocol() == QAbstractSocket::IPv4Protocol) { quint32 ip4 = address.toIPv4Address(); return ip4 >= QHostAddress("224.0.0.0").toIPv4Address() && ip4 <= QHostAddress("239.255.255.255").toIPv4Address(); } else if (address.protocol() == QAbstractSocket::IPv6Protocol) { QString ipv6 = address.toString(); return ipv6.startsWith("ff", Qt::CaseInsensitive); // IPv6组播地址以"ff"开头 } return false; } void ReadStream::udpDataReceivedSlot() { // qDebug() << "udpreceive:" << QThread::currentThreadId(); while (udpSocket->hasPendingDatagrams()) { QByteArray datagram; datagram.resize(udpSocket->pendingDatagramSize()); udpSocket->readDatagram(datagram.data(), datagram.size()); // 保存原始数据 if (m_saveFile.isOpen()) { m_saveFile.write(datagram.toHex()); } if (static_cast(datagram.at(0)) == 0xEB && static_cast(datagram[1]) == 0x90 && static_cast(datagram[4]) == 0xD5) { // QByteArray tmp = datagram.sliced(2, 4); // uint16_t crc = // getCRC16Sum(reinterpret_cast(tmp.data()), // tmp.size()); // uint16_t rawcrc = (static_cast(datagram.at(7)) << 8) | // static_cast(datagram.at(6)); // if (crc != rawcrc) continue; // int videoDataLen = static_cast(datagram.at(5) & 0xFF); int videoDataLen = 248; QByteArray videoData = datagram.sliced(8, videoDataLen); uint8_t *inBuf = (uint8_t *)(videoData.data()); // int videoDataLen = datagram.size(); // QByteArray videoData = datagram.data(); // uint8_t *inBuf = (uint8_t *)(videoData.data()); // if (videoDataLen > 0) { // AVPacket packet; // av_init_packet(&packet); // packet.data = (uint8_t *)inBuf; // packet.size = videoDataLen; // if (m_decodeStreamFlag) { // m_queueManager.enqueueDecodePacket(&packet); // } // av_packet_unref(&packet); // } if (!m_decodeStreamFlag) continue; int outbuf_size = 0; // 输出缓冲区的大小 uint8_t *outbuf = nullptr; // 输出缓冲区 int len = 0; while (videoDataLen) { len = av_parser_parse2(parser, codec_ctx, &outbuf, &outbuf_size, inBuf, videoDataLen, AV_NOPTS_VALUE, AV_NOPTS_VALUE, 0); // if (ret < 0) continue; inBuf += len; videoDataLen -= len; if (outbuf_size) { AVPacket packet; av_init_packet(&packet); packet.data = (uint8_t *)outbuf; packet.size = outbuf_size; // int ret = avcodec_send_packet(codec_ctx, &packet); // if (ret >= 0) qDebug() << "decode success!"; // qDebug() << "packet!"; if (m_decodeStreamFlag) { m_queueManager.enqueueDecodePacket(&packet); } if (m_saveStreamFlag) { m_queueManager.enqueueSavePacket(&packet); } if (m_pushStreamFlag) { m_queueManager.enqueuePushPacket(&packet); } av_packet_unref(&packet); // av_free(outbuf); } // qDebug() << "one packet end"; } } } // qDebug() << "read UDP Stream End!"; }