From 53fd032ae1ec93a289c19f15d22e6dbeb2900bed Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Mon, 25 Jun 2012 16:22:42 +0400 Subject: [PATCH 1/4] close_stream implementation --- ngx_rtmp_cmd_module.c | 101 +++++++++++++++++++++++++++++++++++++++-- ngx_rtmp_cmd_module.h | 8 ++++ ngx_rtmp_live_module.c | 10 ++-- 3 files changed, 110 insertions(+), 9 deletions(-) diff --git a/ngx_rtmp_cmd_module.c b/ngx_rtmp_cmd_module.c index 7976b80..62b71c1 100644 --- a/ngx_rtmp_cmd_module.c +++ b/ngx_rtmp_cmd_module.c @@ -15,6 +15,7 @@ ngx_rtmp_connect_pt ngx_rtmp_connect; ngx_rtmp_create_stream_pt ngx_rtmp_create_stream; +ngx_rtmp_close_stream_pt ngx_rtmp_close_stream; ngx_rtmp_delete_stream_pt ngx_rtmp_delete_stream; ngx_rtmp_publish_pt ngx_rtmp_publish; @@ -337,6 +338,91 @@ ngx_rtmp_cmd_create_stream(ngx_rtmp_session_t *s, ngx_rtmp_create_stream_t *v) } +static ngx_int_t +ngx_rtmp_cmd_close_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, + ngx_chain_t *in) +{ + static ngx_rtmp_close_stream_t v; + + static ngx_rtmp_amf_elt_t in_elts[] = { + + { NGX_RTMP_AMF_NUMBER, + ngx_null_string, + &v.stream, 0 }, + }; + + if (ngx_rtmp_receive_amf(s, in, in_elts, + sizeof(in_elts) / sizeof(in_elts[0]))) + { + return NGX_ERROR; + } + + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "closeStream"); + + return ngx_rtmp_close_stream + ? ngx_rtmp_close_stream(s, &v) + : NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_cmd_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v) +{ + ngx_rtmp_header_t h; + + static double trans; + + static ngx_rtmp_amf_elt_t out_inf[] = { + + { NGX_RTMP_AMF_STRING, + ngx_string("code"), + "NetStream.Play.Stop", 0 }, + + { NGX_RTMP_AMF_STRING, + ngx_string("level"), + "status", 0 }, + + { NGX_RTMP_AMF_STRING, + ngx_string("description"), + "Stopping.", 0 }, + }; + + static ngx_rtmp_amf_elt_t out_elts[] = { + + { NGX_RTMP_AMF_STRING, + ngx_null_string, + "onStatus", 0 }, + + { NGX_RTMP_AMF_NUMBER, + ngx_null_string, + &trans, 0 }, + + { NGX_RTMP_AMF_NULL, + ngx_null_string, + NULL, 0 }, + + { NGX_RTMP_AMF_OBJECT, + ngx_null_string, + out_inf, sizeof(out_inf) }, + }; + + /* send onStatus reply */ + memset(&h, 0, sizeof(h)); + h.type = NGX_RTMP_MSG_AMF_CMD; + h.csid = NGX_RTMP_CMD_CSID_AMF; + h.msid = NGX_RTMP_CMD_MSID; + + if (ngx_rtmp_send_amf(s, &h, out_elts, + sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK) + { + return NGX_ERROR; + } + + return NGX_OK; +} + + static ngx_int_t ngx_rtmp_cmd_delete_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) @@ -373,8 +459,13 @@ ngx_rtmp_cmd_delete_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, static ngx_int_t ngx_rtmp_cmd_delete_stream(ngx_rtmp_session_t *s, ngx_rtmp_delete_stream_t *v) { - /* TODO: send NetStream.Play.Stop? */ - return NGX_OK; + ngx_rtmp_close_stream_t cv; + + /* chain close_stream */ + cv.stream = 0; + return ngx_rtmp_close_stream + ? ngx_rtmp_close_stream(s, &cv) + : NGX_OK; } @@ -658,7 +749,7 @@ ngx_rtmp_cmd_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v) { NGX_RTMP_AMF_STRING, ngx_string("code"), - "NetStream.Play.Reset", 0 }, + "NetStream.Play.Start", 0 }, { NGX_RTMP_AMF_STRING, ngx_string("level"), @@ -666,7 +757,7 @@ ngx_rtmp_cmd_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v) { NGX_RTMP_AMF_STRING, ngx_string("description"), - "Playing and resetting.", 0 }, + "Playback started.", 0 }, }; static ngx_rtmp_amf_elt_t out_elts[] = { @@ -912,6 +1003,7 @@ static ngx_rtmp_amf_handler_t ngx_rtmp_cmd_map[] = { { ngx_string("connect"), ngx_rtmp_cmd_connect_init }, { ngx_string("createStream"), ngx_rtmp_cmd_create_stream_init }, + { ngx_string("closeStream"), ngx_rtmp_cmd_close_stream_init }, { ngx_string("deleteStream"), ngx_rtmp_cmd_delete_stream_init }, { ngx_string("publish"), ngx_rtmp_cmd_publish_init }, @@ -955,6 +1047,7 @@ ngx_rtmp_cmd_postconfiguration(ngx_conf_t *cf) /* set initial handlers */ ngx_rtmp_connect = ngx_rtmp_cmd_connect; ngx_rtmp_create_stream = ngx_rtmp_cmd_create_stream; + ngx_rtmp_close_stream = ngx_rtmp_cmd_close_stream; ngx_rtmp_delete_stream = ngx_rtmp_cmd_delete_stream; ngx_rtmp_publish = ngx_rtmp_cmd_publish; diff --git a/ngx_rtmp_cmd_module.h b/ngx_rtmp_cmd_module.h index 5332eac..2b8c61e 100644 --- a/ngx_rtmp_cmd_module.h +++ b/ngx_rtmp_cmd_module.h @@ -45,6 +45,11 @@ typedef struct { } ngx_rtmp_delete_stream_t; +typedef struct { + double stream; +} ngx_rtmp_close_stream_t; + + typedef struct { u_char name[NGX_RTMP_MAX_NAME]; u_char args[NGX_RTMP_MAX_ARGS]; @@ -77,6 +82,8 @@ typedef ngx_int_t (*ngx_rtmp_connect_pt)(ngx_rtmp_session_t *s, ngx_rtmp_connect_t *v); typedef ngx_int_t (*ngx_rtmp_create_stream_pt)(ngx_rtmp_session_t *s, ngx_rtmp_create_stream_t *v); +typedef ngx_int_t (*ngx_rtmp_close_stream_pt)(ngx_rtmp_session_t *s, + ngx_rtmp_close_stream_t *v); typedef ngx_int_t (*ngx_rtmp_delete_stream_pt)(ngx_rtmp_session_t *s, ngx_rtmp_delete_stream_t *v); @@ -97,6 +104,7 @@ typedef ngx_int_t (*ngx_rtmp_fcunsubscribe_pt)(ngx_rtmp_session_t *s, extern ngx_rtmp_connect_pt ngx_rtmp_connect; extern ngx_rtmp_create_stream_pt ngx_rtmp_create_stream; +extern ngx_rtmp_close_stream_pt ngx_rtmp_close_stream; extern ngx_rtmp_delete_stream_pt ngx_rtmp_delete_stream; extern ngx_rtmp_publish_pt ngx_rtmp_publish; diff --git a/ngx_rtmp_live_module.c b/ngx_rtmp_live_module.c index 0d4c880..dee15e0 100644 --- a/ngx_rtmp_live_module.c +++ b/ngx_rtmp_live_module.c @@ -10,7 +10,7 @@ static ngx_rtmp_publish_pt next_publish; static ngx_rtmp_play_pt next_play; -static ngx_rtmp_delete_stream_pt next_delete_stream; +static ngx_rtmp_close_stream_pt next_close_stream; static ngx_int_t ngx_rtmp_live_postconfiguration(ngx_conf_t *cf); @@ -214,7 +214,7 @@ ngx_rtmp_live_join(ngx_rtmp_session_t *s, u_char *name, static ngx_int_t -ngx_rtmp_live_delete_stream(ngx_rtmp_session_t *s, ngx_rtmp_delete_stream_t *v) +ngx_rtmp_live_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v) { ngx_rtmp_live_ctx_t *ctx, **cctx; ngx_rtmp_live_stream_t **stream; @@ -271,7 +271,7 @@ ngx_rtmp_live_delete_stream(ngx_rtmp_session_t *s, ngx_rtmp_delete_stream_t *v) ctx->stream = NULL; next: - return next_delete_stream(s, v); + return next_close_stream(s, v); } @@ -499,8 +499,8 @@ ngx_rtmp_live_postconfiguration(ngx_conf_t *cf) next_play = ngx_rtmp_play; ngx_rtmp_play = ngx_rtmp_live_play; - next_delete_stream = ngx_rtmp_delete_stream; - ngx_rtmp_delete_stream = ngx_rtmp_live_delete_stream; + next_close_stream = ngx_rtmp_close_stream; + ngx_rtmp_close_stream = ngx_rtmp_live_close_stream; return NGX_OK; } From 32ab32f1680f051bbd3911afd60e00a9e18f7bd3 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Mon, 25 Jun 2012 21:40:23 +0400 Subject: [PATCH 2/4] added stream begin/eof to play/closeStream --- ngx_rtmp_cmd_module.c | 56 ++++--------------------------------------- ngx_rtmp_receive.c | 2 +- 2 files changed, 6 insertions(+), 52 deletions(-) diff --git a/ngx_rtmp_cmd_module.c b/ngx_rtmp_cmd_module.c index 62b71c1..d9e7dcd 100644 --- a/ngx_rtmp_cmd_module.c +++ b/ngx_rtmp_cmd_module.c @@ -369,57 +369,7 @@ ngx_rtmp_cmd_close_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, static ngx_int_t ngx_rtmp_cmd_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v) { - ngx_rtmp_header_t h; - - static double trans; - - static ngx_rtmp_amf_elt_t out_inf[] = { - - { NGX_RTMP_AMF_STRING, - ngx_string("code"), - "NetStream.Play.Stop", 0 }, - - { NGX_RTMP_AMF_STRING, - ngx_string("level"), - "status", 0 }, - - { NGX_RTMP_AMF_STRING, - ngx_string("description"), - "Stopping.", 0 }, - }; - - static ngx_rtmp_amf_elt_t out_elts[] = { - - { NGX_RTMP_AMF_STRING, - ngx_null_string, - "onStatus", 0 }, - - { NGX_RTMP_AMF_NUMBER, - ngx_null_string, - &trans, 0 }, - - { NGX_RTMP_AMF_NULL, - ngx_null_string, - NULL, 0 }, - - { NGX_RTMP_AMF_OBJECT, - ngx_null_string, - out_inf, sizeof(out_inf) }, - }; - - /* send onStatus reply */ - memset(&h, 0, sizeof(h)); - h.type = NGX_RTMP_MSG_AMF_CMD; - h.csid = NGX_RTMP_CMD_CSID_AMF; - h.msid = NGX_RTMP_CMD_MSID; - - if (ngx_rtmp_send_amf(s, &h, out_elts, - sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK) - { - return NGX_ERROR; - } - - return NGX_OK; + return ngx_rtmp_send_user_stream_eof(s, NGX_RTMP_CMD_MSID); } @@ -863,6 +813,10 @@ ngx_rtmp_cmd_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v) h.csid = NGX_RTMP_CMD_CSID_AMF; h.msid = NGX_RTMP_CMD_MSID; + if (ngx_rtmp_send_user_stream_begin(s, NGX_RTMP_CMD_MSID) != NGX_OK) { + return NGX_ERROR; + } + if (ngx_rtmp_send_amf(s, &h, out_elts, sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK) { diff --git a/ngx_rtmp_receive.c b/ngx_rtmp_receive.c index a3f1c36..49cd803 100644 --- a/ngx_rtmp_receive.c +++ b/ngx_rtmp_receive.c @@ -135,7 +135,7 @@ ngx_rtmp_user_message_handler(ngx_rtmp_session_t *s, ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "msid=%uD buflen: %uD (msec)", val, arg); - ngx_rtmp_send_user_stream_begin(s, val); + /*ngx_rtmp_send_user_stream_begin(s, val);*/ } break; From 62c750ab8b74d77f2ec207b927e48053749ed266 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Mon, 25 Jun 2012 21:46:57 +0400 Subject: [PATCH 3/4] added resetting msg mask in close_stream --- ngx_rtmp_live_module.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ngx_rtmp_live_module.c b/ngx_rtmp_live_module.c index dee15e0..f40afd4 100644 --- a/ngx_rtmp_live_module.c +++ b/ngx_rtmp_live_module.c @@ -239,6 +239,8 @@ ngx_rtmp_live_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v) ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "live: leave '%s'", ctx->stream->name); + ctx->msg_mask = 0; + if (ctx->stream->flags & NGX_RTMP_LIVE_PUBLISHING && ctx->flags & NGX_RTMP_LIVE_PUBLISHING) { From d9579d627d727d8ad5edd2a0054f66324bebd459 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Thu, 28 Jun 2012 22:57:15 +0400 Subject: [PATCH 4/4] implemented returning OK from close stream handler sending stream EOF. Any other value can break destruction chain which leads to undestructed data --- ngx_rtmp_cmd_module.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ngx_rtmp_cmd_module.c b/ngx_rtmp_cmd_module.c index d9e7dcd..4d6508a 100644 --- a/ngx_rtmp_cmd_module.c +++ b/ngx_rtmp_cmd_module.c @@ -369,7 +369,11 @@ ngx_rtmp_cmd_close_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, static ngx_int_t ngx_rtmp_cmd_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v) { - return ngx_rtmp_send_user_stream_eof(s, NGX_RTMP_CMD_MSID); + ngx_rtmp_send_user_stream_eof(s, NGX_RTMP_CMD_MSID); + + /* Whatever happens return OK + * since we should be careful with destruction */ + return NGX_OK; }