From 4bb48483be9d91a74a2efd5cb80fa136918ae466 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Fri, 8 Jun 2012 22:57:54 +0400 Subject: [PATCH] implemented exec feature --- README | 27 +++ config | 2 + ngx_rtmp_cmd_module.h | 21 +- ngx_rtmp_exec_module.c | 492 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 533 insertions(+), 9 deletions(-) create mode 100644 ngx_rtmp_exec_module.c diff --git a/README b/README index 904a07f..6e3a9f0 100644 --- a/README +++ b/README @@ -11,6 +11,9 @@ NGINX-based RTMP server * H264 support +* Online transcoding with FFmpeg + (experimental) + * HTTP callbacks on publish/play/record * Advanced buffering techniques @@ -77,6 +80,30 @@ rtmp { #allow play all; } + # Transcoding (ffmpeg needed) + application big { + live on; + + # On every pusblished stream run this command (ffmpeg) + # with substitutions: $app/${app}, $name/${name} for application & stream name. + # + # This ffmpeg call receives stream from this application & + # reduces the resolution down to 32x32. The stream is the published to + # 'small' application (see below) under the same name. + # + # ffmpeg can do anything with the stream like video/audio + # transcoding, resizing, altering container/codec params etc + # + # Multiple exec lines can be specified. + + exec ffmpeg -re -i rtmp://localhost:1935/$app/$name -vcodec flv -acodec copy -s 32x32 -f flv rtmp://localhost:1935/small/${name}; + } + + application small { + live on; + # Video with rediced resolution comes here from ffmpeg + } + application mypush { live on; diff --git a/config b/config index e5bcbbb..ed1cd30 100644 --- a/config +++ b/config @@ -10,6 +10,7 @@ CORE_MODULES="$CORE_MODULES ngx_rtmp_netcall_module \ ngx_rtmp_notify_module \ ngx_rtmp_relay_module \ + ngx_rtmp_exec_module \ " @@ -38,6 +39,7 @@ NGX_ADDON_SRCS="$NGX_ADDON_SRCS \ $ngx_addon_dir/ngx_rtmp_relay_module.c \ $ngx_addon_dir/ngx_rtmp_bandwidth.c \ $ngx_addon_dir/ngx_rtmp_codecs.c \ + $ngx_addon_dir/ngx_rtmp_exec_module.c \ " CFLAGS="$CFLAGS -I$ngx_addon_dir" diff --git a/ngx_rtmp_cmd_module.h b/ngx_rtmp_cmd_module.h index 2f4f4e3..4f9da37 100644 --- a/ngx_rtmp_cmd_module.h +++ b/ngx_rtmp_cmd_module.h @@ -13,19 +13,22 @@ #include "ngx_rtmp.h" -/* Basic RTMP call support */ +#define NGX_RTMP_MAX_APP 128 +#define NGX_RTMP_MAX_NAME 256 +#define NGX_RTMP_MAX_URL 256 -/* TODO: improve string sizes */ + +/* Basic RTMP call support */ typedef struct { double trans; - u_char app[128]; + u_char app[NGX_RTMP_MAX_APP]; u_char flashver[32]; - u_char swf_url[256]; - u_char tc_url[256]; + u_char swf_url[NGX_RTMP_MAX_URL]; + u_char tc_url[NGX_RTMP_MAX_URL]; double acodecs; double vcodecs; - u_char page_url[256]; + u_char page_url[NGX_RTMP_MAX_URL]; } ngx_rtmp_connect_t; @@ -41,14 +44,14 @@ typedef struct { typedef struct { - u_char name[256]; + u_char name[NGX_RTMP_MAX_NAME]; u_char type[16]; int silent; } ngx_rtmp_publish_t; typedef struct { - u_char name[256]; + u_char name[NGX_RTMP_MAX_NAME]; } ngx_rtmp_fcpublish_t; @@ -58,7 +61,7 @@ typedef ngx_rtmp_fcpublish_t ngx_rtmp_fcunsubscribe_t; typedef struct { - u_char name[256]; + u_char name[NGX_RTMP_MAX_NAME]; double start; double duration; int reset; diff --git a/ngx_rtmp_exec_module.c b/ngx_rtmp_exec_module.c new file mode 100644 index 0000000..08f6d39 --- /dev/null +++ b/ngx_rtmp_exec_module.c @@ -0,0 +1,492 @@ +/* + * Copyright (c) 2012 Roman Arutyunyan + */ + + +#include "ngx_rtmp_cmd_module.h" +#include + +#ifdef NGX_LINUX +#include +#endif + + +static ngx_rtmp_publish_pt next_publish; +static ngx_rtmp_delete_stream_pt next_delete_stream; + + +static ngx_int_t ngx_rtmp_exec_postconfiguration(ngx_conf_t *cf); +static void * ngx_rtmp_exec_create_app_conf(ngx_conf_t *cf); +static char * ngx_rtmp_exec_merge_app_conf(ngx_conf_t *cf, + void *parent, void *child); +static char * ngx_rtmp_exec_exec(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf); + + +#define NGX_RTMP_EXEC_RESPAWN 0x01 +#define NGX_RTMP_EXEC_KILL 0x02 + + +typedef struct { + ngx_str_t cmd; + ngx_array_t args; /* ngx_str_t */ +} ngx_rtmp_exec_conf_t; + + +typedef struct { + ngx_array_t execs; /* ngx_rtmp_exec_conf_t */ +} ngx_rtmp_exec_app_conf_t; + + +typedef struct { + ngx_rtmp_session_t *session; + size_t index; + unsigned active:1; + unsigned respawn:1; + int pid; + int pipefd; + ngx_connection_t dummy_conn; /*needed by ngx_xxx_event*/ + ngx_event_t read_evt, write_evt; +} ngx_rtmp_exec_t; + + +typedef struct { + u_char name[NGX_RTMP_MAX_NAME]; + ngx_rtmp_exec_t *execs; +} ngx_rtmp_exec_ctx_t; + + +static ngx_int_t ngx_rtmp_exec_kill(ngx_rtmp_session_t *s, ngx_rtmp_exec_t *e, + ngx_int_t term); +static ngx_int_t ngx_rtmp_exec_run(ngx_rtmp_session_t *s, size_t n); + + +static ngx_command_t ngx_rtmp_exec_commands[] = { + + { ngx_string("exec"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_1MORE, + ngx_rtmp_exec_exec, + NGX_RTMP_APP_CONF_OFFSET, + 0, + NULL }, + + ngx_null_command +}; + + +static ngx_rtmp_module_t ngx_rtmp_exec_module_ctx = { + NULL, /* preconfiguration */ + ngx_rtmp_exec_postconfiguration, /* postconfiguration */ + NULL, /* create main configuration */ + NULL, /* init main configuration */ + NULL, /* create server configuration */ + NULL, /* merge server configuration */ + ngx_rtmp_exec_create_app_conf, /* create app configuration */ + ngx_rtmp_exec_merge_app_conf /* merge app configuration */ +}; + + +ngx_module_t ngx_rtmp_exec_module = { + NGX_MODULE_V1, + &ngx_rtmp_exec_module_ctx, /* module context */ + ngx_rtmp_exec_commands, /* module directives */ + NGX_RTMP_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +static void * +ngx_rtmp_exec_create_app_conf(ngx_conf_t *cf) +{ + ngx_rtmp_exec_app_conf_t *eacf; + + eacf = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_exec_app_conf_t)); + if (eacf == NULL) { + return NULL; + } + + if (ngx_array_init(&eacf->execs, cf->pool, 1, + sizeof(ngx_rtmp_exec_conf_t)) != NGX_OK) + { + return NULL; + } + + return eacf; +} + + +static char * +ngx_rtmp_exec_merge_app_conf(ngx_conf_t *cf, void *parent, void *child) +{ + ngx_rtmp_exec_app_conf_t *prev = parent; + ngx_rtmp_exec_app_conf_t *conf = child; + size_t n; + ngx_rtmp_exec_conf_t *ec, *pec; + + if (prev->execs.nelts) { + ec = ngx_array_push_n(&conf->execs, prev->execs.nelts); + if (ec == NULL) { + return NGX_CONF_ERROR; + } + pec = prev->execs.elts; + for (n = 0; n < prev->execs.nelts; ++n, ++ec, ++pec) { + *ec = *pec; + } + } + + return NGX_CONF_OK; +} + + +static void +ngx_rtmp_exec_child_dead(ngx_event_t *ev) +{ + ngx_connection_t *dummy_conn; + ngx_rtmp_exec_t *e; + ngx_rtmp_session_t *s; + + dummy_conn = ev->data; + e = dummy_conn->data; + s = e->session; + + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "exec: child %ui exited; %s", + (ngx_int_t)e->pid, + e->respawn ? "respawning" : "ignoring"); + + ngx_rtmp_exec_kill(s, e, 0); + + if (e->respawn) { + ngx_rtmp_exec_run(s, e->index); + } +} + + +static ngx_int_t +ngx_rtmp_exec_kill(ngx_rtmp_session_t *s, ngx_rtmp_exec_t *e, ngx_int_t term) +{ + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "exec: terminating child %ui", + (ngx_int_t)e->pid); + + ngx_del_event(&e->read_evt, NGX_READ_EVENT, 0); + e->active = 0; + close(e->pipefd); + + if (!term) { + return NGX_OK; + } + + if (kill(e->pid, SIGKILL) == -1) { + ngx_log_error(NGX_LOG_INFO, s->connection->log, ngx_errno, + "exec: kill failed pid=%i", (ngx_int_t)e->pid); + } else { + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "exec: killed pid=%i", (ngx_int_t)e->pid); + } + + return NGX_OK; +} + + +static void +ngx_rtmp_exec_append(ngx_str_t *result, u_char *data, size_t len) +{ + if (len == 0) { + len = ngx_strlen(data); + } + + /* use malloc in child */ + if (result->len == 0) { + result->data = malloc(len + 1); + result->len = len; + ngx_memcpy(result->data, data, len); + result->data[len] = 0; + return; + } + + result->data = realloc(result->data, result->len + len + 1); + ngx_memcpy(result->data + result->len, data, len); + result->len += len; + result->data[result->len] = 0; +} + + +static char * +ngx_rtmp_exec_prepare_arg(ngx_rtmp_session_t *s, ngx_str_t *arg) +{ + ngx_rtmp_core_app_conf_t *cacf; + ngx_rtmp_exec_ctx_t *ctx; + u_char *p, *pp; + ngx_str_t result; + + cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_core_module); + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); + + /* substitute $app/${app} & $name/${name} */ + ngx_str_set(&result, ""); + pp = arg->data; + for ( ;; ) { + p = (u_char *)ngx_strchr(pp, '$'); + ngx_rtmp_exec_append(&result, pp, p ? p - pp : 0); + if (p == NULL) { + return (char *)result.data; + } + pp = p + 1; + if (p != arg->data && p[-1] == '\\') { + goto dollar; + } + if (!ngx_strncmp(p + 1, "app", sizeof("app") - 1) + || !ngx_strncmp(p + 1, "{app}", sizeof("{app}") - 1)) + { + ngx_rtmp_exec_append(&result, cacf->name.data, cacf->name.len); + pp += (p[1] == '{' ? sizeof("{app}") - 1 : sizeof("app") - 1); + continue; + } + if (!ngx_strncmp(p + 1, "name", sizeof("name") - 1) + || !ngx_strncmp(p + 1, "{name}", sizeof("{name}") - 1)) + { + ngx_rtmp_exec_append(&result, ctx->name, 0); + pp += (p[1] == '{' ? sizeof("{name}") - 1 : sizeof("name") - 1); + continue; + } +dollar: + ngx_rtmp_exec_append(&result, (u_char *)"$", 1); + } +} + + +static ngx_int_t +ngx_rtmp_exec_run(ngx_rtmp_session_t *s, size_t n) +{ +#ifdef NGX_LINUX + ngx_rtmp_exec_app_conf_t *eacf; + ngx_rtmp_exec_ctx_t *ctx; + int pid; + int pipefd[2]; + int ret; + ngx_rtmp_exec_conf_t *ec; + ngx_rtmp_exec_t *e; + ngx_str_t *arg; + char **args; + + eacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_exec_module); + ec = (ngx_rtmp_exec_conf_t *)eacf->execs.elts + n; + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); + e = ctx->execs + n; + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "exec: starting child '%V'", + &ec->cmd); + + if (pipe(pipefd) == -1) { + ngx_log_error(NGX_LOG_INFO, s->connection->log, ngx_errno, + "exec: pipe failed"); + return NGX_ERROR; + } + + /* make pipe write end survive through exec */ + ret = fcntl(pipefd[1], F_GETFD); + if (ret != -1) { + ret &= ~FD_CLOEXEC; + ret = fcntl(pipefd[1], F_SETFD, ret); + } + if (ret == -1) { + close(pipefd[0]); + close(pipefd[1]); + ngx_log_error(NGX_LOG_INFO, s->connection->log, ngx_errno, + "exec: fcntl failed"); + return NGX_ERROR; + } + + pid = fork(); + switch (pid) { + case -1: + close(pipefd[0]); + close(pipefd[1]); + ngx_log_error(NGX_LOG_INFO, s->connection->log, ngx_errno, + "exec: fork failed"); + return NGX_ERROR; + + case 0: + /* child */ + args = malloc((ec->args.nelts + 2) * sizeof(char *)); + if (args == NULL) { + exit(1); + } + /*TODO: implement variables $app $name */ + arg = ec->args.elts; + args[0] = (char *)ec->cmd.data; + for (n = 0; n < ec->args.nelts; ++n, ++arg) { + args[n + 1] = ngx_rtmp_exec_prepare_arg(s, arg); + } + args[n + 1] = NULL; + if (execve((char *)ec->cmd.data, args, environ) == -1) { + exit(1); + } + break; + + default: + /* parent */ + close(pipefd[1]); + e->session = s; + e->index = n; + e->active = 1; + e->respawn = 1; + e->pid = pid; + e->pipefd = pipefd[0]; + + e->dummy_conn.fd = e->pipefd; + e->dummy_conn.data = e; + e->dummy_conn.read = &e->read_evt; + e->dummy_conn.write = &e->write_evt; + e->read_evt.data = &e->dummy_conn; + e->write_evt.data = &e->dummy_conn; + + e->read_evt.log = s->connection->log; + e->read_evt.handler = ngx_rtmp_exec_child_dead; + + if (ngx_add_event(&e->read_evt, NGX_READ_EVENT, 0) != NGX_OK) { + ngx_log_error(NGX_LOG_INFO, s->connection->log, ngx_errno, + "exec: failed to add child control event"); + } + + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "exec: child '%V' started pid=%ui", + &ec->cmd, (ngx_uint_t)pid); + break; + } +#endif /* NGX_LINUX */ + return NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_exec_delete_stream(ngx_rtmp_session_t *s, ngx_rtmp_delete_stream_t *v) +{ + ngx_rtmp_exec_app_conf_t *eacf; + ngx_rtmp_exec_ctx_t *ctx; + ngx_rtmp_exec_t *e; + size_t n; + + eacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_exec_module); + if (eacf == NULL) { + goto next; + } + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); + if (ctx == NULL || ctx->execs == NULL) { + goto next; + } + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "exec: delete %uz command(s)", eacf->execs.nelts); + + e = ctx->execs; + for (n = 0; n < eacf->execs.nelts; ++n, ++e) { + if (e->active) { + ngx_rtmp_exec_kill(s, e, 1); + } + } + +next: + return next_delete_stream(s, v); +} + + +static ngx_int_t +ngx_rtmp_exec_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) +{ + ngx_rtmp_exec_app_conf_t *eacf; + ngx_rtmp_exec_conf_t *ec; + ngx_rtmp_exec_ctx_t *ctx; + size_t n; + + eacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_exec_module); + if (eacf == NULL || eacf->execs.nelts == 0) { + goto next; + } + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); + if (ctx == NULL) { + ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_exec_ctx_t)); + if (ctx == NULL) { + return NGX_ERROR; + } + ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_exec_module); + ctx->execs = ngx_pcalloc(s->connection->pool, eacf->execs.nelts + * sizeof(ngx_rtmp_exec_t)); + } + ngx_memcpy(ctx->name, v->name, NGX_RTMP_MAX_NAME); + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "exec: run %uz command(s)", eacf->execs.nelts); + + ec = eacf->execs.elts; + for (n = 0; n < eacf->execs.nelts; ++n, ++ec) { + ngx_rtmp_exec_run(s, n); + } + +next: + return next_publish(s, v); +} + + +static char * +ngx_rtmp_exec_exec(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_str_t *value; + ngx_rtmp_exec_app_conf_t *eacf; + size_t n, nargs; + ngx_str_t *s; + ngx_rtmp_exec_conf_t *ec; + + eacf = ngx_rtmp_conf_get_module_app_conf(cf, ngx_rtmp_exec_module); + value = cf->args->elts; + + ec = ngx_array_push(&eacf->execs); + if (ec == NULL) { + return NGX_CONF_ERROR; + } + + ec->cmd = value[1]; + + if (cf->args->nelts == 2) { + return NGX_CONF_OK; + } + + nargs = cf->args->nelts - 2; + if (ngx_array_init(&ec->args, cf->pool, nargs, + sizeof(ngx_str_t)) != NGX_OK) + { + return NGX_CONF_ERROR; + } + + s = ngx_array_push_n(&ec->args, nargs); + for (n = 2; n < cf->args->nelts; ++n, ++s) { + *s = value[n]; + } + + return NGX_CONF_OK; +} + + +static ngx_int_t +ngx_rtmp_exec_postconfiguration(ngx_conf_t *cf) +{ + next_publish = ngx_rtmp_publish; + ngx_rtmp_publish = ngx_rtmp_exec_publish; + + next_delete_stream = ngx_rtmp_delete_stream; + ngx_rtmp_delete_stream = ngx_rtmp_exec_delete_stream; + + return NGX_OK; +}