From 882b4f15b287b3e313088a67b2a4a543fbb6511d Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Thu, 17 Jan 2013 17:32:23 +0400 Subject: [PATCH] implemented getting worker id & spreading static relays over workers --- config | 4 +- ngx_rtmp.h | 1 + ngx_rtmp_auto_push_module.c | 89 ++++++++++++++++++++++++++++++++++++- ngx_rtmp_relay_module.c | 17 ++++--- 4 files changed, 102 insertions(+), 9 deletions(-) diff --git a/config b/config index df398df..bac25ec 100644 --- a/config +++ b/config @@ -3,6 +3,7 @@ ngx_addon_name="ngx_rtmp_module" CORE_MODULES="$CORE_MODULES ngx_rtmp_module \ ngx_rtmp_core_module \ + ngx_rtmp_auto_push_module \ ngx_rtmp_cmd_module \ ngx_rtmp_codec_module \ ngx_rtmp_access_module \ @@ -14,7 +15,6 @@ CORE_MODULES="$CORE_MODULES ngx_rtmp_netcall_module \ ngx_rtmp_relay_module \ ngx_rtmp_exec_module \ - ngx_rtmp_auto_push_module \ ngx_rtmp_enotify_module \ ngx_rtmp_notify_module \ " @@ -53,6 +53,7 @@ NGX_ADDON_SRCS="$NGX_ADDON_SRCS \ $ngx_addon_dir/ngx_rtmp_eval.c \ $ngx_addon_dir/ngx_rtmp_receive.c \ $ngx_addon_dir/ngx_rtmp_core_module.c \ + $ngx_addon_dir/ngx_rtmp_auto_push_module.c \ $ngx_addon_dir/ngx_rtmp_cmd_module.c \ $ngx_addon_dir/ngx_rtmp_codec_module.c \ $ngx_addon_dir/ngx_rtmp_access_module.c \ @@ -67,7 +68,6 @@ NGX_ADDON_SRCS="$NGX_ADDON_SRCS \ $ngx_addon_dir/ngx_rtmp_relay_module.c \ $ngx_addon_dir/ngx_rtmp_bandwidth.c \ $ngx_addon_dir/ngx_rtmp_exec_module.c \ - $ngx_addon_dir/ngx_rtmp_auto_push_module.c \ $ngx_addon_dir/ngx_rtmp_enotify_module.c \ $ngx_addon_dir/ngx_rtmp_notify_module.c \ " diff --git a/ngx_rtmp.h b/ngx_rtmp.h index 3067b9d..00dc227 100644 --- a/ngx_rtmp.h +++ b/ngx_rtmp.h @@ -573,6 +573,7 @@ extern ngx_rtmp_bandwidth_t ngx_rtmp_bw_out; extern ngx_rtmp_bandwidth_t ngx_rtmp_bw_in; +extern ngx_int_t ngx_rtmp_worker_id; extern ngx_uint_t ngx_rtmp_max_module; extern ngx_module_t ngx_rtmp_core_module; diff --git a/ngx_rtmp_auto_push_module.c b/ngx_rtmp_auto_push_module.c index 19e197c..7543b94 100644 --- a/ngx_rtmp_auto_push_module.c +++ b/ngx_rtmp_auto_push_module.c @@ -11,6 +11,7 @@ static ngx_rtmp_publish_pt next_publish; static ngx_rtmp_delete_stream_pt next_delete_stream; +static ngx_int_t ngx_rtmp_auto_push_init_module(ngx_cycle_t *cycle); static ngx_int_t ngx_rtmp_auto_push_init_process(ngx_cycle_t *cycle); static void ngx_rtmp_auto_push_exit_process(ngx_cycle_t *cycle); static void * ngx_rtmp_auto_push_create_conf(ngx_cycle_t *cf); @@ -78,7 +79,7 @@ ngx_module_t ngx_rtmp_auto_push_module = { ngx_rtmp_auto_push_commands, /* module directives */ NGX_CORE_MODULE, /* module type */ NULL, /* init master */ - NULL, /* init module */ + ngx_rtmp_auto_push_init_module, /* init module */ ngx_rtmp_auto_push_init_process, /* init process */ NULL, /* init thread */ NULL, /* exit thread */ @@ -88,9 +89,52 @@ ngx_module_t ngx_rtmp_auto_push_module = { }; +ngx_int_t ngx_rtmp_worker_id = -1; + + #define NGX_RTMP_AUTO_PUSH_SOCKNAME "nginx-rtmp" +static ngx_int_t +ngx_rtmp_auto_push_init_module(ngx_cycle_t *cycle) +{ + ngx_rtmp_auto_push_conf_t *apcf; + int fd; + u_char path[NGX_MAX_PATH]; + + apcf = (ngx_rtmp_auto_push_conf_t *) ngx_get_conf(cycle->conf_ctx, + ngx_rtmp_auto_push_module); + if (apcf->auto_push == 0) { + return NGX_OK; + } + + + if (apcf->socket_dir.len + sizeof("NGX_RTMP_AUTO_PUSH_SOCKNAME") > + sizeof(path)) + { + ngx_log_error(NGX_LOG_EMERG, cycle->log, 0, "too long path"); + return NGX_ERROR; + } + + *ngx_snprintf(path, sizeof(path), "%V/" NGX_RTMP_AUTO_PUSH_SOCKNAME + ".worker", + &apcf->socket_dir) = 0; + + fd = ngx_open_file(path, NGX_FILE_WRONLY, NGX_FILE_TRUNCATE, + NGX_FILE_DEFAULT_ACCESS); + + if (fd == NGX_INVALID_FILE) { + ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, + "failed to create '%s'", path); + return NGX_ERROR; + } + + close(fd); + + return NGX_OK; +} + + static ngx_int_t ngx_rtmp_auto_push_init_process(ngx_cycle_t *cycle) { @@ -98,10 +142,13 @@ ngx_rtmp_auto_push_init_process(ngx_cycle_t *cycle) ngx_rtmp_auto_push_conf_t *apcf; ngx_listening_t *ls, *lss; struct sockaddr_un *sun; - int reuseaddr; + int reuseaddr, fd; ngx_socket_t s; size_t n; ngx_file_info_t fi; + u_char path[NGX_MAX_PATH]; + struct flock fl; + ngx_int_t i; if (ngx_process != NGX_PROCESS_WORKER) { return NGX_OK; @@ -113,6 +160,44 @@ ngx_rtmp_auto_push_init_process(ngx_cycle_t *cycle) return NGX_OK; } + *ngx_snprintf(path, sizeof(path), "%V/" NGX_RTMP_AUTO_PUSH_SOCKNAME + ".worker", + &apcf->socket_dir) = 0; + + fd = ngx_open_file(path, NGX_FILE_WRONLY, NGX_FILE_APPEND, + NGX_FILE_DEFAULT_ACCESS); + + if (fd == NGX_INVALID_FILE) { + ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, + "auto_push: failed to open '%s'", path); + return NGX_ERROR; + } + + for (i = 0; i < NGX_MAX_PROCESSES; ++i) { + ngx_memzero(&fl, sizeof(struct flock)); + + fl.l_type = F_WRLCK; + fl.l_whence = SEEK_SET; + fl.l_len = 1; + fl.l_start = (off_t) i; + + if (fcntl(fd, F_SETLK, &fl) == 0) { + break; + } + } + + if (i == NGX_MAX_PROCESSES) { + ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, + "auto_push: failed to find worker id"); + return NGX_ERROR; + } + + ngx_rtmp_worker_id = i; + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, cycle->log, 0, + "auto_push: worker_id %i", ngx_rtmp_worker_id); + + next_publish = ngx_rtmp_publish; ngx_rtmp_publish = ngx_rtmp_auto_push_publish; diff --git a/ngx_rtmp_relay_module.c b/ngx_rtmp_relay_module.c index 5cb144c..0d915c9 100644 --- a/ngx_rtmp_relay_module.c +++ b/ngx_rtmp_relay_module.c @@ -1552,20 +1552,19 @@ ngx_rtmp_relay_init_process(ngx_cycle_t *cycle) ngx_rtmp_core_srv_conf_t **pcscf, *cscf; ngx_rtmp_core_app_conf_t **pcacf, *cacf; ngx_rtmp_relay_app_conf_t *racf; - ngx_uint_t n, m, k; + ngx_uint_t n, m, k, cnt; ngx_rtmp_relay_static_t *rs; ngx_rtmp_listen_t *lst; ngx_event_t **pevent, *event; + ngx_core_conf_t *ccf; if (cmcf->listen.nelts == 0) { return NGX_OK; } - /* only first worker does static pulling */ + ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module); - if (ngx_process_slot) { - return NGX_OK; - } + cnt = 0; lst = cmcf->listen.elts; @@ -1582,6 +1581,14 @@ ngx_rtmp_relay_init_process(ngx_cycle_t *cycle) pevent = racf->static_events.elts; for (k = 0; k < racf->static_events.nelts; ++k, ++pevent) { + + /* distribute among all workers */ + if ((cnt++ % ccf->worker_processes) != + (ngx_uint_t) ngx_rtmp_worker_id) + { + continue; + } + event = *pevent; rs = event->data;