基于librtmp的RTMP代理服务器(附代码)

本文是基于librtmp的rtmp代理服务器,目的是在客户端拉流的过程中能够抓取到客户端拉取流的数据,然后将数据做处理操作,处理完发送给客户端或挪作他用。


以上为整体介绍,若跟你的意图大相径庭,后边的文字就没必要深入细读了


整体概述:
本文主要对rtmp代理服务器的整体流程以及实现方式做了一个大体概述,对抓取的数据如何处理或转发没有做出任何解释;代码只是部分重要代码;另外,本文没有对rtmp协议有任何讲解,若想实现rtmp代理请先了解rtmp协议,有关rtmp协议详情请自行搜索;推荐阅读带你吃透rtmp,链接:http://blog.csdn.net/shangmingyang/article/details/50837852


请求流程:
客户端->代理服务器->rtmp服务器


数据流程:
rtmp服务器->代理服务器->客户端


主要步骤:
1)新建一个tcp的服务,用来接收来自客户端的请求;

2)提取接收到请求客户端的socket,然后进行rtmp的握手操作,注意,这里你需要两个rtmp的句柄(librtmp中的RTMP结构体),一个用来与客户端通信,一个用来与服务端通信,简单说来就是:与客户端通信的句柄接收客户端的请求,然后将请求发送给服务器,与服务器通信的句柄接收服务器的数据,然后将数据发给客户端;

RTMP_Init(&m_sRtmpClient->rs); //rs为与客户端通信的句柄,相对于客户端,这里属于服务端,所以用rs来命名 RTMP_Init(&m_sRtmpClient->rc); //rc为与服务端通信的句柄,相对于服务端,这里属于客户端,所以用rc来命名 pThis->m_sRtmpClient->rs.m_sb.sb_socket = sockfd; //sockfd为接收到的客户端的请求,将这个socket赋值给与客户端通信的句柄 if (!RTMP_Serve(&m_sRtmpClient->rs))//启动一个rtmp的server,此处说白了就是进行了rtmp的握手操作 { RTMP_Log(RTMP_LOGERROR, "Handshake failed"); return; }


3)处理双方的数据,确定双方已全部连接,则建立连接成功,开始之后的操作


while (RTMP_IsConnected(&m_sRtmpClient->rs) && RTMP_ReadPacket(&m_sRtmpClient->rs, &ps)) { if (!RTMPPacket_IsReady(&ps)) { continue; } ServePacket(m_sRtmpClient, 0, &ps); //此处为对客户端的参数解析,详情请看步骤4 RTMPPacket_Free(&ps); if (RTMP_IsConnected(&m_sRtmpClient->rc)) { break; } }



4)接收来自客户端的数据包,然后对数据包类型进行判断,对不同的类型做不通的处理;

//ServerPacket函数中主要操作为对接收到的Packet类型做判断,然后做相应的处理 int ServePacket(STREAMING_SERVER *server, int which, RTMPPacket *packet) { int ret = 0; switch (packet->m_packetType) { case RTMP_PACKET_TYPE_CHUNK_SIZE: { // chunk size //HandleChangeChunkSize(r, packet); break; } case RTMP_PACKET_TYPE_BYTES_READ_REPORT: { // bytes read report break; } case RTMP_PACKET_TYPE_CONTROL: { // ctrl //HandleCtrl(r, packet); break; } case RTMP_PACKET_TYPE_SERVER_BW: { // server bw //HandleServerBW(r, packet); break; } case RTMP_PACKET_TYPE_CLIENT_BW: { // client bw //HandleClientBW(r, packet); break; } case RTMP_PACKET_TYPE_AUDIO: { // audio data //RTMP_Log(RTMP_LOGDEBUG, "%s, received: audio %lu bytes", __FUNCTION__, packet.m_nBodySize); break; } case RTMP_PACKET_TYPE_VIDEO: { // video data //RTMP_Log(RTMP_LOGDEBUG, "%s, received: video %lu bytes", __FUNCTION__, packet.m_nBodySize); break; } case RTMP_PACKET_TYPE_FLEX_STREAM_SEND: { // flex stream send break; } case RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT: { // flex shared object break; } case RTMP_PACKET_TYPE_FLEX_MESSAGE: // flex message { ret = ServeInvoke(server, which, packet, packet->m_body + 1); break; } case RTMP_PACKET_TYPE_INFO: { // metadata (notify) break; } case RTMP_PACKET_TYPE_SHARED_OBJECT: { /* shared object */ break; } case RTMP_PACKET_TYPE_INVOKE: { // invoke ret = ServeInvoke(server, which, packet, packet->m_body); break; } case RTMP_PACKET_TYPE_FLASH_VIDEO: { /* flv */ break; } default: { RTMP_Log(RTMP_LOGDEBUG, "%s, unknown packet type received: 0x%02x", __FUNCTION__,packet->m_packetType); } } return ret; }


