diff --git a/ngx_rtmp_stat_module.c b/ngx_rtmp_stat_module.c index 82d2b5e..efed2e0 100644 --- a/ngx_rtmp_stat_module.c +++ b/ngx_rtmp_stat_module.c @@ -16,10 +16,23 @@ static ngx_int_t ngx_rtmp_stat_postconfiguration(ngx_conf_t *cf); static void * ngx_rtmp_stat_create_loc_conf(ngx_conf_t *cf); static char * ngx_rtmp_stat_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child); +static ngx_int_t ngx_rtmp_stat_init_process(ngx_cycle_t *cycle); +static void ngx_rtmp_stat_main(ngx_pool_t *pool, ngx_chain_t ***lll); static time_t start_time; +static ngx_str_t shm_name = ngx_string("rtmp_stat"); +static ngx_shm_zone_t *shm_zone; +static ngx_slab_pool_t *shm_pool; +static ngx_buf_t *shm_map; +static ngx_event_t shm_evt; +static ngx_connection_t shm_dummy_conn; +static size_t shm_size = 1024 * 1024; /*TODO:tune*/ + +#define shm_map_size (sizeof(ngx_buf_t) * NGX_MAX_PROCESSES) +#define shm_update_timeout 1000 + #define NGX_RTMP_STAT_ALL 0xff #define NGX_RTMP_STAT_GLOBAL 0x01 @@ -89,7 +102,7 @@ ngx_module_t ngx_rtmp_stat_module = { NGX_HTTP_MODULE, /* module type */ NULL, /* init master */ NULL, /* init module */ - NULL, /* init process */ + ngx_rtmp_stat_init_process, /* init process */ NULL, /* init thread */ NULL, /* exit thread */ NULL, /* exit process */ @@ -98,6 +111,123 @@ ngx_module_t ngx_rtmp_stat_module = { }; +static void +ngx_rtmp_stat_update(ngx_event_t *ev) +{ + ngx_buf_t *b; + ngx_pool_t *pool; + ngx_chain_t *l, *cl, **ll, ***lll; + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, ev->log, 0, + "stat: updating slot %d", + (ngx_int_t) ngx_process_slot); + + b = &shm_map[ngx_process_slot]; + b->pos = b->last = b->start; + + pool = ngx_create_pool(4096, ev->log); + if (pool == NULL) { + return; + } + + cl = NULL; + ll = &cl; + lll = ≪ + + ngx_rtmp_stat_main(pool, lll); + + for (l = cl; l; l = l->next) { + if (b->end - b->last < l->buf->last - l->buf->end) { + b->last = b->pos; + ngx_log_error(NGX_LOG_ERR, ev->log, 0, + "stat: not enough space for worker stat"); + goto next; + } + + b->last = ngx_cpymem(b->last, l->buf->pos, + (size_t) (l->buf->last - l->buf->pos)); + } + +next: + ngx_destroy_pool(pool); + + ngx_add_timer(ev, shm_update_timeout); +} + + +static ngx_int_t +ngx_rtmp_stat_init_process(ngx_cycle_t *cycle) +{ + ngx_buf_t *b; + size_t size; + ngx_core_conf_t *ccf; + + start_time = ngx_cached_time->sec; + + ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module); + if (ccf->worker_processes == 1) { + return NGX_OK; + } + + if (shm_map == NULL) { + ngx_log_error(NGX_LOG_ERR, cycle->log, 0, + "stat: NULL shm_map"); + return NGX_ERROR; + } + + b = &shm_map[ngx_process_slot]; + + if (b->start) { + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, cycle->log, 0, + "stat: slot %d already allocated", + (ngx_int_t) ngx_process_slot); + goto schedule; + } + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, cycle->log, 0, + "stat: allocating slot %d", + (ngx_int_t) ngx_process_slot); + + size = ((shm_size - shm_map_size) / ccf->worker_processes) & + ~(ngx_pagesize - 1); + + b->start= ngx_slab_alloc(shm_pool, size); + if (b->start == NULL) { + ngx_log_error(NGX_LOG_ERR, cycle->log, 0, + "stat: error allocating %d slot", + ngx_process_slot); + return NGX_ERROR; + } + + b->end = b->start + size; + +schedule: + b->pos = b->last = b->start; + + shm_evt.log = cycle->log; + shm_evt.data = &shm_dummy_conn; + shm_evt.handler = ngx_rtmp_stat_update; + ngx_add_timer(&shm_evt, shm_update_timeout); + + return NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_stat_shm_init(ngx_shm_zone_t *shm_zone, void *data) +{ + shm_pool = (ngx_slab_pool_t *) shm_zone->shm.addr; + + shm_map = ngx_slab_alloc(shm_pool, shm_map_size); + if (shm_map == NULL) { + return NGX_ERROR; + } + + ngx_memzero(shm_map, shm_map_size); + + return NGX_OK; +} + #define NGX_RTMP_STAT_BUFSIZE 256 @@ -106,7 +236,7 @@ ngx_module_t ngx_rtmp_stat_module = { * which are bad for xslt */ static void * -ngx_rtmp_stat_escape(ngx_http_request_t *r, void *data, size_t len) +ngx_rtmp_stat_escape(ngx_pool_t *pool, void *data, size_t len) { u_char *p, *np; void *new_data; @@ -124,7 +254,7 @@ ngx_rtmp_stat_escape(ngx_http_request_t *r, void *data, size_t len) return data; } - new_data = ngx_palloc(r->pool, len); + new_data = ngx_palloc(pool, len); if (new_data == NULL) { return NULL; } @@ -141,7 +271,7 @@ ngx_rtmp_stat_escape(ngx_http_request_t *r, void *data, size_t len) static void -ngx_rtmp_stat_output(ngx_http_request_t *r, ngx_chain_t ***lll, +ngx_rtmp_stat_output(ngx_pool_t *pool, ngx_chain_t ***lll, void *data, size_t len, ngx_uint_t escape) { ngx_chain_t *cl; @@ -153,7 +283,7 @@ ngx_rtmp_stat_output(ngx_http_request_t *r, ngx_chain_t ***lll, } if (escape) { - data = ngx_rtmp_stat_escape(r, data, len); + data = ngx_rtmp_stat_escape(pool, data, len); if (data == NULL) { return; } @@ -169,11 +299,11 @@ ngx_rtmp_stat_output(ngx_http_request_t *r, ngx_chain_t ***lll, } if (**lll == NULL) { - cl = ngx_alloc_chain_link(r->pool); + cl = ngx_alloc_chain_link(pool); if (cl == NULL) { return; } - b = ngx_create_temp_buf(r->pool, + b = ngx_create_temp_buf(pool, ngx_max(NGX_RTMP_STAT_BUFSIZE, real_len)); if (b == NULL || b->pos == NULL) { return; @@ -194,14 +324,16 @@ ngx_rtmp_stat_output(ngx_http_request_t *r, ngx_chain_t ***lll, /* These shortcuts assume 2 variables exist in current context: - * ngx_http_request_t *r + * ngx_pool_t *pool * ngx_chain_t ***lll */ /* plain data */ -#define NGX_RTMP_STAT(data, len) ngx_rtmp_stat_output(r, lll, data, len, 0) +#define NGX_RTMP_STAT(data, len) ngx_rtmp_stat_output(pool, lll, data, len,\ + 0) /* escaped data */ -#define NGX_RTMP_STAT_E(data, len) ngx_rtmp_stat_output(r, lll, data, len, 1) +#define NGX_RTMP_STAT_E(data, len) ngx_rtmp_stat_output(pool, lll, data, len,\ + 1) /* literal */ #define NGX_RTMP_STAT_L(s) NGX_RTMP_STAT((s), sizeof(s) - 1) @@ -220,7 +352,7 @@ ngx_rtmp_stat_output(ngx_http_request_t *r, ngx_chain_t ***lll, static void -ngx_rtmp_stat_bw(ngx_http_request_t *r, ngx_chain_t ***lll, +ngx_rtmp_stat_bw(ngx_pool_t *pool, ngx_chain_t ***lll, ngx_rtmp_bandwidth_t *bw_in, ngx_rtmp_bandwidth_t *bw_out) { u_char buf[NGX_OFF_T_LEN + 1]; @@ -274,8 +406,8 @@ ngx_rtmp_stat_get_pool_size(ngx_pool_t *pool, ngx_uint_t *nlarge, static void -ngx_rtmp_stat_dump_pool(ngx_http_request_t *r, ngx_chain_t ***lll, - ngx_pool_t *pool) +ngx_rtmp_stat_dump_pool(ngx_pool_t *pool, ngx_chain_t ***lll, + ngx_pool_t *p) { ngx_uint_t nlarge; size_t size; @@ -283,7 +415,7 @@ ngx_rtmp_stat_dump_pool(ngx_http_request_t *r, ngx_chain_t ***lll, size = 0; nlarge = 0; - ngx_rtmp_stat_get_pool_size(pool, &nlarge, &size); + ngx_rtmp_stat_get_pool_size(p, &nlarge, &size); NGX_RTMP_STAT_L(""); NGX_RTMP_STAT(buf, ngx_snprintf(buf, sizeof(buf), "%ui", nlarge) - buf); @@ -296,13 +428,13 @@ ngx_rtmp_stat_dump_pool(ngx_http_request_t *r, ngx_chain_t ***lll, static void -ngx_rtmp_stat_client(ngx_http_request_t *r, ngx_chain_t ***lll, +ngx_rtmp_stat_client(ngx_pool_t *pool, ngx_chain_t ***lll, ngx_rtmp_session_t *s) { u_char buf[NGX_OFF_T_LEN + 1]; #ifdef NGX_RTMP_POOL_DEBUG - ngx_rtmp_stat_dump_pool(r, lll, s->connection->pool); + ngx_rtmp_stat_dump_pool(pool, lll, s->connection->pool); #endif NGX_RTMP_STAT_L(""); NGX_RTMP_STAT(buf, ngx_snprintf(buf, sizeof(buf), "%ui", @@ -340,7 +472,7 @@ ngx_rtmp_stat_client(ngx_http_request_t *r, ngx_chain_t ***lll, static void -ngx_rtmp_stat_live(ngx_http_request_t *r, ngx_chain_t ***lll, +ngx_rtmp_stat_live(ngx_pool_t *pool, ngx_chain_t ***lll, ngx_rtmp_live_app_conf_t *lacf) { ngx_rtmp_live_stream_t *stream; @@ -350,15 +482,12 @@ ngx_rtmp_stat_live(ngx_http_request_t *r, ngx_chain_t ***lll, ngx_int_t n; size_t nclients, total_nclients; u_char buf[NGX_OFF_T_LEN + 1]; - ngx_rtmp_stat_loc_conf_t *slcf; u_char *cname; if (!lacf->live) { return; } - slcf = ngx_http_get_module_loc_conf(r, ngx_rtmp_stat_module); - NGX_RTMP_STAT_L("\r\n"); total_nclients = 0; @@ -375,42 +504,42 @@ ngx_rtmp_stat_live(ngx_http_request_t *r, ngx_chain_t ***lll, "%M", ngx_current_msec - stream->epoch) - buf); NGX_RTMP_STAT_L(""); - ngx_rtmp_stat_bw(r, lll, &stream->bw_in, &stream->bw_out); + ngx_rtmp_stat_bw(pool, lll, &stream->bw_in, &stream->bw_out); nclients = 0; codec = NULL; for (ctx = stream->ctx; ctx; ctx = ctx->next, ++nclients) { s = ctx->session; - if (slcf->stat & NGX_RTMP_STAT_CLIENTS) { - NGX_RTMP_STAT_L(""); - ngx_rtmp_stat_client(r, lll, s); + NGX_RTMP_STAT_L(""); - NGX_RTMP_STAT_L(""); + ngx_rtmp_stat_client(pool, lll, s); + + NGX_RTMP_STAT_L(""); + NGX_RTMP_STAT(buf, ngx_snprintf(buf, sizeof(buf), + "%uD/%uD", ctx->cs[1].dropped, + ctx->cs[0].dropped) - buf); + NGX_RTMP_STAT_L(""); + + NGX_RTMP_STAT_L(""); + if (!lacf->interleave) { NGX_RTMP_STAT(buf, ngx_snprintf(buf, sizeof(buf), - "%uD/%uD", ctx->cs[1].dropped, - ctx->cs[0].dropped) - buf); - NGX_RTMP_STAT_L(""); - - NGX_RTMP_STAT_L(""); - if (!lacf->interleave) { - NGX_RTMP_STAT(buf, ngx_snprintf(buf, sizeof(buf), - "%L", (int64_t) ctx->cs[1].timestamp - - (int64_t) ctx->cs[0].timestamp) - - buf); - } - NGX_RTMP_STAT_L(""); - - if (ctx->publishing) { - NGX_RTMP_STAT_L(""); - } - - if (ctx->active) { - NGX_RTMP_STAT_L(""); - } - - NGX_RTMP_STAT_L("\r\n"); + "%L", (int64_t) ctx->cs[1].timestamp - + (int64_t) ctx->cs[0].timestamp) + - buf); } + NGX_RTMP_STAT_L(""); + + if (ctx->publishing) { + NGX_RTMP_STAT_L(""); + } + + if (ctx->active) { + NGX_RTMP_STAT_L(""); + } + + NGX_RTMP_STAT_L("\r\n"); + if (ctx->publishing) { codec = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module); } @@ -480,7 +609,7 @@ ngx_rtmp_stat_live(ngx_http_request_t *r, ngx_chain_t ***lll, static void -ngx_rtmp_stat_play(ngx_http_request_t *r, ngx_chain_t ***lll, +ngx_rtmp_stat_play(ngx_pool_t *pool, ngx_chain_t ***lll, ngx_rtmp_play_app_conf_t *pacf) { ngx_rtmp_play_ctx_t *ctx, *sctx; @@ -488,14 +617,11 @@ ngx_rtmp_stat_play(ngx_http_request_t *r, ngx_chain_t ***lll, ngx_uint_t n; size_t nclients, total_nclients; u_char buf[NGX_OFF_T_LEN + 1]; - ngx_rtmp_stat_loc_conf_t *slcf; if (pacf->root.len == 0 && pacf->url == NULL) { return; } - slcf = ngx_http_get_module_loc_conf(r, ngx_rtmp_stat_module); - NGX_RTMP_STAT_L("\r\n"); total_nclients = 0; @@ -517,13 +643,11 @@ ngx_rtmp_stat_play(ngx_http_request_t *r, ngx_chain_t ***lll, ++nclients; s = ctx->session; - if (slcf->stat & NGX_RTMP_STAT_CLIENTS) { - NGX_RTMP_STAT_L(""); + NGX_RTMP_STAT_L(""); - ngx_rtmp_stat_client(r, lll, s); + ngx_rtmp_stat_client(pool, lll, s); - NGX_RTMP_STAT_L("\r\n"); - } + NGX_RTMP_STAT_L("\r\n"); } total_nclients += nclients; @@ -547,34 +671,25 @@ ngx_rtmp_stat_play(ngx_http_request_t *r, ngx_chain_t ***lll, static void -ngx_rtmp_stat_application(ngx_http_request_t *r, ngx_chain_t ***lll, +ngx_rtmp_stat_application(ngx_pool_t *pool, ngx_chain_t ***lll, ngx_rtmp_core_app_conf_t *cacf) { - ngx_rtmp_stat_loc_conf_t *slcf; - NGX_RTMP_STAT_L("\r\n"); NGX_RTMP_STAT_L(""); NGX_RTMP_STAT_ES(&cacf->name); NGX_RTMP_STAT_L("\r\n"); - slcf = ngx_http_get_module_loc_conf(r, ngx_rtmp_stat_module); - - if (slcf->stat & NGX_RTMP_STAT_LIVE) { - ngx_rtmp_stat_live(r, lll, - cacf->app_conf[ngx_rtmp_live_module.ctx_index]); - } - - if (slcf->stat & NGX_RTMP_STAT_PLAY) { - ngx_rtmp_stat_play(r, lll, - cacf->app_conf[ngx_rtmp_play_module.ctx_index]); - } + ngx_rtmp_stat_live(pool, lll, + cacf->app_conf[ngx_rtmp_live_module.ctx_index]); + ngx_rtmp_stat_play(pool, lll, + cacf->app_conf[ngx_rtmp_play_module.ctx_index]); NGX_RTMP_STAT_L("\r\n"); } static void -ngx_rtmp_stat_server(ngx_http_request_t *r, ngx_chain_t ***lll, +ngx_rtmp_stat_server(ngx_pool_t *pool, ngx_chain_t ***lll, ngx_rtmp_core_srv_conf_t *cscf) { ngx_rtmp_core_app_conf_t **cacf; @@ -583,29 +698,93 @@ ngx_rtmp_stat_server(ngx_http_request_t *r, ngx_chain_t ***lll, NGX_RTMP_STAT_L("\r\n"); #ifdef NGX_RTMP_POOL_DEBUG - ngx_rtmp_stat_dump_pool(r, lll, cscf->pool); + ngx_rtmp_stat_dump_pool(pool, lll, cscf->pool); #endif cacf = cscf->applications.elts; for (n = 0; n < cscf->applications.nelts; ++n, ++cacf) { - ngx_rtmp_stat_application(r, lll, *cacf); + ngx_rtmp_stat_application(pool, lll, *cacf); } NGX_RTMP_STAT_L("\r\n"); } +static void +ngx_rtmp_stat_main(ngx_pool_t *pool, ngx_chain_t ***lll) +{ + ngx_rtmp_core_main_conf_t *cmcf = ngx_rtmp_core_main_conf; + ngx_rtmp_core_srv_conf_t **cscf; + size_t n; + static u_char nbuf[NGX_OFF_T_LEN + 1]; + static u_char tbuf[NGX_TIME_T_LEN + 1]; + + NGX_RTMP_STAT_L(""); + NGX_RTMP_STAT(tbuf, ngx_snprintf(tbuf, sizeof(tbuf), + "%T", ngx_cached_time->sec - start_time) - tbuf); + NGX_RTMP_STAT_L("\r\n"); + + NGX_RTMP_STAT_L(""); + NGX_RTMP_STAT(nbuf, ngx_snprintf(nbuf, sizeof(nbuf), + "%ui", (ngx_uint_t) ngx_getpid()) - nbuf); + NGX_RTMP_STAT_L("\r\n"); + + + ngx_rtmp_stat_bw(pool, lll, &ngx_rtmp_bw_in, &ngx_rtmp_bw_out); + + cscf = cmcf->servers.elts; + for (n = 0; n < cmcf->servers.nelts; ++n, ++cscf) { + ngx_rtmp_stat_server(pool, lll, *cscf); + } +} + + +static void +ngx_rtmp_stat_get(ngx_pool_t *pool, ngx_chain_t ***lll) +{ + ngx_buf_t *b; + ngx_uint_t n; + static u_char nbuf[NGX_OFF_T_LEN + 1]; + + b = &shm_map[ngx_process_slot]; + + if (b->start == NULL) { + /* single-worker */ + NGX_RTMP_STAT_L("\r\n"); + ngx_rtmp_stat_main(pool, lll); + NGX_RTMP_STAT_L("\r\n"); + return; + } + + /* multi-worker */ + b = &shm_map[0]; + for (n = 0; n < NGX_MAX_PROCESSES; ++n, ++b) { + if (b->start == NULL) { + continue; + } + + NGX_RTMP_STAT_L("\r\n"); + + NGX_RTMP_STAT_L(""); + NGX_RTMP_STAT(nbuf, ngx_snprintf(nbuf, sizeof(nbuf), "%ui", n + 1) + - nbuf); + NGX_RTMP_STAT_L("\r\n"); + + NGX_RTMP_STAT(b->pos, b->last - b->pos); + NGX_RTMP_STAT_L("\r\n"); + } +} + + static ngx_int_t ngx_rtmp_stat_handler(ngx_http_request_t *r) { ngx_rtmp_stat_loc_conf_t *slcf; - ngx_rtmp_core_main_conf_t *cmcf; - ngx_rtmp_core_srv_conf_t **cscf; ngx_chain_t *cl, *l, **ll, ***lll; - size_t n; + ngx_pool_t *pool; off_t len; - static u_char tbuf[NGX_TIME_T_LEN + 1]; - static u_char nbuf[NGX_OFF_T_LEN + 1]; + + pool = r->pool; r->keepalive = 0; slcf = ngx_http_get_module_loc_conf(r, ngx_rtmp_stat_module); @@ -613,8 +792,7 @@ ngx_rtmp_stat_handler(ngx_http_request_t *r) return NGX_DECLINED; } - cmcf = ngx_rtmp_core_main_conf; - if (cmcf == NULL) { + if (ngx_rtmp_core_main_conf == NULL) { goto error; } @@ -640,22 +818,7 @@ ngx_rtmp_stat_handler(ngx_http_request_t *r) #endif NGX_RTMP_STAT_L("" __DATE__ " " __TIME__ "\r\n"); - NGX_RTMP_STAT_L(""); - NGX_RTMP_STAT(nbuf, ngx_snprintf(nbuf, sizeof(nbuf), - "%ui", (ngx_uint_t) ngx_getpid()) - nbuf); - NGX_RTMP_STAT_L("\r\n"); - - NGX_RTMP_STAT_L(""); - NGX_RTMP_STAT(tbuf, ngx_snprintf(tbuf, sizeof(tbuf), - "%T", ngx_cached_time->sec - start_time) - tbuf); - NGX_RTMP_STAT_L("\r\n"); - - ngx_rtmp_stat_bw(r, lll, &ngx_rtmp_bw_in, &ngx_rtmp_bw_out); - - cscf = cmcf->servers.elts; - for (n = 0; n < cmcf->servers.nelts; ++n, ++cscf) { - ngx_rtmp_stat_server(r, lll, *cscf); - } + ngx_rtmp_stat_get(r->pool, lll); NGX_RTMP_STAT_L("\r\n"); @@ -720,8 +883,15 @@ ngx_rtmp_stat_postconfiguration(ngx_conf_t *cf) } *h = ngx_rtmp_stat_handler; - start_time = ngx_cached_time->sec; + if (shm_zone == NULL) { + shm_zone = ngx_shared_memory_add(cf, &shm_name, shm_size + + ngx_pagesize, &ngx_rtmp_stat_module); + if (shm_zone == NULL) { + return NGX_ERROR; + } + + shm_zone->init = ngx_rtmp_stat_shm_init; + } return NGX_OK; } - diff --git a/stat.xsl b/stat.xsl index bc5e586..4cf8405 100644 --- a/stat.xsl +++ b/stat.xsl @@ -13,7 +13,6 @@
Generated by NGINX RTMP module, NGINX , - pid , built   @@ -35,23 +34,28 @@ State Time - - - - - - - - - - - - - - + + + + + Worker #, pid + + + + + + + + + + + + + + @@ -101,7 +105,7 @@ - var d=document.getElementById('-'); + var d=document.getElementById('--'); d.style.display=d.style.display=='none'?'':'none'; return false @@ -133,7 +137,7 @@ - - + --