From 65cc92e7ca86bf6b116d78de4c6285fd1b97ff03 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Thu, 15 Mar 2012 20:43:03 +0400 Subject: [PATCH] improved publishing & subscribing: now subscriber receives some data which looks like real video --- TODO | 2 + ngx_rtmp.h | 8 +- ngx_rtmp_broadcast_module.c | 52 ++++++++++--- ngx_rtmp_core_module.c | 9 +++ ngx_rtmp_handler.c | 146 +++++++++++++++++++++--------------- 5 files changed, 146 insertions(+), 71 deletions(-) diff --git a/TODO b/TODO index 6350959..9d265a9 100644 --- a/TODO +++ b/TODO @@ -1,5 +1,7 @@ - implement chain-reuse for output +- get rid of greedy send_chain + - remove macros hell from ngx_rtmp_send.c - packet dropping diff --git a/ngx_rtmp.h b/ngx_rtmp.h index 4c3d6c5..a453274 100644 --- a/ngx_rtmp.h +++ b/ngx_rtmp.h @@ -176,7 +176,9 @@ typedef struct ngx_rtmp_session_s { ngx_str_t *addr_text; - ngx_buf_t hs_buf; + /* TODO: allocate this bufs from shared pool */ + ngx_buf_t hs_in_buf; + ngx_buf_t hs_out_buf; ngx_uint_t hs_stage; /* connection timestamps */ @@ -190,6 +192,8 @@ typedef struct ngx_rtmp_session_s { uint32_t in_csid; ngx_uint_t in_chunk_size; ngx_pool_t *in_pool; + uint32_t in_bytes; + uint32_t in_last_ack; ngx_chain_t *out; } ngx_rtmp_session_t; @@ -217,6 +221,8 @@ typedef struct { ngx_msec_t timeout; ngx_flag_t so_keepalive; ngx_int_t max_streams; + + ngx_uint_t ack_window; ngx_int_t out_chunk_size; ngx_pool_t *out_pool; diff --git a/ngx_rtmp_broadcast_module.c b/ngx_rtmp_broadcast_module.c index fdd3d86..3a4a0f0 100644 --- a/ngx_rtmp_broadcast_module.c +++ b/ngx_rtmp_broadcast_module.c @@ -90,6 +90,7 @@ ngx_module_t ngx_rtmp_broadcast_module = { #define NGX_RTMP_BROADCAST_PUBLISHER 0x01 #define NGX_RTMP_BROADCAST_SUBSCRIBER 0x02 +#define NGX_RTMP_BROADCAST_WANT_KEYFRAME 0x04 typedef struct ngx_rtmp_broadcast_ctx_s { @@ -223,6 +224,18 @@ ngx_rtmp_broadcast_leave(ngx_rtmp_session_t *s) } +#define NGX_RTMP_VIDEO_KEY_FRAME 1 +#define NGX_RTMP_VIDEO_INTER_FRAME 2 +#define NGX_RTMP_VIDEO_DISPOSABLE_FRAME 3 + + +static ngx_int_t +ngx_rtmp_get_video_frame_type(ngx_chain_t *in) +{ + return (in->buf->pos[0] & 0xf0) >> 4; +} + + static ngx_int_t ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) @@ -232,13 +245,16 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *out, *l, **ll; u_char *p; size_t nsubs, size; + ngx_int_t vftype; c = s->connection; ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_broadcast_module); + /* h->timestamp -= s->peer_epoch; h->timestamp += s->epoch; + */ if (ctx == NULL || !(ctx->flags & NGX_RTMP_BROADCAST_PUBLISHER)) @@ -252,6 +268,11 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, return NGX_OK; } + vftype = 0; + if (h->type == NGX_RTMP_MSG_VIDEO) { + vftype = ngx_rtmp_get_video_frame_type(in); + } + /* copy data to output stream */ out = NULL; ll = &out; @@ -297,6 +318,14 @@ done: && !ngx_strncmp(cctx->stream.data, ctx->stream.data, ctx->stream.len)) { + if (h->type == NGX_RTMP_MSG_VIDEO + && cctx->flags & NGX_RTMP_BROADCAST_WANT_KEYFRAME) + { + if (vftype && vftype != NGX_RTMP_VIDEO_KEY_FRAME) { + continue; + } + cctx->flags &= ~NGX_RTMP_BROADCAST_WANT_KEYFRAME; + } if (ngx_rtmp_send_message(cctx->session, out) != NGX_OK) { return NGX_ERROR; } @@ -319,11 +348,13 @@ static ngx_int_t ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, double in_trans, ngx_chain_t *in) { - static double trans; - static u_char app[1024]; - static u_char url[1024]; - static u_char acodecs[1024]; - static ngx_str_t app_str; + ngx_rtmp_core_srv_conf_t *cscf; + + static double trans; + static u_char app[1024]; + static u_char url[1024]; + static u_char acodecs[1024]; + static ngx_str_t app_str; static ngx_rtmp_amf0_elt_t in_cmd[] = { { NGX_RTMP_AMF0_STRING, "app", app, sizeof(app) }, @@ -354,6 +385,8 @@ ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, double in_trans, return NGX_ERROR; } + cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); + trans = in_trans; ngx_str_set(&out_inf[0], "NetConnection.Connect.Success"); ngx_str_set(&out_inf[1], "status"); @@ -373,8 +406,8 @@ ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, double in_trans, */ ngx_rtmp_broadcast_join(s, &app_str, 0); - return ngx_rtmp_send_ack_size(s, 2500000) - || ngx_rtmp_send_bandwidth(s, 2500000, NGX_RTMP_LIMIT_DYNAMIC) + return ngx_rtmp_send_ack_size(s, cscf->ack_window) + || ngx_rtmp_send_bandwidth(s, cscf->ack_window, NGX_RTMP_LIMIT_SOFT) || ngx_rtmp_send_user_stream_begin(s, 0) || ngx_rtmp_send_amf0(s, 3, 0, out_elts, sizeof(out_elts) / sizeof(out_elts[0])) @@ -498,10 +531,11 @@ ngx_rtmp_broadcast_play(ngx_rtmp_session_t *s, double in_trans, } ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "publish() called; pubName='%s' pubType='%s'", + "play() called; pubName='%s' pubType='%s'", pub_name, pub_type); - ngx_rtmp_broadcast_set_flags(s, NGX_RTMP_BROADCAST_SUBSCRIBER); + ngx_rtmp_broadcast_set_flags(s, NGX_RTMP_BROADCAST_SUBSCRIBER + | NGX_RTMP_BROADCAST_WANT_KEYFRAME); trans = in_trans; ngx_str_set(&out_inf[0], "NetStream.Play.Start"); diff --git a/ngx_rtmp_core_module.c b/ngx_rtmp_core_module.c index 178de04..0f475dc 100644 --- a/ngx_rtmp_core_module.c +++ b/ngx_rtmp_core_module.c @@ -63,6 +63,13 @@ static ngx_command_t ngx_rtmp_core_commands[] = { offsetof(ngx_rtmp_core_srv_conf_t, max_streams), NULL }, + { ngx_string("ack_window"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_num_slot, + NGX_RTMP_SRV_CONF_OFFSET, + offsetof(ngx_rtmp_core_srv_conf_t, ack_window), + NULL }, + { ngx_string("out_chunk_size"), NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1, ngx_conf_set_num_slot, @@ -141,6 +148,7 @@ ngx_rtmp_core_create_srv_conf(ngx_conf_t *cf) conf->so_keepalive = NGX_CONF_UNSET; conf->max_streams = NGX_CONF_UNSET; conf->out_chunk_size = NGX_CONF_UNSET; + conf->ack_window = NGX_CONF_UNSET; return conf; } @@ -157,6 +165,7 @@ ngx_rtmp_core_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child) ngx_conf_merge_value(conf->so_keepalive, prev->so_keepalive, 0); ngx_conf_merge_value(conf->max_streams, prev->max_streams, 16); ngx_conf_merge_value(conf->out_chunk_size, prev->out_chunk_size, 128); + ngx_conf_merge_uint_value(conf->ack_window, prev->ack_window, 5000000); if (prev->out_pool == NULL) { prev->out_pool = ngx_create_pool(4096, cf->log); diff --git a/ngx_rtmp_handler.c b/ngx_rtmp_handler.c index 9096028..939bc55 100644 --- a/ngx_rtmp_handler.c +++ b/ngx_rtmp_handler.c @@ -24,10 +24,6 @@ static ngx_int_t ngx_rtmp_receive_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in); -static char handshake_text[] = - "This RTMP handshake is generated by nginx-rtmp-module. "; - - #ifdef NGX_DEBUG static char* ngx_rtmp_packet_type(uint8_t type) { @@ -222,13 +218,18 @@ ngx_rtmp_init_session(ngx_connection_t *c) } s->in_chunk_size = NGX_RTMP_DEFAULT_CHUNK_SIZE; - s->in_pool = ngx_create_pool(NGX_RTMP_HANDSHAKE_SIZE + 1 + s->in_pool = ngx_create_pool(2 * (NGX_RTMP_HANDSHAKE_SIZE + 1) + sizeof(ngx_pool_t), c->log); /* start handshake */ size = NGX_RTMP_HANDSHAKE_SIZE + 1; - b = &s->hs_buf; + b = &s->hs_in_buf; + b->start = b->pos = b->last = ngx_pcalloc(s->in_pool, size); + b->end = b->start + size; + b->temporary = 1; + + b = &s->hs_out_buf; b->start = b->pos = b->last = ngx_pcalloc(s->in_pool, size); b->end = b->start + size; b->temporary = 1; @@ -244,15 +245,11 @@ uint32_t ngx_rtmp_get_timestamp() { ngx_time_t *tod; - uint32_t t; tod = ngx_timeofday(); - t = tod->sec; - t *= 1e6; - t += tod->msec; - - return t; + return (uint32_t)tod->sec * 1e3 + + (uint32_t)tod->msec / 1e3; } @@ -285,7 +282,9 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev) ngx_del_timer(rev); } - b = &s->hs_buf; + b = (s->hs_stage == NGX_RTMP_HS_READ_DATA) + ? &s->hs_in_buf + : &s->hs_out_buf; while (b->last != b->end) { @@ -304,16 +303,6 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev) return; } - if (b->last == b->start - && s->hs_stage == NGX_RTMP_HS_READ_DATA - && *b->last != NGX_RTMP_VERSION) - { - ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR, - "invalid handshake signature"); - ngx_rtmp_close_connection(c); - return; - } - b->last += n; } @@ -322,18 +311,49 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev) ++s->hs_stage; if (s->hs_stage == NGX_RTMP_HS_WRITE_DATA) { + + if (*b->pos != NGX_RTMP_VERSION) { + ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR, + "invalid handshake signature"); + ngx_rtmp_close_connection(c); + return; + } + + /* version is never needed anymore */ + ++b->pos; + + /* store current time as our epoch */ s->epoch = ngx_rtmp_get_timestamp(); + + /* read client epoch */ p = (u_char*)&s->peer_epoch; *p++ = b->pos[3]; *p++ = b->pos[2]; *p++ = b->pos[1]; *p++ = b->pos[0]; + + /* prepare output signature: + * set version, set epoch, fill zeroes */ p = (u_char*)&s->epoch; - b->pos[7] = *p++; - b->pos[6] = *p++; - b->pos[5] = *p++; + b = &s->hs_out_buf; + b->pos[0] = NGX_RTMP_VERSION; b->pos[4] = *p++; + b->pos[3] = *p++; + b->pos[2] = *p++; + b->pos[1] = *p++; + b->pos[5] = b->pos[6] = b->pos[7] = b->pos[8] = 0; + for(b->last = b->pos + 9, n = 1; + b->last < b->end; + ++b->last, ++n) + { + *b->last = (u_char)(n & 0xff); + } + + /* reply timestamp is the same as out epoch */ + ngx_memcpy(s->hs_in_buf.pos + 4, b->pos + 1, 4); + ngx_rtmp_handshake_send(c->write); + return; } @@ -344,7 +364,7 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev) c->write->handler = ngx_rtmp_send; ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, - "RTMP handshake done; epoch=%D peer_epoch=%d", + "RTMP handshake done; epoch=%uD peer_epoch=%uD", s->epoch, s->peer_epoch); ngx_rtmp_recv(rev); @@ -359,8 +379,6 @@ ngx_rtmp_handshake_send(ngx_event_t *wev) ngx_rtmp_session_t *s; ngx_rtmp_core_srv_conf_t *cscf; ngx_buf_t *b; - size_t offs; - static u_char zeroes[4]; c = wev->data; s = c->data; @@ -382,55 +400,41 @@ ngx_rtmp_handshake_send(ngx_event_t *wev) ngx_del_timer(wev); } - b = &s->hs_buf; - restart: + b = (s->hs_stage == NGX_RTMP_HS_WRITE_DATA) + ? &s->hs_out_buf + : &s->hs_in_buf; + while(b->pos != b->last) { - if (s->hs_stage == NGX_RTMP_HS_WRITE_ECHO) { - n = c->send(c, b->pos, b->last - b->pos); - - } else if (b->pos - b->start < 4) { - /* use the timestamp from echo packet */ - n = c->send(c, b->pos + 4, 4 - (b->pos - b->start)); - - } else if (b->pos - b->start < 8) { - n = c->send(c, zeroes, 8 - (b->pos - b->start)); - - } else { - offs = (b->pos - b->start - 4) % sizeof(handshake_text); - n = c->send(c, (u_char*)handshake_text + offs, - ngx_min(b->last - b->pos, - (ngx_int_t)(sizeof(handshake_text) - offs))); - } - - if (n > 0) { - b->pos += n; - } + n = c->send(c, b->pos, b->last - b->pos); if (n == NGX_ERROR) { ngx_rtmp_close_connection(c); return; } - if (n == NGX_AGAIN) { + if (n == NGX_AGAIN || n == 0) { ngx_add_timer(c->write, cscf->timeout); if (ngx_handle_write_event(c->write, 0) != NGX_OK) { ngx_rtmp_close_connection(c); return; } } + + b->pos += n; } ++s->hs_stage; if (s->hs_stage == NGX_RTMP_HS_WRITE_ECHO) { - b->pos = b->start + 1; goto restart; } ngx_del_event(wev, NGX_WRITE_EVENT, 0); + + b = &s->hs_out_buf; b->pos = b->last = b->start + 1; ngx_rtmp_handshake_recv(c->read); } @@ -526,6 +530,20 @@ ngx_rtmp_recv(ngx_event_t *rev) } b->last += n; + s->in_bytes += n; + + if (s->in_bytes - s->in_last_ack >= cscf->ack_window) { + + s->in_last_ack = s->in_bytes; + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, + "sending RTMP ACK(%D)", s->in_bytes); + + if (ngx_rtmp_send_ack(s, s->in_bytes)) { + ngx_rtmp_close_connection(c); + return; + } + } } old_pos = NULL; @@ -737,7 +755,7 @@ ngx_rtmp_send(ngx_event_t *wev) } if (out == s->out - && s->out->buf->pos == s->out->buf->last) + && out->buf->pos == out->buf->last) { cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); ngx_add_timer(c->write, cscf->timeout); @@ -964,12 +982,14 @@ ngx_int_t ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out) { ngx_chain_t *l, **ll; - size_t nbytes, nbufs; + size_t nbytes, nbufs, noutbytes, noutbufs; ngx_connection_t *c; c = s->connection; nbytes = 0; nbufs = 0; + noutbytes = 0; + noutbufs = 0; for(l = out; l; l = l->next) { ngx_rtmp_buf_addref(l->buf); @@ -977,16 +997,20 @@ ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out) ++nbufs; } - ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "RTMP send nbytes=%d, nbufs=%d", - nbytes, nbufs); - /* TODO: optimize lookup */ /* TODO: implement dropper */ - for(ll = &s->out; *ll; ll = &(*ll)->next); + for(ll = &s->out; *ll; ll = &(*ll)->next) { + noutbytes += (*ll)->buf->last - (*ll)->buf->pos; + ++noutbufs; + } + *ll = out; - ngx_rtmp_send(s->connection->write); + ngx_log_debug6(NGX_LOG_DEBUG_RTMP, c->log, 0, + "RTMP send nbytes=%d (%d), nbufs=%d (%d) ready=%d; active=%d", + nbytes, noutbytes, nbufs, noutbufs, c->write->ready, c->write->active); + + ngx_rtmp_send(c->write); return c->destroyed ? NGX_ERROR : NGX_OK; }