From 32dfa94e9a4d96d3eb24b3ff3a748976e11990bf Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Wed, 21 Nov 2012 07:31:54 +0400 Subject: [PATCH] more updates in relay module --- ngx_rtmp_relay_module.c | 279 +++++++++++++++++----------------------- ngx_rtmp_relay_module.h | 36 +++--- 2 files changed, 135 insertions(+), 180 deletions(-) diff --git a/ngx_rtmp_relay_module.c b/ngx_rtmp_relay_module.c index 9bd8d40..fc84ff5 100644 --- a/ngx_rtmp_relay_module.c +++ b/ngx_rtmp_relay_module.c @@ -17,11 +17,11 @@ static ngx_rtmp_leave_room_pt next_leave_room; static ngx_int_t ngx_rtmp_relay_postconfiguration(ngx_conf_t *cf); static void * ngx_rtmp_relay_create_app_conf(ngx_conf_t *cf); static char * ngx_rtmp_relay_merge_app_conf(ngx_conf_t *cf, - void *parent, void *child); + void *parent, void *child); static char * ngx_rtmp_relay_push_pull(ngx_conf_t *cf, ngx_command_t *cmd, - void *conf); + void *conf); static ngx_int_t ngx_rtmp_relay_publish(ngx_rtmp_session_t *s, - ngx_rtmp_publish_t *v); + ngx_rtmp_publish_t *v); /* _____ @@ -39,12 +39,9 @@ static ngx_int_t ngx_rtmp_relay_publish(ngx_rtmp_session_t *s, typedef struct { - ngx_array_t pulls; /* ngx_rtmp_relay_target_t * */ - ngx_array_t pushes; /* ngx_rtmp_relay_target_t * */ - ngx_log_t *log; - ngx_uint_t nbuckets; + ngx_array_t relays; /* ngx_rtmp_relay_target_t * */ ngx_msec_t buflen; - ngx_msec_t push_reconnect; + ngx_msec_t sync_timeout; ngx_rtmp_relay_ctx_t **ctx; } ngx_rtmp_relay_app_conf_t; @@ -89,7 +86,7 @@ static ngx_command_t ngx_rtmp_relay_commands[] = { NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_TAKE1, ngx_conf_set_msec_slot, NGX_RTMP_APP_CONF_OFFSET, - offsetof(ngx_rtmp_relay_app_conf_t, push_reconnect), + offsetof(ngx_rtmp_relay_app_conf_t, sync_timeout), NULL }, @@ -135,14 +132,9 @@ ngx_rtmp_relay_create_app_conf(ngx_conf_t *cf) return NULL; } - ngx_array_init(&racf->pushes, cf->pool, 1, - sizeof(ngx_rtmp_relay_target_t *)); - ngx_array_init(&racf->pulls, cf->pool, 1, - sizeof(ngx_rtmp_relay_target_t *)); - racf->nbuckets = 1024; - racf->log = &cf->cycle->new_log; + ngx_array_init(&racf->relays, cf->pool, 1, sizeof(void *)); racf->buflen = NGX_CONF_UNSET; - racf->push_reconnect = NGX_CONF_UNSET; + racf->sync_timeout = NGX_CONF_UNSET; return racf; } @@ -154,33 +146,36 @@ ngx_rtmp_relay_merge_app_conf(ngx_conf_t *cf, void *parent, void *child) ngx_rtmp_relay_app_conf_t *prev = parent; ngx_rtmp_relay_app_conf_t *conf = child; - conf->ctx = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_relay_ctx_t *) - * conf->nbuckets); - ngx_conf_merge_msec_value(conf->buflen, prev->buflen, 5000); - ngx_conf_merge_msec_value(conf->push_reconnect, prev->push_reconnect, - 3000); + ngx_conf_merge_msec_value(conf->sync_timeout, prev->sync_timeout, 3000); return NGX_CONF_OK; } static void -ngx_rtmp_relay_reconnect(ngx_event_t *ev) +ngx_rtmp_relay_sync(ngx_event_t *ev) { - ngx_rtmp_session_t *s = ev->data; + ngx_rtmp_room_t *r = ev->data; ngx_rtmp_relay_app_conf_t *racf; ngx_rtmp_relay_ctx_t *ctx, *pctx; ngx_uint_t n; ngx_rtmp_relay_target_t *target, **t; - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "relay: push reconnect"); - - racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module); + if (ev->timer_set) { + ngx_del_timer(ev); + } - ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module); + if (ev->prev) { + ngx_delete_posted_event(ev); + } + + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, r->log, 0, "relay: sync"); + + racf = ngx_rtmp_get_module_app_conf(r, ngx_rtmp_relay_module); + + ctx = ngx_rtmp_get_module_ctx(r, ngx_rtmp_relay_module); if (ctx == NULL) { return; } @@ -221,6 +216,11 @@ ngx_rtmp_relay_reconnect(ngx_event_t *ev) ngx_add_timer(&ctx->push_evt, racf->push_reconnect); } } + + return; + +again: + ngx_add_timer(ev, racf->sync_timeout); } @@ -261,10 +261,9 @@ ngx_rtmp_relay_copy_str(ngx_pool_t *pool, ngx_str_t *dst, ngx_str_t *src) static ngx_int_t ngx_rtmp_relay_create(ngx_rtmp_room_t *r, ngx_uint_t nrelay) { - ngx_rtmp_relay_target_t *relay; + ngx_rtmp_relay_target_t *target; ngx_rtmp_relay_room_ctx_t *rctx; - - ngx_rtmp_relay_ctx_t *rctx; + ngx_rtmp_relay_ctx_t *ctx; ngx_rtmp_addr_conf_t *addr_conf; ngx_rtmp_conf_ctx_t *addr_ctx; ngx_rtmp_session_t *rs; @@ -279,8 +278,12 @@ ngx_rtmp_relay_create(ngx_rtmp_room_t *r, ngx_uint_t nrelay) ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "relay: create"); - rctx + rctx = ngx_rtmp_get_module_ctx(r, ngx_rtmp_relay_module); + if (rctx == NULL || rctx->relays.nelts == 0) { + return NGX_OK; + } + target = rctx->relays[nrelay]; racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module); @@ -290,13 +293,13 @@ ngx_rtmp_relay_create(ngx_rtmp_room_t *r, ngx_uint_t nrelay) return NULL; } - rctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_relay_ctx_t)); - if (rctx == NULL) { + ctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_relay_ctx_t)); + if (ctx == NULL) { goto clear; } - if (ngx_rtmp_relay_copy_str(pool, &rctx->name, name) != NGX_OK || - ngx_rtmp_relay_copy_str(pool, &rctx->url, &target->url.url) != NGX_OK) + if (ngx_rtmp_relay_copy_str(pool, &ctx->name, &rctx->name) != NGX_OK || + ngx_rtmp_relay_copy_str(pool, &ctx->url, &target->url.url) != NGX_OK) { goto clear; } @@ -428,125 +431,6 @@ clear: } -static ngx_rtmp_relay_ctx_t * -ngx_rtmp_relay_create_local_ctx(ngx_rtmp_session_t *s, ngx_str_t *name, - ngx_rtmp_relay_target_t *target) -{ - ngx_rtmp_relay_ctx_t *ctx; - - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "relay: create local context"); - - ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module); - if (ctx == NULL) { - ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_relay_ctx_t)); - if (ctx == NULL) { - return NULL; - } - ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_relay_module); - } - ctx->session = s; - - ctx->push_evt.data = s; - ctx->push_evt.log = s->connection->log; - ctx->push_evt.handler = ngx_rtmp_relay_reconnect; - - if (ctx->publish) { - return NULL; - } - - if (ngx_rtmp_relay_copy_str(s->connection->pool, &ctx->name, name) - != NGX_OK) - { - return NULL; - } - - return ctx; -} - - -static ngx_int_t -ngx_rtmp_relay_create(ngx_rtmp_session_t *s, ngx_str_t *name, - ngx_rtmp_relay_target_t *target, - ngx_rtmp_relay_create_ctx_pt create_publish_ctx, - ngx_rtmp_relay_create_ctx_pt create_play_ctx) -{ - ngx_rtmp_relay_app_conf_t *racf; - ngx_rtmp_relay_ctx_t *publish_ctx, *play_ctx, **cctx; - ngx_uint_t hash; - - - racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module); - if (racf == NULL) { - return NGX_ERROR; - } - - play_ctx = create_play_ctx(s, name, target); - if (play_ctx == NULL) { - return NGX_ERROR; - } - - hash = ngx_hash_key(name->data, name->len); - cctx = &racf->ctx[hash % racf->nbuckets]; - for (; *cctx; cctx = &(*cctx)->next) { - if ((*cctx)->name.len == name->len - && !ngx_memcmp(name->data, (*cctx)->name.data, - name->len)) - { - break; - } - } - - if (*cctx) { - play_ctx->publish = (*cctx)->publish; - play_ctx->next = (*cctx)->play; - (*cctx)->play = play_ctx; - return NGX_OK; - } - - publish_ctx = create_publish_ctx(s, name, target); - if (publish_ctx == NULL) { - ngx_rtmp_finalize_session(play_ctx->session); - return NGX_ERROR; - } - - publish_ctx->publish = publish_ctx; - publish_ctx->play = play_ctx; - play_ctx->publish = publish_ctx; - *cctx = publish_ctx; - - return NGX_OK; -} - - -ngx_int_t -ngx_rtmp_relay_pull(ngx_rtmp_session_t *s, ngx_str_t *name, - ngx_rtmp_relay_target_t *target) -{ - ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, - "relay: create pull name='%V' app='%V' playpath='%V' url='%V'", - name, &target->app, &target->play_path, &target->url.url); - - return ngx_rtmp_relay_create(s, name, target, - ngx_rtmp_relay_create_remote_ctx, - ngx_rtmp_relay_create_local_ctx); -} - - -ngx_int_t -ngx_rtmp_relay_push(ngx_rtmp_session_t *s, ngx_str_t *name, - ngx_rtmp_relay_target_t *target) -{ - ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, - "relay: create push name='%V' app='%V' playpath='%V' url='%V'", - name, &target->app, &target->play_path, &target->url.url); - - return ngx_rtmp_relay_create(s, name, target, - ngx_rtmp_relay_create_local_ctx, - ngx_rtmp_relay_create_remote_ctx); -} - - static ngx_int_t ngx_rtmp_relay_create_room(ngx_rtmp_room_t *r) { @@ -605,6 +489,85 @@ next: } +static ngx_int_t +ngx_rtmp_relay_delete_room(ngx_rtmp_room_t *r) +{ + ngx_rtmp_relay_room_ctx_t *ctx; + ngx_event_t *e; + + ctx = ngx_rtmp_get_module_ctx(r, ngx_rtmp_relay_module); + if (ctx == NULL) { + goto next; + } + + /* unschedule sync event */ + + e = &r->sync_evt; + + if (e->timer_set) { + ngx_del_timer(e); + } + + if (e->prev) { + ngx_delete_posted_event(e); + } + +next: + return next_delete_room(r); +} + + +static ngx_int_t +ngx_rtmp_join_room(ngx_rtmp_room_t *r, ngx_rtmp_session_t *s) +{ + ngx_rtmp_room_ctx_t *room_ctx; + ngx_rtmp_relay_ctx_t *relay_ctx; + + room_ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_room_module); + if (room_ctx == NULL) { + goto next; + } + + relay_ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module); + if (relay_ctx == NULL) { + goto next; + } + + /* all relays are weak */ + + room_ctx->weak = 1; + +next: + return next_join_room(r, s); +} + + +static ngx_int_t +ngx_rtmp_leave_room(ngx_rtmp_room_t *r, ngx_rtmp_session_t *s) +{ + ngx_rtmp_relay_ctx_t *relay_ctx; + ngx_rtmp_relay_room_ctx_t *ctx; + + relay_ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module); + if (relay_ctx == NULL) { + goto next; + } + + /* schedule relay reconnect */ + + ctx = ngx_rtmp_get_module_ctx(r, ngx_rtmp_relay_module); + + ctx->states[]; + + ngx_rtmp_relay_sync(r); + + ngx_rtmp_finalize_session(s); + +next: + return next_leave_room(r, s); +} + + static ngx_int_t ngx_rtmp_relay_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v) { @@ -1314,10 +1277,7 @@ ngx_rtmp_relay_push_pull(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) racf = ngx_rtmp_conf_get_module_app_conf(cf, ngx_rtmp_relay_module); - t = ngx_array_push(value[0].data[3] == 'h' - ? &racf->pushes /* push */ - : &racf->pulls /* pull */ - ); + t = ngx_array_push(&racf->relays); if (t == NULL) { return NGX_CONF_ERROR; @@ -1330,6 +1290,7 @@ ngx_rtmp_relay_push_pull(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) *t = target; + target->push = (value[0].data[3] == 'h'); target->tag = &ngx_rtmp_relay_module; target->data = target; diff --git a/ngx_rtmp_relay_module.h b/ngx_rtmp_relay_module.h index ed33e0e..eb71bc6 100644 --- a/ngx_rtmp_relay_module.h +++ b/ngx_rtmp_relay_module.h @@ -23,42 +23,36 @@ typedef struct { ngx_int_t start; ngx_int_t stop; unsigned push:1; - +#if 0 void *tag; /* usually module reference */ void *data; /* module-specific data */ +#endif + ngx_rtmp_relay_get_peer_pt *get_peer; + void *get_peer_arg; } ngx_rtmp_relay_target_t; typedef struct ngx_rtmp_relay_ctx_s ngx_rtmp_relay_ctx_t; struct ngx_rtmp_relay_ctx_s { - ngx_str_t name; - ngx_str_t url; - ngx_log_t log; - ngx_rtmp_session_t *session; - ngx_rtmp_relay_ctx_t *publish; - ngx_rtmp_relay_ctx_t *play; - ngx_rtmp_relay_ctx_t *next; - unsigned relay:1; + ngx_rtmp_relay_target_t *relay; - ngx_str_t app; - ngx_str_t tc_url; - ngx_str_t page_url; - ngx_str_t swf_url; - ngx_str_t flash_ver; - ngx_str_t play_path; - ngx_int_t live; - ngx_int_t start; - ngx_int_t stop; - - ngx_event_t push_evt; void *tag; void *data; +} ngx_rtmp_relay_ctx_t; + + +enum { + NGX_RTMP_RELAY_PENDING, + NGX_RTMP_RELAY_RUNNING, + NGX_RTMP_RELAY_STOPPED }; typedef struct { - ngx_array_t relays; /* ngx_rtmp_relay_target_t */ + ngx_array_t relays; /* ngx_rtmp_relay_target_t * */ + ngx_array_t states; /* ngx_uint_t */ + ngx_event_t sync_evt; } ngx_rtmp_relay_room_ctx_t;