mirror of
https://github.com/arut/nginx-rtmp-module.git
synced 2025-08-06 15:00:18 +08:00
Merge branch 'smart-drop'
This commit is contained in:
@ -36,6 +36,20 @@ typedef struct {
|
|||||||
#define NGX_RTMP_CONTROL_DROP 0x02
|
#define NGX_RTMP_CONTROL_DROP 0x02
|
||||||
|
|
||||||
|
|
||||||
|
enum {
|
||||||
|
NGX_RTMP_CONTROL_DROP_PUBLISHER,
|
||||||
|
NGX_RTMP_CONTROL_DROP_SUBSCRIBER,
|
||||||
|
NGX_RTMP_CONTROL_DROP_CLIENT,
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
ngx_uint_t method;
|
||||||
|
ngx_str_t addr;
|
||||||
|
ngx_uint_t ndropped;
|
||||||
|
} ngx_rtmp_control_drop_t;
|
||||||
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
ngx_uint_t control;
|
ngx_uint_t control;
|
||||||
} ngx_rtmp_control_loc_conf_t;
|
} ngx_rtmp_control_loc_conf_t;
|
||||||
@ -194,6 +208,10 @@ ngx_rtmp_control_parse_live(ngx_http_request_t *r,
|
|||||||
ngx_memzero(&name, sizeof(name));
|
ngx_memzero(&name, sizeof(name));
|
||||||
ngx_http_arg(r, (u_char *) "name", sizeof("name") - 1, &name);
|
ngx_http_arg(r, (u_char *) "name", sizeof("name") - 1, &name);
|
||||||
|
|
||||||
|
if (name.len == 0) {
|
||||||
|
return NGX_CONF_OK;
|
||||||
|
}
|
||||||
|
|
||||||
live->lacf = core->cacf->app_conf[ngx_rtmp_live_module.ctx_index];
|
live->lacf = core->cacf->app_conf[ngx_rtmp_live_module.ctx_index];
|
||||||
|
|
||||||
/* find live stream by name */
|
/* find live stream by name */
|
||||||
@ -246,6 +264,7 @@ ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method)
|
|||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ngx_memzero(&live, sizeof(live));
|
||||||
msg = ngx_rtmp_control_parse_live(r, &core, &live);
|
msg = ngx_rtmp_control_parse_live(r, &core, &live);
|
||||||
if (msg != NGX_CONF_OK) {
|
if (msg != NGX_CONF_OK) {
|
||||||
goto error;
|
goto error;
|
||||||
@ -329,68 +348,221 @@ error:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static const char *
|
||||||
|
ngx_rtmp_control_drop_session(ngx_http_request_t *r,
|
||||||
|
ngx_rtmp_control_drop_t *drop,
|
||||||
|
ngx_rtmp_live_ctx_t *lctx)
|
||||||
|
{
|
||||||
|
ngx_rtmp_session_t *s;
|
||||||
|
ngx_str_t *paddr;
|
||||||
|
|
||||||
|
s = lctx->session;
|
||||||
|
|
||||||
|
if (s == NULL || s->connection == NULL)
|
||||||
|
{
|
||||||
|
return NGX_CONF_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (drop->addr.len) {
|
||||||
|
paddr = &s->connection->addr_text;
|
||||||
|
if (paddr->len != drop->addr.len ||
|
||||||
|
ngx_strncmp(paddr->data, drop->addr.data, drop->addr.len))
|
||||||
|
{
|
||||||
|
return NGX_CONF_OK;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (drop->method) {
|
||||||
|
case NGX_RTMP_CONTROL_DROP_PUBLISHER:
|
||||||
|
if (!lctx->publishing) {
|
||||||
|
return NGX_CONF_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
case NGX_RTMP_CONTROL_DROP_SUBSCRIBER:
|
||||||
|
if (lctx->publishing) {
|
||||||
|
return NGX_CONF_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
case NGX_RTMP_CONTROL_DROP_CLIENT:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
ngx_rtmp_finalize_session(s);
|
||||||
|
++drop->ndropped;
|
||||||
|
|
||||||
|
return NGX_CONF_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static const char *
|
||||||
|
ngx_rtmp_control_drop_stream(ngx_http_request_t *r,
|
||||||
|
ngx_rtmp_control_drop_t *drop,
|
||||||
|
ngx_rtmp_live_stream_t *ls)
|
||||||
|
{
|
||||||
|
ngx_rtmp_live_ctx_t *lctx;
|
||||||
|
const char *s;
|
||||||
|
|
||||||
|
for (lctx = ls->ctx; lctx; lctx = lctx->next) {
|
||||||
|
s = ngx_rtmp_control_drop_session(r, drop, lctx);
|
||||||
|
if (s != NGX_CONF_OK) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NGX_CONF_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static const char *
|
||||||
|
ngx_rtmp_control_drop_app(ngx_http_request_t *r,
|
||||||
|
ngx_rtmp_control_drop_t *drop,
|
||||||
|
ngx_rtmp_core_app_conf_t *cacf)
|
||||||
|
{
|
||||||
|
ngx_rtmp_live_app_conf_t *lacf;
|
||||||
|
ngx_rtmp_live_stream_t *ls;
|
||||||
|
ngx_str_t name;
|
||||||
|
const char *s;
|
||||||
|
size_t len;
|
||||||
|
ngx_uint_t n;
|
||||||
|
|
||||||
|
ngx_memzero(&name, sizeof(name));
|
||||||
|
ngx_http_arg(r, (u_char *) "name", sizeof("name") - 1, &name);
|
||||||
|
|
||||||
|
lacf = cacf->app_conf[ngx_rtmp_live_module.ctx_index];
|
||||||
|
|
||||||
|
if (name.len == 0) {
|
||||||
|
for (n = 0; n < (ngx_uint_t) lacf->nbuckets; ++n) {
|
||||||
|
for (ls = lacf->streams[n]; ls; ls = ls->next)
|
||||||
|
{
|
||||||
|
s = ngx_rtmp_control_drop_stream(r, drop, ls);
|
||||||
|
if (s != NGX_CONF_OK) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NGX_CONF_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (ls = lacf->streams[ngx_hash_key(name.data, name.len) % lacf->nbuckets];
|
||||||
|
ls; ls = ls->next)
|
||||||
|
{
|
||||||
|
len = ngx_strlen(ls->name);
|
||||||
|
if (name.len != len || ngx_strncmp(name.data, ls->name, name.len)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
s = ngx_rtmp_control_drop_stream(r, drop, ls);
|
||||||
|
if (s != NGX_CONF_OK) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NGX_CONF_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static const char *
|
||||||
|
ngx_rtmp_control_drop_srv(ngx_http_request_t *r,
|
||||||
|
ngx_rtmp_control_drop_t *drop,
|
||||||
|
ngx_rtmp_core_srv_conf_t *cscf)
|
||||||
|
{
|
||||||
|
ngx_rtmp_core_app_conf_t **pcacf;
|
||||||
|
ngx_str_t app;
|
||||||
|
ngx_uint_t n;
|
||||||
|
const char *s;
|
||||||
|
|
||||||
|
ngx_memzero(&app, sizeof(app));
|
||||||
|
ngx_http_arg(r, (u_char *) "app", sizeof("app") - 1, &app);
|
||||||
|
|
||||||
|
pcacf = cscf->applications.elts;
|
||||||
|
|
||||||
|
for (n = 0; n < cscf->applications.nelts; ++n, ++pcacf) {
|
||||||
|
if (app.len && ((*pcacf)->name.len != app.len ||
|
||||||
|
ngx_strncmp((*pcacf)->name.data, app.data, app.len)))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
s = ngx_rtmp_control_drop_app(r, drop, *pcacf);
|
||||||
|
if (s != NGX_CONF_OK) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NGX_CONF_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static const char *
|
||||||
|
ngx_rtmp_control_drop_main(ngx_http_request_t *r,
|
||||||
|
ngx_rtmp_control_drop_t *drop,
|
||||||
|
ngx_rtmp_core_main_conf_t *cmcf)
|
||||||
|
{
|
||||||
|
ngx_rtmp_core_srv_conf_t **pcscf;
|
||||||
|
ngx_str_t srv;
|
||||||
|
ngx_uint_t sn;
|
||||||
|
|
||||||
|
sn = 0;
|
||||||
|
if (ngx_http_arg(r, (u_char *) "srv", sizeof("srv") - 1, &srv) == NGX_OK) {
|
||||||
|
sn = ngx_atoi(srv.data, srv.len);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sn >= cmcf->servers.nelts) {
|
||||||
|
return "Server index out of range";
|
||||||
|
}
|
||||||
|
|
||||||
|
pcscf = cmcf->servers.elts;
|
||||||
|
pcscf += sn;
|
||||||
|
|
||||||
|
return ngx_rtmp_control_drop_srv(r, drop, *pcscf);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static ngx_int_t
|
static ngx_int_t
|
||||||
ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method)
|
ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method)
|
||||||
{
|
{
|
||||||
ngx_rtmp_control_core_t core;
|
ngx_rtmp_control_drop_t drop;
|
||||||
ngx_rtmp_control_live_t live;
|
|
||||||
ngx_rtmp_live_ctx_t *lctx;
|
|
||||||
ngx_str_t addr, *paddr;
|
|
||||||
const char *msg;
|
|
||||||
ngx_uint_t ndropped;
|
|
||||||
size_t len;
|
size_t len;
|
||||||
u_char *p;
|
u_char *p;
|
||||||
ngx_buf_t *b;
|
ngx_buf_t *b;
|
||||||
ngx_chain_t cl;
|
ngx_chain_t cl;
|
||||||
|
const char *msg;
|
||||||
|
|
||||||
msg = ngx_rtmp_control_parse_core(r, &core);
|
if (ngx_rtmp_core_main_conf == NULL) {
|
||||||
if (msg != NGX_CONF_OK) {
|
msg = "Empty main conf";
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
msg = ngx_rtmp_control_parse_live(r, &core, &live);
|
ngx_memzero(&drop, sizeof(drop));
|
||||||
if (msg != NGX_CONF_OK) {
|
|
||||||
goto error;
|
|
||||||
}
|
|
||||||
|
|
||||||
ndropped = 0;
|
|
||||||
|
|
||||||
if (method->len == sizeof("publisher") - 1 &&
|
if (method->len == sizeof("publisher") - 1 &&
|
||||||
ngx_strncmp(method->data, "publisher", method->len) == 0)
|
ngx_memcmp(method->data, "publisher", method->len) == 0)
|
||||||
{
|
{
|
||||||
for (lctx = live.ls->ctx; lctx; lctx = lctx->next) {
|
drop.method = NGX_RTMP_CONTROL_DROP_PUBLISHER;
|
||||||
if (lctx->publishing) {
|
|
||||||
ngx_rtmp_finalize_session(lctx->session);
|
} else if (method->len == sizeof("subscriber") - 1 &&
|
||||||
++ndropped;
|
ngx_memcmp(method->data, "subscriber", method->len) == 0)
|
||||||
break;
|
{
|
||||||
}
|
drop.method = NGX_RTMP_CONTROL_DROP_SUBSCRIBER;
|
||||||
}
|
|
||||||
|
|
||||||
} else if (method->len == sizeof("client") - 1 &&
|
} else if (method->len == sizeof("client") - 1 &&
|
||||||
ngx_strncmp(method->data, "client", method->len) == 0)
|
ngx_memcmp(method->data, "client", method->len) == 0)
|
||||||
{
|
{
|
||||||
ngx_memzero(&addr, sizeof(addr));
|
drop.method = NGX_RTMP_CONTROL_DROP_CLIENT;
|
||||||
ngx_http_arg(r, (u_char *) "addr", sizeof("addr") - 1, &addr);
|
|
||||||
|
|
||||||
for (lctx = live.ls->ctx; lctx; lctx = lctx->next) {
|
|
||||||
if (addr.len && lctx->session && lctx->session->connection) {
|
|
||||||
paddr = &lctx->session->connection->addr_text;
|
|
||||||
if (paddr->len != addr.len ||
|
|
||||||
ngx_strncmp(paddr->data, addr.data, addr.len))
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ngx_rtmp_finalize_session(lctx->session);
|
|
||||||
++ndropped;
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
msg = "Undefined method";
|
msg = "Undefined method";
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ngx_http_arg(r, (u_char *) "addr", sizeof("addr") - 1, &drop.addr);
|
||||||
|
|
||||||
|
msg = ngx_rtmp_control_drop_main(r, &drop, ngx_rtmp_core_main_conf);
|
||||||
|
if (msg != NGX_CONF_OK) {
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
|
||||||
/* output ndropped */
|
/* output ndropped */
|
||||||
|
|
||||||
len = NGX_INT_T_LEN;
|
len = NGX_INT_T_LEN;
|
||||||
@ -400,7 +572,7 @@ ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method)
|
|||||||
return NGX_ERROR;
|
return NGX_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
len = (size_t) (ngx_snprintf(p, len, "%ui", ndropped) - p);
|
len = (size_t) (ngx_snprintf(p, len, "%ui", drop.ndropped) - p);
|
||||||
|
|
||||||
r->headers_out.status = NGX_HTTP_OK;
|
r->headers_out.status = NGX_HTTP_OK;
|
||||||
r->headers_out.content_length_n = len;
|
r->headers_out.content_length_n = len;
|
||||||
|
Reference in New Issue
Block a user