From f65f07deb32565b144e22faece57638f8961d62f Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Tue, 23 Oct 2012 10:46:44 +0400 Subject: [PATCH] implemented connection dropped in control module --- ngx_rtmp_control_module.c | 342 ++++++++++++++++++++++++++++---------- 1 file changed, 251 insertions(+), 91 deletions(-) diff --git a/ngx_rtmp_control_module.c b/ngx_rtmp_control_module.c index 227b677..ff7bcd1 100644 --- a/ngx_rtmp_control_module.c +++ b/ngx_rtmp_control_module.c @@ -17,8 +17,22 @@ static char * ngx_rtmp_control_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child); +typedef struct { + ngx_rtmp_core_main_conf_t *cmcf; + ngx_rtmp_core_srv_conf_t *cscf; + ngx_rtmp_core_app_conf_t *cacf; +} ngx_rtmp_control_core_t; + + +typedef struct { + ngx_rtmp_live_app_conf_t *lacf; + ngx_rtmp_live_stream_t *ls; +} ngx_rtmp_control_live_t; + + #define NGX_RTMP_CONTROL_ALL 0xff #define NGX_RTMP_CONTROL_RECORD 0x01 +#define NGX_RTMP_CONTROL_DROP 0x02 typedef struct { @@ -27,9 +41,10 @@ typedef struct { static ngx_conf_bitmask_t ngx_rtmp_control_masks[] = { - { ngx_string("all"), NGX_RTMP_CONTROL_ALL }, - { ngx_string("record"), NGX_RTMP_CONTROL_RECORD }, - { ngx_null_string, 0 } + { ngx_string("all"), NGX_RTMP_CONTROL_ALL }, + { ngx_string("record"), NGX_RTMP_CONTROL_RECORD }, + { ngx_string("drop"), NGX_RTMP_CONTROL_DROP }, + { ngx_null_string, 0 } }; @@ -77,6 +92,131 @@ ngx_module_t ngx_rtmp_control_module = { }; +static ngx_int_t +ngx_rtmp_control_output_error(ngx_http_request_t *r, const char *msg) +{ + size_t len; + ngx_buf_t *b; + ngx_chain_t cl; + + len = ngx_strlen(msg); + + r->headers_out.status = NGX_HTTP_BAD_REQUEST; + r->headers_out.content_length_n = len; + + b = ngx_calloc_buf(r->pool); + if (b == NULL) { + return NGX_ERROR; + } + + ngx_memzero(&cl, sizeof(cl)); + cl.buf = b; + + b->start = b->pos = (u_char *) msg; + b->end = b->last = (u_char *) msg + len; + b->memory = 1; + b->last_buf = 1; + + ngx_http_send_header(r); + + return ngx_http_output_filter(r, &cl); +} + + +static const char * +ngx_rtmp_control_parse_core(ngx_http_request_t *r, + ngx_rtmp_control_core_t *core) +{ + ngx_str_t srv, app; + ngx_uint_t sn, n; + ngx_rtmp_core_srv_conf_t **pcscf; + ngx_rtmp_core_app_conf_t **pcacf; + + + core->cmcf = ngx_rtmp_core_main_conf; + if (core->cmcf == NULL) { + return "Missing main RTMP conf"; + } + + /* find server */ + sn = 0; + + if (ngx_http_arg(r, (u_char *) "srv", sizeof("srv") - 1, &srv) == NGX_OK) { + sn = ngx_atoi(srv.data, srv.len); + } + + if (sn >= core->cmcf->servers.nelts) { + return "Server index out of range"; + } + + pcscf = core->cmcf->servers.elts; + pcscf += sn; + + core->cscf = *pcscf; + + /* find application */ + if (ngx_http_arg(r, (u_char *) "app", sizeof("app") - 1, &app) != NGX_OK) { + ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0, + "rtmp_control: app not specified"); + return "Application not specified"; + } + + core->cacf = NULL; + + pcacf = core->cscf->applications.elts; + + for (n = 0; n < core->cscf->applications.nelts; ++n, ++pcacf) { + if ((*pcacf)->name.len == app.len && + ngx_strncmp((*pcacf)->name.data, app.data, app.len) == 0) + { + core->cacf = *pcacf; + break; + } + } + + if (core->cacf == NULL) { + return "Application not found"; + } + + return NGX_CONF_OK; +} + + +static const char * +ngx_rtmp_control_parse_live(ngx_http_request_t *r, + ngx_rtmp_control_core_t *core, + ngx_rtmp_control_live_t *live) +{ + ngx_str_t name; + size_t len; + + ngx_memzero(&name, sizeof(name)); + ngx_http_arg(r, (u_char *) "name", sizeof("name") - 1, &name); + + live->lacf = core->cacf->app_conf[ngx_rtmp_live_module.ctx_index]; + + /* find live stream by name */ + for (live->ls = live->lacf->streams[ngx_hash_key(name.data, name.len) % + live->lacf->nbuckets]; + live->ls; live->ls = live->ls->next) + { + len = ngx_strlen(live->ls->name); + + if (name.len == len && ngx_strncmp(name.data, live->ls->name, name.len) + == 0) + { + break; + } + } + + if (live->ls == NULL) { + return "Live stream not found"; + } + + return NGX_CONF_OK; +} + + /* /record arguments: * srv - server index (optional) * app - application name @@ -88,116 +228,55 @@ ngx_module_t ngx_rtmp_control_module = { static ngx_int_t ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method) { + ngx_rtmp_control_core_t core; + ngx_rtmp_control_live_t live; ngx_rtmp_record_app_conf_t *racf; - ngx_rtmp_core_main_conf_t *cmcf; - ngx_rtmp_core_srv_conf_t **pcscf, *cscf; - ngx_rtmp_core_app_conf_t **pcacf, *cacf; - ngx_rtmp_live_app_conf_t *lacf; - ngx_rtmp_live_stream_t *ls; ngx_rtmp_live_ctx_t *lctx; ngx_rtmp_session_t *s; ngx_chain_t cl; - ngx_uint_t sn, rn, n; - ngx_str_t srv, app, rec, name, path; - ngx_str_t msg; + ngx_uint_t rn; + ngx_str_t rec, path; ngx_buf_t *b; ngx_int_t rc; - size_t len; + const char *msg; - sn = 0; - if (ngx_http_arg(r, (u_char *) "srv", sizeof("srv") - 1, &srv) == NGX_OK) { - sn = ngx_atoi(srv.data, srv.len); - } - - if (ngx_http_arg(r, (u_char *) "app", sizeof("app") - 1, &app) != NGX_OK) { - ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0, - "rtmp_control: app not specified"); - ngx_str_set(&msg, "Application not specified"); + msg = ngx_rtmp_control_parse_core(r, &core); + if (msg != NGX_CONF_OK) { goto error; } - ngx_memzero(&rec, sizeof(rec)); - ngx_http_arg(r, (u_char *) "rec", sizeof("rec") - 1, &rec); - - ngx_memzero(&name, sizeof(name)); - ngx_http_arg(r, (u_char *) "name", sizeof("name") - 1, &name); - - cmcf = ngx_rtmp_core_main_conf; - if (cmcf == NULL) { - ngx_str_set(&msg, "Missing main RTMP conf"); - goto error; - } - - /* find server */ - if (sn >= cmcf->servers.nelts) { - ngx_str_set(&msg, "Server index out of range"); - goto error; - } - - pcscf = cmcf->servers.elts; - pcscf += sn; - cscf = *pcscf; - - /* find application */ - pcacf = cscf->applications.elts; - cacf = NULL; - - for (n = 0; n < cscf->applications.nelts; ++n, ++pcacf) { - if ((*pcacf)->name.len == app.len && - ngx_strncmp((*pcacf)->name.data, app.data, app.len) == 0) - { - cacf = *pcacf; - break; - } - } - - if (cacf == NULL) { - ngx_str_set(&msg, "Application not found"); - goto error; - } - - lacf = cacf->app_conf[ngx_rtmp_live_module.ctx_index]; - racf = cacf->app_conf[ngx_rtmp_record_module.ctx_index]; - - /* find live stream by name */ - for (ls = lacf->streams[ngx_hash_key(name.data, name.len) % lacf->nbuckets]; - ls; ls = ls->next) - { - len = ngx_strlen(ls->name); - - if (name.len == len && ngx_strncmp(name.data, ls->name, name.len) - == 0) - { - break; - } - } - - if (ls == NULL) { - ngx_str_set(&msg, "Live stream not found"); + msg = ngx_rtmp_control_parse_live(r, &core, &live); + if (msg != NGX_CONF_OK) { goto error; } /* find publisher context */ - for (lctx = ls->ctx; lctx; lctx = lctx->next) { + for (lctx = live.ls->ctx; lctx; lctx = lctx->next) { if (lctx->flags & NGX_RTMP_LIVE_PUBLISHING) { break; } } if (lctx == NULL) { - ngx_str_set(&msg, "No publisher"); + msg = "No publisher"; goto error; } s = lctx->session; /* find recorder */ + ngx_memzero(&rec, sizeof(rec)); + ngx_http_arg(r, (u_char *) "rec", sizeof("rec") - 1, &rec); + + racf = core.cacf->app_conf[ngx_rtmp_record_module.ctx_index]; + rn = ngx_rtmp_record_find(racf, &rec); if (rn == NGX_CONF_UNSET_UINT) { - ngx_str_set(&msg, "Recorder not found"); + msg = "Recorder not found"; goto error; } + /* call the method */ ngx_memzero(&path, sizeof(path)); if (method->len == sizeof("start") - 1 && @@ -211,12 +290,12 @@ ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method) rc = ngx_rtmp_record_close(s, rn, &path); } else { - ngx_str_set(&msg, "Undefined method"); + msg = "Undefined method"; goto error; } if (rc == NGX_ERROR) { - ngx_str_set(&msg, "Recorder error"); + msg = "Recorder error"; goto error; } @@ -245,25 +324,105 @@ ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method) return ngx_http_output_filter(r, &cl); error: - r->headers_out.status = NGX_HTTP_BAD_REQUEST; - r->headers_out.content_length_n = msg.len; + return ngx_rtmp_control_output_error(r, msg); +} + + +static ngx_int_t +ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method) +{ + ngx_rtmp_control_core_t core; + ngx_rtmp_control_live_t live; + ngx_rtmp_live_ctx_t *lctx; + ngx_str_t addr, *paddr; + const char *msg; + ngx_uint_t ndropped; + size_t len; + u_char *p; + ngx_buf_t *b; + ngx_chain_t cl; + + msg = ngx_rtmp_control_parse_core(r, &core); + if (msg != NGX_CONF_OK) { + goto error; + } + + msg = ngx_rtmp_control_parse_live(r, &core, &live); + if (msg != NGX_CONF_OK) { + goto error; + } + + ndropped = 0; + + if (method->len == sizeof("publisher") - 1 && + ngx_strncmp(method->data, "publisher", method->len) == 0) + { + for (lctx = live.ls->ctx; lctx; lctx = lctx->next) { + if (lctx->flags & NGX_RTMP_LIVE_PUBLISHING) { + ngx_rtmp_finalize_session(lctx->session); + ++ndropped; + break; + } + } + + } else if (method->len == sizeof("client") - 1 && + ngx_strncmp(method->data, "client", method->len) == 0) + { + ngx_memzero(&addr, sizeof(addr)); + ngx_http_arg(r, (u_char *) "addr", sizeof("addr") - 1, &addr); + + for (lctx = live.ls->ctx; lctx; lctx = lctx->next) { + if (addr.len && lctx->session && lctx->session->connection) { + paddr = &lctx->session->connection->addr_text; + if (paddr->len != addr.len || + ngx_strncmp(paddr->data, addr.data, addr.len)) + { + continue; + } + } + + ngx_rtmp_finalize_session(lctx->session); + ++ndropped; + } + + } else { + msg = "Undefined method"; + goto error; + } + + /* output ndropped */ + + len = NGX_OFF_T_LEN; + + p = ngx_palloc(r->connection->pool, len); + if (p == NULL) { + return NGX_ERROR; + } + + len = (size_t) (ngx_snprintf(p, len, "%ui", ndropped) - p); + + r->headers_out.status = NGX_HTTP_OK; + r->headers_out.content_length_n = len; b = ngx_calloc_buf(r->pool); if (b == NULL) { return NGX_ERROR; } + b->start = b->pos = p; + b->end = b->last = p + len; + b->temporary = 1; + b->last_buf = 1; + ngx_memzero(&cl, sizeof(cl)); cl.buf = b; - b->start = b->pos = msg.data; - b->end = b->last = msg.data + msg.len; - b->memory = 1; - b->last_buf = 1; - ngx_http_send_header(r); return ngx_http_output_filter(r, &cl); + +error: + return ngx_rtmp_control_output_error(r, msg); } @@ -315,6 +474,7 @@ ngx_rtmp_control_handler(ngx_http_request_t *r) } NGX_RTMP_CONTROL_SECTION(RECORD, record); + NGX_RTMP_CONTROL_SECTION(DROP, drop); #undef NGX_RTMP_CONTROL_SECTION