5)解析客户端的发送参数,然后连接真实的rtmp服务器;


//ServeInvoke主要是解析来自客户端的请求,解析其中的参数,然后根据参数连接rtmp服务器,若此处需要做一个重定向,可以获取rtmp的连接地址,然后做一个更改 int ServeInvoke(STREAMING_SERVER *server, int which, RTMPPacket *pack, const char *body) { int nRes; int nBodySize = pack->m_nBodySize; AVal method; AMFObject obj; if (body > pack->m_body) { nBodySize--; } if (body[0] != 0x02)// make sure it is a string method name we start with { RTMP_Log(RTMP_LOGWARNING, "%s, Sanity failed. no string method in invoke packet",__FUNCTION__); return -1; } nRes = AMF_Decode(&obj, body, nBodySize, FALSE); if (nRes < 0) { RTMP_Log(RTMP_LOGERROR, "%s, error decoding invoke packet", __FUNCTION__); return -1; } AMF_Dump(&obj); AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method); RTMP_Log(RTMP_LOGDEBUG, "%s, %s invoking <%s>", __FUNCTION__, cst[which], method.av_val); RTMP_LogPrintf("%s, %s invoking <%s>", __FUNCTION__, cst[which], method.av_val); if (AVMATCH(&method, &av_connect)) { AMFObject cobj; AVal pname, pval; int i; AMFProp_GetObject(AMF_GetProp(&obj, NULL, 2), &cobj); for (i=0; irc.Link.app = pval; pval.av_val = NULL; } else if (AVMATCH(&pname, &av_flashVer)) { server->rc.Link.flashVer = pval; pval.av_val = NULL; } else if (AVMATCH(&pname, &av_swfUrl)) { #ifdef CRYPTO if (pval.av_val) { RTMP_HashSWF(pval.av_val, &server->rc.Link.SWFSize,(unsigned char *)server->rc.Link.SWFHash, 30); } #endif server->rc.Link.swfUrl = pval; pval.av_val = NULL; } else if (AVMATCH(&pname, &av_tcUrl)) { char *r1 = NULL, *r2; int len; server->rc.Link.tcUrl = pval; if ((pval.av_val[0] | 0x40) == 'r' &&(pval.av_val[1] | 0x40) == 't' &&(pval.av_val[2] | 0x40) == 'm' &&(pval.av_val[3] | 0x40) == 'p') { if (pval.av_val[4] == ':') { server->rc.Link.protocol = RTMP_PROTOCOL_RTMP; r1 = pval.av_val+7; } else if ((pval.av_val[4] | 0x40) == 'e' && pval.av_val[5] == ':') { server->rc.Link.protocol = RTMP_PROTOCOL_RTMPE; r1 = pval.av_val+8; } r2 = strchr(r1, '/'); if (r2) { len = r2 - r1; } else { len = pval.av_len - (r1 - pval.av_val); } r2 = (char*)malloc(len+1); memcpy(r2, r1, len); r2[len] = '\0'; server->rc.Link.hostname.av_val = r2; r1 = strrchr(r2, ':'); if (r1) { server->rc.Link.hostname.av_len = r1 - r2; *r1++ = '\0'; server->rc.Link.port = 1935; //atoi(r1); } else { server->rc.Link.hostname.av_len = len; server->rc.Link.port = 1935; } //20171009 截取rtmp地址,以正确获取rtmp数据 memset(server->rc.Link.hostname.av_val,0,server->rc.Link.hostname.av_len); sprintf(server->rc.Link.hostname.av_val,"%s",m_strServerIP); len = server->rc.Link.hostname.av_len = strlen(m_strServerIP); } pval.av_val = NULL; } else if (AVMATCH(&pname, &av_pageUrl)) { server->rc.Link.pageUrl = pval; pval.av_val = NULL; } else if (AVMATCH(&pname, &av_audioCodecs)) { server->rc.m_fAudioCodecs = cobj.o_props[i].p_vu.p_number; } else if (AVMATCH(&pname, &av_videoCodecs)) { server->rc.m_fVideoCodecs = cobj.o_props[i].p_vu.p_number; } else if (AVMATCH(&pname, &av_objectEncoding)) { server->rc.m_fEncoding = cobj.o_props[i].p_vu.p_number; server->rc.m_bSendEncoding = TRUE; } else { printf("%s\n",pname.av_val); }/* Dup'd a string we didn't recognize? */ if (pval.av_val) { free(pval.av_val); } }//endfor if (obj.o_num > 3) { if (AMFProp_GetBoolean(&obj.o_props[3])) { server->rc.Link.lFlags |= RTMP_LF_AUTH; } if (obj.o_num > 4) { AMFProp_GetString(&obj.o_props[4], &server->rc.Link.auth); } } //连接服务器 if (!RTMP_Connect(&server->rc, pack)) { return -1; } server->rc.m_bSendCounter = FALSE; } else if (AVMATCH(&method, &av_play)) { AVal av; char *p, *q; char flvHeader[] = { 'F', 'L', 'V', 0x01, 0x05,// video + audio, we finalize later if the value is different 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00// first prevTagSize=0 }; int count = 0, flen; server->rc.m_stream_id = pack->m_nInfoField2; AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &av); server->rc.Link.playpath = av; if (!av.av_val) { return -1; }printf("Receive Play Method,Play Path is %s : %s\n",server->rc.Link.playpath.av_val,m_strEndPoint.c_str()); /* strip trailing URL parameters */ q = (char*)memchr(av.av_val, '?', av.av_len); if (q) { if (q == av.av_val) { av.av_val++; av.av_len--; } else { av.av_len = q - av.av_val; } }/* strip leading slash components */ for (p=av.av_val+av.av_len-1; p>=av.av_val; p--) { if (*p == '/') { p++; av.av_len -= p - av.av_val; av.av_val = p; break; } } /* skip leading dot */ if (av.av_val[0] == '.') { av.av_val++; av.av_len--; } flen = av.av_len; /* hope there aren't more than 255 dups */ if (count) { flen += 2; }{ av = server->rc.Link.playpath; } m_eStatus = RTMPSTATUS_PLAYING; } else if (AVMATCH(&method, &av_onStatus)) { AMFObject obj2; AVal code, level; AMFProp_GetObject(AMF_GetProp(&obj, NULL, 3), &obj2); AMFProp_GetString(AMF_GetProp(&obj2, &av_code, -1), &code); AMFProp_GetString(AMF_GetProp(&obj2, &av_level, -1), &level); RTMP_Log(RTMP_LOGDEBUG, "%s, onStatus: %s", __FUNCTION__, code.av_val); if (AVMATCH(&code, &av_NetStream_Failed)|| AVMATCH(&code, &av_NetStream_Play_Failed)|| AVMATCH(&code, &av_NetStream_Play_StreamNotFound)|| AVMATCH(&code, &av_NetConnection_Connect_InvalidApp)) { return -1; } if (AVMATCH(&code, &av_NetStream_Play_Start)) { server->rc.m_bPlaying = TRUE; }// Return 1 if this is a Play.Complete or Play.Stop if (AVMATCH(&code, &av_NetStream_Play_Complete)|| AVMATCH(&code, &av_NetStream_Play_Stop)) { m_eStatus = RTMPSTATUS_STOPPED; } } else if (AVMATCH(&method, &av_closeStream)) { m_eStatus = RTMPSTATUS_STOPPED; } else if (AVMATCH(&method, &av_close)) { m_eStatus = RTMPSTATUS_STOPPED; } else if(AVMATCH(&method, &av_seek)) {if (m_bPause) { WARN("Mustn't Seek When Pausing"); RTMP_Pause(&server->rc,0); uint64_t newPos = RTMP_GetTime() - m_dwPauseTime; m_pFlvParser->SetSeekPos(-1 * newPos); m_bPause = false; m_eStatus = RTMPSTATUS_PLAYING; }//m_bSeek = true; AMFObjectProperty *prop = AMF_GetProp(&obj, NULL, 3); uint64_t newPos = AMFProp_GetNumber(prop); int bForwardSeek = newPos > m_nTimeStamp ? 1:-1; int64_t pos; if(newPos > m_nTimeStamp) { pos = newPos - m_nTimeStamp; } else { pos = m_nTimeStamp-newPos; }printf("pos is %d\n",pos); } else if(AVMATCH(&method, &av_pauseRaw)) { if(!m_bPause) { m_bPause = true; m_dwPauseTime = RTMP_GetTime(); m_eStatus = RTMPSTATUS_PAUSING; } else { uint64_t newPos = RTMP_GetTime() - m_dwPauseTime; m_pFlvParser->SetSeekPos(-1 * newPos); m_bPause = false; m_eStatus = RTMPSTATUS_PLAYING; } } else { printf("接收到其他RTMP指令 : %s\n",method.av_val); } //RTMP_SendPacket(&server->rc,pack,1); return 0; }



【基于librtmp的RTMP代理服务器(附代码)】6)数据交换。



总结:以上的重点在于代理服务器接收客户端的数据包并解析客户端的参数,然后根据解析的参数连接rtmp服务器;至于数据交换,只不过是接收服务端的packet,然后将packet发送给客户端,此处就没什么可以多说的了。



注意:
1)如果客户端为网页端,在编译librtmp时需要加入openssl的支持,否则网页的播放器中不会显示任何东西;
2)rtmp代理服务器不支持客户端在暂停状态下进行seek操作,否则会出现卡死的问题,原因为:客户端在等待服务端的数据,而服务端在等待客户端的请求,进入了双向等待的状态;


如对本文有任何问题或不清楚的地方,可以加入qq群445236076一起学习

    推荐阅读