随笔-99  评论-133  文章-4  trackbacks-0
基础搞明白了,那么live555的RTSP服务器,又是如何创建、启动,如何和Source和Sink建立联系的呢?

主程序中会调用类似下面的代码,创建RTSP服务器:
  // Create the RTSP server:
  RTSPServer
* rtspServer = RTSPServer::createNew(*env, 554, authDB);
  
if (rtspServer == NULL) {
    
*env << "Failed to create RTSP server: " << env->getResultMsg() << "\n";
    
exit(1);
  }

父类GenericMediaServer被构建,后续承担基础的服务:
RTSPServer::RTSPServer(UsageEnvironment& env,
               
int ourSocket, Port ourPort,
               UserAuthenticationDatabase
* authDatabase,
               unsigned reclamationSeconds)
  : GenericMediaServer(env, ourSocket, ourPort, reclamationSeconds),
    fHTTPServerSocket(
-1), fHTTPServerPort(0),
    fClientConnectionsForHTTPTunneling(
NULL), // will get created if needed
    fTCPStreamingDatabase(HashTable::create(ONE_WORD_HASH_KEYS)),
    fPendingRegisterOrDeregisterRequests(HashTable::create(ONE_WORD_HASH_KEYS)),
    fRegisterOrDeregisterRequestCounter(
0), fAuthDB(authDatabase), fAllowStreamingRTPOverTCP(True) {
}

GenericMediaServer会创建一个后台任务,由于监听Client的连接:
GenericMediaServer
::GenericMediaServer(UsageEnvironment
& env, int ourSocket, Port ourPort,
             unsigned reclamationSeconds)
  : Medium(env),
    fServerSocket(ourSocket), fServerPort(ourPort), fReclamationSeconds(reclamationSeconds),
    fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)),
    fClientConnections(HashTable::create(ONE_WORD_HASH_KEYS)),
    fClientSessions(HashTable::create(STRING_HASH_KEYS)) {
  ignoreSigPipeOnSocket(fServerSocket); 
// so that clients on the same host that are killed don't also kill us

  
// Arrange to handle connections from others:
  env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket, 
incomingConnectionHandler, this); //创建后台任务用于监听client的connect
}

收到Client的连接请求后,incomingConnectionHandler函数会被调用。

void GenericMediaServer::incomingConnectionHandlerOnSocket(int serverSocket) {
  struct sockaddr_in clientAddr;
  SOCKLEN_T clientAddrLen 
= sizeof clientAddr;
  
int clientSocket = accept(serverSocket, (struct sockaddr*)&clientAddr, &clientAddrLen);
  
if (clientSocket < 0) {
    
int err = envir().getErrno();
    
if (err != EWOULDBLOCK) {
      envir().setResultErrMsg(
"accept() failed: ");
    }
    return;
  }

  
// Create a new object for handling this connection:
  (void)createNewClientConnection(clientSocket, clientAddr);
}

live555会为每个连接调用createNewClientConnection创建一个ClientConnection

RTSPServer::RTSPClientConnection
::RTSPClientConnection(RTSPServer
& ourServer, int clientSocket, struct sockaddr_in clientAddr)
  : 
GenericMediaServer::ClientConnection
    
fIsActive((ourServer, clientSocket, clientAddr),
    fOurRTSPServer(ourServer), fClientInputSocket(fOurSocket), fClientOutputSocket(fOurSocket),
True), fRecursionCount(0), fOurSessionCookie(NULL) {
  resetRequestBuffer();
}

GenericMediaServer::ClientConnection会创建一个后台任务,由于监听connect上的Request:
GenericMediaServer::ClientConnection
::ClientConnection(GenericMediaServer
& ourServer, int clientSocket, struct sockaddr_in clientAddr)
  : fOurServer(ourServer), fOurSocket(clientSocket), fClientAddr(clientAddr) {
  
// Add ourself to our 'client connections' table:
  fOurServer.fClientConnections->Add((char const*)this, this);

  
// Arrange to handle incoming requests:
  resetRequestBuffer();
  envir().taskScheduler()
    .setBackgroundHandling(fOurSocket, SOCKET_READABLE|SOCKET_EXCEPTION, 
incomingRequestHandler, this); //创建后台任务用于监听connect上的Request
}

