随笔-118  评论-133  文章-4  trackbacks-0
基础搞明白了,那么live555的RTSP服务器,又是如何创建、启动,如何和Source和Sink建立联系的呢?

主程序中会调用类似下面的代码,创建RTSP服务器:
  // Create the RTSP server:
  RTSPServer
* rtspServer = RTSPServer::createNew(*env, 554, authDB);
  
if (rtspServer == NULL) {
    
*env << "Failed to create RTSP server: " << env->getResultMsg() << "\n";
    
exit(1);
  }

父类GenericMediaServer被构建,后续承担基础的服务:
RTSPServer::RTSPServer(UsageEnvironment& env,
               
int ourSocket, Port ourPort,
               UserAuthenticationDatabase
* authDatabase,
               unsigned reclamationSeconds)
  : GenericMediaServer(env, ourSocket, ourPort, reclamationSeconds),
    fHTTPServerSocket(
-1), fHTTPServerPort(0),
    fClientConnectionsForHTTPTunneling(
NULL), // will get created if needed
    fTCPStreamingDatabase(HashTable::create(ONE_WORD_HASH_KEYS)),
    fPendingRegisterOrDeregisterRequests(HashTable::create(ONE_WORD_HASH_KEYS)),
    fRegisterOrDeregisterRequestCounter(
0), fAuthDB(authDatabase), fAllowStreamingRTPOverTCP(True) {
}

GenericMediaServer会创建一个后台任务,由于监听Client的连接:
GenericMediaServer
::GenericMediaServer(UsageEnvironment
& env, int ourSocket, Port ourPort,
             unsigned reclamationSeconds)
  : Medium(env),
    fServerSocket(ourSocket), fServerPort(ourPort), fReclamationSeconds(reclamationSeconds),
    fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)),
    fClientConnections(HashTable::create(ONE_WORD_HASH_KEYS)),
    fClientSessions(HashTable::create(STRING_HASH_KEYS)) {
  ignoreSigPipeOnSocket(fServerSocket); 
// so that clients on the same host that are killed don't also kill us

  
// Arrange to handle connections from others:
  env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket, 
incomingConnectionHandler, this); //创建后台任务用于监听client的connect
}

收到Client的连接请求后,incomingConnectionHandler函数会被调用。

void GenericMediaServer::incomingConnectionHandlerOnSocket(int serverSocket) {
  struct sockaddr_in clientAddr;
  SOCKLEN_T clientAddrLen 
= sizeof clientAddr;
  
int clientSocket = accept(serverSocket, (struct sockaddr*)&clientAddr, &clientAddrLen);
  
if (clientSocket < 0) {
    
int err = envir().getErrno();
    
if (err != EWOULDBLOCK) {
      envir().setResultErrMsg(
"accept() failed: ");
    }
    return;
  }

  
// Create a new object for handling this connection:
  (void)createNewClientConnection(clientSocket, clientAddr);
}

live555会为每个连接调用createNewClientConnection创建一个ClientConnection

RTSPServer::RTSPClientConnection
::RTSPClientConnection(RTSPServer
& ourServer, int clientSocket, struct sockaddr_in clientAddr)
  : 
GenericMediaServer::ClientConnection(ourServer, clientSocket, clientAddr),
    fOurRTSPServer(ourServer), fClientInputSocket(fOurSocket), fClientOutputSocket(fOurSocket),

    fIsActive(True), fRecursionCount(0), fOurSessionCookie(NULL) {
  resetRequestBuffer();
}

GenericMediaServer::ClientConnection会创建一个后台任务,由于监听connect上的Request:
GenericMediaServer::ClientConnection
::ClientConnection(GenericMediaServer
& ourServer, int clientSocket, struct sockaddr_in clientAddr)
  : fOurServer(ourServer), fOurSocket(clientSocket), fClientAddr(clientAddr) {
  
// Add ourself to our 'client connections' table:
  fOurServer.fClientConnections->Add((char const*)this, this);

  
// Arrange to handle incoming requests:
  resetRequestBuffer();
  envir().taskScheduler()
    .setBackgroundHandling(fOurSocket, SOCKET_READABLE|SOCKET_EXCEPTION, 
incomingRequestHandler, this); //创建后台任务用于监听connect上的Request
}

