From 90f985fa2f015e8bdfe92a9b57a5d04b779d7be1 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Sun, 8 Apr 2012 01:44:57 +0400 Subject: [PATCH] implemented deferred session close --- ngx_rtmp.h | 3 +- ngx_rtmp_handler.c | 98 +++++++++++++++++++++++++-------------- ngx_rtmp_netcall_module.c | 2 +- 3 files changed, 66 insertions(+), 37 deletions(-) diff --git a/ngx_rtmp.h b/ngx_rtmp.h index 9a7f426..69953cf 100644 --- a/ngx_rtmp.h +++ b/ngx_rtmp.h @@ -175,6 +175,7 @@ typedef struct { uint32_t signature; /* "RTMP" */ /* <-- FIXME wtf */ ngx_connection_t *connection; + ngx_event_t close; void **ctx; void **main_conf; @@ -334,7 +335,7 @@ char* ngx_rtmp_user_message_type(uint16_t evt); #endif void ngx_rtmp_init_connection(ngx_connection_t *c); -void ngx_rtmp_close_connection(ngx_connection_t *c); +void ngx_rtmp_finalize_session(ngx_rtmp_session_t *s); u_char * ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len); uint32_t ngx_rtmp_get_timestamp(); ngx_int_t ngx_rtmp_set_chunk_size(ngx_rtmp_session_t *s, ngx_uint_t size); diff --git a/ngx_rtmp_handler.c b/ngx_rtmp_handler.c index e0944da..6b916ee 100644 --- a/ngx_rtmp_handler.c +++ b/ngx_rtmp_handler.c @@ -14,6 +14,7 @@ static void ngx_rtmp_init_session(ngx_connection_t *c); +static void ngx_rtmp_close_connection(ngx_connection_t *c); static void ngx_rtmp_handshake_recv(ngx_event_t *rev); static void ngx_rtmp_handshake_send(ngx_event_t *rev); @@ -241,10 +242,7 @@ ngx_rtmp_init_session(ngx_connection_t *c) size = NGX_RTMP_HANDSHAKE_SIZE + 1; ngx_rtmp_set_chunk_size(s, NGX_RTMP_DEFAULT_CHUNK_SIZE); -/* - s->in_chunk_size = NGX_RTMP_DEFAULT_CHUNK_SIZE; - s->in_pool = ngx_create_pool(4096, c->log); -*/ + /* start handshake */ b = &s->hs_in_buf; b->start = b->pos = b->last = ngx_pcalloc(s->in_pool, size); @@ -267,7 +265,7 @@ ngx_rtmp_init_session(ngx_connection_t *c) for(n = 0; n < ch->nelts; ++n, ++h) { if (*h) { if ((*h)(s, NULL, NULL) != NGX_OK) { - ngx_rtmp_close_connection(c); + ngx_rtmp_finalize_session(s); return; } } @@ -310,7 +308,7 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev) if (rev->timedout) { ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out"); c->timedout = 1; - ngx_rtmp_close_connection(c); + ngx_rtmp_finalize_session(s); return; } @@ -327,14 +325,14 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev) n = c->recv(c, b->last, b->end - b->last); if (n == NGX_ERROR || n == 0) { - ngx_rtmp_close_connection(c); + ngx_rtmp_finalize_session(s); return; } if (n == NGX_AGAIN) { ngx_add_timer(rev, cscf->timeout); if (ngx_handle_read_event(c->read, 0) != NGX_OK) { - ngx_rtmp_close_connection(c); + ngx_rtmp_finalize_session(s); } return; } @@ -351,7 +349,7 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev) if (*b->pos != NGX_RTMP_VERSION) { ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR, "invalid handshake signature"); - ngx_rtmp_close_connection(c); + ngx_rtmp_finalize_session(s); return; } @@ -428,7 +426,7 @@ ngx_rtmp_handshake_send(ngx_event_t *wev) ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out"); c->timedout = 1; - ngx_rtmp_close_connection(c); + ngx_rtmp_finalize_session(s); return; } @@ -447,14 +445,14 @@ restart: n = c->send(c, b->pos, b->last - b->pos); if (n == NGX_ERROR) { - ngx_rtmp_close_connection(c); + ngx_rtmp_finalize_session(s); return; } if (n == NGX_AGAIN || n == 0) { ngx_add_timer(c->write, cscf->timeout); if (ngx_handle_write_event(c->write, 0) != NGX_OK) { - ngx_rtmp_close_connection(c); + ngx_rtmp_finalize_session(s); return; } } @@ -540,7 +538,7 @@ ngx_rtmp_recv(ngx_event_t *rev) if (st->in == NULL) { ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR, "in buf alloc failed"); - ngx_rtmp_close_connection(c); + ngx_rtmp_finalize_session(s); return; } } @@ -570,13 +568,13 @@ ngx_rtmp_recv(ngx_event_t *rev) n = c->recv(c, b->last, b->end - b->last); if (n == NGX_ERROR || n == 0) { - ngx_rtmp_close_connection(c); + ngx_rtmp_finalize_session(s); return; } if (n == NGX_AGAIN) { if (ngx_handle_read_event(c->read, 0) != NGX_OK) { - ngx_rtmp_close_connection(c); + ngx_rtmp_finalize_session(s); } return; } @@ -592,7 +590,7 @@ ngx_rtmp_recv(ngx_event_t *rev) "sending RTMP ACK(%D)", s->in_bytes); if (ngx_rtmp_send_ack(s, s->in_bytes)) { - ngx_rtmp_close_connection(c); + ngx_rtmp_finalize_session(s); return; } } @@ -631,7 +629,7 @@ ngx_rtmp_recv(ngx_event_t *rev) ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR, "RTMP in chunk stream too big: %D >= %D", csid, cscf->max_streams); - ngx_rtmp_close_connection(c); + ngx_rtmp_finalize_session(s); return; } @@ -733,7 +731,7 @@ ngx_rtmp_recv(ngx_event_t *rev) if (h->mlen > cscf->max_message) { ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR, "too big message: %uz", cscf->max_message); - ngx_rtmp_close_connection(c); + ngx_rtmp_finalize_session(s); return; } } @@ -763,7 +761,7 @@ ngx_rtmp_recv(ngx_event_t *rev) st->len = 0; if (ngx_rtmp_receive_message(s, h, head) != NGX_OK) { - ngx_rtmp_close_connection(c); + ngx_rtmp_finalize_session(s); return; } @@ -809,7 +807,7 @@ ngx_rtmp_send(ngx_event_t *wev) ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out"); c->timedout = 1; - ngx_rtmp_close_connection(c); + ngx_rtmp_finalize_session(s); return; } @@ -854,7 +852,7 @@ ngx_rtmp_send(ngx_event_t *wev) out = c->send_chain(c, s->out, limit); if (out == NGX_CHAIN_ERROR) { - ngx_rtmp_close_connection(c); + ngx_rtmp_finalize_session(s); return; } @@ -862,7 +860,7 @@ ngx_rtmp_send(ngx_event_t *wev) cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); ngx_add_timer(c->write, cscf->timeout); if (ngx_handle_write_event(c->write, 0) != NGX_OK) { - ngx_rtmp_close_connection(c); + ngx_rtmp_finalize_session(s); } return; } @@ -913,7 +911,7 @@ ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR, "RTMP out chunk stream too big: %D >= %D", h->csid, cscf->max_streams); - ngx_rtmp_close_connection(c); + ngx_rtmp_finalize_session(s); return; } @@ -1257,28 +1255,37 @@ ngx_rtmp_finalize_set_chunk_size(ngx_rtmp_session_t *s) } -void +static void ngx_rtmp_close_connection(ngx_connection_t *c) { - ngx_rtmp_session_t *s; ngx_pool_t *pool; + + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "close connection"); + + pool = c->pool; + ngx_close_connection(c); + ngx_destroy_pool(pool); +} + + +static void +ngx_rtmp_close_session_handler(ngx_event_t *e) +{ + ngx_rtmp_session_t *s; + ngx_connection_t *c; ngx_rtmp_core_main_conf_t *cmcf; ngx_rtmp_core_srv_conf_t *cscf; ngx_rtmp_handler_pt *h; ngx_array_t *dh; size_t n; - if (c->destroyed) { - return; - } + s = e->data; + c = s->connection; - c->destroyed = 1; - - s = c->data; cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module); cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "close connection"); + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "close session"); if (s) { dh = &cmcf->events[NGX_RTMP_DISCONNECT]; @@ -1305,9 +1312,30 @@ ngx_rtmp_close_connection(ngx_connection_t *c) ngx_rtmp_free_shared_buf(cscf, s->out->buf); } - pool = c->pool; - ngx_close_connection(c); - ngx_destroy_pool(pool); + ngx_rtmp_close_connection(c); +} + + +void +ngx_rtmp_finalize_session(ngx_rtmp_session_t *s) +{ + ngx_event_t *e; + ngx_connection_t *c; + + /* deferred session finalize; + * schedule handler here */ + + c = s->connection; + + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "finalize session"); + + c->destroyed = 1; + e = &s->close; + e->data = s; + e->handler = ngx_rtmp_close_session_handler; + e->log = c->log; + + ngx_post_event(e, &ngx_posted_events); } diff --git a/ngx_rtmp_netcall_module.c b/ngx_rtmp_netcall_module.c index 2106673..3964d44 100644 --- a/ngx_rtmp_netcall_module.c +++ b/ngx_rtmp_netcall_module.c @@ -293,7 +293,7 @@ ngx_rtmp_netcall_close(ngx_connection_t *cc) if (cs->handle && cs->handle(s, cs->arg, cs->in) != NGX_OK) { - ngx_rtmp_close_connection(c); + ngx_rtmp_finalize_session(s); } }