当Connect上收到来自Client的请求后,incomingRequestHandler函数会被调用。

void GenericMediaServer::ClientConnection::incomingRequestHandler() {
  struct sockaddr_in dummy; 
// 'from' address, meaningless in this case

  
int bytesRead = readSocket(envir(), fOurSocket, &fRequestBuffer[fRequestBytesAlreadySeen], fRequestBufferBytesLeft, dummy);
  handleRequestBytes(bytesRead);
}

void RTSPServer::RTSPClientConnection::handleRequestBytes(int newBytesRead) {
.
    
Boolean parseSucceeded = parseRTSPRequestString((char*)fRequestBuffer, fLastCRLF+2 - fRequestBuffer,    
                            cmdName, sizeof cmdName,
                            urlPreSuffix, sizeof urlPreSuffix,
                            urlSuffix, sizeof urlSuffix,
                            cseq, sizeof cseq,
                            sessionIdStr, sizeof sessionIdStr,
                            contentLength);

.
      } 
else if (strcmp(cmdName, "DESCRIBE"== 0) {    //处理DESCRIBE request
        handleCmd_DESCRIBE(urlPreSuffix, urlSuffix, (char 
const*)fRequestBuffer);
      } 
else if (strcmp(cmdName, "SETUP"== 0) {   //处理SETUP request
            clientSession 
= (RTSPServer::RTSPClientSession*)fOurRTSPServer.createNewClientSessionWithId();
      }
        
if (clientSession != NULL) {
          clientSession
->handleCmd_SETUP(this, urlPreSuffix, urlSuffix, (char const*)fRequestBuffer);
        }
      } 
else if (strcmp(cmdName, "TEARDOWN"== 0
         || strcmp(cmdName, 
"PLAY"== 0
         || strcmp(cmdName, 
"PAUSE"== 0
         || strcmp(cmdName, 
"GET_PARAMETER"== 0
         || strcmp(cmdName, 
"SET_PARAMETER"== 0) {
        
if (clientSession != NULL) {
          clientSession
->handleCmd_withinSession(this, cmdName, urlPreSuffix, urlSuffix, (char const*)fRequestBuffer);  //处理PLAY请求等

        }
.
}

具体RTSP协议下的Request和Respond不作详细介绍,这里只关注流程部分:
1、先看handleCmd_SETUP
void RTSPServer::RTSPClientSession
::handleCmd_SETUP(RTSPServer::RTSPClientConnection
* ourClientConnection,
          char 
const* urlPreSuffix, char const* urlSuffix, char const* fullRequestStr) {

fstreamStates = new struct streamState[fNumStreamStates];  //每个会话的状态,用streamState来管理
....
    subsession
->getStreamParameters(fOurSessionId, ourClientConnection->fClientAddr.sin_addr.s_addr,
                    clientRTPPort, clientRTCPPort,
                    fStreamStates[trackNum].tcpSocketNum, rtpChannelId, rtcpChannelId,
                    destinationAddress, destinationTTL, fIsMulticast,
                    serverRTPPort, serverRTCPPort,
                    fStreamStates[trackNum].streamToken);
.
}

getStreamParmeters这个函数很重要,它将完成source,RTPSink的创建工作,并将其与客户端建立联系
void OnDemandServerMediaSubsession
::getStreamParameters(unsigned clientSessionId,
              netAddressBits clientAddress,
              Port 
const& clientRTPPort,
              Port 
const& clientRTCPPort,
              
int tcpSocketNum,
              unsigned char rtpChannelId,
              unsigned char rtcpChannelId,
              netAddressBits
& destinationAddress,
              u_int8_t
& /*destinationTTL*/,
              
Boolean& isMulticast,
              Port
& serverRTPPort,
              Port
& serverRTCPPort,
              void
*& streamToken) {
.
    FramedSource* mediaSource
      
= createNewStreamSource(clientSessionId, streamBitrate);

.
    rtpSink = createNewRTPSink(rtpGroupsock, rtpPayloadType, mediaSource);

.
    streamToken = fLastStreamToken
      
= new StreamState(*this, serverRTPPort, serverRTCPPort, rtpSink, udpSink, 
            streamBitrate, mediaSource,
            rtpGroupsock, rtcpGroupsock);}

.

2、再来看handleCmd_PLAY
void RTSPServer::RTSPClientSession
::handleCmd_PLAY(RTSPServer::RTSPClientConnection
* ourClientConnection,
         ServerMediaSubsession
* subsession, char const* fullRequestStr) {
.
  
// Now, start streaming:
  
for (i = 0; i < fNumStreamStates; ++i) {
    
if (subsession == NULL /* means: aggregated operation */
    || subsession 
== fStreamStates[i].subsession) {
      unsigned short rtpSeqNum 
= 0;
      unsigned rtpTimestamp 
= 0;
      
if (fStreamStates[i].subsession == NULL) continue;
      fStreamStates[i].subsession
->startStream(fOurSessionId, 
                           fStreamStates[i].streamToken,
                           (TaskFunc
*)noteClientLiveness, this,
                           rtpSeqNum, rtpTimestamp,
                           RTSPServer::RTSPClientConnection::handleAlternativeRequestByte, ourClientConnection);
.
}

void OnDemandServerMediaSubsession::startStream(unsigned clientSessionId,
                        void
* streamToken,
                        TaskFunc
* rtcpRRHandler,
                        void
* rtcpRRHandlerClientData,
                        unsigned short
& rtpSeqNum,
                        unsigned
& rtpTimestamp,
                        ServerRequestAlternativeByteHandler
* serverRequestAlternativeByteHandler,
                        void
* serverRequestAlternativeByteHandlerClientData) {
.
  
if (streamState != NULL) {
    streamState
->startPlaying(destinations, clientSessionId,   
                  rtcpRRHandler, rtcpRRHandlerClientData,
                  serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData);
.
}

void StreamState
::startPlaying(Destinations
* dests, unsigned clientSessionId,
           TaskFunc
* rtcpRRHandler, void* rtcpRRHandlerClientData,
           ServerRequestAlternativeByteHandler
* serverRequestAlternativeByteHandler,
           void
* serverRequestAlternativeByteHandlerClientData) {
.
  
if (!fAreCurrentlyPlaying && fMediaSource != NULL) {
    
if (fRTPSink != NULL) {
      fRTPSink
->startPlaying(*fMediaSource, afterPlayingStreamState, this);
      fAreCurrentlyPlaying 
= True;
.
}

Boolean MediaSink::startPlaying(MediaSource& source,
                afterPlayingFunc
* afterFunc,
                void
* afterClientData) {
.
return continuePlaying();
}

Boolean H264or5VideoRTPSink::continuePlaying() {
  
// First, check whether we have a 'fragmenter' class set up yet.
  // If not, create it now:
  
if (fOurFragmenter == NULL) {
    fOurFragmenter 
= new H264or5Fragmenter(fHNumber, envir(), fSource, OutPacketBuffer::maxSize,   
                       ourMaxPacketSize() 
- 12/*RTP hdr size*/);
  } 
else {
    fOurFragmenter
->reassignInputSource(fSource);
  }
  fSource 
= fOurFragmenter;

  
// Then call the parent class's implementation:
  return MultiFramedRTPSink::continuePlaying();
}

Boolean MultiFramedRTPSink::continuePlaying() {
  
// Send the first packet.
  
// (This will also schedule any future sends.)
  buildAndSendPacket(
True);
  return 
True;
}

绕了好大一个圈,终于到达MultiFrameRTPSink的continuePlaying了,从现在开始,它将循环的获取RTSP服务器需要的RTP数据包,直到收到停止命令。
posted on 2017-02-08 16:47 lfc 阅读(130) 评论(0)  编辑 收藏 引用
只有注册用户登录后才能发表评论。