mirror of
https://github.com/arut/nginx-rtmp-module.git
synced 2025-08-06 15:00:18 +08:00
implemented broadcast
This commit is contained in:
@ -20,6 +20,8 @@ static void ngx_rtmp_handshake_send(ngx_event_t *rev);
|
||||
|
||||
static void ngx_rtmp_recv(ngx_event_t *rev);
|
||||
static void ngx_rtmp_send(ngx_event_t *rev);
|
||||
static ngx_int_t ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
|
||||
ngx_rtmp_header_t *h, ngx_chain_t *in);
|
||||
|
||||
static void ngx_rtmp_close_connection(ngx_connection_t *c);
|
||||
|
||||
@ -196,7 +198,6 @@ ngx_rtmp_init_session(ngx_connection_t *c)
|
||||
{
|
||||
ngx_rtmp_session_t *s;
|
||||
ngx_rtmp_core_srv_conf_t *cscf;
|
||||
ngx_bufs_t bufs;
|
||||
ngx_buf_t *b;
|
||||
size_t size;
|
||||
|
||||
@ -211,13 +212,13 @@ ngx_rtmp_init_session(ngx_connection_t *c)
|
||||
}
|
||||
|
||||
s->in_streams = ngx_pcalloc(c->pool, sizeof(ngx_rtmp_stream_t)
|
||||
* cmcf->max_streams);
|
||||
* cscf->max_streams);
|
||||
if (s->in_streams == NULL) {
|
||||
ngx_rtmp_close_session(s);
|
||||
return;
|
||||
}
|
||||
|
||||
s->chunk_size = NGX_RTMP_DEFAULT_CHUNK_SIZE;
|
||||
s->in_chunk_size = NGX_RTMP_DEFAULT_CHUNK_SIZE;
|
||||
s->in_pool = ngx_create_pool(NGX_RTMP_HANDSHAKE_SIZE + 1
|
||||
+ sizeof(ngx_pool_t), c->log);
|
||||
|
||||
@ -373,14 +374,13 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
||||
ngx_connection_t *c;
|
||||
ngx_rtmp_session_t *s;
|
||||
ngx_rtmp_core_srv_conf_t *cscf;
|
||||
u_char *p;
|
||||
u_char *p, *pp;
|
||||
uint32_t timestamp;
|
||||
size_t size;
|
||||
ngx_chain_t *in;
|
||||
ngx_rtmp_header_t *h;
|
||||
ngx_rtmp_header_t *h;
|
||||
uint8_t fmt;
|
||||
uint32_t csid;
|
||||
ngx_rtmp_stream_t *st, st0;
|
||||
ngx_rtmp_stream_t *st, *st0;
|
||||
ngx_chain_t *in, *head;
|
||||
ngx_buf_t *b;
|
||||
|
||||
@ -391,11 +391,11 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
||||
|
||||
for(;;) {
|
||||
|
||||
st = &s->in_streams[s->csid];
|
||||
st = &s->in_streams[s->in_csid];
|
||||
|
||||
if (st->in == NULL) {
|
||||
if ((st->in = ngx_alloc_chain_link(s->in_pool)) == NULL
|
||||
|| (sin->in->buf = ngx_calloc_buf(s->in_pool)) == NULL)
|
||||
|| (st->in->buf = ngx_calloc_buf(s->in_pool)) == NULL)
|
||||
{
|
||||
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
|
||||
"chain alloc failed");
|
||||
@ -411,7 +411,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
||||
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
|
||||
"buf alloc failed");
|
||||
ngx_rtmp_close_session(s);
|
||||
return NULL;
|
||||
return;
|
||||
}
|
||||
st->in->buf->flush = 1;
|
||||
}
|
||||
@ -421,10 +421,10 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
||||
|
||||
/* anything remained from last iteration? */
|
||||
if (b != NULL && b->recycled && b->pos < b->last) {
|
||||
s->in->buf->last = ngx_movemem(s->in->buf->start, b->pos,
|
||||
st->in->buf->last = ngx_movemem(st->in->buf->start, b->pos,
|
||||
b->last - b->pos);
|
||||
b->recycled = 0;
|
||||
s->in->buf->flush = 0;
|
||||
st->in->buf->flush = 0;
|
||||
}
|
||||
|
||||
b = in->buf;
|
||||
@ -485,14 +485,14 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
||||
}
|
||||
|
||||
/* link orphan */
|
||||
if (s->csid == 0) {
|
||||
if (s->in_csid == 0) {
|
||||
|
||||
/* unlink from stream #0 */
|
||||
st->in = st->in->next;
|
||||
|
||||
/* link to new stream */
|
||||
s->csid = csid;
|
||||
st = s->in_streams[csid];
|
||||
s->in_csid = csid;
|
||||
st = &s->in_streams[csid];
|
||||
if (st->in == NULL) {
|
||||
in->next = in;
|
||||
} else {
|
||||
@ -518,7 +518,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
||||
pp[0] = *p++;
|
||||
pp[3] = 0;
|
||||
|
||||
if (mt <= 1) {
|
||||
if (fmt <= 1) {
|
||||
if (b->last - p < 4)
|
||||
continue;
|
||||
/* size:
|
||||
@ -554,7 +554,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
||||
pp[2] = *p++;
|
||||
pp[1] = *p++;
|
||||
pp[0] = *p++;
|
||||
} else if (h->fmt) {
|
||||
} else if (fmt) {
|
||||
h->timestamp += timestamp;
|
||||
} else {
|
||||
h->timestamp = timestamp;
|
||||
@ -573,7 +573,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
||||
|
||||
size = b->last - b->pos;
|
||||
|
||||
if (size < (ngx_int_t)ngx_min(h->mlen, s->chunk_size))
|
||||
if (size < ngx_min(h->mlen, s->in_chunk_size))
|
||||
continue;
|
||||
|
||||
/* buffer is ready */
|
||||
@ -600,18 +600,18 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
||||
st0->in->next = head;
|
||||
}
|
||||
|
||||
s->csid = 0;
|
||||
s->in_csid = 0;
|
||||
b->recycled = 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#define ngx_rtmp_buf_addref(b) \
|
||||
(++(ngx_int_t)(b)->tag)
|
||||
(++*(int*)&(b)->tag)
|
||||
|
||||
|
||||
#define ngx_rtmp_buf_release(b) \
|
||||
(--(ngx_int_t)(b)->tag)
|
||||
(--*(int*)&(b)->tag)
|
||||
|
||||
|
||||
static void
|
||||
@ -666,8 +666,8 @@ ngx_rtmp_send(ngx_event_t *wev)
|
||||
|
||||
/* return buffer to core */
|
||||
ln = l->next;
|
||||
l->next = cscf->free;
|
||||
cscf->free = l;
|
||||
l->next = cscf->out_free;
|
||||
cscf->out_free = l;
|
||||
l = ln;
|
||||
}
|
||||
}
|
||||
@ -723,7 +723,7 @@ ngx_rtmp_prepare_message(ngx_rtmp_header_t *h, ngx_chain_t *out,
|
||||
uint8_t fmt)
|
||||
{
|
||||
ngx_chain_t *l;
|
||||
u_char *p;
|
||||
u_char *p, *pp;
|
||||
ngx_int_t hsize, thsize, nbufs;
|
||||
uint32_t mlen, timestamp, ext_timestamp;
|
||||
static uint8_t hdrsize[] = { 12, 8, 4, 1 };
|
||||
@ -735,13 +735,13 @@ ngx_rtmp_prepare_message(ngx_rtmp_header_t *h, ngx_chain_t *out,
|
||||
mlen += (out->buf->last - l->buf->pos);
|
||||
++nbufs;
|
||||
}
|
||||
|
||||
/*
|
||||
ngx_log_debug7(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
||||
"RTMP send %s (%d) csid=%D timestamp=%D "
|
||||
"mlen=%D msid=%D nbufs=%d",
|
||||
ngx_rtmp_packet_type(h->type), (int)h->type,
|
||||
h->csid, h->timestamp, mlen, h->msid, nbufs);
|
||||
|
||||
*/
|
||||
/* determine initial header size */
|
||||
hsize = hdrsize[fmt];
|
||||
|
||||
@ -778,7 +778,7 @@ ngx_rtmp_prepare_message(ngx_rtmp_header_t *h, ngx_chain_t *out,
|
||||
*p++ = (uint8_t)((h->csid - 64) >> 8);
|
||||
}
|
||||
|
||||
thsize = p - b->pos;
|
||||
thsize = p - out->buf->pos;
|
||||
|
||||
/* message header */
|
||||
if (fmt <= 2) {
|
||||
@ -813,9 +813,10 @@ ngx_rtmp_prepare_message(ngx_rtmp_header_t *h, ngx_chain_t *out,
|
||||
|
||||
/* use the smallest fmt (3) for
|
||||
* trailing fragments */
|
||||
p = out->buf->pos;
|
||||
for(out = out->next; out; out = out->next) {
|
||||
out->pos -= hsize;
|
||||
ngx_memcpy(out->pos, b->pos, thsize);
|
||||
out->buf->pos -= hsize;
|
||||
ngx_memcpy(out->buf->pos, p, thsize);
|
||||
}
|
||||
}
|
||||
|
||||
@ -834,7 +835,7 @@ ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out)
|
||||
for(ll = &s->out; *ll; ll = &(*ll)->next);
|
||||
*ll = out;
|
||||
|
||||
ngx_rtmp_send(c->write);
|
||||
ngx_rtmp_send(s->connection->write);
|
||||
}
|
||||
|
||||
|
||||
@ -845,8 +846,10 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
|
||||
ngx_rtmp_core_main_conf_t *cmcf;
|
||||
ngx_array_t *evhs;
|
||||
size_t n;
|
||||
ngx_rtmp_event_handler_pt evh;
|
||||
ngx_rtmp_event_handler_pt *evh;
|
||||
ngx_connection_t *c;
|
||||
|
||||
c = s->connection;
|
||||
cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module);
|
||||
|
||||
#ifdef NGX_DEBUG
|
||||
@ -854,7 +857,7 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
|
||||
int nbufs;
|
||||
ngx_chain_t *ch;
|
||||
|
||||
for(nbufs = 1, ch = l;
|
||||
for(nbufs = 1, ch = in;
|
||||
ch->next;
|
||||
ch = ch->next, ++nbufs);
|
||||
|
||||
@ -873,13 +876,13 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
|
||||
}
|
||||
|
||||
evhs = &cmcf->events[h->type];
|
||||
for(n = 0; n < evhs->nelts; ++n) {
|
||||
evh = evh->elts[n];
|
||||
evh = evhs->elts;
|
||||
for(n = 0; n < evhs->nelts; ++n, ++evh) {
|
||||
if (!evh) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (evh(s, h, in) != NGX_OK) {
|
||||
if ((*evh)(s, h, in) != NGX_OK) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
}
|
||||
@ -897,8 +900,8 @@ ngx_rtmp_close_session(ngx_rtmp_session_t *s)
|
||||
|
||||
cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module);
|
||||
|
||||
for(n = 0; n < cmcf->disconnect.nelts; ++n) {
|
||||
h = &cmcf->disconnect.elts[n];
|
||||
h = cmcf->disconnect.elts;
|
||||
for(n = 0; n < cmcf->disconnect.nelts; ++n, ++h) {
|
||||
if (*h) {
|
||||
(*h)(s);
|
||||
}
|
||||
|
Reference in New Issue
Block a user