diff --git a/src/video/avpacketqueuemanager.cpp b/src/video/avpacketqueuemanager.cpp index b899e1e..e48ebf8 100644 --- a/src/video/avpacketqueuemanager.cpp +++ b/src/video/avpacketqueuemanager.cpp @@ -95,3 +95,63 @@ bool AVPacketQueueManager::isEmptySeekQueue() { QMutexLocker locker(&m_seekMutex); return m_seekQueue.isEmpty(); } + +void AVPacketQueueManager::waitForStreamDecoderClosed() +{ + if(!m_isDecodeEnd){ + m_waitCloseStreamDecoderMutex.lock(); + qDebug()<<"waitForStreamDecoderClosed0"; + m_condition.wait(&m_waitCloseStreamDecoderMutex); + m_waitCloseStreamDecoderMutex.unlock(); + qDebug()<<"waitForStreamDecoderClosed1"; + } +} + +void AVPacketQueueManager::waitForStreamSaverClosed() +{ + if(!m_isStreamSaverEnd){ + m_waitCloseStreamSaverMutex.lock(); + qDebug()<<"waitForStreamSaverClosed0"; + m_condition.wait(&m_waitCloseStreamSaverMutex); + m_waitCloseStreamSaverMutex.unlock(); + qDebug()<<"waitForStreamSaverClosed1"; + } +} + +void AVPacketQueueManager::waitForStreamPusherClosed() +{ + if(!m_isStreamPusherEnd){ + m_waitCloseStreamPusherMutex.lock(); + qDebug()<<"waitForStreamPusherClosed0"; + m_condition.wait(&m_waitCloseStreamPusherMutex); + m_waitCloseStreamPusherMutex.unlock(); + qDebug()<<"waitForStreamPusherClosed1"; + } +} + +void AVPacketQueueManager::wakeStreamDecoder() +{ + m_waitCloseStreamDecoderMutex.lock(); + qDebug()<<"wakeStreamDecoder0"; + m_condition.wakeAll(); + m_waitCloseStreamDecoderMutex.unlock(); + qDebug()<<"wakeStreamDecoder1"; +} + +void AVPacketQueueManager::wakeStreamSaver() +{ + m_waitCloseStreamSaverMutex.lock(); + qDebug()<<"wakeStreamSaver0"; + m_condition.wakeAll(); + m_waitCloseStreamSaverMutex.unlock(); + qDebug()<<"wakeStreamSaver1"; +} + +void AVPacketQueueManager::wakeStreamPusher() +{ + m_waitCloseStreamPusherMutex.lock(); + m_condition.wakeAll(); + qDebug()<<"wakeStreamPusher0"; + m_waitCloseStreamPusherMutex.unlock(); + qDebug()<<"wakeStreamPusher1"; +} diff --git a/src/video/avpacketqueuemanager.h b/src/video/avpacketqueuemanager.h index 240b2c1..98c8b3f 100644 --- a/src/video/avpacketqueuemanager.h +++ b/src/video/avpacketqueuemanager.h @@ -3,6 +3,7 @@ #include #include +#include #include "ffmpeginclude.h" @@ -29,11 +30,22 @@ public: bool isEmptySaveQueue(); bool isEmptySeekQueue(); + void waitForStreamDecoderClosed(); + void waitForStreamSaverClosed(); + void waitForStreamPusherClosed(); + + void wakeStreamDecoder(); + void wakeStreamSaver(); + void wakeStreamPusher(); + public: std::atomic m_isSeeking{false}; // 是否正在跳转 std::atomic m_isReadEnd{false}; std::atomic m_isDecodeEnd{false}; - + std::atomic m_isStreamSaverEnd{false}; + std::atomic m_isStreamPusherEnd{false}; + std::atomic m_isPullReconnect{false}; + std::atomic m_isPushReconnect{false}; private: int QUEUECAPACITY = 100; QQueue m_decodeQueue; @@ -44,6 +56,11 @@ private: QMutex m_saveMutex; // 共享的互斥锁 QMutex m_pushMutex; // 共享的互斥锁 QMutex m_seekMutex; // 共享的互斥锁 +private: + QWaitCondition m_condition; + QMutex m_waitCloseStreamDecoderMutex; + QMutex m_waitCloseStreamSaverMutex; + QMutex m_waitCloseStreamPusherMutex; }; #endif // AVPACKETQUEUEMANAGER_H diff --git a/src/video/decodestream.cpp b/src/video/decodestream.cpp index 1d1caff..5853f63 100644 --- a/src/video/decodestream.cpp +++ b/src/video/decodestream.cpp @@ -1,6 +1,9 @@ #include "decodestream.h" -DecodeStream::DecodeStream(QObject *parent) : QObject{parent}, m_playSpeed{1} {} +DecodeStream::DecodeStream(QObject *parent) : QObject{parent}, m_playSpeed{1} { + connect(this,&DecodeStream::initStreamDecoderSignal,this,&DecodeStream::init,Qt::BlockingQueuedConnection); + connect(this,&DecodeStream::initUDPStreamDecoderSignal,this,&DecodeStream::initUDPDecodeStream,Qt::BlockingQueuedConnection); +} bool DecodeStream::init(AVPacketQueueManager *queueManager, AVFormatContext *formatContext, int videoIndex) { @@ -22,13 +25,14 @@ bool DecodeStream::init(AVPacketQueueManager *queueManager, if (m_fps <= 0) { m_fps = 30.0; // 默认帧率 } - - return initDecoder(formatContext, videoIndex); + initStatus = initDecoder(formatContext, videoIndex); + return initStatus; } -bool DecodeStream::init(AVPacketQueueManager *queueManager) { +bool DecodeStream::initUDPDecodeStream(AVPacketQueueManager *queueManager) { m_queueManager = queueManager; - return initUDPDecoder(); + initStatus = initUDPDecoder(); + return initStatus; } // 视频解码线程任务 @@ -142,9 +146,12 @@ void DecodeStream::startDecode() { } free(); + m_queueManager->m_isDecodeEnd = true; + m_queueManager->wakeStreamDecoder(); qDebug() << "Decoding Thread End!"; emit decodeEndSignal(); emit sendErrorMessageSignal("视频解码结束!", 1); + } void DecodeStream::changePlaySpeedSlot(double speed) { diff --git a/src/video/decodestream.h b/src/video/decodestream.h index c57b0d1..cb76068 100644 --- a/src/video/decodestream.h +++ b/src/video/decodestream.h @@ -18,7 +18,7 @@ public: DecodeStream(QObject *parent = nullptr); bool init(AVPacketQueueManager *queueManager, AVFormatContext *formatContext, int videoIndex); - bool init(AVPacketQueueManager *queueManager); + bool initUDPDecodeStream(AVPacketQueueManager *queueManager); void close(); AVCodecContext *getCodecContext(); AVCodecParserContext *getParserContext(); @@ -31,7 +31,11 @@ signals: void sendErrorMessageSignal(QString message, int type); void updateVideoCurrentTime(int currentTime, int duration); void decodeEndSignal(); - + void initUDPStreamDecoderSignal(AVPacketQueueManager *queueManager); + void initStreamDecoderSignal(AVPacketQueueManager *queueManager, + AVFormatContext *formatContext, int videoIndex); +public: + bool initStatus = false; private: bool initObject(); // 初始化对象 bool initDecoder(AVFormatContext *inputFormatContext, @@ -44,7 +48,7 @@ private: bool isValidAVPacket(AVPacket *pkt); private: - bool m_start = true; + std::atomicm_start = true; int m_videoIndex = 0; AVFormatContext *m_formatContext = nullptr; AVCodecContext *m_codecContext = nullptr; // 解码器上下文 diff --git a/src/video/ffmpeginclude.h b/src/video/ffmpeginclude.h index e186b7a..171d2b6 100644 --- a/src/video/ffmpeginclude.h +++ b/src/video/ffmpeginclude.h @@ -22,7 +22,6 @@ extern "C" { #include #include -// #include "global.h" #define PRINT_LOG 1 #define ERROR_LEN 1024 // 异常信息数组长度 diff --git a/src/video/pushstream.cpp b/src/video/pushstream.cpp index 68137f3..96677b9 100644 --- a/src/video/pushstream.cpp +++ b/src/video/pushstream.cpp @@ -1,6 +1,8 @@ #include "pushstream.h" -PushStream::PushStream(QObject *parent) : QObject{parent} {} +PushStream::PushStream(QObject *parent) : QObject{parent} { + connect(this,&PushStream::initStreamPusherSignal,this,&PushStream::init,Qt::BlockingQueuedConnection); +} void PushStream::setRemoteIP(QString url) { m_pushStreamIP = url; @@ -8,10 +10,12 @@ void PushStream::setRemoteIP(QString url) { bool PushStream::init(AVFormatContext *inputFormatCtx, AVPacketQueueManager *queueManager) { + qDebug() << "PushStream::init ThreadID:" << QThread::currentThreadId(); // m_pusherQueue = queue; m_queueManager = queueManager; m_inputFormatCtx = inputFormatCtx; m_start = openNetworkStream(inputFormatCtx); + initStatus = m_start; return m_start; } @@ -71,7 +75,7 @@ bool PushStream::openNetworkStream(AVFormatContext *inputFormatCtx) { m_inputTimeBase = m_istream->time_base; m_inputFrameRate = m_istream->r_frame_rate; - m_outputTimeBase = m_istream->time_base; + // 打开输出文件 if (!(m_outputFormatCtx->flags & AVFMT_NOFILE)) { @@ -97,16 +101,24 @@ bool PushStream::openNetworkStream(AVFormatContext *inputFormatCtx) { // m_InitStatus = true; // startTime = av_gettime_relative(); + m_outputTimeBase = m_ostream->time_base; + m_bwriteHeader = true; m_firstPts = AV_NOPTS_VALUE; return true; } int PushStream::reconnect(int ret) { + if(!m_queueManager->m_isPullReconnect) return 0; + if (ret == -10053 || ret == -10054) { + m_queueManager->m_isPushReconnect = true; m_end = false; // qDebug() << "m_end:" << m_end; for (int nRetryCount = 0; nRetryCount < MAXCONNECT; ++nRetryCount) { + if(m_queueManager->m_isPullReconnect) { + break; + } // 关闭输出 if (m_outputFormatCtx && !(m_outputFormatCtx->flags & AVFMT_NOFILE)) { @@ -147,6 +159,8 @@ int PushStream::reconnect(int ret) { emit sendErrorMessageSignal("重连成功!", 1); m_queueManager->clearPushQueue(); qDebug() << "重连成功!"; + + m_queueManager->m_isPushReconnect = false; return ret; } @@ -155,6 +169,7 @@ int PushStream::reconnect(int ret) { m_start = false; m_bwriteHeader = false; emit sendErrorMessageSignal("重连失败,推流停止!", 2); + m_queueManager->m_isPushReconnect = false; return -1; } return 0; @@ -163,6 +178,7 @@ int PushStream::reconnect(int ret) { void PushStream::pushStream(int64_t startTime) { qDebug() << "PushStreamThreadID:" << QThread::currentThreadId(); // m_startTime = startTime; + m_queueManager->m_isStreamPusherEnd = false; while (m_start) { AVPacket *inputPacket = m_queueManager->dequeuePushPacket(); if (inputPacket) { @@ -213,7 +229,7 @@ void PushStream::pushStream(int64_t startTime) { // sleepMsec(40); } else { if (delay < -100000) { - qDebug() << "delay:" << delay; + qDebug() << "push delay:" << delay; // 滞后100ms以上,丢弃非重要帧 if (!(inputPacket->flags & AV_PKT_FLAG_KEY)) { av_packet_unref(inputPacket); @@ -262,8 +278,11 @@ void PushStream::pushStream(int64_t startTime) { if (m_bwriteHeader) av_write_trailer(m_outputFormatCtx); free(); + m_queueManager->m_isStreamPusherEnd = true; + m_queueManager->wakeStreamPusher(); qDebug() << "Push Stream End!"; emit sendErrorMessageSignal("推流结束!", 1); + } void PushStream::free() { diff --git a/src/video/pushstream.h b/src/video/pushstream.h index dc72117..b076f59 100644 --- a/src/video/pushstream.h +++ b/src/video/pushstream.h @@ -18,20 +18,24 @@ public: * @param url 远端推流地址 */ void setRemoteIP(QString url); - bool init(AVFormatContext *inputFormatCtx, - AVPacketQueueManager *queueManager); + void close(); public slots: void pushStream(int64_t startTime); + bool init(AVFormatContext *inputFormatCtx, + AVPacketQueueManager *queueManager); signals: void startPushStreamSignal(int64_t startTime); void sendErrorMessageSignal(QString message, int type); - + void initStreamPusherSignal(AVFormatContext *inputFormatCtx,AVPacketQueueManager *queueManager); private: bool openNetworkStream(AVFormatContext *inputFormatCtx); int reconnect(int ret); void free(); +public: + bool initStatus = false; + private: // QQueue *m_pusherQueue = nullptr; AVFormatContext *m_inputFormatCtx = nullptr; // diff --git a/src/video/readstream.cpp b/src/video/readstream.cpp index 9a3162f..f941e64 100644 --- a/src/video/readstream.cpp +++ b/src/video/readstream.cpp @@ -105,8 +105,10 @@ bool ReadStream::setStreamDecoder(DecodeStream *decodeStreamer) { // QMutexLocker locker(&m_mutex); m_streamDecoder = decodeStreamer; m_queueManager.clearDecodeQueue(); - m_decodeStreamFlag = m_streamDecoder->init( - &m_queueManager, m_formatContext, m_videoIndex); + // m_decodeStreamFlag = m_streamDecoder->init( + // &m_queueManager, m_formatContext, m_videoIndex); + m_streamDecoder->initStreamDecoderSignal(&m_queueManager, m_formatContext, m_videoIndex); + m_decodeStreamFlag = m_streamDecoder->initStatus; if (m_decodeStreamFlag) emit decodeStreamer->startDecodeSignal(); return m_decodeStreamFlag; } else { @@ -120,9 +122,11 @@ bool ReadStream::setUDPStreamDecoder(DecodeStream *decodeStreamer) { // QMutexLocker locker(&m_mutex); m_streamDecoder = decodeStreamer; m_queueManager.clearDecodeQueue(); - m_decodeStreamFlag = m_streamDecoder->init(&m_queueManager); + // m_decodeStreamFlag = m_streamDecoder->init(&m_queueManager); // codec_ctx = m_streamDecoder->getCodecContext(); // parser = m_streamDecoder->getParserContext(); + m_streamDecoder->initUDPStreamDecoderSignal(&m_queueManager); + m_decodeStreamFlag = m_streamDecoder->initStatus; if (m_decodeStreamFlag) emit decodeStreamer->startDecodeSignal(); return m_decodeStreamFlag; } else { @@ -138,8 +142,10 @@ bool ReadStream::setStreamSaver(SaveStream *streamSaver, bool isUDP) { if (isUDP) { m_saveStreamFlag = m_streamSaver->initUDP(&m_queueManager); } else { - m_saveStreamFlag = m_streamSaver->init( - m_formatContext, &m_queueManager, m_videoIndex); + // m_saveStreamFlag = m_streamSaver->init( + // m_formatContext, &m_queueManager, m_videoIndex); + emit m_streamSaver->initStreamSaverSignal(m_formatContext, &m_queueManager, m_videoIndex); + m_saveStreamFlag = m_streamSaver->initStatus; } return m_saveStreamFlag; } else { @@ -149,10 +155,13 @@ bool ReadStream::setStreamSaver(SaveStream *streamSaver, bool isUDP) { } bool ReadStream::setStreamPusher(PushStream *streamPusher) { + qDebug() << "setStreamPusher ThreadID:" << QThread::currentThreadId(); if (streamPusher) { m_streamPusher = streamPusher; m_queueManager.clearPushQueue(); - m_pushStreamFlag = streamPusher->init(m_formatContext, &m_queueManager); + emit streamPusher->initStreamPusherSignal(m_formatContext, &m_queueManager); + // m_pushStreamFlag = streamPusher->init(m_formatContext, &m_queueManager); + m_pushStreamFlag = m_streamPusher->initStatus; if (m_pushStreamFlag) streamPusher->startPushStreamSignal(0); return m_pushStreamFlag; } else { @@ -471,21 +480,25 @@ void ReadStream::free() { } bool ReadStream::reconnect() { + m_queueManager.m_isPullReconnect = true; m_end = false; if (m_streamDecoder) { m_streamDecoder->close(); + m_queueManager.waitForStreamDecoderClosed(); } if (m_streamSaver) { m_streamSaver->close(); + m_queueManager.waitForStreamSaverClosed(); } if (m_streamPusher) { m_streamPusher->close(); + m_queueManager.waitForStreamPusherClosed(); } + qDebug()<<"all closed!"; free(); for (int i = 0; i < MAXRECONNECT; ++i) { m_start = openFile(m_pullURL); if (m_start) { - emit sendErrorMessageSignal("重连成功!", 1); if (m_streamDecoder) { setStreamDecoder(m_streamDecoder); } @@ -495,6 +508,8 @@ bool ReadStream::reconnect() { if (m_streamPusher) { setStreamPusher(m_streamPusher); } + m_queueManager.m_isPullReconnect = false; + emit sendErrorMessageSignal("重连成功!", 1); return true; } else { qDebug() << "reconnect failed:" << QString::number(i + 1); @@ -506,6 +521,7 @@ bool ReadStream::reconnect() { if (m_end) break; } emit sendErrorMessageSignal("重连失败!", 2); + m_queueManager.m_isPullReconnect = false; return false; } diff --git a/src/video/savestream.cpp b/src/video/savestream.cpp index 78b1fb7..c9414f7 100644 --- a/src/video/savestream.cpp +++ b/src/video/savestream.cpp @@ -1,6 +1,8 @@ #include "savestream.h" -SaveStream::SaveStream(QObject *parent) : QObject{parent} {} +SaveStream::SaveStream(QObject *parent) : QObject{parent} { + connect(this,&SaveStream::initStreamSaverSignal,this,&SaveStream::init,Qt::BlockingQueuedConnection); +} bool SaveStream::init(AVFormatContext *formatContext, AVPacketQueueManager *queueManager, int videoIndex) { @@ -13,6 +15,7 @@ bool SaveStream::init(AVFormatContext *formatContext, if (!m_start) { free(); } + initStatus = m_start; return m_start; } @@ -37,8 +40,10 @@ void SaveStream::close() { void SaveStream::startSaveStream() { qDebug() << "SaveStreamThreadID:" << QThread::currentThreadId(); if (!m_start) { + m_queueManager->m_isStreamSaverEnd = true; return; } + m_queueManager->m_isStreamSaverEnd = false; int frameIndex = 0; int64_t dts = 0; int64_t dts_last = 0; @@ -101,16 +106,6 @@ void SaveStream::startSaveStream() { } } - // AVStream *out_stream = m_formatContextSave->streams[0]; - // out_stream->start_time = 0; - // m_formatContextSave->start_time = 0; - // int64_t duration_pts = last_pts - first_pts; - // // duration_pts = av_rescale(duration_pts, 1, AV_TIME_BASE); - // m_formatContextSave->duration = duration_pts; - - // AVStream *stream = m_formatContextSave->streams[0]; - // stream->duration = duration_pts; - // 写入文件尾 if (m_formatContextSave && m_writeHeader) { av_write_trailer(m_formatContextSave); @@ -123,6 +118,8 @@ void SaveStream::startSaveStream() { } free(); + m_queueManager->m_isStreamSaverEnd = true; + m_queueManager->wakeStreamSaver(); qDebug() << "Save Video End!"; emit sendErrorMessageSignal("视频保存结束!", 1); } @@ -130,8 +127,11 @@ void SaveStream::startSaveStream() { void SaveStream::startSaveUDPStream() { qDebug() << "SaveStreamThreadID:" << QThread::currentThreadId(); if (!m_start) { + m_queueManager->m_isStreamSaverEnd = true; + qDebug()<<"StreamSaver direct return "; return; } + m_queueManager->m_isStreamSaverEnd = false; int frameIndex = 0; int64_t pts = 0; int64_t dts = 0; @@ -184,8 +184,11 @@ void SaveStream::startSaveUDPStream() { } free(); + m_queueManager->m_isStreamSaverEnd = true; + m_queueManager->wakeStreamSaver(); qDebug() << "视频保存结束!"; emit sendErrorMessageSignal("视频保存结束!", 1); + } bool SaveStream::openFile(bool isUDP) { diff --git a/src/video/savestream.h b/src/video/savestream.h index c6bc20f..73c5698 100644 --- a/src/video/savestream.h +++ b/src/video/savestream.h @@ -30,12 +30,16 @@ public slots: signals: void startSaveStreamSignal(); void sendErrorMessageSignal(QString message, int type); - + void initStreamSaverSignal(AVFormatContext *formatContext, + AVPacketQueueManager *queueManager, int videoIndex); private: bool openFile(bool isUDP = false); void free(); bool isValidAVPacket(AVPacket *pkt); +public: + bool initStatus = false; + private: AVFormatContext *m_formatContextSave = nullptr; // 封装上下文 AVFormatContext *m_inputFormatContext = nullptr;