From 18e4762db29139ada9c8be4b36070005e8d01efc Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Thu, 29 Nov 2012 23:28:56 +0400 Subject: [PATCH] improved dropper in control module; now dropper support dropping all clients with the same address; dropping all subscribers is supported as well --- ngx_rtmp_control_module.c | 252 ++++++++++++++++++++++++++++++++------ 1 file changed, 212 insertions(+), 40 deletions(-) diff --git a/ngx_rtmp_control_module.c b/ngx_rtmp_control_module.c index f76639e..3b968ec 100644 --- a/ngx_rtmp_control_module.c +++ b/ngx_rtmp_control_module.c @@ -35,6 +35,20 @@ typedef struct { #define NGX_RTMP_CONTROL_DROP 0x02 +enum { + NGX_RTMP_CONTROL_DROP_PUBLISHER, + NGX_RTMP_CONTROL_DROP_SUBSCRIBER, + NGX_RTMP_CONTROL_DROP_CLIENT, +}; + + +typedef struct { + ngx_uint_t method; + ngx_str_t addr; + ngx_uint_t ndropped; +} ngx_rtmp_control_drop_t; + + typedef struct { ngx_uint_t control; } ngx_rtmp_control_loc_conf_t; @@ -193,6 +207,10 @@ ngx_rtmp_control_parse_live(ngx_http_request_t *r, ngx_memzero(&name, sizeof(name)); ngx_http_arg(r, (u_char *) "name", sizeof("name") - 1, &name); + if (name.len == 0) { + return NGX_CONF_OK; + } + live->lacf = core->cacf->app_conf[ngx_rtmp_live_module.ctx_index]; /* find live stream by name */ @@ -245,6 +263,7 @@ ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method) goto error; } + ngx_memzero(&live, sizeof(live)); msg = ngx_rtmp_control_parse_live(r, &core, &live); if (msg != NGX_CONF_OK) { goto error; @@ -328,68 +347,221 @@ error: } +static const char * +ngx_rtmp_control_drop_session(ngx_http_request_t *r, + ngx_rtmp_control_drop_t *drop, + ngx_rtmp_live_ctx_t *lctx) +{ + ngx_rtmp_session_t *s; + ngx_str_t *paddr; + + s = lctx->session; + + if (s == NULL || s->connection == NULL) + { + return NGX_CONF_OK; + } + + if (drop->addr.len) { + paddr = &s->connection->addr_text; + if (paddr->len != drop->addr.len || + ngx_strncmp(paddr->data, drop->addr.data, drop->addr.len)) + { + return NGX_CONF_OK; + } + } + + switch (drop->method) { + case NGX_RTMP_CONTROL_DROP_PUBLISHER: + if (!lctx->publishing) { + return NGX_CONF_OK; + } + + case NGX_RTMP_CONTROL_DROP_SUBSCRIBER: + if (lctx->publishing) { + return NGX_CONF_OK; + } + + case NGX_RTMP_CONTROL_DROP_CLIENT: + break; + } + + ngx_rtmp_finalize_session(s); + ++drop->ndropped; + + return NGX_CONF_OK; +} + + +static const char * +ngx_rtmp_control_drop_stream(ngx_http_request_t *r, + ngx_rtmp_control_drop_t *drop, + ngx_rtmp_live_stream_t *ls) +{ + ngx_rtmp_live_ctx_t *lctx; + const char *s; + + for (lctx = ls->ctx; lctx; lctx = lctx->next) { + s = ngx_rtmp_control_drop_session(r, drop, lctx); + if (s != NGX_CONF_OK) { + return s; + } + } + + return NGX_CONF_OK; +} + + +static const char * +ngx_rtmp_control_drop_app(ngx_http_request_t *r, + ngx_rtmp_control_drop_t *drop, + ngx_rtmp_core_app_conf_t *cacf) +{ + ngx_rtmp_live_app_conf_t *lacf; + ngx_rtmp_live_stream_t *ls; + ngx_str_t name; + const char *s; + size_t len; + ngx_uint_t n; + + ngx_memzero(&name, sizeof(name)); + ngx_http_arg(r, (u_char *) "name", sizeof("name") - 1, &name); + + lacf = cacf->app_conf[ngx_rtmp_live_module.ctx_index]; + + if (name.len == 0) { + for (n = 0; n < (ngx_uint_t) lacf->nbuckets; ++n) { + for (ls = lacf->streams[n]; ls; ls = ls->next) + { + s = ngx_rtmp_control_drop_stream(r, drop, ls); + if (s != NGX_CONF_OK) { + return s; + } + } + } + + return NGX_CONF_OK; + } + + for (ls = lacf->streams[ngx_hash_key(name.data, name.len) % lacf->nbuckets]; + ls; ls = ls->next) + { + len = ngx_strlen(ls->name); + if (name.len != len || ngx_strncmp(name.data, ls->name, name.len)) { + continue; + } + + s = ngx_rtmp_control_drop_stream(r, drop, ls); + if (s != NGX_CONF_OK) { + return s; + } + } + + return NGX_CONF_OK; +} + + +static const char * +ngx_rtmp_control_drop_srv(ngx_http_request_t *r, + ngx_rtmp_control_drop_t *drop, + ngx_rtmp_core_srv_conf_t *cscf) +{ + ngx_rtmp_core_app_conf_t **pcacf; + ngx_str_t app; + ngx_uint_t n; + const char *s; + + ngx_memzero(&app, sizeof(app)); + ngx_http_arg(r, (u_char *) "app", sizeof("app") - 1, &app); + + pcacf = cscf->applications.elts; + + for (n = 0; n < cscf->applications.nelts; ++n, ++pcacf) { + if (app.len && ((*pcacf)->name.len != app.len || + ngx_strncmp((*pcacf)->name.data, app.data, app.len))) + { + continue; + } + + s = ngx_rtmp_control_drop_app(r, drop, *pcacf); + if (s != NGX_CONF_OK) { + return s; + } + } + + return NGX_CONF_OK; +} + + +static const char * +ngx_rtmp_control_drop_main(ngx_http_request_t *r, + ngx_rtmp_control_drop_t *drop, + ngx_rtmp_core_main_conf_t *cmcf) +{ + ngx_rtmp_core_srv_conf_t **pcscf; + ngx_str_t srv; + ngx_uint_t sn; + + sn = 0; + if (ngx_http_arg(r, (u_char *) "srv", sizeof("srv") - 1, &srv) == NGX_OK) { + sn = ngx_atoi(srv.data, srv.len); + } + + if (sn >= cmcf->servers.nelts) { + return "Server index out of range"; + } + + pcscf = cmcf->servers.elts; + pcscf += sn; + + return ngx_rtmp_control_drop_srv(r, drop, *pcscf); +} + + static ngx_int_t ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method) { - ngx_rtmp_control_core_t core; - ngx_rtmp_control_live_t live; - ngx_rtmp_live_ctx_t *lctx; - ngx_str_t addr, *paddr; - const char *msg; - ngx_uint_t ndropped; + ngx_rtmp_control_drop_t drop; size_t len; u_char *p; ngx_buf_t *b; ngx_chain_t cl; + const char *msg; - msg = ngx_rtmp_control_parse_core(r, &core); - if (msg != NGX_CONF_OK) { + if (ngx_rtmp_core_main_conf == NULL) { + msg = "Empty main conf"; goto error; } - msg = ngx_rtmp_control_parse_live(r, &core, &live); - if (msg != NGX_CONF_OK) { - goto error; - } - - ndropped = 0; + ngx_memzero(&drop, sizeof(drop)); if (method->len == sizeof("publisher") - 1 && - ngx_strncmp(method->data, "publisher", method->len) == 0) + ngx_memcmp(method->data, "publisher", method->len) == 0) { - for (lctx = live.ls->ctx; lctx; lctx = lctx->next) { - if (lctx->publishing) { - ngx_rtmp_finalize_session(lctx->session); - ++ndropped; - break; - } - } + drop.method = NGX_RTMP_CONTROL_DROP_PUBLISHER; + + } else if (method->len == sizeof("subscriber") - 1 && + ngx_memcmp(method->data, "subscriber", method->len) == 0) + { + drop.method = NGX_RTMP_CONTROL_DROP_SUBSCRIBER; } else if (method->len == sizeof("client") - 1 && - ngx_strncmp(method->data, "client", method->len) == 0) + ngx_memcmp(method->data, "client", method->len) == 0) { - ngx_memzero(&addr, sizeof(addr)); - ngx_http_arg(r, (u_char *) "addr", sizeof("addr") - 1, &addr); - - for (lctx = live.ls->ctx; lctx; lctx = lctx->next) { - if (addr.len && lctx->session && lctx->session->connection) { - paddr = &lctx->session->connection->addr_text; - if (paddr->len != addr.len || - ngx_strncmp(paddr->data, addr.data, addr.len)) - { - continue; - } - } - - ngx_rtmp_finalize_session(lctx->session); - ++ndropped; - } + drop.method = NGX_RTMP_CONTROL_DROP_CLIENT; } else { msg = "Undefined method"; goto error; } + ngx_http_arg(r, (u_char *) "addr", sizeof("addr") - 1, &drop.addr); + + msg = ngx_rtmp_control_drop_main(r, &drop, ngx_rtmp_core_main_conf); + if (msg != NGX_CONF_OK) { + goto error; + } + /* output ndropped */ len = NGX_OFF_T_LEN; @@ -399,7 +571,7 @@ ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method) return NGX_ERROR; } - len = (size_t) (ngx_snprintf(p, len, "%ui", ndropped) - p); + len = (size_t) (ngx_snprintf(p, len, "%ui", drop.ndropped) - p); r->headers_out.status = NGX_HTTP_OK; r->headers_out.content_length_n = len;