implemented shared output chains

This commit is contained in:
Roman Arutyunyan
2012-04-18 16:37:18 +04:00
parent 201d1b28ae
commit 40684b12b4
7 changed files with 105 additions and 203 deletions

View File

@ -48,9 +48,9 @@ ngx_rtmp_message_type(uint8_t type) {
"amf3_meta",
"amf3_shared",
"amd3_cmd",
"amf0_meta",
"amf0_shared",
"amf0_cmd",
"amf_meta",
"amf_shared",
"amf_cmd",
"?",
"aggregate"
};
@ -233,6 +233,7 @@ ngx_rtmp_init_session(ngx_connection_t *c)
return;
}
s->in_streams = ngx_pcalloc(c->pool, sizeof(ngx_rtmp_stream_t)
* cscf->max_streams);
if (s->in_streams == NULL) {
@ -241,8 +242,19 @@ ngx_rtmp_init_session(ngx_connection_t *c)
}
size = NGX_RTMP_HANDSHAKE_SIZE + 1;
s->out_start = ngx_palloc(c->pool, sizeof(ngx_chain_t *) * cscf->max_queue);
if (s->out_start == NULL) {
ngx_rtmp_close_connection(c);
return;
}
s->out_pos = s->out_last = s->out_start;
s->out_end = s->out_start + cscf->max_queue;
ngx_rtmp_set_chunk_size(s, NGX_RTMP_DEFAULT_CHUNK_SIZE);
/* start handshake */
b = &s->hs_in_buf;
b->start = b->pos = b->last = ngx_pcalloc(s->in_pool, size);
@ -790,10 +802,7 @@ ngx_rtmp_send(ngx_event_t *wev)
ngx_connection_t *c;
ngx_rtmp_session_t *s;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_chain_t *out, *l, *cl;
u_char *p;
off_t limit;
size_t n;
ngx_int_t n;
c = wev->data;
s = c->data;
@ -815,78 +824,43 @@ ngx_rtmp_send(ngx_event_t *wev)
ngx_del_timer(wev);
}
while (s->out) {
p = s->out->buf->pos;
while (s->out_chain) {
n = c->send(c, s->out_bpos, s->out_chain->buf->last - s->out_bpos);
/* send_chain calls writev for output.
* It uses mixed allocation model for
* for iovecs passed to writev. Only 64
* structs fit into stack. When writing more
* memory is allocated from c->pool and
* **NEVER EVER** returned back.
* IOV_MAX=1024 on Linux.
*
* The only way to escape allocation is
* limiting the number of output data blocks
* being written at once with NGX_HEADERS
* (64 by default).
*
* FIXME: NGINX
* Unfortunately NGINX API does not allow
* us to specify max number of such blocks
* but only size limit. We're left with
* limiting by size which leads to extra
* loop here to find size of first 64
* blocks in output.
* */
limit = 0;
n = 0;
cl = s->out;
while (cl && n < 64) {
++n;
limit += cl->buf->last - cl->buf->pos;
cl = cl->next;
}
out = c->send_chain(c, s->out, limit);
if (out == NGX_CHAIN_ERROR) {
if (n == NGX_ERROR) {
ngx_rtmp_finalize_session(s);
return;
}
if (out == s->out && out->buf->pos == p) {
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
if (n == NGX_AGAIN || n == 0) {
ngx_add_timer(c->write, cscf->timeout);
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
ngx_rtmp_finalize_session(s);
return;
}
return;
}
while (s->out) {
l = s->out;
if (l->buf->pos < l->buf->last) {
break;
}
s->out = s->out->next;
l->next = NULL;
l->next = s->out_free_chains;
s->out_free_chains = l;
ngx_rtmp_free_shared_buf(cscf, l->buf);
if (s->out == out) {
break;
s->out_bpos += n;
if (s->out_bpos == s->out_chain->buf->last) {
s->out_chain = s->out_chain->next;
if (s->out_chain == NULL) {
ngx_rtmp_free_shared_chain(cscf, *s->out_pos);
++s->out_pos;
if (s->out_pos == s->out_end) {
s->out_pos = s->out_start;
}
if (s->out_pos == s->out_last) {
break;
}
s->out_chain = *s->out_pos;
}
s->out_bpos = s->out_chain->buf->pos;
}
}
ngx_del_event(wev, NGX_WRITE_EVENT, 0);
if (wev->active) {
ngx_del_event(wev, NGX_WRITE_EVENT, 0);
}
}
@ -1038,68 +1012,42 @@ ngx_int_t
ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out,
ngx_uint_t priority)
{
ngx_chain_t *l, **ll;
ngx_connection_t *c;
ngx_buf_t *b;
ngx_rtmp_core_srv_conf_t *cscf;
size_t nbytes, nbufs, qbytes, qbufs;
size_t nmsg;
c = s->connection;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
qbytes = 0;
qbufs = 0;
nbytes = 0;
nbufs = 0;
for(ll = &s->out; *ll; ll = &(*ll)->next) {
qbytes += (*ll)->buf->last - (*ll)->buf->pos;
++qbufs;
}
nmsg = (s->out_pos <= s->out_last)
? s->out_last - s->out_pos
: (s->out_end - s->out_pos) + (s->out_last - s->out_start);
++nmsg;
/* drop packet? */
if (qbytes > cscf->max_buf / (priority + 1)) {
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, c->log, 0,
"drop message bytes=%uz, bufs=%uz priority=%ui",
qbytes, qbufs, priority);
/* drop packet?
* Note we always leave 1 slot free */
if (nmsg >= (s->out_end - s->out_start) / (priority + 1)) {
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP drop message bufs=%ui, priority=%ui",
nmsg, priority);
return NGX_AGAIN;
}
/* append locally-linked chain of shared buffers */
for(l = out; l; l = l->next) {
ngx_rtmp_acquire_shared_chain(out);
if (s->out_free_chains) {
*ll = s->out_free_chains;
s->out_free_chains = (*ll)->next;
} else {
*ll = ngx_alloc_chain_link(c->pool);
if (*ll == NULL) {
break;
}
(*ll)->buf = ngx_calloc_buf(c->pool);
if ((*ll)->buf == NULL) {
ngx_free_chain(c->pool, (*ll));
break;
}
}
b = (*ll)->buf;
*b = *l->buf;
ngx_rtmp_acquire_shared_buf(b);
ll = &(*ll)->next;
nbytes += (b->last - b->pos);
++nbufs;
*s->out_last++ = out;
if (s->out_last >= s->out_end) {
s->out_last = s->out_start;
}
*ll = NULL;
ngx_log_debug7(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP send bytes=%uz+%uz, bufs=%uz+%uz, priority=%ui, "
"ready=%d, active=%d",
qbytes, nbytes, qbufs, nbufs, priority,
c->write->ready, c->write->active);
if (s->out_chain == NULL) {
s->out_chain = out;
s->out_bpos = out->buf->pos;
}
ngx_log_debug4(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP send nmsg=%ui, priority=%ui, ready=%d, active=%d",
nmsg, priority, c->write->ready, c->write->active);
if (!c->write->active) {
ngx_rtmp_send(c->write);
@ -1306,10 +1254,12 @@ ngx_rtmp_close_session_handler(ngx_event_t *e)
}
}
/* release only buffers, links are local
* and will be released as part of pool */
for (; s->out; s->out = s->out->next) {
ngx_rtmp_free_shared_buf(cscf, s->out->buf);
while (s->out_pos != s->out_last) {
ngx_rtmp_free_shared_chain(cscf, *s->out_pos);
++s->out_pos;
if (s->out_pos == s->out_end) {
s->out_pos = s->out_start;
}
}
ngx_rtmp_close_connection(c);