前言

RTSP服务器是一个网络服务器,它使用实时流媒体传输协议(RTSP)提供对媒体流的控制。RTSP是一种应用层协议,用于控制多媒体会话,包括控制媒体会话中的媒体流的传输。这种控制包括播放、暂停、前进、后退等操作。

RTSP服务器通常用于流媒体系统,如网络电视直播,视频监控等。它可以将媒体数据流动态的发送给客户端,而无需将整个媒体文件先发给客户端。

RTSP服务器能够处理和响应客户端发送的RTSP请求,并管理媒体流的传输。它还可以动态地为每个客户端会话选择传输协议(如RTP、UDP或TCP)以及数据格式(如MPEG、AVI或QuickTime)。

一、需求

功能需求

  1. RTSP服务器需要要有UDP和TCP两种连接方式,即能够通过以下两种方式来获取服务器的媒体数据。
    1
    2
    3
    ffplay -i  rtsp://127.0.0.1:8554

    ffplay -i -rtsp_transport tcp rtsp://127.0.0.1:8554
  2. 服务器能够上传视频 (H264) 和音频 (AAC),且用户在获取媒体数据时,音画需要同步

    性能需求

    支持多用户 (5+) 同时访问

二、架构

想要实现高性能RTSP服务器,好的架构是后续工作顺利开展的基础。

架构如下:

RTSP服务器

1.线程池

2.事件管理

  • select 轮询器(Poller) - 处理 IO 事件
  • 定时器(Timer)管理 - 处理定时器事件
  • 触发器(Trigger)管理 - 处理触发器事件

3.媒体节点管理

  • 媒体节点 1 - 视频
  • 媒体节点 2 - 音频

4.RTSP连接管理

  • UDP管理
  • TCP管理

三、事件

事件是贯穿RTSP服务器的核心思想,事件共分为以下三类:

->右侧代表更高级的管理器

=>右侧代表最高级的管理器

事件携带其他组件定义的回调函数被赋予不同的功能,

在事件被添加到事件管理器之后,遍历事件列表逐个调用回调函数,

这无疑给予了事件足够的灵活性,这与线程池的Task有异曲同工之妙。

RTSP服务器所需事件共4个:

  1. 关闭连接触发器事件:用于关闭与客户端的连接
  2. 消费者定时器事件:用于让媒体消费者发送帧数据
  3. 接受连接IO事件:用于与客户端建立连接
  4. 客户端IO事件:用于读取客户端发送的指令

四、媒体生产者(Source)与消费者(Sink)

媒体生产者和消费者是RTSP服务器与视频音频文件打交道的重要组件。

生产者

生产者用于读取媒体文件中的帧信息,并保存到帧队列中。

每次读取文件都需要生成一个Task,加入线程池中处理。

使用两个队列,输入队列和输出队列

输入队列

初始化为4个空帧(停-等窗口)。

从文件中读出的帧不直接进入输入序列,输入序列仅接受使用完的帧。

当读取过快导致输入队列为空,就需要等待消费者发送完帧后创建新的Task。

消费者创建Task的同时,向输入队列中添加使用完的帧。

输出队列

初始化为空。

直接接受文件的输出帧,消费者获取输出队列帧来发送。

消费者

消费者用于获取生产者的输出队列中的帧,并将数据包装成RTP数据包发送给客户端。

同时处理数据包发送时的格式数据。

五、协议与格式

格式在网络数据传输中占据着及其重要的地位。

[数据头]+[数据] 的格式占据了数据传输格式的大部分,下述的四种格式也不例外。

RTP与RTCP

RTP 是包装音视频数据的协议
RTP与RTCP

H264

H264 是包装视频数据的格式
H264中的NAL、NALU,以及RTP封装方式

AAC

AAC 是包装音频数据的格式
AAC

RTSP

RTSP是一种双向实时数据传输协议,负责服务器与客户端的沟通。
RTSP与SDP

六、实现

有了基础知识,我们来尝试与实践相结合。