当Connect上收到来自Client的请求后,incomingRequestHandler函数会被调用。

void GenericMediaServer::ClientConnection::incomingRequestHandler() {
  struct sockaddr_in dummy; 
// 'from' address, meaningless in this case

  
int bytesRead = readSocket(envir(), fOurSocket, &fRequestBuffer[fRequestBytesAlreadySeen], fRequestBufferBytesLeft, dummy);
  handleRequestBytes(bytesRead);
}

void RTSPServer::RTSPClientConnection::handleRequestBytes(int newBytesRead) {
.
    
Boolean parseSucceeded = parseRTSPRequestString((char*)fRequestBuffer, fLastCRLF+2 - fRequestBuffer,    
                            cmdName, sizeof cmdName,
                            urlPreSuffix, sizeof urlPreSuffix,
                            urlSuffix, sizeof urlSuffix,
                            cseq, sizeof cseq,
                            sessionIdStr, sizeof sessionIdStr,
                            contentLength);

.
      } 
else if (strcmp(cmdName, "DESCRIBE"== 0) {    //处理DESCRIBE request
        handleCmd_DESCRIBE(urlPreSuffix, urlSuffix, (char 
const*)fRequestBuffer);
      } 
else if (strcmp(cmdName, "SETUP"== 0) {   //处理SETUP request
            clientSession 
= (RTSPServer::RTSPClientSession*)fOurRTSPServer.createNewClientSessionWithId();
      }
        
if (clientSession != NULL) {
          clientSession
->handleCmd_SETUP(this, urlPreSuffix, urlSuffix, (char const*)fRequestBuffer);
        }
      } 
else if (strcmp(cmdName, "TEARDOWN"== 0
         || strcmp(cmdName, 
"PLAY"== 0
         || strcmp(cmdName, 
"PAUSE"== 0
         || strcmp(cmdName, 
"GET_PARAMETER"== 0
         || strcmp(cmdName, 
"SET_PARAMETER"== 0) {
        
if (clientSession != NULL) {
          clientSession
->handleCmd_withinSession(this, cmdName, urlPreSuffix, urlSuffix, (char const*)fRequestBuffer);  //处理PLAY请求等

        }
.
}

具体RTSP协议下的Request和Respond不作详细介绍,这里只关注流程部分:
1、先看handleCmd_SETUP
void RTSPServer::RTSPClientSession
::handleCmd_SETUP(RTSPServer::RTSPClientConnection
* ourClientConnection,
          char 
const* urlPreSuffix, char const* urlSuffix, char const* fullRequestStr) {

fstreamStates = new struct streamState[fNumStreamStates];  //每个会话的状态,用streamState来管理
....
    subsession
->getStreamParameters(fOurSessionId, ourClientConnection->fClientAddr.sin_addr.s_addr,
                    clientRTPPort, clientRTCPPort,
                    fStreamStates[trackNum].tcpSocketNum, rtpChannelId, rtcpChannelId,
                    destinationAddress, destinationTTL, fIsMulticast,
                    serverRTPPort, serverRTCPPort,
                    fStreamStates[trackNum].streamToken);
.
}

getStreamParmeters这个函数很重要,它将完成source,RTPSink的创建工作,并将其与客户端建立联系
void OnDemandServerMediaSubsession
::getStreamParameters(unsigned clientSessionId,
              netAddressBits clientAddress,
              Port 
const& clientRTPPort,
              Port 
const& clientRTCPPort,
              
int tcpSocketNum,
              unsigned char rtpChannelId,
              unsigned char rtcpChannelId,
              netAddressBits
& destinationAddress,
              u_int8_t
& /*destinationTTL*/,
              
Boolean& isMulticast,
              Port
& serverRTPPort,
              Port
& serverRTCPPort,
              void
*& streamToken) {

    FramedSource* mediaSource
      
= createNewStreamSource(clientSessionId, streamBitrate);
    // Normal case: We're streaming RTP (over UDP or TCP).  Create a pair of
    // groupsocks (RTP and RTCP), with adjacent port numbers (RTP port number even).
    
// (If we're multiplexing RTCP and RTP over the same port number, it can be odd or even.)
    NoReuse dummy(envir()); // ensures that we skip over ports that are already in use
    
for (portNumBits serverPortNum = fInitialPortNum; ; ++serverPortNum) {
      struct in_addr dummyAddr; dummyAddr.s_addr 
= 0;

      serverRTPPort 
= serverPortNum;
      rtpGroupsock = createGroupsock(dummyAddr, serverRTPPort); //创建RTP传输用socket
      
if (rtpGroupsock->socketNum() < 0) {
        delete rtpGroupsock;
        continue; 
// try again
      }

      
if (fMultiplexRTCPWithRTP) {
        
// Use the RTP 'groupsock' object for RTCP as well:
        serverRTCPPort = serverRTPPort;
        rtcpGroupsock 
= rtpGroupsock;
      } 
else {
        
// Create a separate 'groupsock' object (with the next (odd) port number) for RTCP:
        serverRTCPPort = ++serverPortNum;
      
  rtcpGroupsock = createGroupsock(dummyAddr, serverRTCPPort);
        
if (rtcpGroupsock->socketNum() < 0) {
          delete rtpGroupsock;
          delete rtcpGroupsock;
          continue; 
// try again
        }
      }
   rtpSink = createNewRTPSink(rtpGroupsock, rtpPayloadType, mediaSource);
  streamToken = fLastStreamToken
      
= new StreamState(*this, serverRTPPort, serverRTCPPort, rtpSink, udpSink, 
            streamBitrate, mediaSource,
            rtpGroupsock, rtcpGroupsock);}

2、再来看handleCmd_PLAY
void RTSPServer::RTSPClientSession
::handleCmd_PLAY(RTSPServer::RTSPClientConnection
* ourClientConnection,
         ServerMediaSubsession
* subsession, char const* fullRequestStr) {
.
  
// Now, start streaming:
  
for (i = 0; i < fNumStreamStates; ++i) {
    
if (subsession == NULL /* means: aggregated operation */
    || subsession 
== fStreamStates[i].subsession) {
      unsigned short rtpSeqNum 
= 0;
      unsigned rtpTimestamp 
= 0;
      
if (fStreamStates[i].subsession == NULL) continue;
      fStreamStates[i].subsession
->startStream(fOurSessionId, 
                           fStreamStates[i].streamToken,
                           (TaskFunc
*)noteClientLiveness, this,
                           rtpSeqNum, rtpTimestamp,
                           RTSPServer::RTSPClientConnection::handleAlternativeRequestByte, ourClientConnection);
.
}

void OnDemandServerMediaSubsession::startStream(unsigned clientSessionId,
                        void
* streamToken,
                        TaskFunc
* rtcpRRHandler,
                        void
* rtcpRRHandlerClientData,
                        unsigned short
& rtpSeqNum,
                        unsigned
& rtpTimestamp,
                        ServerRequestAlternativeByteHandler
* serverRequestAlternativeByteHandler,
                        void
* serverRequestAlternativeByteHandlerClientData) {
.
  
if (streamState != NULL) {
    streamState
->startPlaying(destinations, clientSessionId,   
                  rtcpRRHandler, rtcpRRHandlerClientData,
                  serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData);
.
}

void StreamState
::startPlaying(Destinations
* dests, unsigned clientSessionId,
           TaskFunc
* rtcpRRHandler, void* rtcpRRHandlerClientData,
           ServerRequestAlternativeByteHandler
* serverRequestAlternativeByteHandler,
           void
* serverRequestAlternativeByteHandlerClientData) {
.
  
if (!fAreCurrentlyPlaying && fMediaSource != NULL) {
    
if (fRTPSink != NULL) {
      fRTPSink
->startPlaying(*fMediaSource, afterPlayingStreamState, this);
      fAreCurrentlyPlaying 
= True;
.
}

Boolean MediaSink::startPlaying(MediaSource& source,
                afterPlayingFunc
* afterFunc,
                void
* afterClientData) {
.
return continuePlaying();
}

Boolean H264or5VideoRTPSink::continuePlaying() {
  
// First, check whether we have a 'fragmenter' class set up yet.
  // If not, create it now:
  
if (fOurFragmenter == NULL) {
    fOurFragmenter 
= new H264or5Fragmenter(fHNumber, envir(), fSource, OutPacketBuffer::maxSize,   
                       ourMaxPacketSize() 
- 12/*RTP hdr size*/);
  } 
else {
    fOurFragmenter
->reassignInputSource(fSource);
  }
  fSource 
= fOurFragmenter;

  
// Then call the parent class's implementation:
  return MultiFramedRTPSink::continuePlaying();
}

Boolean MultiFramedRTPSink::continuePlaying() {
  
// Send the first packet.
  
// (This will also schedule any future sends.)
  buildAndSendPacket(
True);
  return 
True;
}

绕了好大一个圈,终于到达MultiFrameRTPSink的continuePlaying了,从现在开始,它将循环的获取RTSP服务器需要的RTP数据包,直到收到停止命令。 
MultiFramedRTPSink是与帧有关的类,其实它要求每次必须从source获得一个帧的数据。
void MultiFramedRTPSink::buildAndSendPacket(Boolean isFirstPacket) 
{
    
//此函数中主要是准备rtp包的头,为一些需要跟据实际数据改变的字段留出位置。
    fIsFirstPacket 
= isFirstPacket;
 
    
// Set up the RTP header:
    unsigned rtpHdr 
= 0x80000000; // RTP version 2; marker ('M') bit not set (by default; it can be set later)
    rtpHdr |= (fRTPPayloadType << 16);
    rtpHdr |
= fSeqNo; // sequence number
    fOutBuf
->enqueueWord(rtpHdr);//向包中加入一个字
 
 
    
// Note where the RTP timestamp will go.
    
// (We can't fill this in until we start packing payload frames.)
    fTimestampPosition = fOutBuf->curPacketSize();
    fOutBuf
->skipBytes(4); // leave a hole for the timestamp 在缓冲中空出时间戳的位置
 
 
    fOutBuf
->enqueueWord(SSRC()); 
 
 
    
// Allow for a special, payload-format-specific header following the
    
// RTP header:
    fSpecialHeaderPosition 
= fOutBuf->curPacketSize();
    fSpecialHeaderSize 
= specialHeaderSize();
    fOutBuf
->skipBytes(fSpecialHeaderSize);
 
 
    
// Begin packing as many (complete) frames into the packet as we can:
    fTotalFrameSpecificHeaderSizes 
= 0;
    fNoFramesLeft 
= False;
    fNumFramesUsedSoFar 
= 0;
   

    packFrame(); 
//头准备好了,再打包帧数据}

void MultiFramedRTPSink::packFrame()
{
    
// First, see if we have an overflow frame that was too big for the last pkt
    
if (fOutBuf->haveOverflowData()) {
        
//如果有帧数据,则使用之。OverflowData是指上次打包时剩下的帧数据,因为一个包可能容纳不了一个帧。
        
// Use this frame before reading a new one from the source
        unsigned frameSize 
= fOutBuf->overflowDataSize();
        struct timeval presentationTime 
= fOutBuf->overflowPresentationTime();
        unsigned durationInMicroseconds 
=fOutBuf->overflowDurationInMicroseconds();
        fOutBuf
->useOverflowData();
 
 
        afterGettingFrame1(frameSize, 
0, presentationTime,durationInMicroseconds);
    } 
else {
        
//否则,跟source要。
        
// Normal case: we need to read a new frame from the source
        
if (fSource == NULL)
            return;
 
 
        
//更新缓冲中的一些位置
        fCurFrameSpecificHeaderPosition 
= fOutBuf->curPacketSize();
        fCurFrameSpecificHeaderSize 
= frameSpecificHeaderSize();
        fOutBuf
->skipBytes(fCurFrameSpecificHeaderSize);
        fTotalFrameSpecificHeaderSizes 
+= fCurFrameSpecificHeaderSize;
 
 
        
//从source获取下一帧
        
fSource->getNextFrame(fOutBuf->curPtr(),
                fOutBuf
->totalBytesAvailable(),
                afterGettingFrame,

                this,
                ourHandleClosure,

                this);
    }
}
source从文件(或某个设备)中读取一帧数据,读完后通过回调函数afterGettingFrame返回给Sink。
void MultiFramedRTPSink::afterGettingFrame(void* clientData,
        unsigned numBytesRead, unsigned numTruncatedBytes,
        struct timeval presentationTime, unsigned durationInMicroseconds)
{
    MultiFramedRTPSink
* sink = (MultiFramedRTPSink*) clientData;
    sink
->afterGettingFrame1(numBytesRead, numTruncatedBytes, presentationTime,
            durationInMicroseconds);
}
void MultiFramedRTPSink::afterGettingFrame1(
        unsigned frameSize,
        unsigned numTruncatedBytes,
        struct timeval presentationTime,
        unsigned durationInMicroseconds)
{
    
if (fIsFirstPacket) {
        
// Record the fact that we're starting to play now:
        gettimeofday(&fNextSendTime, NULL); 
    }

    unsigned curFragmentationOffset 
= fCurFragmentationOffset;
    unsigned numFrameBytesToUse 
= frameSize;
    unsigned overflowBytes 
= 0;
 
 
    
//如果包只已经打入帧数据了,并且不能再向这个包中加数据了,则把新获得的帧数据保存下来。
    
// If we have already packed one or more frames into this packet,
    
// check whether this new frame is eligible to be packed after them.
    
// (This is independent of whether the packet has enough room for this
    
// new frame; that check comes later.)
    
if (fNumFramesUsedSoFar > 0) {
        
//如果包中已有了一个帧,并且不允许再打入新的帧了,则只记录下新的帧。
        
if ((fPreviousFrameEndedFragmentation && !allowOtherFramesAfterLastFragment())
                || !frameCanAppearAfterPacketStart(fOutBuf
->curPtr(), frameSize))
        {
            
// Save away this frame for next time:
            numFrameBytesToUse 
= 0;
            fOutBuf
->setOverflowData(fOutBuf->curPacketSize(), frameSize,
                    presentationTime, durationInMicroseconds);
        }
    }
    
    
//表示当前打入的是否是上一个帧的最后一块数据。
    fPreviousFrameEndedFragmentation 
= False;
 
 
    
//下面是计算获取的帧中有多少数据可以打到当前包中,剩下的数据就作为overflow数据保存下来。
    
if (numFrameBytesToUse > 0) {
        
// Check whether this frame overflows the packet
        
if (fOutBuf->wouldOverflow(frameSize)) {
            
// Don't use this frame now; instead, save it as overflow data, and
            // send it in the next packet instead.  However, if the frame is too
            
// big to fit in a packet by itself, then we need to fragment it (and
            
// use some of it in this packet, if the payload format permits this.)
            
if (isTooBigForAPacket(frameSize)
                    
&& (fNumFramesUsedSoFar == 0 || allowFragmentationAfterStart())) {
                
// We need to fragment this frame, and use some of it now:
                overflowBytes 
= computeOverflowForNewFrame(frameSize);
                numFrameBytesToUse 
-= overflowBytes;
                fCurFragmentationOffset 
+= numFrameBytesToUse;
            } 
else {
                
// We don't use any of this frame now:
                overflowBytes = frameSize;
                numFrameBytesToUse 
= 0;
            }
            fOutBuf
->setOverflowData(fOutBuf->curPacketSize() + numFrameBytesToUse,
                    overflowBytes, presentationTime, durationInMicroseconds);
        } 
else if (fCurFragmentationOffset > 0) {
            
// This is the last fragment of a frame that was fragmented over
            
// more than one packet.  Do any special handling for this case:
            fCurFragmentationOffset 
= 0;
            fPreviousFrameEndedFragmentation 
= True;
        }
    }
 
 
    
    
if (numFrameBytesToUse == 0 && frameSize > 0) {
        
// Send our packet now, because we have filled it up:
        sendPacketIfNecessary();
    } 
else {
        
//需要向包中打入数据。
        
        
// Use this frame in our outgoing packet:
        unsigned char
* frameStart = fOutBuf->curPtr();
        fOutBuf
->increment(numFrameBytesToUse);
        
// do this now, in case "doSpecialFrameHandling()" calls "setFramePadding()" to append padding bytes
 
 
        
// Here's where any payload format specific processing gets done:
        doSpecialFrameHandling(curFragmentationOffset, frameStart,
                numFrameBytesToUse, presentationTime, overflowBytes);
 
 
        
++fNumFramesUsedSoFar;
 
        //更新fNextSendTime,后面delay queue需要用到。
        
// Update the time at which the next packet should be sent, based
        
// on the duration of the frame that we just packed into it.
        
// However, if this frame has overflow data remaining, then don't
        // count its duration yet.
        
if (overflowBytes == 0) {
            fNextSendTime.tv_usec += durationInMicroseconds;
            fNextSendTime.tv_sec += fNextSendTime.tv_usec / 1000000;
            fNextSendTime.tv_usec %= 1000000;
        }
 
        //如果需要,就发出包,否则继续打入数据。
        
// Send our packet now if (i) it's already at our preferred size, or
        // (ii) (heuristic) another frame of the same size as the one we just
        
//      read would overflow the packet, or
        
// (iii) it contains the last fragment of a fragmented frame, and we
        
//      don't allow anything else to follow this or
        // (iv) one frame per packet is allowed:
        
if (fOutBuf->isPreferredSize()
                || fOutBuf
->wouldOverflow(numFrameBytesToUse)
                || (fPreviousFrameEndedFragmentation
                        
&& !allowOtherFramesAfterLastFragment())
                || !frameCanAppearAfterPacketStart(
                        fOutBuf
->curPtr() - frameSize, frameSize)) {
            
// The packet is ready to be sent now
            
sendPacketIfNecessary();
        } 
else {
            
// There's room for more frames; try getting another:
            packFrame();
        }
    }
void MultiFramedRTPSink::sendPacketIfNecessary()
{

    
if (fNumFramesUsedSoFar > 0) {

        
++fPacketCount;
        fTotalOctetCount 
+= fOutBuf->curPacketSize();
        fOctetCount 
+= fOutBuf->curPacketSize() - rtpHeaderSize
                
- fSpecialHeaderSize - fTotalFrameSpecificHeaderSizes;
 
 
        
++fSeqNo; // for next time
    }
 
 
    
//如果还有剩余数据,则调整缓冲区
    
if (fOutBuf->haveOverflowData()
            
&& fOutBuf->totalBytesAvailable() > fOutBuf->totalBufferSize() / 2) {
        
// Efficiency hack: Reset the packet start pointer to just in front of
        
// the overflow data (allowing for the RTP header and special headers),
        
// so that we probably don't have to "memmove()" the overflow data
        // into place when building the next packet:
        unsigned newPacketStart 
= fOutBuf->curPacketSize()- 
                (rtpHeaderSize 
+ fSpecialHeaderSize + frameSpecificHeaderSize());
        fOutBuf
->adjustPacketStart(newPacketStart);
    } 
else {
        
// Normal case: Reset the packet start pointer back to the start:
        fOutBuf
->resetPacketStart();
    }
    fOutBuf
->resetOffset();
    fNumFramesUsedSoFar 
= 0;
 
 
    
if (fNoFramesLeft) {
        
// We're done:
        onSourceClosure(this);
    } 
else {
        
//如果还有数据,则在下一次需要发送的时间再次打包发送。
        
// We have more frames left to send.  Figure out when the next frame
        
// is due to start playing, then make sure that we wait this long before
        
// sending the next packet.
        struct timeval timeNow;
        gettimeofday(
&timeNow, NULL);
        
int secsDiff = fNextSendTime.tv_sec - timeNow.tv_sec;
        int64_t uSecondsToGo = secsDiff * 1000000
                
+ (fNextSendTime.tv_usec - timeNow.tv_usec);
        
if (uSecondsToGo < 0 || secsDiff < 0) { // sanity check: Make sure that the time-to-delay is non-negative:
            uSecondsToGo 
= 0;
        }
 
 
        
// Delay this amount of time:
        
nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecondsToGo,
                (TaskFunc*) sendNext, this);}
    }
// The following is called after each delay between packet sends:
void MultiFramedRTPSink::sendNext(void
* firstArg) {
  MultiFramedRTPSink
* sink = (MultiFramedRTPSink*)firstArg;
  
sink->buildAndSendPacket(False); //继续循环
}
posted on 2017-02-08 16:47 lfc 阅读(630) 评论(0)  编辑 收藏 引用
只有注册用户登录后才能发表评论。