diff --git a/ngx_rtmp_live_module.c b/ngx_rtmp_live_module.c
index 5ff6c1a..f2c94ad 100644
--- a/ngx_rtmp_live_module.c
+++ b/ngx_rtmp_live_module.c
@@ -248,6 +248,9 @@ ngx_rtmp_live_join(ngx_rtmp_session_t *s, u_char *name,
if (lacf->buflen) {
s->out_buffer = 1;
}
+
+ ctx->cs[0].csid = NGX_RTMP_MSG_AUDIO;
+ ctx->cs[1].csid = NGX_RTMP_MSG_VIDEO;
}
@@ -277,8 +280,6 @@ ngx_rtmp_live_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v)
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: leave '%s'", ctx->stream->name);
- ctx->msg_mask = 0;
-
if (ctx->stream->flags & NGX_RTMP_LIVE_PUBLISHING
&& ctx->flags & NGX_RTMP_LIVE_PUBLISHING)
{
@@ -313,7 +314,7 @@ ngx_rtmp_live_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v)
next:
return next_close_stream(s, v);
}
-
+
static ngx_int_t
ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
@@ -321,116 +322,103 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
{
ngx_rtmp_live_ctx_t *ctx, *pctx;
ngx_rtmp_codec_ctx_t *codec_ctx;
- ngx_chain_t *out, *peer_out, *header_out,
- *pheader_out, *meta;
+ ngx_chain_t *out, *pkt, *header, *pheader, *meta;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_live_app_conf_t *lacf;
ngx_rtmp_session_t *ss;
ngx_rtmp_header_t ch, lh;
- ngx_uint_t prio, peer_prio;
- ngx_uint_t peers, dropped_peers;
- size_t header_offset, last_offset;
+ ngx_uint_t prio;
+ ngx_uint_t peers;
ngx_uint_t header_version, meta_version;
- ngx_int_t diff_timestamp;
- uint32_t *last, timestamp;
+ ngx_uint_t csidx, hvidx;
+ uint32_t timestamp, delta;
+ ngx_rtmp_live_chunk_stream_t *cs;
+#ifdef NGX_DEBUG
+ const char *type_s;
+
+ type_s = (h->type == NGX_RTMP_MSG_VIDEO ? "video" : "audio");
+#endif
lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
if (lacf == NULL) {
- ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
- "live: NULL application");
return NGX_ERROR;
}
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
- if (!lacf->live
- || in == NULL || in->buf == NULL
- || ctx == NULL || ctx->stream == NULL
- || (h->type != NGX_RTMP_MSG_VIDEO
- && h->type != NGX_RTMP_MSG_AUDIO))
+ if (!lacf->live ||
+ in == NULL || in->buf == NULL ||
+ ctx == NULL || ctx->stream == NULL ||
+ (h->type != NGX_RTMP_MSG_VIDEO && h->type != NGX_RTMP_MSG_AUDIO))
{
return NGX_OK;
}
if ((ctx->flags & NGX_RTMP_LIVE_PUBLISHING) == 0) {
- ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
- "live: received audio/video from non-publisher");
+ ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
+ "live: %s from non-publisher", type_s);
return NGX_OK;
}
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
- "live: av: %s timestamp=%uD timeshift=%uD",
- h->type == NGX_RTMP_MSG_VIDEO ? "video" : "audio",
- h->timestamp, h->timeshift);
+ "live: %s packet timestamp=%uD timeshift=%uD",
+ type_s, h->timestamp, h->timeshift);
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
-
+
+ csidx = !(lacf->interleave || h->type == NGX_RTMP_MSG_VIDEO);
+ hvidx = (h->type == NGX_RTMP_MSG_VIDEO);
+
timestamp = h->timestamp;
if (lacf->atc == 0) {
timestamp += h->timeshift;
}
- /* prepare output header */
+ /* prepare output headers */
ngx_memzero(&ch, sizeof(ch));
ngx_memzero(&lh, sizeof(lh));
+
+ cs = &ctx->cs[csidx];
+
ch.timestamp = timestamp;
ch.msid = NGX_RTMP_MSID;
+ ch.csid = cs->csid;
ch.type = h->type;
- lh.msid = ch.msid;
- if (lacf->interleave || h->type == NGX_RTMP_MSG_VIDEO) {
- last_offset = offsetof(ngx_rtmp_live_ctx_t, last_video);
- } else {
- last_offset = offsetof(ngx_rtmp_live_ctx_t, last_audio);
+ lh = ch;
+
+ if (cs->active) {
+ lh.timestamp = cs->timestamp;
}
- last = (uint32_t *)((u_char *)ctx + last_offset);
- lh.timestamp = *last;
- *last = ch.timestamp;
+ cs->active = 1;
+ cs->timestamp = ch.timestamp;
- if (h->type == NGX_RTMP_MSG_VIDEO) {
- prio = ngx_rtmp_get_video_frame_type(in);
- ch.csid = NGX_RTMP_CSID_VIDEO;
- } else {
- prio = 0;
- ch.csid = NGX_RTMP_CSID_AUDIO;
- }
+ prio = (h->type == NGX_RTMP_MSG_VIDEO ?
+ ngx_rtmp_get_video_frame_type(in) : 0);
- if (lacf->interleave) {
- ch.csid = NGX_RTMP_CSID_VIDEO;
- }
-
- if ((ctx->msg_mask & (1 << ch.csid)) == 0) {
- lh.timestamp = ch.timestamp;
- ctx->msg_mask |= (1 << ch.csid);
- }
-
- lh.csid = ch.csid;
- diff_timestamp = ch.timestamp - lh.timestamp;
+ delta = ch.timestamp - lh.timestamp;
out = ngx_rtmp_append_shared_bufs(cscf, NULL, in);
ngx_rtmp_prepare_message(s, &ch, &lh, out);
peers = 0;
- dropped_peers = 0;
-
- codec_ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
- header_out = NULL;
- pheader_out = NULL;
- header_offset = 0;
+ header = NULL;
+ pheader = NULL;
header_version = 0;
meta = NULL;
meta_version = 0;
+
+ codec_ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
+
if (codec_ctx) {
if (h->type == NGX_RTMP_MSG_AUDIO) {
if (codec_ctx->aac_header) {
- header_out = codec_ctx->aac_header;
- header_offset = offsetof(ngx_rtmp_live_ctx_t, aac_version);
+ header = codec_ctx->aac_header;
header_version = codec_ctx->aac_version;
}
} else {
if (codec_ctx->avc_header) {
- header_out = codec_ctx->avc_header;
- header_offset = offsetof(ngx_rtmp_live_ctx_t, avc_version);
+ header = codec_ctx->avc_header;
header_version = codec_ctx->avc_version;
}
}
@@ -445,116 +433,122 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
if (pctx == ctx) {
continue;
}
- ++peers;
+
ss = pctx->session;
- last = (uint32_t *)((u_char *)pctx + last_offset);
ch.timestamp = timestamp;
if (lacf->atc == 0) {
- ch.timestamp -= (uint32_t)ss->epoch;
+ ch.timestamp -= (uint32_t) ss->epoch;
}
- lh.timestamp = ch.timestamp - diff_timestamp;
+ lh.timestamp = ch.timestamp - delta;
- /* send absolute frame */
- if ((pctx->msg_mask & (1 << ch.csid)) == 0) {
-
- /* packet from the past for the peer */
- if (lacf->atc == 0 && timestamp < (uint32_t)ss->epoch) {
- ngx_log_debug3(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
- "live: av: %s packet from the past %uD < %uD",
- h->type == NGX_RTMP_MSG_VIDEO ? "video" : "audio",
- timestamp, (uint32_t)ss->epoch);
- continue;
- }
-
- ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
- "live: av: abs %s timestamp=%uD",
- h->type == NGX_RTMP_MSG_VIDEO ? "video" : "audio",
- ch.timestamp);
-
- /* send codec header as abs frame if any */
- peer_out = ngx_rtmp_append_shared_bufs(cscf, NULL,
- header_out ? header_out : in);
- ngx_rtmp_prepare_message(s, &ch, NULL, peer_out);
- if (ngx_rtmp_send_message(ss, peer_out, prio) == NGX_OK) {
- pctx->msg_mask |= (1 << ch.csid);
- if (header_out) {
- *(ngx_uint_t *)((u_char *)pctx + header_offset)
- = header_version;
- *last = ch.timestamp;
- }
- }
-
- ngx_rtmp_free_shared_chain(cscf, peer_out);
- continue;
- }
-
- /* send AVC/H264 header if newer header has arrived */
- if (header_out && *(ngx_uint_t *)((u_char *)pctx + header_offset)
- != header_version)
- {
- ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
- "live: sending codec header");
-
- if (pheader_out == NULL) {
- pheader_out =
- ngx_rtmp_append_shared_bufs(cscf, NULL, header_out);
- ngx_rtmp_prepare_message(s, &ch, &ch, pheader_out);
- }
-
- if (ngx_rtmp_send_message(ss, pheader_out, 0) == NGX_OK) {
- *(ngx_uint_t *)((u_char *)pctx + header_offset)
- = header_version;
- }
- }
+ cs = &pctx->cs[csidx];
/* send metadata if newer exists */
if (meta && meta_version != pctx->meta_version) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
- "live: sending metadata");
- if (ngx_rtmp_send_message(ss, meta, prio) == NGX_OK) {
+ "live: sending metadata");
+
+ if (ngx_rtmp_send_message(ss, meta, 0) == NGX_OK) {
pctx->meta_version = meta_version;
}
}
+ /* send absolute frame */
+ if (!cs->active) {
+
+ /* packet from the past for the peer */
+ if (lacf->atc == 0 && timestamp < (uint32_t) ss->epoch) {
+ ngx_log_debug3(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
+ "live: %s packet from the past %uD < %uD",
+ type_s, timestamp, (uint32_t)ss->epoch);
+ continue;
+ }
+
+ if (header) {
+ ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
+ "live: abs %s header timestamp=%uD",
+ type_s, lh.timestamp);
+
+ pkt = ngx_rtmp_append_shared_bufs(cscf, NULL, header);
+ ngx_rtmp_prepare_message(s, &lh, NULL, pkt);
+
+ if (ngx_rtmp_send_message(ss, pkt, 0) == NGX_OK) {
+ cs->timestamp = lh.timestamp;
+ cs->active = 1;
+ pctx->header_versions[hvidx] = header_version;
+ }
+
+ ngx_rtmp_free_shared_chain(cscf, pkt);
+
+ } else {
+ ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
+ "live: abs %s frame timestamp=%uD",
+ type_s, ch.timestamp);
+
+ pkt = ngx_rtmp_append_shared_bufs(cscf, NULL, in);
+ ngx_rtmp_prepare_message(s, &ch, NULL, pkt);
+
+ if (ngx_rtmp_send_message(ss, pkt, prio) == NGX_OK) {
+ cs->timestamp = ch.timestamp;
+ cs->active = 1;
+ ++peers;
+ }
+
+ ngx_rtmp_free_shared_chain(cscf, pkt);
+
+ continue;
+ }
+ }
+
/* send absolute frame to sync stream */
if (!lacf->interleave && lacf->sync &&
- *last + lacf->sync < lh.timestamp)
+ cs->timestamp + lacf->sync < lh.timestamp)
{
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
- "live: av: sync %s: %i",
- h->type == NGX_RTMP_MSG_VIDEO ? "video" : "audio",
- (ngx_int_t) (lh.timestamp - *last));
+ "live: abs sync %s: %i",
+ type_s, (ngx_int_t) (lh.timestamp - cs->timestamp));
- peer_out = ngx_rtmp_alloc_shared_buf(cscf);
- ngx_rtmp_prepare_message(s, &lh, NULL, peer_out);
- if (ngx_rtmp_send_message(ss, peer_out, 0) == NGX_OK) {
- *last = lh.timestamp;
+ pkt = ngx_rtmp_alloc_shared_buf(cscf);
+ ngx_rtmp_prepare_message(s, &lh, NULL, pkt);
+ if (ngx_rtmp_send_message(ss, pkt, 0) == NGX_OK) {
+ cs->timestamp = lh.timestamp;
+ }
+ ngx_rtmp_free_shared_chain(cscf, pkt);
+ }
+
+ /* send codec header if newer header has arrived */
+ if (header && pctx->header_versions[hvidx] != header_version) {
+ ngx_log_debug1(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
+ "live: new %s header", type_s);
+
+ if (pheader == NULL) {
+ pheader = ngx_rtmp_append_shared_bufs(cscf, NULL, header);
+ ngx_rtmp_prepare_message(s, &ch, &ch, pheader);
+ }
+
+ if (ngx_rtmp_send_message(ss, pheader, 0) == NGX_OK) {
+ pctx->header_versions[hvidx] = header_version;
}
- ngx_rtmp_free_shared_chain(cscf, peer_out);
}
/* push buffered data */
- peer_prio = prio;
- if (ngx_rtmp_send_message(ss, out, peer_prio) != NGX_OK) {
- ++pctx->dropped;
- ++dropped_peers;
+ if (ngx_rtmp_send_message(ss, out, prio) == NGX_OK) {
+ cs->timestamp += delta;
+ ++peers;
continue;
}
-
- *last += diff_timestamp;
-
}
+
ngx_rtmp_free_shared_chain(cscf, out);
- if (pheader_out) {
- ngx_rtmp_free_shared_chain(cscf, pheader_out);
+ if (pheader) {
+ ngx_rtmp_free_shared_chain(cscf, pheader);
}
ngx_rtmp_update_bandwidth(&ctx->stream->bw_in, h->mlen);
- ngx_rtmp_update_bandwidth(&ctx->stream->bw_out,
- h->mlen * (peers - dropped_peers));
+ ngx_rtmp_update_bandwidth(&ctx->stream->bw_out, h->mlen * peers);
return NGX_OK;
}
diff --git a/ngx_rtmp_live_module.h b/ngx_rtmp_live_module.h
index f1255a5..3be4524 100644
--- a/ngx_rtmp_live_module.h
+++ b/ngx_rtmp_live_module.h
@@ -21,23 +21,22 @@ typedef struct ngx_rtmp_live_ctx_s ngx_rtmp_live_ctx_t;
typedef struct ngx_rtmp_live_stream_s ngx_rtmp_live_stream_t;
+typedef struct {
+ unsigned active:1;
+ uint32_t timestamp;
+ uint32_t csid;
+} ngx_rtmp_live_chunk_stream_t;
+
+
struct ngx_rtmp_live_ctx_s {
ngx_rtmp_session_t *session;
ngx_rtmp_live_stream_t *stream;
ngx_rtmp_live_ctx_t *next;
ngx_uint_t flags;
- ngx_uint_t msg_mask;
ngx_uint_t dropped;
- uint32_t csid;
- uint32_t last_audio;
- uint32_t last_video;
- ngx_uint_t aac_version;
- ngx_uint_t avc_version;
+ ngx_rtmp_live_chunk_stream_t cs[2];
+ ngx_uint_t header_versions[2];
ngx_uint_t meta_version;
-
- /* last stream timestamps */
- uint32_t last[2];
- uint32_t *plast[2];
};
diff --git a/ngx_rtmp_stat_module.c b/ngx_rtmp_stat_module.c
index e77884f..03f3a5c 100644
--- a/ngx_rtmp_stat_module.c
+++ b/ngx_rtmp_stat_module.c
@@ -309,11 +309,11 @@ ngx_rtmp_stat_live(ngx_http_request_t *r, ngx_chain_t ***lll,
"%uz", ctx->dropped) - buf);
NGX_RTMP_STAT_L("");
- NGX_RTMP_STAT_L("");
+ /*NGX_RTMP_STAT_L("");
NGX_RTMP_STAT(buf, ngx_snprintf(buf, sizeof(buf),
"%L", (int64_t)ctx->last_audio
- ctx->last_video) - buf);
- NGX_RTMP_STAT_L("");
+ NGX_RTMP_STAT_L("");*/
if (s->flashver.len) {
NGX_RTMP_STAT_L("");