From 284e521e82ba577f8feca23eb931504dd094cf87 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Sat, 27 Oct 2012 17:39:04 +0400 Subject: [PATCH] refactored --- ngx_rtmp_live_module.c | 270 ++++++++++++++++++++--------------------- ngx_rtmp_live_module.h | 19 ++- ngx_rtmp_stat_module.c | 4 +- 3 files changed, 143 insertions(+), 150 deletions(-) diff --git a/ngx_rtmp_live_module.c b/ngx_rtmp_live_module.c index 5ff6c1a..f2c94ad 100644 --- a/ngx_rtmp_live_module.c +++ b/ngx_rtmp_live_module.c @@ -248,6 +248,9 @@ ngx_rtmp_live_join(ngx_rtmp_session_t *s, u_char *name, if (lacf->buflen) { s->out_buffer = 1; } + + ctx->cs[0].csid = NGX_RTMP_MSG_AUDIO; + ctx->cs[1].csid = NGX_RTMP_MSG_VIDEO; } @@ -277,8 +280,6 @@ ngx_rtmp_live_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v) ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "live: leave '%s'", ctx->stream->name); - ctx->msg_mask = 0; - if (ctx->stream->flags & NGX_RTMP_LIVE_PUBLISHING && ctx->flags & NGX_RTMP_LIVE_PUBLISHING) { @@ -313,7 +314,7 @@ ngx_rtmp_live_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v) next: return next_close_stream(s, v); } - + static ngx_int_t ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, @@ -321,116 +322,103 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, { ngx_rtmp_live_ctx_t *ctx, *pctx; ngx_rtmp_codec_ctx_t *codec_ctx; - ngx_chain_t *out, *peer_out, *header_out, - *pheader_out, *meta; + ngx_chain_t *out, *pkt, *header, *pheader, *meta; ngx_rtmp_core_srv_conf_t *cscf; ngx_rtmp_live_app_conf_t *lacf; ngx_rtmp_session_t *ss; ngx_rtmp_header_t ch, lh; - ngx_uint_t prio, peer_prio; - ngx_uint_t peers, dropped_peers; - size_t header_offset, last_offset; + ngx_uint_t prio; + ngx_uint_t peers; ngx_uint_t header_version, meta_version; - ngx_int_t diff_timestamp; - uint32_t *last, timestamp; + ngx_uint_t csidx, hvidx; + uint32_t timestamp, delta; + ngx_rtmp_live_chunk_stream_t *cs; +#ifdef NGX_DEBUG + const char *type_s; + + type_s = (h->type == NGX_RTMP_MSG_VIDEO ? "video" : "audio"); +#endif lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module); if (lacf == NULL) { - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "live: NULL application"); return NGX_ERROR; } ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module); - if (!lacf->live - || in == NULL || in->buf == NULL - || ctx == NULL || ctx->stream == NULL - || (h->type != NGX_RTMP_MSG_VIDEO - && h->type != NGX_RTMP_MSG_AUDIO)) + if (!lacf->live || + in == NULL || in->buf == NULL || + ctx == NULL || ctx->stream == NULL || + (h->type != NGX_RTMP_MSG_VIDEO && h->type != NGX_RTMP_MSG_AUDIO)) { return NGX_OK; } if ((ctx->flags & NGX_RTMP_LIVE_PUBLISHING) == 0) { - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "live: received audio/video from non-publisher"); + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "live: %s from non-publisher", type_s); return NGX_OK; } ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "live: av: %s timestamp=%uD timeshift=%uD", - h->type == NGX_RTMP_MSG_VIDEO ? "video" : "audio", - h->timestamp, h->timeshift); + "live: %s packet timestamp=%uD timeshift=%uD", + type_s, h->timestamp, h->timeshift); cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); - + + csidx = !(lacf->interleave || h->type == NGX_RTMP_MSG_VIDEO); + hvidx = (h->type == NGX_RTMP_MSG_VIDEO); + timestamp = h->timestamp; if (lacf->atc == 0) { timestamp += h->timeshift; } - /* prepare output header */ + /* prepare output headers */ ngx_memzero(&ch, sizeof(ch)); ngx_memzero(&lh, sizeof(lh)); + + cs = &ctx->cs[csidx]; + ch.timestamp = timestamp; ch.msid = NGX_RTMP_MSID; + ch.csid = cs->csid; ch.type = h->type; - lh.msid = ch.msid; - if (lacf->interleave || h->type == NGX_RTMP_MSG_VIDEO) { - last_offset = offsetof(ngx_rtmp_live_ctx_t, last_video); - } else { - last_offset = offsetof(ngx_rtmp_live_ctx_t, last_audio); + lh = ch; + + if (cs->active) { + lh.timestamp = cs->timestamp; } - last = (uint32_t *)((u_char *)ctx + last_offset); - lh.timestamp = *last; - *last = ch.timestamp; + cs->active = 1; + cs->timestamp = ch.timestamp; - if (h->type == NGX_RTMP_MSG_VIDEO) { - prio = ngx_rtmp_get_video_frame_type(in); - ch.csid = NGX_RTMP_CSID_VIDEO; - } else { - prio = 0; - ch.csid = NGX_RTMP_CSID_AUDIO; - } + prio = (h->type == NGX_RTMP_MSG_VIDEO ? + ngx_rtmp_get_video_frame_type(in) : 0); - if (lacf->interleave) { - ch.csid = NGX_RTMP_CSID_VIDEO; - } - - if ((ctx->msg_mask & (1 << ch.csid)) == 0) { - lh.timestamp = ch.timestamp; - ctx->msg_mask |= (1 << ch.csid); - } - - lh.csid = ch.csid; - diff_timestamp = ch.timestamp - lh.timestamp; + delta = ch.timestamp - lh.timestamp; out = ngx_rtmp_append_shared_bufs(cscf, NULL, in); ngx_rtmp_prepare_message(s, &ch, &lh, out); peers = 0; - dropped_peers = 0; - - codec_ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module); - header_out = NULL; - pheader_out = NULL; - header_offset = 0; + header = NULL; + pheader = NULL; header_version = 0; meta = NULL; meta_version = 0; + + codec_ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module); + if (codec_ctx) { if (h->type == NGX_RTMP_MSG_AUDIO) { if (codec_ctx->aac_header) { - header_out = codec_ctx->aac_header; - header_offset = offsetof(ngx_rtmp_live_ctx_t, aac_version); + header = codec_ctx->aac_header; header_version = codec_ctx->aac_version; } } else { if (codec_ctx->avc_header) { - header_out = codec_ctx->avc_header; - header_offset = offsetof(ngx_rtmp_live_ctx_t, avc_version); + header = codec_ctx->avc_header; header_version = codec_ctx->avc_version; } } @@ -445,116 +433,122 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, if (pctx == ctx) { continue; } - ++peers; + ss = pctx->session; - last = (uint32_t *)((u_char *)pctx + last_offset); ch.timestamp = timestamp; if (lacf->atc == 0) { - ch.timestamp -= (uint32_t)ss->epoch; + ch.timestamp -= (uint32_t) ss->epoch; } - lh.timestamp = ch.timestamp - diff_timestamp; + lh.timestamp = ch.timestamp - delta; - /* send absolute frame */ - if ((pctx->msg_mask & (1 << ch.csid)) == 0) { - - /* packet from the past for the peer */ - if (lacf->atc == 0 && timestamp < (uint32_t)ss->epoch) { - ngx_log_debug3(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, - "live: av: %s packet from the past %uD < %uD", - h->type == NGX_RTMP_MSG_VIDEO ? "video" : "audio", - timestamp, (uint32_t)ss->epoch); - continue; - } - - ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, - "live: av: abs %s timestamp=%uD", - h->type == NGX_RTMP_MSG_VIDEO ? "video" : "audio", - ch.timestamp); - - /* send codec header as abs frame if any */ - peer_out = ngx_rtmp_append_shared_bufs(cscf, NULL, - header_out ? header_out : in); - ngx_rtmp_prepare_message(s, &ch, NULL, peer_out); - if (ngx_rtmp_send_message(ss, peer_out, prio) == NGX_OK) { - pctx->msg_mask |= (1 << ch.csid); - if (header_out) { - *(ngx_uint_t *)((u_char *)pctx + header_offset) - = header_version; - *last = ch.timestamp; - } - } - - ngx_rtmp_free_shared_chain(cscf, peer_out); - continue; - } - - /* send AVC/H264 header if newer header has arrived */ - if (header_out && *(ngx_uint_t *)((u_char *)pctx + header_offset) - != header_version) - { - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, - "live: sending codec header"); - - if (pheader_out == NULL) { - pheader_out = - ngx_rtmp_append_shared_bufs(cscf, NULL, header_out); - ngx_rtmp_prepare_message(s, &ch, &ch, pheader_out); - } - - if (ngx_rtmp_send_message(ss, pheader_out, 0) == NGX_OK) { - *(ngx_uint_t *)((u_char *)pctx + header_offset) - = header_version; - } - } + cs = &pctx->cs[csidx]; /* send metadata if newer exists */ if (meta && meta_version != pctx->meta_version) { ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, - "live: sending metadata"); - if (ngx_rtmp_send_message(ss, meta, prio) == NGX_OK) { + "live: sending metadata"); + + if (ngx_rtmp_send_message(ss, meta, 0) == NGX_OK) { pctx->meta_version = meta_version; } } + /* send absolute frame */ + if (!cs->active) { + + /* packet from the past for the peer */ + if (lacf->atc == 0 && timestamp < (uint32_t) ss->epoch) { + ngx_log_debug3(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, + "live: %s packet from the past %uD < %uD", + type_s, timestamp, (uint32_t)ss->epoch); + continue; + } + + if (header) { + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, + "live: abs %s header timestamp=%uD", + type_s, lh.timestamp); + + pkt = ngx_rtmp_append_shared_bufs(cscf, NULL, header); + ngx_rtmp_prepare_message(s, &lh, NULL, pkt); + + if (ngx_rtmp_send_message(ss, pkt, 0) == NGX_OK) { + cs->timestamp = lh.timestamp; + cs->active = 1; + pctx->header_versions[hvidx] = header_version; + } + + ngx_rtmp_free_shared_chain(cscf, pkt); + + } else { + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, + "live: abs %s frame timestamp=%uD", + type_s, ch.timestamp); + + pkt = ngx_rtmp_append_shared_bufs(cscf, NULL, in); + ngx_rtmp_prepare_message(s, &ch, NULL, pkt); + + if (ngx_rtmp_send_message(ss, pkt, prio) == NGX_OK) { + cs->timestamp = ch.timestamp; + cs->active = 1; + ++peers; + } + + ngx_rtmp_free_shared_chain(cscf, pkt); + + continue; + } + } + /* send absolute frame to sync stream */ if (!lacf->interleave && lacf->sync && - *last + lacf->sync < lh.timestamp) + cs->timestamp + lacf->sync < lh.timestamp) { ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, - "live: av: sync %s: %i", - h->type == NGX_RTMP_MSG_VIDEO ? "video" : "audio", - (ngx_int_t) (lh.timestamp - *last)); + "live: abs sync %s: %i", + type_s, (ngx_int_t) (lh.timestamp - cs->timestamp)); - peer_out = ngx_rtmp_alloc_shared_buf(cscf); - ngx_rtmp_prepare_message(s, &lh, NULL, peer_out); - if (ngx_rtmp_send_message(ss, peer_out, 0) == NGX_OK) { - *last = lh.timestamp; + pkt = ngx_rtmp_alloc_shared_buf(cscf); + ngx_rtmp_prepare_message(s, &lh, NULL, pkt); + if (ngx_rtmp_send_message(ss, pkt, 0) == NGX_OK) { + cs->timestamp = lh.timestamp; + } + ngx_rtmp_free_shared_chain(cscf, pkt); + } + + /* send codec header if newer header has arrived */ + if (header && pctx->header_versions[hvidx] != header_version) { + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, + "live: new %s header", type_s); + + if (pheader == NULL) { + pheader = ngx_rtmp_append_shared_bufs(cscf, NULL, header); + ngx_rtmp_prepare_message(s, &ch, &ch, pheader); + } + + if (ngx_rtmp_send_message(ss, pheader, 0) == NGX_OK) { + pctx->header_versions[hvidx] = header_version; } - ngx_rtmp_free_shared_chain(cscf, peer_out); } /* push buffered data */ - peer_prio = prio; - if (ngx_rtmp_send_message(ss, out, peer_prio) != NGX_OK) { - ++pctx->dropped; - ++dropped_peers; + if (ngx_rtmp_send_message(ss, out, prio) == NGX_OK) { + cs->timestamp += delta; + ++peers; continue; } - - *last += diff_timestamp; - } + ngx_rtmp_free_shared_chain(cscf, out); - if (pheader_out) { - ngx_rtmp_free_shared_chain(cscf, pheader_out); + if (pheader) { + ngx_rtmp_free_shared_chain(cscf, pheader); } ngx_rtmp_update_bandwidth(&ctx->stream->bw_in, h->mlen); - ngx_rtmp_update_bandwidth(&ctx->stream->bw_out, - h->mlen * (peers - dropped_peers)); + ngx_rtmp_update_bandwidth(&ctx->stream->bw_out, h->mlen * peers); return NGX_OK; } diff --git a/ngx_rtmp_live_module.h b/ngx_rtmp_live_module.h index f1255a5..3be4524 100644 --- a/ngx_rtmp_live_module.h +++ b/ngx_rtmp_live_module.h @@ -21,23 +21,22 @@ typedef struct ngx_rtmp_live_ctx_s ngx_rtmp_live_ctx_t; typedef struct ngx_rtmp_live_stream_s ngx_rtmp_live_stream_t; +typedef struct { + unsigned active:1; + uint32_t timestamp; + uint32_t csid; +} ngx_rtmp_live_chunk_stream_t; + + struct ngx_rtmp_live_ctx_s { ngx_rtmp_session_t *session; ngx_rtmp_live_stream_t *stream; ngx_rtmp_live_ctx_t *next; ngx_uint_t flags; - ngx_uint_t msg_mask; ngx_uint_t dropped; - uint32_t csid; - uint32_t last_audio; - uint32_t last_video; - ngx_uint_t aac_version; - ngx_uint_t avc_version; + ngx_rtmp_live_chunk_stream_t cs[2]; + ngx_uint_t header_versions[2]; ngx_uint_t meta_version; - - /* last stream timestamps */ - uint32_t last[2]; - uint32_t *plast[2]; }; diff --git a/ngx_rtmp_stat_module.c b/ngx_rtmp_stat_module.c index e77884f..03f3a5c 100644 --- a/ngx_rtmp_stat_module.c +++ b/ngx_rtmp_stat_module.c @@ -309,11 +309,11 @@ ngx_rtmp_stat_live(ngx_http_request_t *r, ngx_chain_t ***lll, "%uz", ctx->dropped) - buf); NGX_RTMP_STAT_L(""); - NGX_RTMP_STAT_L(""); + /*NGX_RTMP_STAT_L(""); NGX_RTMP_STAT(buf, ngx_snprintf(buf, sizeof(buf), "%L", (int64_t)ctx->last_audio - ctx->last_video) - buf); - NGX_RTMP_STAT_L(""); + NGX_RTMP_STAT_L("");*/ if (s->flashver.len) { NGX_RTMP_STAT_L("");