implemented exec_pull feature; the old exec is now exec_push

This commit is contained in:
Roman Arutyunyan
2013-10-08 02:04:49 +04:00
parent 737ea5ce34
commit 97d144ae28
4 changed files with 449 additions and 137 deletions

View File

@ -136,10 +136,11 @@ ngx_module_t ngx_rtmp_enotify_module = {
static void static void
ngx_rtmp_enotify_eval_astr(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e, ngx_rtmp_enotify_eval_astr(void *sctx, ngx_rtmp_eval_t *e, ngx_str_t *ret)
ngx_str_t *ret)
{ {
ngx_rtmp_enotify_ctx_t *ctx; ngx_rtmp_session_t *s = sctx;
ngx_rtmp_enotify_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_enotify_module); ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_enotify_module);
if (ctx == NULL) { if (ctx == NULL) {
@ -153,10 +154,11 @@ ngx_rtmp_enotify_eval_astr(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e,
static void static void
ngx_rtmp_enotify_eval_str(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e, ngx_rtmp_enotify_eval_str(void *sctx, ngx_rtmp_eval_t *e, ngx_str_t *ret)
ngx_str_t *ret)
{ {
ngx_rtmp_enotify_ctx_t *ctx; ngx_rtmp_session_t *s = sctx;
ngx_rtmp_enotify_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_enotify_module); ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_enotify_module);
if (ctx == NULL) { if (ctx == NULL) {
@ -283,7 +285,8 @@ ngx_rtmp_enotify_exec(ngx_rtmp_session_t *s, ngx_rtmp_enotify_conf_t *ec)
for (n = 0; n < ec->args.nelts; ++n, ++arg_in) { for (n = 0; n < ec->args.nelts; ++n, ++arg_in) {
ngx_rtmp_eval(s, arg_in, ngx_rtmp_enotify_eval_p, &a); ngx_rtmp_eval(s, arg_in, ngx_rtmp_enotify_eval_p, &a,
s->connection->log);
if (ngx_rtmp_eval_streams(&a) != NGX_DONE) { if (ngx_rtmp_eval_streams(&a) != NGX_DONE) {
continue; continue;

View File

@ -12,17 +12,17 @@
static void static void
ngx_rtmp_eval_session_str(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e, ngx_rtmp_eval_session_str(void *ctx, ngx_rtmp_eval_t *e, ngx_str_t *ret)
ngx_str_t *ret)
{ {
*ret = *(ngx_str_t *) ((u_char *) s + e->offset); *ret = *(ngx_str_t *) ((u_char *) ctx + e->offset);
} }
static void static void
ngx_rtmp_eval_connection_str(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e, ngx_rtmp_eval_connection_str(void *ctx, ngx_rtmp_eval_t *e, ngx_str_t *ret)
ngx_str_t *ret)
{ {
ngx_rtmp_session_t *s = ctx;
*ret = *(ngx_str_t *) ((u_char *) s->connection + e->offset); *ret = *(ngx_str_t *) ((u_char *) s->connection + e->offset);
} }
@ -58,15 +58,14 @@ ngx_rtmp_eval_t ngx_rtmp_eval_session[] = {
static void static void
ngx_rtmp_eval_append(ngx_rtmp_session_t *s, ngx_buf_t *b, ngx_rtmp_eval_append(ngx_buf_t *b, void *data, size_t len, ngx_log_t *log)
void *data, size_t len)
{ {
size_t buf_len; size_t buf_len;
if (b->last + len > b->end) { if (b->last + len > b->end) {
buf_len = 2 * (b->last - b->pos) + len; buf_len = 2 * (b->last - b->pos) + len;
b->start = ngx_palloc(s->connection->pool, buf_len); b->start = ngx_alloc(buf_len, log);
if (b->start == NULL) { if (b->start == NULL) {
return; return;
} }
@ -81,8 +80,8 @@ ngx_rtmp_eval_append(ngx_rtmp_session_t *s, ngx_buf_t *b,
static void static void
ngx_rtmp_eval_append_var(ngx_rtmp_session_t *s, ngx_buf_t *b, ngx_rtmp_eval_append_var(void *ctx, ngx_buf_t *b, ngx_rtmp_eval_t **e,
ngx_rtmp_eval_t **e, ngx_str_t *name) ngx_str_t *name, ngx_log_t *log)
{ {
ngx_uint_t k; ngx_uint_t k;
ngx_str_t v; ngx_str_t v;
@ -91,10 +90,10 @@ ngx_rtmp_eval_append_var(ngx_rtmp_session_t *s, ngx_buf_t *b,
for (; *e; ++e) { for (; *e; ++e) {
for (k = 0, ee = *e; ee->handler; ++k, ++ee) { for (k = 0, ee = *e; ee->handler; ++k, ++ee) {
if (ee->name.len == name->len && if (ee->name.len == name->len &&
ngx_memcmp(ee->name.data, name->data, name->len) == 0) ngx_memcmp(ee->name.data, name->data, name->len) == 0)
{ {
ee->handler(s, ee, &v); ee->handler(ctx, ee, &v);
ngx_rtmp_eval_append(s, b, v.data, v.len); ngx_rtmp_eval_append(b, v.data, v.len, log);
} }
} }
} }
@ -102,8 +101,8 @@ ngx_rtmp_eval_append_var(ngx_rtmp_session_t *s, ngx_buf_t *b,
ngx_int_t ngx_int_t
ngx_rtmp_eval(ngx_rtmp_session_t *s, ngx_str_t *in, ngx_rtmp_eval_t **e, ngx_rtmp_eval(void *ctx, ngx_str_t *in, ngx_rtmp_eval_t **e, ngx_str_t *out,
ngx_str_t *out) ngx_log_t *log)
{ {
u_char c, *p; u_char c, *p;
ngx_str_t name; ngx_str_t name;
@ -117,8 +116,7 @@ ngx_rtmp_eval(ngx_rtmp_session_t *s, ngx_str_t *in, ngx_rtmp_eval_t **e,
SNAME SNAME
} state = NORMAL; } state = NORMAL;
b.pos = b.last = b.start = ngx_palloc(s->connection->pool, b.pos = b.last = b.start = ngx_alloc(NGX_RTMP_EVAL_BUFLEN, log);
NGX_RTMP_EVAL_BUFLEN);
if (b.pos == NULL) { if (b.pos == NULL) {
return NGX_ERROR; return NGX_ERROR;
} }
@ -137,7 +135,7 @@ ngx_rtmp_eval(ngx_rtmp_session_t *s, ngx_str_t *in, ngx_rtmp_eval_t **e,
} }
name.len = p - name.data; name.len = p - name.data;
ngx_rtmp_eval_append_var(s, &b, e, &name); ngx_rtmp_eval_append_var(ctx, &b, e, &name, log);
state = NORMAL; state = NORMAL;
@ -154,7 +152,7 @@ ngx_rtmp_eval(ngx_rtmp_session_t *s, ngx_str_t *in, ngx_rtmp_eval_t **e,
} }
name.len = p - name.data; name.len = p - name.data;
ngx_rtmp_eval_append_var(s, &b, e, &name); ngx_rtmp_eval_append_var(ctx, &b, e, &name, log);
case NORMAL: case NORMAL:
switch (c) { switch (c) {
@ -168,7 +166,7 @@ ngx_rtmp_eval(ngx_rtmp_session_t *s, ngx_str_t *in, ngx_rtmp_eval_t **e,
} }
case ESCAPE: case ESCAPE:
ngx_rtmp_eval_append(s, &b, &c, 1); ngx_rtmp_eval_append(&b, &c, 1, log);
state = NORMAL; state = NORMAL;
break; break;
@ -178,11 +176,11 @@ ngx_rtmp_eval(ngx_rtmp_session_t *s, ngx_str_t *in, ngx_rtmp_eval_t **e,
if (state == NAME) { if (state == NAME) {
p = &in->data[n]; p = &in->data[n];
name.len = p - name.data; name.len = p - name.data;
ngx_rtmp_eval_append_var(s, &b, e, &name); ngx_rtmp_eval_append_var(ctx, &b, e, &name, log);
} }
c = 0; c = 0;
ngx_rtmp_eval_append(s, &b, &c, 1); ngx_rtmp_eval_append(&b, &c, 1, log);
out->data = b.pos; out->data = b.pos;
out->len = b.last - b.pos - 1; out->len = b.last - b.pos - 1;

View File

@ -15,7 +15,7 @@
typedef struct ngx_rtmp_eval_s ngx_rtmp_eval_t; typedef struct ngx_rtmp_eval_s ngx_rtmp_eval_t;
typedef void (* ngx_rtmp_eval_pt)(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e, typedef void (* ngx_rtmp_eval_pt)(void *ctx, ngx_rtmp_eval_t *e,
ngx_str_t *ret); ngx_str_t *ret);
@ -33,8 +33,8 @@ struct ngx_rtmp_eval_s {
extern ngx_rtmp_eval_t ngx_rtmp_eval_session[]; extern ngx_rtmp_eval_t ngx_rtmp_eval_session[];
ngx_int_t ngx_rtmp_eval(ngx_rtmp_session_t *s, ngx_str_t *in, ngx_int_t ngx_rtmp_eval(void *ctx, ngx_str_t *in, ngx_rtmp_eval_t **e,
ngx_rtmp_eval_t **e, ngx_str_t *out); ngx_str_t *out, ngx_log_t *log);
ngx_int_t ngx_rtmp_eval_streams(ngx_str_t *in); ngx_int_t ngx_rtmp_eval_streams(ngx_str_t *in);

View File

@ -16,6 +16,7 @@
#if !(NGX_WIN32) #if !(NGX_WIN32)
static ngx_rtmp_publish_pt next_publish; static ngx_rtmp_publish_pt next_publish;
static ngx_rtmp_play_pt next_play;
static ngx_rtmp_close_stream_pt next_close_stream; static ngx_rtmp_close_stream_pt next_close_stream;
#endif #endif
@ -27,7 +28,7 @@ static char * ngx_rtmp_exec_init_main_conf(ngx_conf_t *cf, void *conf);
static void * ngx_rtmp_exec_create_app_conf(ngx_conf_t *cf); static void * ngx_rtmp_exec_create_app_conf(ngx_conf_t *cf);
static char * ngx_rtmp_exec_merge_app_conf(ngx_conf_t *cf, static char * ngx_rtmp_exec_merge_app_conf(ngx_conf_t *cf,
void *parent, void *child); void *parent, void *child);
static char * ngx_rtmp_exec_exec(ngx_conf_t *cf, ngx_command_t *cmd, static char * ngx_rtmp_exec_conf(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf); void *conf);
static char * ngx_rtmp_exec_exec_static(ngx_conf_t *cf, ngx_command_t *cmd, static char * ngx_rtmp_exec_exec_static(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf); void *conf);
@ -48,7 +49,8 @@ typedef struct {
typedef struct { typedef struct {
ngx_rtmp_exec_conf_t *conf; ngx_rtmp_exec_conf_t *conf;
ngx_log_t *log; ngx_log_t *log;
ngx_rtmp_session_t *session; /* NULL for init execs */ ngx_rtmp_eval_t **eval;
void *eval_ctx;
unsigned active:1; unsigned active:1;
ngx_pid_t pid; ngx_pid_t pid;
ngx_pid_t *save_pid; ngx_pid_t *save_pid;
@ -62,23 +64,40 @@ typedef struct {
typedef struct { typedef struct {
ngx_array_t confs; /* ngx_rtmp_exec_conf_t */ ngx_array_t static_confs; /* ngx_rtmp_exec_conf_t */
ngx_array_t execs; /* ngx_rtmp_exec_t */ ngx_array_t static_execs; /* ngx_rtmp_exec_t */
ngx_msec_t respawn_timeout; ngx_msec_t respawn_timeout;
ngx_int_t kill_signal; ngx_int_t kill_signal;
ngx_log_t *log;
} ngx_rtmp_exec_main_conf_t; } ngx_rtmp_exec_main_conf_t;
typedef struct ngx_rtmp_exec_pull_ctx_s ngx_rtmp_exec_pull_ctx_t;
struct ngx_rtmp_exec_pull_ctx_s {
ngx_pool_t *pool;
ngx_uint_t counter;
ngx_str_t name;
ngx_str_t app;
ngx_array_t pull_execs; /* ngx_rtmp_exec_t */
ngx_rtmp_exec_pull_ctx_t *next;
};
typedef struct { typedef struct {
ngx_array_t confs; /* ngx_rtmp_exec_conf_t */ ngx_array_t push_confs; /* ngx_rtmp_exec_conf_t */
ngx_array_t pull_confs; /* ngx_rtmp_exec_conf_t */
ngx_flag_t respawn; ngx_flag_t respawn;
ngx_uint_t nbuckets;
ngx_rtmp_exec_pull_ctx_t **pull;
} ngx_rtmp_exec_app_conf_t; } ngx_rtmp_exec_app_conf_t;
typedef struct { typedef struct {
u_char name[NGX_RTMP_MAX_NAME]; u_char name[NGX_RTMP_MAX_NAME];
u_char args[NGX_RTMP_MAX_ARGS]; u_char args[NGX_RTMP_MAX_ARGS];
ngx_array_t execs; ngx_array_t push_execs; /* ngx_rtmp_exec_t */
ngx_rtmp_exec_pull_ctx_t *pull;
} ngx_rtmp_exec_ctx_t; } ngx_rtmp_exec_ctx_t;
@ -93,9 +112,23 @@ static ngx_command_t ngx_rtmp_exec_commands[] = {
{ ngx_string("exec"), { ngx_string("exec"),
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_1MORE, NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_1MORE,
ngx_rtmp_exec_exec, ngx_rtmp_exec_conf,
NGX_RTMP_APP_CONF_OFFSET, NGX_RTMP_APP_CONF_OFFSET,
0, offsetof(ngx_rtmp_exec_app_conf_t, push_confs),
NULL },
{ ngx_string("exec_push"),
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_1MORE,
ngx_rtmp_exec_conf,
NGX_RTMP_APP_CONF_OFFSET,
offsetof(ngx_rtmp_exec_app_conf_t, push_confs),
NULL },
{ ngx_string("exec_pull"),
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_1MORE,
ngx_rtmp_exec_conf,
NGX_RTMP_APP_CONF_OFFSET,
offsetof(ngx_rtmp_exec_app_conf_t, pull_confs),
NULL }, NULL },
{ ngx_string("exec_static"), { ngx_string("exec_static"),
@ -159,10 +192,11 @@ ngx_module_t ngx_rtmp_exec_module = {
static void static void
ngx_rtmp_exec_eval_astr(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e, ngx_rtmp_exec_eval_ctx_str(void *sctx, ngx_rtmp_eval_t *e, ngx_str_t *ret)
ngx_str_t *ret)
{ {
ngx_rtmp_exec_ctx_t *ctx; ngx_rtmp_session_t *s = sctx;
ngx_rtmp_exec_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module);
if (ctx == NULL) { if (ctx == NULL) {
@ -175,23 +209,50 @@ ngx_rtmp_exec_eval_astr(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e,
} }
static ngx_rtmp_eval_t ngx_rtmp_exec_eval[] = { static void
ngx_rtmp_exec_eval_pctx_str(void *ctx, ngx_rtmp_eval_t *e, ngx_str_t *ret)
{
*ret = *(ngx_str_t *) ((u_char *) ctx + e->offset);
}
static ngx_rtmp_eval_t ngx_rtmp_exec_push_specific_eval[] = {
{ ngx_string("name"), { ngx_string("name"),
ngx_rtmp_exec_eval_astr, ngx_rtmp_exec_eval_ctx_str,
offsetof(ngx_rtmp_exec_ctx_t, name) }, offsetof(ngx_rtmp_exec_ctx_t, name) },
{ ngx_string("args"), { ngx_string("args"),
ngx_rtmp_exec_eval_astr, ngx_rtmp_exec_eval_ctx_str,
offsetof(ngx_rtmp_exec_ctx_t, args) }, offsetof(ngx_rtmp_exec_ctx_t, args) },
ngx_rtmp_null_eval ngx_rtmp_null_eval
}; };
static ngx_rtmp_eval_t * ngx_rtmp_exec_eval_p[] = { static ngx_rtmp_eval_t * ngx_rtmp_exec_push_eval[] = {
ngx_rtmp_eval_session, ngx_rtmp_eval_session,
ngx_rtmp_exec_eval, ngx_rtmp_exec_push_specific_eval,
NULL
};
static ngx_rtmp_eval_t ngx_rtmp_exec_pull_specific_eval[] = {
{ ngx_string("name"),
ngx_rtmp_exec_eval_pctx_str,
offsetof(ngx_rtmp_exec_pull_ctx_t, name) },
{ ngx_string("app"),
ngx_rtmp_exec_eval_pctx_str,
offsetof(ngx_rtmp_exec_pull_ctx_t, app) },
ngx_rtmp_null_eval
};
static ngx_rtmp_eval_t * ngx_rtmp_exec_pull_eval[] = {
ngx_rtmp_exec_pull_specific_eval,
NULL NULL
}; };
@ -209,7 +270,7 @@ ngx_rtmp_exec_create_main_conf(ngx_conf_t *cf)
emcf->respawn_timeout = NGX_CONF_UNSET_MSEC; emcf->respawn_timeout = NGX_CONF_UNSET_MSEC;
emcf->kill_signal = NGX_CONF_UNSET; emcf->kill_signal = NGX_CONF_UNSET;
if (ngx_array_init(&emcf->confs, cf->pool, 1, if (ngx_array_init(&emcf->static_confs, cf->pool, 1,
sizeof(ngx_rtmp_exec_conf_t)) != NGX_OK) sizeof(ngx_rtmp_exec_conf_t)) != NGX_OK)
{ {
return NULL; return NULL;
@ -237,23 +298,26 @@ ngx_rtmp_exec_init_main_conf(ngx_conf_t *cf, void *conf)
} }
#endif #endif
if (ngx_array_init(&emcf->execs, cf->pool, emcf->confs.nelts, if (ngx_array_init(&emcf->static_execs, cf->pool,
emcf->static_confs.nelts,
sizeof(ngx_rtmp_exec_t)) != NGX_OK) sizeof(ngx_rtmp_exec_t)) != NGX_OK)
{ {
return NGX_CONF_ERROR; return NGX_CONF_ERROR;
} }
e = ngx_array_push_n(&emcf->execs, emcf->confs.nelts); e = ngx_array_push_n(&emcf->static_execs, emcf->static_confs.nelts);
if (e == NULL) { if (e == NULL) {
return NGX_CONF_ERROR; return NGX_CONF_ERROR;
} }
ec = emcf->confs.elts; emcf->log = &cf->cycle->new_log;
for (n = 0; n < emcf->confs.nelts; ++n, ++e, ++ec) { ec = emcf->static_confs.elts;
for (n = 0; n < emcf->static_confs.nelts; ++n, ++e, ++ec) {
ngx_memzero(e, sizeof(*e)); ngx_memzero(e, sizeof(*e));
e->conf = ec; e->conf = ec;
e->log = &cf->cycle->new_log; e->log = emcf->log;
e->respawn_timeout = emcf->respawn_timeout; e->respawn_timeout = emcf->respawn_timeout;
e->kill_signal = emcf->kill_signal; e->kill_signal = emcf->kill_signal;
} }
@ -273,8 +337,15 @@ ngx_rtmp_exec_create_app_conf(ngx_conf_t *cf)
} }
eacf->respawn = NGX_CONF_UNSET; eacf->respawn = NGX_CONF_UNSET;
eacf->nbuckets = NGX_CONF_UNSET_UINT;
if (ngx_array_init(&eacf->confs, cf->pool, 1, if (ngx_array_init(&eacf->push_confs, cf->pool, 1,
sizeof(ngx_rtmp_exec_conf_t)) != NGX_OK)
{
return NULL;
}
if (ngx_array_init(&eacf->pull_confs, cf->pool, 1,
sizeof(ngx_rtmp_exec_conf_t)) != NGX_OK) sizeof(ngx_rtmp_exec_conf_t)) != NGX_OK)
{ {
return NULL; return NULL;
@ -284,27 +355,56 @@ ngx_rtmp_exec_create_app_conf(ngx_conf_t *cf)
} }
static ngx_int_t
ngx_rtmp_exec_merge_confs(ngx_array_t *conf, ngx_array_t *prev)
{
size_t n;
ngx_rtmp_exec_conf_t *ec, *pec;
if (prev->nelts == 0) {
return NGX_OK;
}
ec = ngx_array_push_n(conf, prev->nelts);
if (ec == NULL) {
return NGX_ERROR;
}
pec = prev->elts;
for (n = 0; n < prev->nelts; n++, ec++, pec++) {
*ec = *pec;
}
return NGX_OK;
}
static char * static char *
ngx_rtmp_exec_merge_app_conf(ngx_conf_t *cf, void *parent, void *child) ngx_rtmp_exec_merge_app_conf(ngx_conf_t *cf, void *parent, void *child)
{ {
ngx_rtmp_exec_app_conf_t *prev = parent; ngx_rtmp_exec_app_conf_t *prev = parent;
ngx_rtmp_exec_app_conf_t *conf = child; ngx_rtmp_exec_app_conf_t *conf = child;
size_t n;
ngx_rtmp_exec_conf_t *ec, *pec;
ngx_conf_merge_value(conf->respawn, prev->respawn, 1); ngx_conf_merge_value(conf->respawn, prev->respawn, 1);
ngx_conf_merge_uint_value(conf->nbuckets, prev->nbuckets, 1024);
if (prev->confs.nelts) { if (ngx_rtmp_exec_merge_confs(&conf->push_confs, &prev->push_confs)
ec = ngx_array_push_n(&conf->confs, prev->confs.nelts); != NGX_OK)
if (ec == NULL) { {
return NGX_CONF_ERROR; return NGX_CONF_ERROR;
}
pec = prev->confs.elts;
for (n = 0; n < prev->confs.nelts; ++n, ++ec, ++pec) {
*ec = *pec;
}
} }
if (ngx_rtmp_exec_merge_confs(&conf->pull_confs, &prev->pull_confs)
!= NGX_OK)
{
return NGX_CONF_ERROR;
}
conf->pull = ngx_pcalloc(cf->pool, sizeof(void *) * conf->nbuckets);
if (conf->pull == NULL) {
return NGX_CONF_ERROR;
}
return NGX_CONF_OK; return NGX_CONF_OK;
} }
@ -344,8 +444,8 @@ ngx_rtmp_exec_init_process(ngx_cycle_t *cycle)
* when nginx worker is terminated. * when nginx worker is terminated.
*/ */
e = emcf->execs.elts; e = emcf->static_execs.elts;
for (n = 0; n < emcf->execs.nelts; ++n, ++e) { for (n = 0; n < emcf->static_execs.nelts; ++n, ++e) {
e->respawn_evt.data = e; e->respawn_evt.data = e;
e->respawn_evt.log = e->log; e->respawn_evt.log = e->log;
e->respawn_evt.handler = ngx_rtmp_exec_respawn; e->respawn_evt.handler = ngx_rtmp_exec_respawn;
@ -527,10 +627,10 @@ ngx_rtmp_exec_run(ngx_rtmp_exec_t *e)
for (n = 0; n < ec->args.nelts; n++, ++arg_in) { for (n = 0; n < ec->args.nelts; n++, ++arg_in) {
if (e->session == NULL) { if (e->eval == NULL) {
a = *arg_in; a = *arg_in;
} else { } else {
ngx_rtmp_eval(e->session, arg_in, ngx_rtmp_exec_eval_p, &a); ngx_rtmp_eval(e->eval_ctx, arg_in, e->eval, &a, e->log);
} }
if (ngx_rtmp_eval_streams(&a) != NGX_DONE) { if (ngx_rtmp_eval_streams(&a) != NGX_DONE) {
@ -585,10 +685,11 @@ ngx_rtmp_exec_run(ngx_rtmp_exec_t *e)
static ngx_int_t static ngx_int_t
ngx_rtmp_exec_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v) ngx_rtmp_exec_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v)
{ {
ngx_rtmp_exec_app_conf_t *eacf; size_t n;
ngx_rtmp_exec_ctx_t *ctx; ngx_rtmp_exec_t *e;
ngx_rtmp_exec_t *e; ngx_rtmp_exec_ctx_t *ctx;
size_t n; ngx_rtmp_exec_pull_ctx_t *pctx, **ppctx;
ngx_rtmp_exec_app_conf_t *eacf;
eacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_exec_module); eacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_exec_module);
if (eacf == NULL) { if (eacf == NULL) {
@ -596,36 +697,229 @@ ngx_rtmp_exec_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v)
} }
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module);
if (ctx == NULL || ctx->execs.nelts == 0) { if (ctx == NULL) {
goto next; goto next;
} }
if (ctx->push_execs.nelts > 0) {
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"exec: delete %uz push command(s)",
ctx->push_execs.nelts);
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, e = ctx->push_execs.elts;
"exec: delete %uz command(s)", ctx->execs.nelts); for (n = 0; n < ctx->push_execs.nelts; n++, e++) {
ngx_rtmp_exec_kill(e, e->kill_signal);
e = ctx->execs.elts; }
for (n = 0; n < ctx->execs.nelts; ++n, ++e) {
ngx_rtmp_exec_kill(e, e->kill_signal);
} }
pctx = ctx->pull;
if (pctx && --pctx->counter == 0) {
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"exec: delete %uz pull command(s)",
pctx->pull_execs.nelts);
e = pctx->pull_execs.elts;
for (n = 0; n < pctx->pull_execs.nelts; n++, e++) {
ngx_rtmp_exec_kill(e, e->kill_signal);
}
ppctx = &eacf->pull[ngx_hash_key(pctx->name.data, pctx->name.len) &
eacf->nbuckets];
for (; *ppctx; ppctx = &(*ppctx)->next) {
if (pctx == *ppctx) {
*ppctx = pctx->next;
break;
}
}
ngx_destroy_pool(pctx->pool);
}
ctx->pull = NULL;
next: next:
return next_close_stream(s, v); return next_close_stream(s, v);
} }
static ngx_int_t
ngx_rtmp_exec_init_ctx(ngx_rtmp_session_t *s, u_char name[NGX_RTMP_MAX_NAME],
u_char args[NGX_RTMP_MAX_ARGS])
{
ngx_uint_t n;
ngx_rtmp_exec_t *e;
ngx_rtmp_exec_conf_t *ec;
ngx_rtmp_exec_ctx_t *ctx;
ngx_rtmp_exec_app_conf_t *eacf;
ngx_rtmp_exec_main_conf_t *emcf;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module);
if (ctx != NULL) {
goto done;
}
ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_exec_ctx_t));
if (ctx == NULL) {
return NGX_ERROR;
}
ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_exec_module);
eacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_exec_module);
emcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_exec_module);
if (ngx_array_init(&ctx->push_execs, s->connection->pool,
eacf->push_confs.nelts,
sizeof(ngx_rtmp_exec_t)) != NGX_OK)
{
return NGX_ERROR;
}
e = ngx_array_push_n(&ctx->push_execs, eacf->push_confs.nelts);
if (e == NULL) {
return NGX_ERROR;
}
ec = eacf->push_confs.elts;
for (n = 0; n < eacf->push_confs.nelts; n++, e++, ec++) {
ngx_memzero(e, sizeof(*e));
e->conf = ec;
e->log = s->connection->log;
e->eval = ngx_rtmp_exec_push_eval;
e->eval_ctx = s;
e->kill_signal = emcf->kill_signal;
e->respawn_timeout = (eacf->respawn ? emcf->respawn_timeout :
NGX_CONF_UNSET_MSEC);
}
done:
ngx_memcpy(ctx->name, name, NGX_RTMP_MAX_NAME);
ngx_memcpy(ctx->args, args, NGX_RTMP_MAX_ARGS);
return NGX_OK;
}
static ngx_int_t
ngx_rtmp_exec_init_pull_ctx(ngx_rtmp_session_t *s,
u_char name[NGX_RTMP_MAX_NAME])
{
size_t len;
ngx_uint_t n;
ngx_pool_t *pool;
ngx_rtmp_exec_t *e;
ngx_rtmp_exec_ctx_t *ctx;
ngx_rtmp_exec_conf_t *ec;
ngx_rtmp_exec_pull_ctx_t *pctx, **ppctx;
ngx_rtmp_exec_app_conf_t *eacf;
ngx_rtmp_exec_main_conf_t *emcf;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module);
if (ctx->pull != NULL) {
return NGX_OK;
}
eacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_exec_module);
emcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_exec_module);
len = ngx_strlen(name);
ppctx = &eacf->pull[ngx_hash_key(name, len) & eacf->nbuckets];
for (; *ppctx; ppctx = &(*ppctx)->next) {
pctx = *ppctx;
if (pctx->name.len == len &&
ngx_strncmp(name, pctx->name.data, len) == 0)
{
goto done;
}
}
pool = ngx_create_pool(4096, emcf->log);
if (pool == NULL) {
return NGX_ERROR;
}
pctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_exec_pull_ctx_t));
if (pctx == NULL) {
goto error;
}
pctx->pool = pool;
pctx->name.len = len;
pctx->name.data = ngx_palloc(pool, len);
if (pctx->name.data == NULL) {
goto error;
}
ngx_memcpy(pctx->name.data, name, len);
pctx->app.len = s->app.len;
pctx->app.data = ngx_palloc(pool, s->app.len);
if (pctx->app.data == NULL) {
goto error;
}
ngx_memcpy(pctx->app.data, s->app.data, s->app.len);
if (ngx_array_init(&pctx->pull_execs, pool, eacf->pull_confs.nelts,
sizeof(ngx_rtmp_exec_t)) != NGX_OK)
{
goto error;
}
e = ngx_array_push_n(&pctx->pull_execs, eacf->pull_confs.nelts);
if (e == NULL) {
goto error;
}
ec = eacf->pull_confs.elts;
for (n = 0; n < eacf->pull_confs.nelts; n++, e++, ec++) {
ngx_memzero(e, sizeof(*e));
e->conf = ec;
e->log = emcf->log;
e->eval = ngx_rtmp_exec_pull_eval;
e->eval_ctx = pctx;
e->kill_signal = emcf->kill_signal;
e->respawn_timeout = (eacf->respawn ? emcf->respawn_timeout :
NGX_CONF_UNSET_MSEC);
}
*ppctx = pctx;
done:
ctx->pull = pctx;
ctx->pull->counter++;
return NGX_OK;
error:
ngx_destroy_pool(pool);
return NGX_ERROR;
}
static ngx_int_t static ngx_int_t
ngx_rtmp_exec_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) ngx_rtmp_exec_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
{ {
ngx_rtmp_exec_main_conf_t *emcf; size_t n;
ngx_rtmp_exec_app_conf_t *eacf; ngx_rtmp_exec_t *e;
ngx_rtmp_exec_t *e; ngx_rtmp_exec_ctx_t *ctx;
ngx_rtmp_exec_conf_t *ec; ngx_rtmp_exec_app_conf_t *eacf;
ngx_rtmp_exec_ctx_t *ctx;
size_t n;
emcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_exec_module);
eacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_exec_module); eacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_exec_module);
if (eacf == NULL || eacf->confs.nelts == 0) {
if (eacf == NULL || eacf->push_confs.nelts == 0) {
goto next; goto next;
} }
@ -633,69 +927,84 @@ ngx_rtmp_exec_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
goto next; goto next;
} }
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); if (ngx_rtmp_exec_init_ctx(s, v->name, v->args) != NGX_OK) {
goto next;
if (ctx == NULL) {
ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_exec_ctx_t));
if (ctx == NULL) {
return NGX_ERROR;
}
ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_exec_module);
if (ngx_array_init(&ctx->execs, s->connection->pool, eacf->confs.nelts,
sizeof(ngx_rtmp_exec_t)) != NGX_OK)
{
return NGX_ERROR;
}
e = ngx_array_push_n(&ctx->execs, eacf->confs.nelts);
if (e == NULL) {
return NGX_ERROR;
}
ec = eacf->confs.elts;
for (n = 0; n < eacf->confs.nelts; ++n, ++e, ++ec) {
ngx_memzero(e, sizeof(*e));
e->conf = ec;
e->log = s->connection->log;
e->session = s;
e->kill_signal = emcf->kill_signal;
e->respawn_timeout = (eacf->respawn ? emcf->respawn_timeout :
NGX_CONF_UNSET_MSEC);
}
} }
ngx_memcpy(ctx->name, v->name, NGX_RTMP_MAX_NAME); ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module);
ngx_memcpy(ctx->args, v->args, NGX_RTMP_MAX_ARGS);
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"exec: run %uz command(s)", ctx->execs.nelts); "exec: push %uz command(s)", ctx->push_execs.nelts);
e = ctx->execs.elts; e = ctx->push_execs.elts;
for (n = 0; n < ctx->execs.nelts; ++n, ++e) { for (n = 0; n < ctx->push_execs.nelts; ++n, ++e) {
ngx_rtmp_exec_run(e); ngx_rtmp_exec_run(e);
} }
next: next:
return next_publish(s, v); return next_publish(s, v);
} }
static ngx_int_t
ngx_rtmp_exec_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v)
{
size_t n;
ngx_rtmp_exec_t *e;
ngx_rtmp_exec_ctx_t *ctx;
ngx_rtmp_exec_pull_ctx_t *pctx;
ngx_rtmp_exec_app_conf_t *eacf;
eacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_exec_module);
if (eacf == NULL || eacf->pull_confs.nelts == 0) {
goto next;
}
if (ngx_rtmp_exec_init_ctx(s, v->name, v->args) != NGX_OK) {
goto next;
}
if (ngx_rtmp_exec_init_pull_ctx(s, v->name) != NGX_OK) {
goto next;
}
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module);
pctx = ctx->pull;
if (pctx->counter != 1) {
goto next;
}
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"exec: pull %uz command(s)", pctx->pull_execs.nelts);
e = pctx->pull_execs.elts;
for (n = 0; n < pctx->pull_execs.nelts; n++, e++) {
ngx_rtmp_exec_run(e);
}
next:
return next_play(s, v);
}
#endif /* NGX_WIN32 */ #endif /* NGX_WIN32 */
static char * static char *
ngx_rtmp_exec_exec(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) ngx_rtmp_exec_conf(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{ {
ngx_str_t *value; char *p = conf;
ngx_rtmp_exec_app_conf_t *eacf;
size_t n, nargs; size_t n, nargs;
ngx_str_t *s; ngx_str_t *s, *value;
ngx_rtmp_exec_conf_t *ec; ngx_array_t *confs;
ngx_rtmp_exec_conf_t *ec;
confs = (ngx_array_t *) (p + cmd->offset);
eacf = ngx_rtmp_conf_get_module_app_conf(cf, ngx_rtmp_exec_module);
value = cf->args->elts; value = cf->args->elts;
ec = ngx_array_push(&eacf->confs); ec = ngx_array_push(confs);
if (ec == NULL) { if (ec == NULL) {
return NGX_CONF_ERROR; return NGX_CONF_ERROR;
} }
@ -707,8 +1016,7 @@ ngx_rtmp_exec_exec(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
} }
nargs = cf->args->nelts - 2; nargs = cf->args->nelts - 2;
if (ngx_array_init(&ec->args, cf->pool, nargs, if (ngx_array_init(&ec->args, cf->pool, nargs, sizeof(ngx_str_t)) != NGX_OK)
sizeof(ngx_str_t)) != NGX_OK)
{ {
return NGX_CONF_ERROR; return NGX_CONF_ERROR;
} }
@ -734,7 +1042,7 @@ ngx_rtmp_exec_exec_static(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
value = cf->args->elts; value = cf->args->elts;
ec = ngx_array_push(&emcf->confs); ec = ngx_array_push(&emcf->static_confs);
if (ec == NULL) { if (ec == NULL) {
return NGX_CONF_ERROR; return NGX_CONF_ERROR;
} }
@ -821,6 +1129,9 @@ ngx_rtmp_exec_postconfiguration(ngx_conf_t *cf)
next_publish = ngx_rtmp_publish; next_publish = ngx_rtmp_publish;
ngx_rtmp_publish = ngx_rtmp_exec_publish; ngx_rtmp_publish = ngx_rtmp_exec_publish;
next_play = ngx_rtmp_play;
ngx_rtmp_play = ngx_rtmp_exec_play;
next_close_stream = ngx_rtmp_close_stream; next_close_stream = ngx_rtmp_close_stream;
ngx_rtmp_close_stream = ngx_rtmp_exec_close_stream; ngx_rtmp_close_stream = ngx_rtmp_exec_close_stream;