implemented getting worker id & spreading static relays over workers

This commit is contained in:
Roman Arutyunyan
2013-01-17 17:32:23 +04:00
parent 3bd60857bd
commit 882b4f15b2
4 changed files with 102 additions and 9 deletions

4
config
View File

@ -3,6 +3,7 @@ ngx_addon_name="ngx_rtmp_module"
CORE_MODULES="$CORE_MODULES CORE_MODULES="$CORE_MODULES
ngx_rtmp_module \ ngx_rtmp_module \
ngx_rtmp_core_module \ ngx_rtmp_core_module \
ngx_rtmp_auto_push_module \
ngx_rtmp_cmd_module \ ngx_rtmp_cmd_module \
ngx_rtmp_codec_module \ ngx_rtmp_codec_module \
ngx_rtmp_access_module \ ngx_rtmp_access_module \
@ -14,7 +15,6 @@ CORE_MODULES="$CORE_MODULES
ngx_rtmp_netcall_module \ ngx_rtmp_netcall_module \
ngx_rtmp_relay_module \ ngx_rtmp_relay_module \
ngx_rtmp_exec_module \ ngx_rtmp_exec_module \
ngx_rtmp_auto_push_module \
ngx_rtmp_enotify_module \ ngx_rtmp_enotify_module \
ngx_rtmp_notify_module \ ngx_rtmp_notify_module \
" "
@ -53,6 +53,7 @@ NGX_ADDON_SRCS="$NGX_ADDON_SRCS \
$ngx_addon_dir/ngx_rtmp_eval.c \ $ngx_addon_dir/ngx_rtmp_eval.c \
$ngx_addon_dir/ngx_rtmp_receive.c \ $ngx_addon_dir/ngx_rtmp_receive.c \
$ngx_addon_dir/ngx_rtmp_core_module.c \ $ngx_addon_dir/ngx_rtmp_core_module.c \
$ngx_addon_dir/ngx_rtmp_auto_push_module.c \
$ngx_addon_dir/ngx_rtmp_cmd_module.c \ $ngx_addon_dir/ngx_rtmp_cmd_module.c \
$ngx_addon_dir/ngx_rtmp_codec_module.c \ $ngx_addon_dir/ngx_rtmp_codec_module.c \
$ngx_addon_dir/ngx_rtmp_access_module.c \ $ngx_addon_dir/ngx_rtmp_access_module.c \
@ -67,7 +68,6 @@ NGX_ADDON_SRCS="$NGX_ADDON_SRCS \
$ngx_addon_dir/ngx_rtmp_relay_module.c \ $ngx_addon_dir/ngx_rtmp_relay_module.c \
$ngx_addon_dir/ngx_rtmp_bandwidth.c \ $ngx_addon_dir/ngx_rtmp_bandwidth.c \
$ngx_addon_dir/ngx_rtmp_exec_module.c \ $ngx_addon_dir/ngx_rtmp_exec_module.c \
$ngx_addon_dir/ngx_rtmp_auto_push_module.c \
$ngx_addon_dir/ngx_rtmp_enotify_module.c \ $ngx_addon_dir/ngx_rtmp_enotify_module.c \
$ngx_addon_dir/ngx_rtmp_notify_module.c \ $ngx_addon_dir/ngx_rtmp_notify_module.c \
" "

View File

@ -573,6 +573,7 @@ extern ngx_rtmp_bandwidth_t ngx_rtmp_bw_out;
extern ngx_rtmp_bandwidth_t ngx_rtmp_bw_in; extern ngx_rtmp_bandwidth_t ngx_rtmp_bw_in;
extern ngx_int_t ngx_rtmp_worker_id;
extern ngx_uint_t ngx_rtmp_max_module; extern ngx_uint_t ngx_rtmp_max_module;
extern ngx_module_t ngx_rtmp_core_module; extern ngx_module_t ngx_rtmp_core_module;

View File