线程池

与一般服务器线程池无异,实现如下几个功能:

  • 创建
  • 创建线程
  • 销毁线程
  • 添加任务
  • 运行(执行任务)

事件管理

为什要做事件管理?

每次事件在队列中需要被唤醒时,都会创建一个对应的Task,加载到线程池中进行调用。

我们不能直接将所有任务整合到线程池中,这不符合低耦合的要求。

因此,我们需要使用事件管理器来管理事件的增删改用。

IO事件

IO事件当前的状态一共四种-NONE、READ、WRITE、ERROR

除了要提供外部访问控制状态的接口之外,还要提供挂载3种状态处理回调函数的接口。

select 轮询器(Poller) - 处理 IO 事件

通过维护一个<fd, IOEvent>Map,来实现对IO事件的增删改用。

  • fd - 文件描述符 - IOEvent的唯一标识

整个类围绕事件的展开,即

1
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);  

之后,对准备好的事件进行调用。

  • nfds 需要监控的文件描述符集合中文件描述符的最大值加一
  • readfds 读描述符集合
  • writefds 写描述符集合
  • exceptfds 错误描述符集合
  • timeout select()会一直等待,直到有文件描述符准备好I/O操作或者达到超时时间。如果超时参数被设为NULL, select()函数会无限等待。

readfds/writefds/exceptfds - 被select()修改后可使用FD_ISSET查询fd是否已经准备好

有了select()的基础,接下来的三个函数也不难理解了

FD_CLR() - 清空fd_set的设置

FD_SET() - 设置某个fd为就绪状态

FD_ISSET() - 查询某个fd是否就绪

  • 增 - 和 改 相同
