From d8386ce7e01c3459075adccbda95c057678f49cd Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Thu, 1 Nov 2012 01:02:10 +0400 Subject: [PATCH] a new semi-atc live streamer --- ngx_rtmp.h | 1 - ngx_rtmp_handler.c | 1 - ngx_rtmp_live_module.c | 203 +++++++++++++++++++++++++---------------- 3 files changed, 123 insertions(+), 82 deletions(-) diff --git a/ngx_rtmp.h b/ngx_rtmp.h index 94095b9..95f87b2 100644 --- a/ngx_rtmp.h +++ b/ngx_rtmp.h @@ -151,7 +151,6 @@ typedef struct { typedef struct { uint32_t csid; /* chunk stream id */ uint32_t timestamp; /* timestamp (delta) */ - uint32_t timeshift; /* clock - timestamp */ uint32_t mlen; /* message length */ uint8_t type; /* message type id */ uint32_t msid; /* message stream id */ diff --git a/ngx_rtmp_handler.c b/ngx_rtmp_handler.c index 8e448e2..1dd2781 100644 --- a/ngx_rtmp_handler.c +++ b/ngx_rtmp_handler.c @@ -403,7 +403,6 @@ ngx_rtmp_recv(ngx_event_t *rev) st->dtime = timestamp; } else { h->timestamp = timestamp; - h->timeshift = (uint32_t) ngx_current_msec - timestamp; st->dtime = 0; } } diff --git a/ngx_rtmp_live_module.c b/ngx_rtmp_live_module.c index 1a97da2..1853b06 100644 --- a/ngx_rtmp_live_module.c +++ b/ngx_rtmp_live_module.c @@ -60,13 +60,6 @@ static ngx_command_t ngx_rtmp_live_commands[] = { offsetof(ngx_rtmp_live_app_conf_t, sync), NULL }, - { ngx_string("atc"), - NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_TAKE1, - ngx_conf_set_flag_slot, - NGX_RTMP_APP_CONF_OFFSET, - offsetof(ngx_rtmp_live_app_conf_t, atc), - NULL }, - { ngx_string("interleave"), NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_TAKE1, ngx_conf_set_flag_slot, @@ -128,7 +121,6 @@ ngx_rtmp_live_create_app_conf(ngx_conf_t *cf) lacf->nbuckets = NGX_CONF_UNSET; lacf->buflen = NGX_CONF_UNSET; lacf->sync = NGX_CONF_UNSET; - lacf->atc = NGX_CONF_UNSET; lacf->interleave = NGX_CONF_UNSET; lacf->wait_key = NGX_CONF_UNSET; @@ -147,7 +139,6 @@ ngx_rtmp_live_merge_app_conf(ngx_conf_t *cf, void *parent, void *child) ngx_conf_merge_value(conf->nbuckets, prev->nbuckets, 1024); ngx_conf_merge_msec_value(conf->buflen, prev->buflen, 0); ngx_conf_merge_msec_value(conf->sync, prev->sync, 0); - ngx_conf_merge_value(conf->atc, prev->atc, 0); ngx_conf_merge_value(conf->interleave, prev->interleave, 0); ngx_conf_merge_value(conf->wait_key, prev->wait_key, 0); @@ -263,6 +254,93 @@ ngx_rtmp_live_join(ngx_rtmp_session_t *s, u_char *name, } +static ngx_int_t +ngx_rtmp_live_sync_streams(ngx_rtmp_session_t *s) +{ + ngx_rtmp_core_srv_conf_t *cscf; + ngx_rtmp_live_app_conf_t *lacf; + ngx_rtmp_live_ctx_t *ctx, *pctx; + ngx_rtmp_session_t *ss; + ngx_rtmp_header_t ch, lh; + ngx_chain_t *pkt; + ngx_uint_t csidx; + + /* Stream synchronization: + * + * - both streams active + * advance lagging stream + * + * - one stream actvie + * add absolute frame for inactive stream + * (the active stream became active + * during current publisher session) + * + * - none active + * quit + */ + + lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module); + + if (lacf->interleave) { + return NGX_OK; + } + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module); + + if (ctx->cs[0].active == 0 && ctx->cs[1].active == 0) { + return NGX_OK; + } + + if (ctx->cs[0].active == 0) { + csidx = 0; + } else if (ctx->cs[1].active == 0) { + csidx = 1; + } else { + csidx = (ctx->cs[0].timestamp > ctx->cs[1].timestamp); + } + + ngx_memzero(&ch, sizeof(ch)); + + ch.msid = NGX_RTMP_MSID; + ch.type = NGX_RTMP_MSG_AMF_META; + ch.csid = ctx->cs[csidx].csid; + ch.timestamp = ctx->cs[1 - csidx].timestamp; + + lh = ch; + lh.timestamp = ctx->cs[csidx].timestamp; + + cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); + + pkt = ngx_rtmp_alloc_shared_buf(cscf); + if (pkt == NULL) { + return NGX_ERROR; + } + + ngx_rtmp_prepare_message(s, &ch, ctx->cs[csidx].active ? &lh : NULL, pkt); + + for (pctx = ctx->stream->ctx; pctx; pctx = pctx->next) { + if (pctx->cs[1 - csidx].active == 0) { + continue; + } + + ss = pctx->session; + + if (ngx_rtmp_send_message(ss, pkt, 0) != NGX_OK) { + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, + "live: sync failed"); + ngx_rtmp_finalize_session(ss); + } + + pctx->cs[csidx].timestamp += ch.timestamp; + pctx->cs[csidx].active = 1; + } + + ngx_rtmp_free_shared_chain(cscf, pkt); + + return NGX_OK; +} + + static ngx_int_t ngx_rtmp_live_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v) { @@ -302,6 +380,11 @@ ngx_rtmp_live_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v) } } + if (ngx_rtmp_live_sync_streams(s) != NGX_OK) { + ngx_log_error(NGX_LOG_ERR, s->connection->log, 0, + "live: sync error"); + } + if (ctx->stream->ctx) { ctx->stream = NULL; goto next; @@ -332,18 +415,18 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_rtmp_live_ctx_t *ctx, *pctx; ngx_rtmp_codec_ctx_t *codec_ctx; ngx_chain_t *header, *meta, - *apkt, *rpkt, - *aheader, *rheader; + *apkt, *rpkt/*, + *aheader, *rheader*/; ngx_rtmp_core_srv_conf_t *cscf; ngx_rtmp_live_app_conf_t *lacf; ngx_rtmp_session_t *ss; ngx_rtmp_header_t ch, lh; - ngx_int_t rc, sync; + ngx_int_t rc; ngx_uint_t prio; ngx_uint_t peers; ngx_uint_t header_version, meta_version; ngx_uint_t csidx, hvidx; - uint32_t timestamp, delta; + uint32_t delta; ngx_rtmp_live_chunk_stream_t *cs; #ifdef NGX_DEBUG const char *type_s; @@ -371,14 +454,13 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, return NGX_OK; } - ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "live: %s packet timestamp=%uD timeshift=%uD", - type_s, h->timestamp, h->timeshift); + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "live: %s packet timestamp=%uD", + type_s, h->timestamp); - sync = 0; peers = 0; - aheader = NULL; - rheader = NULL; + /*aheader = NULL;*/ + /*rheader = NULL;*/ apkt = NULL; header = NULL; header_version = 0; @@ -398,12 +480,7 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_memzero(&ch, sizeof(ch)); - timestamp = h->timestamp; - if (lacf->atc == 0) { - timestamp += h->timeshift; - } - - ch.timestamp = timestamp; + ch.timestamp = h->timestamp; ch.msid = NGX_RTMP_MSID; ch.csid = cs->csid; ch.type = h->type; @@ -412,8 +489,6 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, if (cs->active) { lh.timestamp = cs->timestamp; - } else { - sync = 1; } cs->active = 1; @@ -421,6 +496,13 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, delta = ch.timestamp - lh.timestamp; + if (delta >> 31) { + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "live: non-monotonical timestamp %uD->%uD", + lh.timestamp, ch.timestamp); + return NGX_OK; + } + rpkt = ngx_rtmp_append_shared_bufs(cscf, NULL, in); ngx_rtmp_prepare_message(s, &ch, &lh, rpkt); @@ -473,10 +555,12 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, /* sync stream */ - if (cs->active && (sync || (lacf->sync && cs->dropped > lacf->sync))) { + if (cs->active && (lacf->sync && cs->dropped > lacf->sync)) { ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, "live: sync %s dropped=%uD", type_s, cs->dropped); - cs->active = 0; + + /*TODO send empty delta packet */ + cs->dropped = 0; } @@ -490,51 +574,22 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, "live: skip non-key"); continue; - } - ch.timestamp = timestamp; - if (lacf->atc == 0) { - ch.timestamp -= (uint32_t) ss->epoch; - } - - lh.timestamp = ch.timestamp - delta; - - /* - if (ngx_rtmp_send_user_stream_eof(ss, NGX_RTMP_MSID) != NGX_OK) { - continue; - } - - if (ngx_rtmp_send_user_stream_begin(ss, NGX_RTMP_MSID) != NGX_OK) { - continue; - }*/ - if (header) { /* send absolute codec header */ - if (lacf->atc == 0 && (int32_t) lh.timestamp < 0) { - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, - "live: abs %s header from the past", type_s); - continue; - } - ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, "live: abs %s header timestamp=%uD", type_s, lh.timestamp); - if (aheader == NULL) { - aheader = ngx_rtmp_append_shared_bufs(cscf, NULL, header); - ngx_rtmp_prepare_message(s, &lh, NULL, aheader); - } - - rc = ngx_rtmp_send_message(ss, aheader, 0); - - if (!lacf->atc) { - ngx_rtmp_free_shared_chain(cscf, aheader); - aheader = NULL; + if (apkt == NULL) { + apkt = ngx_rtmp_append_shared_bufs(cscf, NULL, header); + ngx_rtmp_prepare_message(s, &lh, NULL, apkt); } + rc = ngx_rtmp_send_message(ss, apkt, 0); if (rc != NGX_OK) { continue; } @@ -548,12 +603,6 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, /* send absolute packet */ - if (lacf->atc == 0 && (int32_t) ch.timestamp < 0) { - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, - "live: abs %s packet from the past", type_s); - continue; - } - ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, "live: abs %s packet timestamp=%uD", type_s, ch.timestamp); @@ -564,12 +613,6 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, } rc = ngx_rtmp_send_message(ss, apkt, prio); - - if (!lacf->atc) { - ngx_rtmp_free_shared_chain(cscf, apkt); - apkt = NULL; - } - if (rc != NGX_OK) { continue; } @@ -585,7 +628,7 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, /* send codec header if newer header has arrived */ - if (header && pctx->header_versions[hvidx] != header_version) { +/* if (header && pctx->header_versions[hvidx] != header_version) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, "live: new %s header", type_s); @@ -598,7 +641,7 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, pctx->header_versions[hvidx] = header_version; } } - +*/ /* send relative packet */ ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, @@ -622,14 +665,14 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, if (apkt) { ngx_rtmp_free_shared_chain(cscf, apkt); } - +/* if (rheader) { ngx_rtmp_free_shared_chain(cscf, rheader); } - - if (aheader) { +*/ + /*if (aheader) { ngx_rtmp_free_shared_chain(cscf, aheader); - } + }*/ ngx_rtmp_update_bandwidth(&ctx->stream->bw_in, h->mlen); ngx_rtmp_update_bandwidth(&ctx->stream->bw_out, h->mlen * peers);