more updates in relay module

This commit is contained in:
Roman Arutyunyan
2012-11-21 07:31:54 +04:00
parent f5580e1652
commit 32dfa94e9a
2 changed files with 135 additions and 180 deletions

View File

@ -17,11 +17,11 @@ static ngx_rtmp_leave_room_pt next_leave_room;
static ngx_int_t ngx_rtmp_relay_postconfiguration(ngx_conf_t *cf);
static void * ngx_rtmp_relay_create_app_conf(ngx_conf_t *cf);
static char * ngx_rtmp_relay_merge_app_conf(ngx_conf_t *cf,
void *parent, void *child);
void *parent, void *child);
static char * ngx_rtmp_relay_push_pull(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
void *conf);
static ngx_int_t ngx_rtmp_relay_publish(ngx_rtmp_session_t *s,
ngx_rtmp_publish_t *v);
ngx_rtmp_publish_t *v);
/* _____
@ -39,12 +39,9 @@ static ngx_int_t ngx_rtmp_relay_publish(ngx_rtmp_session_t *s,
typedef struct {
ngx_array_t pulls; /* ngx_rtmp_relay_target_t * */
ngx_array_t pushes; /* ngx_rtmp_relay_target_t * */
ngx_log_t *log;
ngx_uint_t nbuckets;
ngx_array_t relays; /* ngx_rtmp_relay_target_t * */
ngx_msec_t buflen;
ngx_msec_t push_reconnect;
ngx_msec_t sync_timeout;
ngx_rtmp_relay_ctx_t **ctx;
} ngx_rtmp_relay_app_conf_t;
@ -89,7 +86,7 @@ static ngx_command_t ngx_rtmp_relay_commands[] = {
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_RTMP_APP_CONF_OFFSET,
offsetof(ngx_rtmp_relay_app_conf_t, push_reconnect),
offsetof(ngx_rtmp_relay_app_conf_t, sync_timeout),
NULL },
@ -135,14 +132,9 @@ ngx_rtmp_relay_create_app_conf(ngx_conf_t *cf)
return NULL;
}
ngx_array_init(&racf->pushes, cf->pool, 1,
sizeof(ngx_rtmp_relay_target_t *));
ngx_array_init(&racf->pulls, cf->pool, 1,
sizeof(ngx_rtmp_relay_target_t *));
racf->nbuckets = 1024;
racf->log = &cf->cycle->new_log;
ngx_array_init(&racf->relays, cf->pool, 1, sizeof(void *));
racf->buflen = NGX_CONF_UNSET;
racf->push_reconnect = NGX_CONF_UNSET;
racf->sync_timeout = NGX_CONF_UNSET;
return racf;
}
@ -154,33 +146,36 @@ ngx_rtmp_relay_merge_app_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_rtmp_relay_app_conf_t *prev = parent;
ngx_rtmp_relay_app_conf_t *conf = child;
conf->ctx = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_relay_ctx_t *)
* conf->nbuckets);
ngx_conf_merge_msec_value(conf->buflen, prev->buflen, 5000);
ngx_conf_merge_msec_value(conf->push_reconnect, prev->push_reconnect,
3000);
ngx_conf_merge_msec_value(conf->sync_timeout, prev->sync_timeout, 3000);
return NGX_CONF_OK;
}
static void
ngx_rtmp_relay_reconnect(ngx_event_t *ev)
ngx_rtmp_relay_sync(ngx_event_t *ev)
{
ngx_rtmp_session_t *s = ev->data;
ngx_rtmp_room_t *r = ev->data;
ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_ctx_t *ctx, *pctx;
ngx_uint_t n;
ngx_rtmp_relay_target_t *target, **t;
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"relay: push reconnect");
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
if (ev->timer_set) {
ngx_del_timer(ev);
}
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ev->prev) {
ngx_delete_posted_event(ev);
}
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, r->log, 0, "relay: sync");
racf = ngx_rtmp_get_module_app_conf(r, ngx_rtmp_relay_module);
ctx = ngx_rtmp_get_module_ctx(r, ngx_rtmp_relay_module);
if (ctx == NULL) {
return;
}
@ -221,6 +216,11 @@ ngx_rtmp_relay_reconnect(ngx_event_t *ev)
ngx_add_timer(&ctx->push_evt, racf->push_reconnect);
}
}
return;
again:
ngx_add_timer(ev, racf->sync_timeout);
}
@ -261,10 +261,9 @@ ngx_rtmp_relay_copy_str(ngx_pool_t *pool, ngx_str_t *dst, ngx_str_t *src)
static ngx_int_t
ngx_rtmp_relay_create(ngx_rtmp_room_t *r, ngx_uint_t nrelay)
{
ngx_rtmp_relay_target_t *relay;
ngx_rtmp_relay_target_t *target;
ngx_rtmp_relay_room_ctx_t *rctx;
ngx_rtmp_relay_ctx_t *rctx;
ngx_rtmp_relay_ctx_t *ctx;
ngx_rtmp_addr_conf_t *addr_conf;
ngx_rtmp_conf_ctx_t *addr_ctx;
ngx_rtmp_session_t *rs;
@ -279,8 +278,12 @@ ngx_rtmp_relay_create(ngx_rtmp_room_t *r, ngx_uint_t nrelay)
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"relay: create");
rctx
rctx = ngx_rtmp_get_module_ctx(r, ngx_rtmp_relay_module);
if (rctx == NULL || rctx->relays.nelts == 0) {
return NGX_OK;
}
target = rctx->relays[nrelay];
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
@ -290,13 +293,13 @@ ngx_rtmp_relay_create(ngx_rtmp_room_t *r, ngx_uint_t nrelay)
return NULL;
}
rctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_relay_ctx_t));
if (rctx == NULL) {
ctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_relay_ctx_t));
if (ctx == NULL) {
goto clear;
}
if (ngx_rtmp_relay_copy_str(pool, &rctx->name, name) != NGX_OK ||
ngx_rtmp_relay_copy_str(pool, &rctx->url, &target->url.url) != NGX_OK)
if (ngx_rtmp_relay_copy_str(pool, &ctx->name, &rctx->name) != NGX_OK ||
ngx_rtmp_relay_copy_str(pool, &ctx->url, &target->url.url) != NGX_OK)
{
goto clear;
}
@ -428,125 +431,6 @@ clear:
}
static ngx_rtmp_relay_ctx_t *
ngx_rtmp_relay_create_local_ctx(ngx_rtmp_session_t *s, ngx_str_t *name,
ngx_rtmp_relay_target_t *target)
{
ngx_rtmp_relay_ctx_t *ctx;
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"relay: create local context");
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL) {
ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_relay_ctx_t));
if (ctx == NULL) {
return NULL;
}
ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_relay_module);
}
ctx->session = s;
ctx->push_evt.data = s;
ctx->push_evt.log = s->connection->log;
ctx->push_evt.handler = ngx_rtmp_relay_reconnect;
if (ctx->publish) {
return NULL;
}
if (ngx_rtmp_relay_copy_str(s->connection->pool, &ctx->name, name)
!= NGX_OK)
{
return NULL;
}
return ctx;
}
static ngx_int_t
ngx_rtmp_relay_create(ngx_rtmp_session_t *s, ngx_str_t *name,
ngx_rtmp_relay_target_t *target,
ngx_rtmp_relay_create_ctx_pt create_publish_ctx,
ngx_rtmp_relay_create_ctx_pt create_play_ctx)
{
ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_ctx_t *publish_ctx, *play_ctx, **cctx;
ngx_uint_t hash;
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
if (racf == NULL) {
return NGX_ERROR;
}
play_ctx = create_play_ctx(s, name, target);
if (play_ctx == NULL) {
return NGX_ERROR;
}
hash = ngx_hash_key(name->data, name->len);
cctx = &racf->ctx[hash % racf->nbuckets];
for (; *cctx; cctx = &(*cctx)->next) {
if ((*cctx)->name.len == name->len
&& !ngx_memcmp(name->data, (*cctx)->name.data,
name->len))
{
break;
}
}
if (*cctx) {
play_ctx->publish = (*cctx)->publish;
play_ctx->next = (*cctx)->play;
(*cctx)->play = play_ctx;
return NGX_OK;
}
publish_ctx = create_publish_ctx(s, name, target);
if (publish_ctx == NULL) {
ngx_rtmp_finalize_session(play_ctx->session);
return NGX_ERROR;
}
publish_ctx->publish = publish_ctx;
publish_ctx->play = play_ctx;
play_ctx->publish = publish_ctx;
*cctx = publish_ctx;
return NGX_OK;
}
ngx_int_t
ngx_rtmp_relay_pull(ngx_rtmp_session_t *s, ngx_str_t *name,
ngx_rtmp_relay_target_t *target)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"relay: create pull name='%V' app='%V' playpath='%V' url='%V'",
name, &target->app, &target->play_path, &target->url.url);
return ngx_rtmp_relay_create(s, name, target,
ngx_rtmp_relay_create_remote_ctx,
ngx_rtmp_relay_create_local_ctx);
}
ngx_int_t
ngx_rtmp_relay_push(ngx_rtmp_session_t *s, ngx_str_t *name,
ngx_rtmp_relay_target_t *target)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"relay: create push name='%V' app='%V' playpath='%V' url='%V'",
name, &target->app, &target->play_path, &target->url.url);
return ngx_rtmp_relay_create(s, name, target,
ngx_rtmp_relay_create_local_ctx,
ngx_rtmp_relay_create_remote_ctx);
}
static ngx_int_t
ngx_rtmp_relay_create_room(ngx_rtmp_room_t *r)
{
@ -605,6 +489,85 @@ next:
}
static ngx_int_t
ngx_rtmp_relay_delete_room(ngx_rtmp_room_t *r)
{
ngx_rtmp_relay_room_ctx_t *ctx;
ngx_event_t *e;
ctx = ngx_rtmp_get_module_ctx(r, ngx_rtmp_relay_module);
if (ctx == NULL) {
goto next;
}
/* unschedule sync event */
e = &r->sync_evt;
if (e->timer_set) {
ngx_del_timer(e);
}
if (e->prev) {
ngx_delete_posted_event(e);
}
next:
return next_delete_room(r);
}
static ngx_int_t
ngx_rtmp_join_room(ngx_rtmp_room_t *r, ngx_rtmp_session_t *s)
{
ngx_rtmp_room_ctx_t *room_ctx;
ngx_rtmp_relay_ctx_t *relay_ctx;
room_ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_room_module);
if (room_ctx == NULL) {
goto next;
}
relay_ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (relay_ctx == NULL) {
goto next;
}
/* all relays are weak */
room_ctx->weak = 1;
next:
return next_join_room(r, s);
}
static ngx_int_t
ngx_rtmp_leave_room(ngx_rtmp_room_t *r, ngx_rtmp_session_t *s)
{
ngx_rtmp_relay_ctx_t *relay_ctx;
ngx_rtmp_relay_room_ctx_t *ctx;
relay_ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (relay_ctx == NULL) {
goto next;
}
/* schedule relay reconnect */
ctx = ngx_rtmp_get_module_ctx(r, ngx_rtmp_relay_module);
ctx->states[];
ngx_rtmp_relay_sync(r);
ngx_rtmp_finalize_session(s);
next:
return next_leave_room(r, s);
}
static ngx_int_t
ngx_rtmp_relay_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v)
{
@ -1314,10 +1277,7 @@ ngx_rtmp_relay_push_pull(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
racf = ngx_rtmp_conf_get_module_app_conf(cf, ngx_rtmp_relay_module);
t = ngx_array_push(value[0].data[3] == 'h'
? &racf->pushes /* push */
: &racf->pulls /* pull */
);
t = ngx_array_push(&racf->relays);
if (t == NULL) {
return NGX_CONF_ERROR;
@ -1330,6 +1290,7 @@ ngx_rtmp_relay_push_pull(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
*t = target;
target->push = (value[0].data[3] == 'h');
target->tag = &ngx_rtmp_relay_module;
target->data = target;

View File

@ -23,42 +23,36 @@ typedef struct {
ngx_int_t start;
ngx_int_t stop;
unsigned push:1;
#if 0
void *tag; /* usually module reference */
void *data; /* module-specific data */
#endif
ngx_rtmp_relay_get_peer_pt *get_peer;
void *get_peer_arg;
} ngx_rtmp_relay_target_t;
typedef struct ngx_rtmp_relay_ctx_s ngx_rtmp_relay_ctx_t;
struct ngx_rtmp_relay_ctx_s {
ngx_str_t name;
ngx_str_t url;
ngx_log_t log;
ngx_rtmp_session_t *session;
ngx_rtmp_relay_ctx_t *publish;
ngx_rtmp_relay_ctx_t *play;
ngx_rtmp_relay_ctx_t *next;
unsigned relay:1;
ngx_rtmp_relay_target_t *relay;
ngx_str_t app;
ngx_str_t tc_url;
ngx_str_t page_url;
ngx_str_t swf_url;
ngx_str_t flash_ver;
ngx_str_t play_path;
ngx_int_t live;
ngx_int_t start;
ngx_int_t stop;
ngx_event_t push_evt;
void *tag;
void *data;
} ngx_rtmp_relay_ctx_t;
enum {
NGX_RTMP_RELAY_PENDING,
NGX_RTMP_RELAY_RUNNING,
NGX_RTMP_RELAY_STOPPED
};
typedef struct {
ngx_array_t relays; /* ngx_rtmp_relay_target_t */
ngx_array_t relays; /* ngx_rtmp_relay_target_t * */
ngx_array_t states; /* ngx_uint_t */
ngx_event_t sync_evt;
} ngx_rtmp_relay_room_ctx_t;