1
2
3
4
bool addIOEvent(IOEvent* event)
{
return updateIOEvent(event);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
bool removeIOEvent(IOEvent* event)
{
int fd = event->getFd();
if(fd < 0)
return false;
IOEventMap::iterator it = mEventMap.find(fd);
if(it != mEventMap.end()) // 之前存在
{
return false;
}
else
{
if(event->isReadHandling())
FD_SET(fd, &mReadSet);
if(event->isWriteHandling())
FD_SET(fd, &mWriteSet);
if(event->isExceptionHandling())
FD_SET(fd, &mExceptionSet);

mEventMap.insert(std::make_pair(fd, event));
}

if (mEventMap.empty())
mMaxNumSockets = 0;
else
mMaxNumSockets = mEventMap.rbegin()->first + 1;// 更新为最大文件描述符+1

return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
bool updateIOEvent(IOEvent* event)
{
int fd = event->getFd();
if(fd < 0)
return false;

FD_CLR(fd, &mReadSet);
FD_CLR(fd, &mWriteSet);
FD_CLR(fd, &mExceptionSet);

IOEventMap::iterator it = mEventMap.find(fd);
if(it != mEventMap.end()) // 之前存在
{
if (event->isReadHandling())
FD_SET(fd, &mReadSet);
if (event->isWriteHandling())
FD_SET(fd, &mWriteSet);
if (event->isExceptionHandling())
FD_SET(fd, &mExceptionSet);
}
else
{
if(event->isReadHandling())
FD_SET(fd, &mReadSet);
if(event->isWriteHandling())
FD_SET(fd, &mWriteSet);
if(event->isExceptionHandling())
FD_SET(fd, &mExceptionSet);

mEventMap.insert(std::make_pair(fd, event));
}

if (mEventMap.empty())
mMaxNumSockets = 0;
else
mMaxNumSockets = mEventMap.rbegin()->first + 1;// 更新为最大文件描述符+1

return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
bool handleIOEvent(IOEvent* event)
{
fd_set readSet = mReadSet;
fd_set writeSet = mWriteSet;
fd_set exceptionSet = mExceptionSet;
struct timeval timeout;
int ret;
int rEvent;

timeout.tv_sec = 1000;// 秒
timeout.tv_usec = 0;//微秒

ret = select(mMaxNumSockets, &readSet, &writeSet, &exceptionSet, &timeout);


if (ret < 0){
return;
}

for (IOEventMap::iterator it = mEventMap.begin(); it != mEventMap.end(); ++it)
{
rEvent = 0;
if (FD_ISSET(it->first, &readSet))
rEvent |= IOEvent::EVENT_READ;

if (FD_ISSET(it->first, &writeSet))
rEvent |= IOEvent::EVENT_WRITE;

if (FD_ISSET(it->first, &exceptionSet))
rEvent |= IOEvent::EVENT_ERROR;

if (rEvent != 0)
{
it->second->setREvent(rEvent);
mIOEvents.push_back(it->second);
}
}

for (auto& ioEvent : mIOEvents) {
ioEvent->handleEvent();
}

mIOEvents.clear();
}

定时器(Timer)事件

定时器事件就是在该事件在被挂载到事件队列一定时间后被调用,或者间隔一段时间循环调用的特殊事件。

定时器事件只负责挂载回调函数,以及控制回调函数是否执行。

定时器(Timer)

定时器就是定时器事件的载体。

被创建时赋予其创建时的时间戳,以及循环调用所间隔的时间。

类本身向外提供获取当前时间戳的方法。

定时器管理 - 处理定时器事件

由于定时器状态较少,且定时器的时间戳和循环时间不应被改变,所以定时器的管理仅需增删

由于定时器事件不是IO事件,所以不应使用文件标识符fd进行标识。因此采用定时器事件ID来唯一标识。

同时,因为既有一次性计时器,也有循环计时器,就导致我们需要两个Map来分别存放 - Timer本身的Timers - TimerEvent的Events

1
2
3
4
5
6
7
8
9
10
11
12
Timer::TimerId TimerManager::addTimer(TimerEvent* event, Timer::Timestamp timestamp, Timer::TimeInterval timeInterval)
{
++mLastTimerId;
Timer timer(event, timestamp, timeInterval,mLastTimerId);


mTimers.insert(std::make_pair(mLastTimerId, timer));
mEvents.insert(std::make_pair(timestamp, timer));
modifyTimeout();

return mLastTimerId;
}

删除计时器仅需删除计时器本身,已经搭载到计时器事件Map的对不用删除

1
2
3
4
5
6
7
8
9
10
bool TimerManager::removeTimer(Timer::TimerId timerId)
{
std::map<Timer::TimerId, Timer>::iterator it = mTimers.find(timerId);
if(it != mTimers.end())
{
mTimers.erase(timerId);
}

return true;
}

在调用计时器回调函数时,需要考虑循环计时器的情况。循环计时器在每次使用时,将计时器事件Map中的对删除后还需添加当前时间戳的对。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void TimerManager::handleRead() {

Timer::Timestamp timestamp = Timer::getCurTime();
if (!mTimers.empty() && !mEvents.empty()) {

std::multimap<Timer::Timestamp, Timer>::iterator it = mEvents.begin();
Timer timer = it->second;
int expire = timer.mTimestamp - timestamp;

if (expire <= 0) {
bool timerEventIsStop = timer.handleEvent();
mEvents.erase(it);
if (timer.mRepeat) {
if (timerEventIsStop) {
mTimers.erase(timer.mTimerId);
}else {
timer.mTimestamp = timestamp + timer.mTimeInterval;
mEvents.insert(std::make_pair(timer.mTimestamp, timer));
}
}
else {
mTimers.erase(timer.mTimerId);
}
}
}
}

触发器(Trigger)事件

触发器事件是最简单的事件,没有过多的状态,没有时间戳,仅包含一个触发器回调函数。

触发器(Trigger)管理 - 处理触发器事件

相应的,触发器管理也同样简单,仅需增删用即可

事件管理器 - 统一管理三个管理器

事件管理器统一接受所有管理器的增删改操作,并向外提供相应接口。并提供启动循环操作,统一开启所有操作。

媒体节点管理

RTSP连接管理