diff --git a/ngx_rtmp_live_module.c b/ngx_rtmp_live_module.c index f2c94ad..a2ebf29 100644 --- a/ngx_rtmp_live_module.c +++ b/ngx_rtmp_live_module.c @@ -74,6 +74,13 @@ static ngx_command_t ngx_rtmp_live_commands[] = { offsetof(ngx_rtmp_live_app_conf_t, interleave), NULL }, + { ngx_string("wait_key"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_TAKE1, + ngx_conf_set_flag_slot, + NGX_RTMP_APP_CONF_OFFSET, + offsetof(ngx_rtmp_live_app_conf_t, wait_key), + NULL }, + ngx_null_command }; @@ -123,6 +130,7 @@ ngx_rtmp_live_create_app_conf(ngx_conf_t *cf) lacf->sync = NGX_CONF_UNSET; lacf->atc = NGX_CONF_UNSET; lacf->interleave = NGX_CONF_UNSET; + lacf->wait_key = NGX_CONF_UNSET; return lacf; } @@ -141,6 +149,7 @@ ngx_rtmp_live_merge_app_conf(ngx_conf_t *cf, void *parent, void *child) ngx_conf_merge_msec_value(conf->sync, prev->sync, 0); ngx_conf_merge_value(conf->atc, prev->atc, 0); ngx_conf_merge_value(conf->interleave, prev->interleave, 0); + ngx_conf_merge_value(conf->wait_key, prev->wait_key, 0); conf->pool = ngx_create_pool(4096, &cf->cycle->new_log); if (conf->pool == NULL) { @@ -318,7 +327,7 @@ next: static ngx_int_t ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, - ngx_chain_t *in) + ngx_chain_t *in) { ngx_rtmp_live_ctx_t *ctx, *pctx; ngx_rtmp_codec_ctx_t *codec_ctx; @@ -327,6 +336,7 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_rtmp_live_app_conf_t *lacf; ngx_rtmp_session_t *ss; ngx_rtmp_header_t ch, lh; + ngx_int_t rc; ngx_uint_t prio; ngx_uint_t peers; ngx_uint_t header_version, meta_version; @@ -344,12 +354,12 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, return NGX_ERROR; } + if (!lacf->live || in == NULL || in->buf == NULL) { + return NGX_OK; + } + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module); - if (!lacf->live || - in == NULL || in->buf == NULL || - ctx == NULL || ctx->stream == NULL || - (h->type != NGX_RTMP_MSG_VIDEO && h->type != NGX_RTMP_MSG_AUDIO)) - { + if (ctx == NULL || ctx->stream == NULL) { return NGX_OK; } @@ -363,22 +373,28 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, "live: %s packet timestamp=%uD timeshift=%uD", type_s, h->timestamp, h->timeshift); + peers = 0; + header = NULL; + pheader = NULL; + header_version = 0; + meta = NULL; + meta_version = 0; + cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); csidx = !(lacf->interleave || h->type == NGX_RTMP_MSG_VIDEO); + hvidx = (h->type == NGX_RTMP_MSG_VIDEO); + cs = &ctx->cs[csidx]; + + ngx_memzero(&ch, sizeof(ch)); + timestamp = h->timestamp; if (lacf->atc == 0) { timestamp += h->timeshift; } - /* prepare output headers */ - ngx_memzero(&ch, sizeof(ch)); - ngx_memzero(&lh, sizeof(lh)); - - cs = &ctx->cs[csidx]; - ch.timestamp = timestamp; ch.msid = NGX_RTMP_MSID; ch.csid = cs->csid; @@ -399,29 +415,28 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, delta = ch.timestamp - lh.timestamp; out = ngx_rtmp_append_shared_bufs(cscf, NULL, in); - ngx_rtmp_prepare_message(s, &ch, &lh, out); - peers = 0; - header = NULL; - pheader = NULL; - header_version = 0; - meta = NULL; - meta_version = 0; + ngx_rtmp_prepare_message(s, &ch, &lh, out); codec_ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module); if (codec_ctx) { + if (h->type == NGX_RTMP_MSG_AUDIO) { + if (codec_ctx->aac_header) { header = codec_ctx->aac_header; header_version = codec_ctx->aac_version; } + } else { + if (codec_ctx->avc_header) { header = codec_ctx->avc_header; header_version = codec_ctx->avc_version; } } + if (lacf->meta && codec_ctx->meta) { meta = codec_ctx->meta; meta_version = codec_ctx->meta_version; @@ -429,6 +444,7 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, } /* broadcast to all subscribers */ + for (pctx = ctx->stream->ctx; pctx; pctx = pctx->next) { if (pctx == ctx) { continue; @@ -446,6 +462,7 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, cs = &pctx->cs[csidx]; /* send metadata if newer exists */ + if (meta && meta_version != pctx->meta_version) { ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, "live: sending metadata"); @@ -455,54 +472,86 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, } } - /* send absolute frame */ + /* first packet in stream */ + if (!cs->active) { - /* packet from the past for the peer */ - if (lacf->atc == 0 && timestamp < (uint32_t) ss->epoch) { - ngx_log_debug3(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, - "live: %s packet from the past %uD < %uD", - type_s, timestamp, (uint32_t)ss->epoch); + if (lacf->wait_key && h->type == NGX_RTMP_MSG_VIDEO && + prio != NGX_RTMP_VIDEO_KEY_FRAME) + { + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, + "live: skipping non-key frame"); continue; } if (header) { + + /* send absolute codec header */ + + if (lacf->atc == 0 && lh.timestamp & (1 << 31)) { + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, + "live: abs %s header from the past", type_s); + continue; + } + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, "live: abs %s header timestamp=%uD", type_s, lh.timestamp); pkt = ngx_rtmp_append_shared_bufs(cscf, NULL, header); + ngx_rtmp_prepare_message(s, &lh, NULL, pkt); - if (ngx_rtmp_send_message(ss, pkt, 0) == NGX_OK) { - cs->timestamp = lh.timestamp; - cs->active = 1; - pctx->header_versions[hvidx] = header_version; - } + rc = ngx_rtmp_send_message(ss, pkt, 0); ngx_rtmp_free_shared_chain(cscf, pkt); + if (rc != NGX_OK) { + continue; + } + + cs->timestamp = lh.timestamp; + cs->active = 1; + + pctx->header_versions[hvidx] = header_version; + } else { + + /* send absolute packet */ + + if (lacf->atc == 0 && ch.timestamp & (1 << 31)) { + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, + "live: abs %s packet from the past", type_s); + continue; + } + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, "live: abs %s frame timestamp=%uD", type_s, ch.timestamp); pkt = ngx_rtmp_append_shared_bufs(cscf, NULL, in); + ngx_rtmp_prepare_message(s, &ch, NULL, pkt); - if (ngx_rtmp_send_message(ss, pkt, prio) == NGX_OK) { - cs->timestamp = ch.timestamp; - cs->active = 1; - ++peers; - } + rc = ngx_rtmp_send_message(ss, pkt, prio); ngx_rtmp_free_shared_chain(cscf, pkt); + + if (rc != NGX_OK) { + continue; + } + + cs->timestamp = ch.timestamp; + cs->active = 1; + + ++peers; continue; } } - /* send absolute frame to sync stream */ + /* send empty absolute frame to sync stream */ + if (!lacf->interleave && lacf->sync && cs->timestamp + lacf->sync < lh.timestamp) { @@ -511,14 +560,22 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, type_s, (ngx_int_t) (lh.timestamp - cs->timestamp)); pkt = ngx_rtmp_alloc_shared_buf(cscf); + ngx_rtmp_prepare_message(s, &lh, NULL, pkt); - if (ngx_rtmp_send_message(ss, pkt, 0) == NGX_OK) { - cs->timestamp = lh.timestamp; - } + + rc = ngx_rtmp_send_message(ss, pkt, 0); + ngx_rtmp_free_shared_chain(cscf, pkt); + + if (rc != NGX_OK) { + continue; + } + + cs->timestamp = lh.timestamp; } /* send codec header if newer header has arrived */ + if (header && pctx->header_versions[hvidx] != header_version) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, "live: new %s header", type_s); @@ -533,11 +590,11 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, } } - /* push buffered data */ + /* send frame */ + if (ngx_rtmp_send_message(ss, out, prio) == NGX_OK) { cs->timestamp += delta; ++peers; - continue; } } diff --git a/ngx_rtmp_live_module.h b/ngx_rtmp_live_module.h index 3be4524..525a78c 100644 --- a/ngx_rtmp_live_module.h +++ b/ngx_rtmp_live_module.h @@ -59,6 +59,7 @@ typedef struct { ngx_msec_t sync; ngx_flag_t atc; ngx_flag_t interleave; + ngx_flag_t wait_key; ngx_msec_t buflen; ngx_pool_t *pool; ngx_rtmp_live_stream_t *free_streams; diff --git a/ngx_rtmp_stat_module.c b/ngx_rtmp_stat_module.c index 03f3a5c..8abfdf1 100644 --- a/ngx_rtmp_stat_module.c +++ b/ngx_rtmp_stat_module.c @@ -309,11 +309,16 @@ ngx_rtmp_stat_live(ngx_http_request_t *r, ngx_chain_t ***lll, "%uz", ctx->dropped) - buf); NGX_RTMP_STAT_L(""); - /*NGX_RTMP_STAT_L(""); - NGX_RTMP_STAT(buf, ngx_snprintf(buf, sizeof(buf), - "%L", (int64_t)ctx->last_audio - - ctx->last_video) - buf); - NGX_RTMP_STAT_L("");*/ + if (ctx->cs[0].active && ctx->cs[1].active) { + + NGX_RTMP_STAT_L(""); + + NGX_RTMP_STAT(buf, ngx_snprintf(buf, sizeof(buf), + "%L", (int64_t) ctx->cs[0].timestamp - + (int64_t) ctx->cs[1].timestamp) - buf); + + NGX_RTMP_STAT_L(""); + } if (s->flashver.len) { NGX_RTMP_STAT_L("");