feat: 新增两路推流

master
cbwu 2 months ago
parent 9cd84530eb
commit 9b50f5868f

@ -468,7 +468,10 @@ void MainWindow::pushStreamSlot(bool bPush) {
} else { } else {
pushStreamIP = g_pushStreamInfoStruct.pushStreamIP; pushStreamIP = g_pushStreamInfoStruct.pushStreamIP;
} }
ui->videoWidget->pushStream(pushStreamIP); QStringList pushStreamIPs;
pushStreamIPs.append(pushStreamIP);
// pushStreamIPs.append("rtmp://182.92.130.23/videoclient/fp98C_1_gcs");
ui->videoWidget->pushStream(pushStreamIPs);
} else { } else {
ui->videoWidget->stopPushStream(); ui->videoWidget->stopPushStream();
// ui->videoWidget1->stopPushStream(); // ui->videoWidget1->stopPushStream();

@ -4,8 +4,8 @@ PushStream::PushStream(QObject *parent) : QObject{parent} {
connect(this,&PushStream::initStreamPusherSignal,this,&PushStream::init,Qt::BlockingQueuedConnection); connect(this,&PushStream::initStreamPusherSignal,this,&PushStream::init,Qt::BlockingQueuedConnection);
} }
void PushStream::setRemoteIP(QString url) { void PushStream::setRemoteIP(QStringList& urls) {
m_pushStreamIP = url; m_pushStreamIPs = urls;
} }
bool PushStream::init(AVFormatContext *inputFormatCtx, bool PushStream::init(AVFormatContext *inputFormatCtx,
@ -14,7 +14,18 @@ bool PushStream::init(AVFormatContext *inputFormatCtx,
// m_pusherQueue = queue; // m_pusherQueue = queue;
m_queueManager = queueManager; m_queueManager = queueManager;
m_inputFormatCtx = inputFormatCtx; m_inputFormatCtx = inputFormatCtx;
m_start = openNetworkStream(inputFormatCtx); // m_start = openNetworkStream(inputFormatCtx);
bool ss = false;
for (int i = 0; i < m_pushStreamIPs.size(); ++i) {
AVFormatContext* outputformatCtx = nullptr;
ss = openNetworkStream(inputFormatCtx,outputformatCtx,m_pushStreamIPs.at(i));
m_outputFormatCtxArray.append(outputformatCtx);
m_initStatus.append(ss);
if(ss){
m_start = ss;
}
}
initStatus = m_start; initStatus = m_start;
return m_start; return m_start;
} }
@ -26,36 +37,37 @@ void PushStream::close() {
// qDebug() << "*******m_end0:" << m_end; // qDebug() << "*******m_end0:" << m_end;
} }
bool PushStream::openNetworkStream(AVFormatContext *inputFormatCtx) { bool PushStream::openNetworkStream(AVFormatContext *&inputFormatCtx,AVFormatContext *&outputFormatCtx,QString pushStreamIP) {
if (m_pushStreamIP.isEmpty()) return false; if (pushStreamIP.isEmpty()) return false;
if (!inputFormatCtx) return false; if (!inputFormatCtx) return false;
// 初始化网络输出流 // 初始化网络输出流
// QString m_pushStreamIP = "rtsp://182.92.130.23/app/stream999"; // QString m_pushStreamIP = "rtsp://182.92.130.23/app/stream999";
QString format_name = "flv"; QString format_name = "flv";
if (m_pushStreamIP.left(3) == "udp" || m_pushStreamIP.left(3) == "UDP") if (pushStreamIP.left(3) == "udp" || pushStreamIP.left(3) == "UDP")
format_name = "mpegts"; format_name = "mpegts";
int ret = avformat_alloc_output_context2( int ret = avformat_alloc_output_context2(
&m_outputFormatCtx, NULL, format_name.toStdString().data(), &outputFormatCtx, NULL, format_name.toStdString().data(),
m_pushStreamIP.toUtf8().constData()); pushStreamIP.toUtf8().constData());
if (ret < 0) { if (ret < 0) {
showError(ret); showError(ret);
free(); free(outputFormatCtx);
qDebug() << "Could not create output context."; qDebug() << "Could not create output context.";
return false; return false;
} }
AVStream *m_ostream = nullptr;
// 复制流信息 // 复制流信息
for (unsigned int i = 0; i < inputFormatCtx->nb_streams; ++i) { for (unsigned int i = 0; i < inputFormatCtx->nb_streams; ++i) {
// AVStream *stream = inputFormatCtx->streams[i]; // AVStream *stream = inputFormatCtx->streams[i];
if (inputFormatCtx->streams[i]->codecpar->codec_type == if (inputFormatCtx->streams[i]->codecpar->codec_type ==
AVMEDIA_TYPE_VIDEO) { AVMEDIA_TYPE_VIDEO) {
m_istream = inputFormatCtx->streams[i]; m_istream = inputFormatCtx->streams[i];
m_ostream = avformat_new_stream(m_outputFormatCtx, nullptr); m_ostream = avformat_new_stream(outputFormatCtx, nullptr);
if (!m_ostream) { if (!m_ostream) {
qDebug() << "Failed allocating output stream.\n"; qDebug() << "Failed allocating output stream.\n";
free(); free(outputFormatCtx);
return false; return false;
} }
// 复制编解码器参数 // 复制编解码器参数
@ -63,7 +75,7 @@ bool PushStream::openNetworkStream(AVFormatContext *inputFormatCtx) {
m_istream->codecpar); m_istream->codecpar);
if (ret < 0) { if (ret < 0) {
showError(ret); showError(ret);
free(); free(outputFormatCtx);
qWarning() << "avcodec_parameters_from_context Failed"; qWarning() << "avcodec_parameters_from_context Failed";
return false; return false;
} }
@ -78,22 +90,22 @@ bool PushStream::openNetworkStream(AVFormatContext *inputFormatCtx) {
// 打开输出文件 // 打开输出文件
if (!(m_outputFormatCtx->flags & AVFMT_NOFILE)) { if (!(outputFormatCtx->flags & AVFMT_NOFILE)) {
if (ret = avio_open(&m_outputFormatCtx->pb, if (ret = avio_open(&outputFormatCtx->pb,
m_pushStreamIP.toUtf8().constData(), pushStreamIP.toUtf8().constData(),
AVIO_FLAG_WRITE) < 0) { AVIO_FLAG_WRITE) < 0) {
showError(ret); showError(ret);
free(); free(outputFormatCtx);
qDebug() << "Could not open output file.\n"; qDebug() << "Could not open output file.\n";
return false; return false;
} }
} }
// 写入头文件 // 写入头文件
ret = avformat_write_header(m_outputFormatCtx, NULL); ret = avformat_write_header(outputFormatCtx, NULL);
if (ret < 0) { if (ret < 0) {
showError(ret); showError(ret);
free(); free(outputFormatCtx);
qDebug() << "Error occurred when write_header into output file.\n"; qDebug() << "Error occurred when write_header into output file.\n";
return false; return false;
} }
@ -108,57 +120,63 @@ bool PushStream::openNetworkStream(AVFormatContext *inputFormatCtx) {
return true; return true;
} }
int PushStream::reconnect(int ret) { int PushStream::reconnect(int ret,int id) {
if(!m_queueManager->m_isPullReconnect) return 0; if(!m_queueManager->m_isPullReconnect) return 0;
if (ret == -10053 || ret == -10054) { if (ret == -10053 || ret == -10054) {
m_queueManager->m_isPushReconnect = true; m_queueManager->m_isPushReconnect = true;
m_end = false; m_end = false;
// qDebug() << "m_end:" << m_end; // qDebug() << "m_end:" << m_end;
AVFormatContext* outputFormatCtx = m_outputFormatCtxArray.at(id);
QString message;
for (int nRetryCount = 0; nRetryCount < MAXCONNECT; ++nRetryCount) { for (int nRetryCount = 0; nRetryCount < MAXCONNECT; ++nRetryCount) {
if(m_queueManager->m_isPullReconnect) { if(m_queueManager->m_isPullReconnect) {
break; break;
} }
// 关闭输出 // 关闭输出
if (m_outputFormatCtx && if (outputFormatCtx &&
!(m_outputFormatCtx->flags & AVFMT_NOFILE)) { !(outputFormatCtx->flags & AVFMT_NOFILE)) {
avio_close(m_outputFormatCtx->pb); avio_close(outputFormatCtx->pb);
} }
ret = ret =
avio_open(&m_outputFormatCtx->pb, avio_open(&outputFormatCtx->pb,
m_pushStreamIP.toUtf8().constData(), AVIO_FLAG_WRITE); m_pushStreamIPs[id].toUtf8().constData(), AVIO_FLAG_WRITE);
if (ret < 0) { if (ret < 0) {
showError(ret); showError(ret);
qDebug() << "Failed to reconnect" qDebug() << "Failed to reconnect"
<< QString::number(nRetryCount + 1); << QString::number(nRetryCount + 1);
QString str = message =
QString("网络中断,尝试重连第%1次!").arg(nRetryCount + 1); QString("推流地址%1网络中断尝试重连第%2次!").arg(id+1,nRetryCount + 1);
emit sendErrorMessageSignal(str, 3); emit sendErrorMessageSignal(message, 3);
if (m_end) break; if (m_end) break;
// av_usleep(5 * 1000000); // av_usleep(5 * 1000000);
continue; continue;
} }
// Try to reconnect // Try to reconnect
ret = avformat_write_header(m_outputFormatCtx, nullptr); ret = avformat_write_header(outputFormatCtx, nullptr);
if (ret < 0) { if (ret < 0) {
m_initStatus[id] = false;
showError(ret); showError(ret);
// free(); // free();
qDebug() << "Failed to reconnect" qDebug() << "Failed to reconnect"
<< QString::number(nRetryCount + 1); << QString::number(nRetryCount + 1);
QString str = message =
QString("网络中断,尝试重连第%1次!").arg(nRetryCount + 1); QString("推流地址%1网络中断尝试重连第%2次!").arg(id+1,nRetryCount + 1);
emit sendErrorMessageSignal(str, 3); emit sendErrorMessageSignal(message, 3);
if (m_end) break; if (m_end) break;
// nRetryCount++; // nRetryCount++;
// av_usleep(5 * 1000000); // av_usleep(5 * 1000000);
} else { } else {
m_initStatus[id] = true;
m_start = true; m_start = true;
m_firstPts = AV_NOPTS_VALUE; m_firstPts = AV_NOPTS_VALUE;
m_frm_cnt = 0; m_frm_cnt = 0;
m_bwriteHeader = true; m_bwriteHeader = true;
emit sendErrorMessageSignal("重连成功!", 1); message =
QString("推流地址%1重连成功!").arg(id+1);
emit sendErrorMessageSignal(message, 1);
m_queueManager->clearPushQueue(); m_queueManager->clearPushQueue();
qDebug() << "重连成功!"; qDebug() << message;
m_queueManager->m_isPushReconnect = false; m_queueManager->m_isPushReconnect = false;
return ret; return ret;
@ -167,8 +185,16 @@ int PushStream::reconnect(int ret) {
if (m_end) break; if (m_end) break;
} }
m_start = false; m_start = false;
for (int i = 0; i < m_initStatus.size(); ++i) {
if(m_initStatus.at(i)){
m_start = true;
break;
}
}
m_bwriteHeader = false; m_bwriteHeader = false;
emit sendErrorMessageSignal("重连失败,推流停止!", 2); message =
QString("推流地址%1重连失败推流停止!").arg(id+1);
emit sendErrorMessageSignal(message, 2);
m_queueManager->m_isPushReconnect = false; m_queueManager->m_isPushReconnect = false;
return -1; return -1;
} }
@ -254,29 +280,49 @@ void PushStream::pushStream(int64_t startTime) {
// 向推流服务器推送流数据 // 向推流服务器推送流数据
m_frm_cnt++; m_frm_cnt++;
int ret = int ret;
av_interleaved_write_frame(m_outputFormatCtx, inputPacket);
for (int i = 0; i < m_outputFormatCtxArray.size(); ++i) {
if(!m_initStatus.at(i)) continue;
AVFormatContext* outputFormatCtx = m_outputFormatCtxArray.at(i);
if(outputFormatCtx!=nullptr){
AVPacket* clonePacket = nullptr;
if(i!= m_outputFormatCtxArray.size()-1){
clonePacket = av_packet_clone(inputPacket);
}else{
clonePacket = inputPacket;
}
ret =
av_interleaved_write_frame(outputFormatCtx, clonePacket);
if (ret < 0) { if (ret < 0) {
av_packet_unref(inputPacket); av_packet_unref(clonePacket);
showError(ret); showError(ret);
// if (ret == -10053) { // if (ret == -10053) {
// qDebug() << "网络不稳定"; // qDebug() << "网络不稳定";
// } // }
if (reconnect(ret) < 0) { if (reconnect(ret,i) < 0) {
break; // break;
}; };
continue; continue;
} }
// 数据包写入成功现在可以释放pkt // 数据包写入成功现在可以释放pkt
av_packet_unref(inputPacket); av_packet_unref(clonePacket);
av_packet_free(&inputPacket); av_packet_free(&clonePacket);
}
}
} else { } else {
// QThread::usleep(1000); // QThread::usleep(1000);
av_usleep(1000); av_usleep(1000);
} }
} }
if (m_bwriteHeader) av_write_trailer(m_outputFormatCtx); if (m_bwriteHeader) {
for (auto& outputFormatCtx : m_outputFormatCtxArray) {
if(outputFormatCtx!=nullptr){
av_write_trailer(outputFormatCtx);
}
}
}
free(); free();
m_queueManager->m_isStreamPusherEnd = true; m_queueManager->m_isStreamPusherEnd = true;
m_queueManager->wakeStreamPusher(); m_queueManager->wakeStreamPusher();
@ -286,13 +332,27 @@ void PushStream::pushStream(int64_t startTime) {
} }
void PushStream::free() { void PushStream::free() {
m_start = false; for (auto &outputFormatCtx : m_outputFormatCtxArray) {
// 关闭输出
if (outputFormatCtx && !(outputFormatCtx->flags & AVFMT_NOFILE)) {
avio_close(outputFormatCtx->pb);
}
if (outputFormatCtx) {
avformat_free_context(outputFormatCtx);
outputFormatCtx = nullptr;
}
}
m_outputFormatCtxArray.clear();
}
void PushStream::free(AVFormatContext *&outputFormatCtx)
{
// 关闭输出 // 关闭输出
if (m_outputFormatCtx && !(m_outputFormatCtx->flags & AVFMT_NOFILE)) { if (outputFormatCtx && !(outputFormatCtx->flags & AVFMT_NOFILE)) {
avio_close(m_outputFormatCtx->pb); avio_close(outputFormatCtx->pb);
} }
if (m_outputFormatCtx) { if (outputFormatCtx) {
avformat_free_context(m_outputFormatCtx); avformat_free_context(outputFormatCtx);
m_outputFormatCtx = nullptr; outputFormatCtx = nullptr;
} }
} }

@ -17,7 +17,7 @@ public:
* @brief * @brief
* @param url * @param url
*/ */
void setRemoteIP(QString url); void setRemoteIP(QStringList& urls);
void close(); void close();
public slots: public slots:
@ -29,22 +29,24 @@ signals:
void sendErrorMessageSignal(QString message, int type); void sendErrorMessageSignal(QString message, int type);
void initStreamPusherSignal(AVFormatContext *inputFormatCtx,AVPacketQueueManager *queueManager); void initStreamPusherSignal(AVFormatContext *inputFormatCtx,AVPacketQueueManager *queueManager);
private: private:
bool openNetworkStream(AVFormatContext *inputFormatCtx); bool openNetworkStream(AVFormatContext *&inputFormatCtx,AVFormatContext *&outputFormatCtx,QString pushStreamIP);
int reconnect(int ret); int reconnect(int ret,int id=0);
void free(); void free();
void free(AVFormatContext *&outputFormatCtx);
public: public:
bool initStatus = false; bool initStatus = false;
private: private:
// QQueue<AVPacket *> *m_pusherQueue = nullptr; // QQueue<AVPacket *> *m_pusherQueue = nullptr;
AVFormatContext *m_inputFormatCtx = nullptr; // AVFormatContext *m_inputFormatCtx = nullptr; //
AVFormatContext *m_outputFormatCtx = NULL; // AVFormatContext *m_outputFormatCtx = nullptr; //
AVFormatContext *m_outputFormatCtx2 = nullptr; //推流2
QVector<AVFormatContext *>m_outputFormatCtxArray;
AVStream *m_istream = nullptr; AVStream *m_istream = nullptr;
AVStream *m_ostream = nullptr; // AVStream *m_ostream = nullptr;
bool m_bwriteHeader = false; bool m_bwriteHeader = false;
int m_videoIndex = -1; int m_videoIndex = -1;
QString m_pushStreamIP; // 推流地址 QStringList m_pushStreamIPs; // 推流地址
std::atomic<bool> m_start = false; std::atomic<bool> m_start = false;
std::atomic<bool> m_end = false; std::atomic<bool> m_end = false;
int64_t m_startTime; int64_t m_startTime;
@ -58,6 +60,8 @@ private:
AVRational m_inputTimeBase; AVRational m_inputTimeBase;
AVRational m_inputFrameRate; AVRational m_inputFrameRate;
AVRational m_outputTimeBase; AVRational m_outputTimeBase;
QVector<bool> m_initStatus;
}; };
#endif // PUSHSTREAM_H #endif // PUSHSTREAM_H

@ -171,8 +171,8 @@ void VideoWidget::stopPlay() {
} }
// 推流 // 推流
bool VideoWidget::pushStream(const QString &url) { bool VideoWidget::pushStream(QStringList &urls) {
if (url.isEmpty()) { if (urls.isEmpty()) {
return false; return false;
} else { } else {
// 先拉流 // 先拉流
@ -192,7 +192,7 @@ bool VideoWidget::pushStream(const QString &url) {
connect(streamPusher, &PushStream::sendErrorMessageSignal, this, connect(streamPusher, &PushStream::sendErrorMessageSignal, this,
&VideoWidget::receiveErrorMessage, Qt::UniqueConnection); &VideoWidget::receiveErrorMessage, Qt::UniqueConnection);
} }
streamPusher->setRemoteIP(url); streamPusher->setRemoteIP(urls);
streamPusher->moveToThread(&pushStreamThread); streamPusher->moveToThread(&pushStreamThread);
pushStreamThread.start(); pushStreamThread.start();
@ -203,7 +203,7 @@ bool VideoWidget::pushStream(const QString &url) {
return false; return false;
} }
m_pushURL = url; m_pushURLs = urls;
m_pushFlag = true; m_pushFlag = true;
return true; return true;
} }
@ -230,8 +230,8 @@ void VideoWidget::setPullURL(const QString &url) {
m_pullURL = url; m_pullURL = url;
} }
void VideoWidget::setPushURL(const QString &url) { void VideoWidget::setPushURL(QStringList &urls) {
m_pushURL = url; m_pushURLs = urls;
} }
void VideoWidget::setVedioSaveFileDirPath(const QString &dirPath) { void VideoWidget::setVedioSaveFileDirPath(const QString &dirPath) {

@ -46,10 +46,10 @@ public:
bool play(const QString &url, bool bSave = true); bool play(const QString &url, bool bSave = true);
bool udpPlay(QString ip, int port); bool udpPlay(QString ip, int port);
void stopPlay(); void stopPlay();
bool pushStream(const QString &url); bool pushStream(QStringList &urls);
void stopPushStream(); void stopPushStream();
void setPullURL(const QString &url); void setPullURL(const QString &url);
void setPushURL(const QString &url); void setPushURL(QStringList &urls);
void setVedioSaveFileDirPath(const QString &dirPath); void setVedioSaveFileDirPath(const QString &dirPath);
bool getPlayStatus(); bool getPlayStatus();
double getVideoDuration(); double getVideoDuration();
@ -98,7 +98,7 @@ private:
private: private:
QString m_pullURL; QString m_pullURL;
QString m_pushURL; QStringList m_pushURLs;
bool m_playFlag = false; bool m_playFlag = false;
bool m_pullFlag = false; bool m_pullFlag = false;
bool m_pushFlag = false; bool m_pushFlag = false;

Loading…
Cancel
Save