@ -11,6 +11,7 @@ static ngx_rtmp_publish_pt next_publish;
static ngx_rtmp_delete_stream_pt next_delete_stream; static ngx_rtmp_delete_stream_pt next_delete_stream;
static ngx_int_t ngx_rtmp_auto_push_init_module(ngx_cycle_t *cycle);
static ngx_int_t ngx_rtmp_auto_push_init_process(ngx_cycle_t *cycle); static ngx_int_t ngx_rtmp_auto_push_init_process(ngx_cycle_t *cycle);
static void ngx_rtmp_auto_push_exit_process(ngx_cycle_t *cycle); static void ngx_rtmp_auto_push_exit_process(ngx_cycle_t *cycle);
static void * ngx_rtmp_auto_push_create_conf(ngx_cycle_t *cf); static void * ngx_rtmp_auto_push_create_conf(ngx_cycle_t *cf);
@ -78,7 +79,7 @@ ngx_module_t ngx_rtmp_auto_push_module = {
ngx_rtmp_auto_push_commands, /* module directives */ ngx_rtmp_auto_push_commands, /* module directives */
NGX_CORE_MODULE, /* module type */ NGX_CORE_MODULE, /* module type */
NULL, /* init master */ NULL, /* init master */
NULL, /* init module */ ngx_rtmp_auto_push_init_module, /* init module */
ngx_rtmp_auto_push_init_process, /* init process */ ngx_rtmp_auto_push_init_process, /* init process */
NULL, /* init thread */ NULL, /* init thread */
NULL, /* exit thread */ NULL, /* exit thread */
@ -88,9 +89,52 @@ ngx_module_t ngx_rtmp_auto_push_module = {
}; };
ngx_int_t ngx_rtmp_worker_id = -1;
#define NGX_RTMP_AUTO_PUSH_SOCKNAME "nginx-rtmp" #define NGX_RTMP_AUTO_PUSH_SOCKNAME "nginx-rtmp"
static ngx_int_t
ngx_rtmp_auto_push_init_module(ngx_cycle_t *cycle)
{
ngx_rtmp_auto_push_conf_t *apcf;
int fd;
u_char path[NGX_MAX_PATH];
apcf = (ngx_rtmp_auto_push_conf_t *) ngx_get_conf(cycle->conf_ctx,
ngx_rtmp_auto_push_module);
if (apcf->auto_push == 0) {
return NGX_OK;
}
if (apcf->socket_dir.len + sizeof("NGX_RTMP_AUTO_PUSH_SOCKNAME") >
sizeof(path))
{
ngx_log_error(NGX_LOG_EMERG, cycle->log, 0, "too long path");
return NGX_ERROR;
}
*ngx_snprintf(path, sizeof(path), "%V/" NGX_RTMP_AUTO_PUSH_SOCKNAME
".worker",
&apcf->socket_dir) = 0;
fd = ngx_open_file(path, NGX_FILE_WRONLY, NGX_FILE_TRUNCATE,
NGX_FILE_DEFAULT_ACCESS);
if (fd == NGX_INVALID_FILE) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"failed to create '%s'", path);
return NGX_ERROR;
}
close(fd);
return NGX_OK;
}
static ngx_int_t static ngx_int_t
ngx_rtmp_auto_push_init_process(ngx_cycle_t *cycle) ngx_rtmp_auto_push_init_process(ngx_cycle_t *cycle)
{ {
@ -98,10 +142,13 @@ ngx_rtmp_auto_push_init_process(ngx_cycle_t *cycle)
ngx_rtmp_auto_push_conf_t *apcf; ngx_rtmp_auto_push_conf_t *apcf;
ngx_listening_t *ls, *lss; ngx_listening_t *ls, *lss;
struct sockaddr_un *sun; struct sockaddr_un *sun;
int reuseaddr; int reuseaddr, fd;
ngx_socket_t s; ngx_socket_t s;
size_t n; size_t n;
ngx_file_info_t fi; ngx_file_info_t fi;
u_char path[NGX_MAX_PATH];
struct flock fl;
ngx_int_t i;
if (ngx_process != NGX_PROCESS_WORKER) { if (ngx_process != NGX_PROCESS_WORKER) {
return NGX_OK; return NGX_OK;
@ -113,6 +160,44 @@ ngx_rtmp_auto_push_init_process(ngx_cycle_t *cycle)
return NGX_OK; return NGX_OK;
} }
*ngx_snprintf(path, sizeof(path), "%V/" NGX_RTMP_AUTO_PUSH_SOCKNAME
".worker",
&apcf->socket_dir) = 0;
fd = ngx_open_file(path, NGX_FILE_WRONLY, NGX_FILE_APPEND,
NGX_FILE_DEFAULT_ACCESS);
if (fd == NGX_INVALID_FILE) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"auto_push: failed to open '%s'", path);
return NGX_ERROR;
}
for (i = 0; i < NGX_MAX_PROCESSES; ++i) {
ngx_memzero(&fl, sizeof(struct flock));
fl.l_type = F_WRLCK;
fl.l_whence = SEEK_SET;
fl.l_len = 1;
fl.l_start = (off_t) i;
if (fcntl(fd, F_SETLK, &fl) == 0) {
break;
}
}
if (i == NGX_MAX_PROCESSES) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"auto_push: failed to find worker id");
return NGX_ERROR;
}
ngx_rtmp_worker_id = i;
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, cycle->log, 0,
"auto_push: worker_id %i", ngx_rtmp_worker_id);
next_publish = ngx_rtmp_publish; next_publish = ngx_rtmp_publish;
ngx_rtmp_publish = ngx_rtmp_auto_push_publish; ngx_rtmp_publish = ngx_rtmp_auto_push_publish;

View File

@ -1552,20 +1552,19 @@ ngx_rtmp_relay_init_process(ngx_cycle_t *cycle)
ngx_rtmp_core_srv_conf_t **pcscf, *cscf; ngx_rtmp_core_srv_conf_t **pcscf, *cscf;
ngx_rtmp_core_app_conf_t **pcacf, *cacf; ngx_rtmp_core_app_conf_t **pcacf, *cacf;
ngx_rtmp_relay_app_conf_t *racf; ngx_rtmp_relay_app_conf_t *racf;
ngx_uint_t n, m, k; ngx_uint_t n, m, k, cnt;
ngx_rtmp_relay_static_t *rs; ngx_rtmp_relay_static_t *rs;
ngx_rtmp_listen_t *lst; ngx_rtmp_listen_t *lst;
ngx_event_t **pevent, *event; ngx_event_t **pevent, *event;
ngx_core_conf_t *ccf;
if (cmcf->listen.nelts == 0) { if (cmcf->listen.nelts == 0) {
return NGX_OK; return NGX_OK;
} }
/* only first worker does static pulling */ ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
if (ngx_process_slot) { cnt = 0;
return NGX_OK;
}
lst = cmcf->listen.elts; lst = cmcf->listen.elts;
@ -1582,6 +1581,14 @@ ngx_rtmp_relay_init_process(ngx_cycle_t *cycle)
pevent = racf->static_events.elts; pevent = racf->static_events.elts;
for (k = 0; k < racf->static_events.nelts; ++k, ++pevent) { for (k = 0; k < racf->static_events.nelts; ++k, ++pevent) {
/* distribute among all workers */
if ((cnt++ % ccf->worker_processes) !=
(ngx_uint_t) ngx_rtmp_worker_id)
{
continue;
}
event = *pevent; event = *pevent;
rs = event->data; rs = event->data;