diff --git a/ngx_rtmp_access_module.c b/ngx_rtmp_access_module.c index 6147877..e057152 100644 --- a/ngx_rtmp_access_module.c +++ b/ngx_rtmp_access_module.c @@ -9,6 +9,10 @@ #include "ngx_rtmp_cmd_module.h" +static ngx_rtmp_publish_pt next_publish; +static ngx_rtmp_play_pt next_play; + + #define NGX_RTMP_ACCESS_PUBLISH 0x01 #define NGX_RTMP_ACCESS_PLAY 0x02 @@ -389,47 +393,36 @@ ngx_rtmp_access_rule(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) static ngx_int_t -ngx_rtmp_access_connect(ngx_rtmp_session_t *s) +ngx_rtmp_access_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) { - return ngx_rtmp_access(s, NGX_RTMP_ACCESS_PUBLISH) == NGX_OK - || ngx_rtmp_access(s, NGX_RTMP_ACCESS_PLAY) == NGX_OK - ? NGX_OK - : NGX_ERROR; + if (ngx_rtmp_access(s, NGX_RTMP_ACCESS_PUBLISH) != NGX_OK) { + return NGX_ERROR; + } + + return next_publish(s, v); } static ngx_int_t -ngx_rtmp_access_publish(ngx_rtmp_session_t *s, - ngx_str_t *name, ngx_int_t type) +ngx_rtmp_access_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v) { - return ngx_rtmp_access(s, NGX_RTMP_ACCESS_PUBLISH); -} + if (ngx_rtmp_access(s, NGX_RTMP_ACCESS_PLAY) != NGX_OK) { + return NGX_ERROR; + } - -static ngx_int_t -ngx_rtmp_access_play(ngx_rtmp_session_t *s, - ngx_str_t *name, uint32_t start, uint32_t duration, ngx_int_t reset) -{ - return ngx_rtmp_access(s, NGX_RTMP_ACCESS_PLAY); + return next_play(s, v); } static ngx_int_t ngx_rtmp_access_postconfiguration(ngx_conf_t *cf) { - ngx_rtmp_cmd_main_conf_t *dmcf; - void *ch; + /* chain handlers */ + next_publish = ngx_rtmp_publish; + ngx_rtmp_publish = ngx_rtmp_access_publish; - dmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_cmd_module); - - ch = ngx_array_push(&dmcf->connect); - *(ngx_rtmp_cmd_connect_pt*)ch = ngx_rtmp_access_connect; - - ch = ngx_array_push(&dmcf->publish); - *(ngx_rtmp_cmd_publish_pt*)ch = ngx_rtmp_access_publish; - - ch = ngx_array_push(&dmcf->play); - *(ngx_rtmp_cmd_play_pt*)ch = ngx_rtmp_access_play; + next_play = ngx_rtmp_play; + ngx_rtmp_play = ngx_rtmp_access_play; return NGX_OK; } diff --git a/ngx_rtmp_cmd_module.c b/ngx_rtmp_cmd_module.c index b42fc78..a527f1f 100644 --- a/ngx_rtmp_cmd_module.c +++ b/ngx_rtmp_cmd_module.c @@ -8,39 +8,31 @@ #define NGX_RTMP_FMS_VERSION "FMS/3,0,1,123" #define NGX_RTMP_CAPABILITIES 31 +#define NGX_RTMP_CMD_CSID_AMF0_INI 3 #define NGX_RTMP_CMD_CSID_AMF0 5 +#define NGX_RTMP_CMD_MSID 1 + + +ngx_rtmp_connect_pt ngx_rtmp_connect; +ngx_rtmp_create_stream_pt ngx_rtmp_create_stream; +ngx_rtmp_delete_stream_pt ngx_rtmp_delete_stream; + +ngx_rtmp_publish_pt ngx_rtmp_publish; +ngx_rtmp_fcpublish_pt ngx_rtmp_fcpublish; +ngx_rtmp_fcunpublish_pt ngx_rtmp_fcunpublish; + +ngx_rtmp_play_pt ngx_rtmp_play; +ngx_rtmp_fcsubscribe_pt ngx_rtmp_fcsubscribe; +ngx_rtmp_fcunsubscribe_pt ngx_rtmp_fcunsubscribe; -static void * ngx_rtmp_cmd_create_main_conf(ngx_conf_t *cf); static ngx_int_t ngx_rtmp_cmd_postconfiguration(ngx_conf_t *cf); -static ngx_int_t ngx_rtmp_cmd_connect(ngx_rtmp_session_t *s, - ngx_rtmp_header_t *h, ngx_chain_t *in); -static ngx_int_t ngx_rtmp_cmd_create_stream(ngx_rtmp_session_t *s, - ngx_rtmp_header_t *h, ngx_chain_t *in); -static ngx_int_t ngx_rtmp_cmd_publish(ngx_rtmp_session_t *s, - ngx_rtmp_header_t *h, ngx_chain_t *in); -static ngx_int_t ngx_rtmp_cmd_play(ngx_rtmp_session_t *s, - ngx_rtmp_header_t *h, ngx_chain_t *in); -static ngx_int_t ngx_rtmp_cmd_close(ngx_rtmp_session_t *s, - ngx_rtmp_header_t *h, ngx_chain_t *in); -static ngx_int_t ngx_rtmp_cmd_disconnect(ngx_rtmp_session_t *s, - ngx_rtmp_header_t *h, ngx_chain_t *in); -static ngx_int_t ngx_rtmp_cmd_default(ngx_rtmp_session_t *s, - ngx_rtmp_header_t *h, ngx_chain_t *in); - - -static ngx_command_t ngx_rtmp_cmd_commands[] = { - - ngx_null_command -}; - - static ngx_rtmp_module_t ngx_rtmp_cmd_module_ctx = { NULL, /* preconfiguration */ ngx_rtmp_cmd_postconfiguration, /* postconfiguration */ - ngx_rtmp_cmd_create_main_conf, /* create main configuration */ + NULL, /* create main configuration */ NULL, /* init main configuration */ NULL, /* create server configuration */ NULL, /* merge server configuration */ @@ -52,7 +44,7 @@ static ngx_rtmp_module_t ngx_rtmp_cmd_module_ctx = { ngx_module_t ngx_rtmp_cmd_module = { NGX_MODULE_V1, &ngx_rtmp_cmd_module_ctx, /* module context */ - ngx_rtmp_cmd_commands, /* module directives */ + NULL, /* module directives */ NGX_RTMP_MODULE, /* module type */ NULL, /* init master */ NULL, /* init module */ @@ -65,142 +57,121 @@ ngx_module_t ngx_rtmp_cmd_module = { }; -static void * -ngx_rtmp_cmd_create_main_conf(ngx_conf_t *cf) -{ - ngx_rtmp_cmd_main_conf_t *cmcf; - - cmcf = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_cmd_main_conf_t)); - if (cmcf == NULL) { - return NULL; - } - -#define NGX_RTMP_CMD_INIT_ARRAY(name) \ - if (ngx_array_init(&cmcf->name, cf->pool, 1, sizeof(void *)) \ - != NGX_OK) \ - { \ - return NULL; \ - } - - NGX_RTMP_CMD_INIT_ARRAY(connect); - NGX_RTMP_CMD_INIT_ARRAY(publish); - NGX_RTMP_CMD_INIT_ARRAY(play); - NGX_RTMP_CMD_INIT_ARRAY(close); - -#undef NGX_RTMP_CMD_INIT_ARRAY - - return cmcf; -} - - -#define NGX_RTMP_CMD_I(name) \ - { \ - /* call handlers */ \ - ngx_rtmp_cmd_##name##_pt *h; \ - ngx_rtmp_cmd_main_conf_t *cmcf; \ - ngx_uint_t n; \ - \ - cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_cmd_module); \ - h = cmcf->name.elts; \ - for(n = 0; n < cmcf->name.nelts; ++n, ++h) { \ - if ((*h) - -#define NGX_RTMP_CMD_F \ - != NGX_OK) { \ - return NGX_ERROR; \ - } \ - } \ - } - - -static ngx_rtmp_amf0_handler_t ngx_rtmp_cmd_map[] = { - { ngx_string("connect"), ngx_rtmp_cmd_connect }, - { ngx_string("createStream"), ngx_rtmp_cmd_create_stream }, - { ngx_string("publish"), ngx_rtmp_cmd_publish }, - { ngx_string("play"), ngx_rtmp_cmd_play }, - { ngx_string("close"), ngx_rtmp_cmd_close }, - { ngx_string("releaseStream"), ngx_rtmp_cmd_close }, - { ngx_string("deleteStream"), ngx_rtmp_cmd_close }, - { ngx_string("closeStream"), ngx_rtmp_cmd_close }, - { ngx_string("FCPublish"), ngx_rtmp_cmd_default }, - { ngx_string("FCSubscribe"), ngx_rtmp_cmd_default }, -}; - - -ngx_int_t -ngx_rtmp_cmd_connect(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, +static ngx_int_t +ngx_rtmp_cmd_connect_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { - ngx_rtmp_core_srv_conf_t *cscf; - ngx_uint_t n; - ngx_rtmp_core_app_conf_t **cacfp; - size_t len; - - static double trans; - static double capabilities = NGX_RTMP_CAPABILITIES; - - static struct { - u_char app[1024]; - u_char flashver[1024]; - u_char swf_url[1024]; - u_char tc_url[1024]; - double acodecs; - double vcodecs; - u_char page_url[1024]; - } v; - + static ngx_rtmp_connect_t v; static ngx_rtmp_amf0_elt_t in_cmd[] = { - { NGX_RTMP_AMF0_STRING, "app", - v.app, sizeof(v.app) }, - { NGX_RTMP_AMF0_STRING, "flashver", - v.flashver, sizeof(v.flashver) }, - { NGX_RTMP_AMF0_STRING, "swfUrl", - v.swf_url, sizeof(v.swf_url) }, - { NGX_RTMP_AMF0_STRING, "tcUrl", - v.tc_url, sizeof(v.tc_url) }, - { NGX_RTMP_AMF0_NUMBER, "audioCodecs", - &v.acodecs, sizeof(v.acodecs) }, - { NGX_RTMP_AMF0_NUMBER, "videoCodecs", - &v.vcodecs, sizeof(v.vcodecs) }, - { NGX_RTMP_AMF0_STRING, "pageUrl", - v.page_url, sizeof(v.page_url) }, + + { NGX_RTMP_AMF0_STRING, + "app", + v.app, sizeof(v.app) }, + + { NGX_RTMP_AMF0_STRING, + "flashver", + v.flashver, sizeof(v.flashver) }, + + { NGX_RTMP_AMF0_STRING, + "swfUrl", + v.swf_url, sizeof(v.swf_url) }, + + { NGX_RTMP_AMF0_STRING, + "tcUrl", + v.tc_url, sizeof(v.tc_url) }, + + { NGX_RTMP_AMF0_NUMBER, + "audioCodecs", + &v.acodecs, sizeof(v.acodecs) }, + + { NGX_RTMP_AMF0_NUMBER, + "videoCodecs", + &v.vcodecs, sizeof(v.vcodecs) }, + + { NGX_RTMP_AMF0_STRING, + "pageUrl", + v.page_url, sizeof(v.page_url) }, }; static ngx_rtmp_amf0_elt_t in_elts[] = { - { NGX_RTMP_AMF0_NUMBER, 0, - &trans, 0 }, + + /* transaction in always 1 */ + { NGX_RTMP_AMF0_NUMBER, NULL, + NULL, 0 }, + { NGX_RTMP_AMF0_OBJECT, NULL, - in_cmd, sizeof(in_cmd) }, + in_cmd, sizeof(in_cmd) }, }; + ngx_memzero(&v, sizeof(v)); + if (ngx_rtmp_receive_amf0(s, in, in_elts, + sizeof(in_elts) / sizeof(in_elts[0]))) + { + return NGX_ERROR; + } + + return ngx_rtmp_connect + ? ngx_rtmp_connect(s, &v) + : NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_cmd_connect(ngx_rtmp_session_t *s, ngx_rtmp_connect_t *v) +{ + ngx_rtmp_core_srv_conf_t *cscf; + ngx_rtmp_core_app_conf_t **cacfp; + ngx_uint_t n; + size_t len; + ngx_rtmp_header_t h; + + static double trans = 1; + static double capabilities = NGX_RTMP_CAPABILITIES; + static ngx_rtmp_amf0_elt_t out_obj[] = { - { NGX_RTMP_AMF0_STRING, "fmsVer", - NGX_RTMP_FMS_VERSION, 0 }, - { NGX_RTMP_AMF0_NUMBER, "capabilities", - &capabilities, 0 }, + + { NGX_RTMP_AMF0_STRING, + "fmsVer", + NGX_RTMP_FMS_VERSION, 0 }, + + { NGX_RTMP_AMF0_NUMBER, + "capabilities", + &capabilities, 0 }, }; static ngx_rtmp_amf0_elt_t out_inf[] = { - { NGX_RTMP_AMF0_STRING, "level", - "status", 0 }, - { NGX_RTMP_AMF0_STRING, "code", - "NetConnection.Connect.Success", - 0 }, - { NGX_RTMP_AMF0_STRING, "description", - "Connection succeeded.", - 0 }, + + { NGX_RTMP_AMF0_STRING, + "level", + "status", 0 }, + + { NGX_RTMP_AMF0_STRING, + "code", + "NetConnection.Connect.Success", 0 }, + + { NGX_RTMP_AMF0_STRING, + "description", + "Connection succeeded.", 0 } }; static ngx_rtmp_amf0_elt_t out_elts[] = { - { NGX_RTMP_AMF0_STRING, NULL, - "_result", 0 }, - { NGX_RTMP_AMF0_NUMBER, NULL, - &trans, 0 }, - { NGX_RTMP_AMF0_OBJECT, NULL, - out_obj, sizeof(out_obj) }, - { NGX_RTMP_AMF0_OBJECT, NULL, - out_inf, sizeof(out_inf) }, + + { NGX_RTMP_AMF0_STRING, + NULL, + "_result", 0 }, + + { NGX_RTMP_AMF0_NUMBER, + NULL, + &trans, 0 }, + + { NGX_RTMP_AMF0_OBJECT, + NULL, + out_obj, sizeof(out_obj) }, + + { NGX_RTMP_AMF0_OBJECT, + NULL, + out_inf, sizeof(out_inf) }, }; if (s->connected) { @@ -211,28 +182,24 @@ ngx_rtmp_cmd_connect(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); - /* parse input */ - ngx_memzero(&v, sizeof(v)); - if (ngx_rtmp_receive_amf0(s, in, in_elts, - sizeof(in_elts) / sizeof(in_elts[0]))) - { - return NGX_ERROR; - } - ngx_log_debug7(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "connect: app='%s' flashver='%s' swf_url='%s' " "tc_url='%s' page_url='%s' acodecs=%uD vcodecs=%uD", - v.app, v.flashver, v.swf_url, v.tc_url, v.page_url, - (uint32_t)v.acodecs, (uint32_t)v.vcodecs); + v->app, v->flashver, v->swf_url, v->tc_url, v->page_url, + (uint32_t)v->acodecs, (uint32_t)v->vcodecs); /* fill session parameters */ s->connected = 1; + ngx_memzero(&h, sizeof(h)); + h.csid = NGX_RTMP_CMD_CSID_AMF0_INI; + h.type = NGX_RTMP_MSG_AMF0_CMD; + #define NGX_RTMP_SET_STRPAR(name) \ - s->name.len = ngx_strlen(v.name); \ + s->name.len = ngx_strlen(v->name); \ s->name.data = ngx_palloc(s->connection->pool, s->name.len); \ - ngx_memcpy(s->name.data, v.name, s->name.len) + ngx_memcpy(s->name.data, v->name, s->name.len) NGX_RTMP_SET_STRPAR(app); NGX_RTMP_SET_STRPAR(flashver); @@ -242,16 +209,16 @@ ngx_rtmp_cmd_connect(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, #undef NGX_RTMP_SET_STRPAR - s->acodecs = v.acodecs; - s->vcodecs = v.vcodecs; + s->acodecs = v->acodecs; + s->vcodecs = v->vcodecs; /* find application & set app_conf */ - len = ngx_strlen(v.app); + len = ngx_strlen(v->app); cacfp = cscf->applications.elts; for(n = 0; n < cscf->applications.nelts; ++n, ++cacfp) { if ((*cacfp)->name.len == len - && !ngx_strncmp((*cacfp)->name.data, v.app, len)) + && !ngx_strncmp((*cacfp)->name.data, v->app, len)) { /* found app! */ s->app_conf = (*cacfp)->app_conf; @@ -261,127 +228,87 @@ ngx_rtmp_cmd_connect(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, if (s->app_conf == NULL) { ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, - "connect: application not found: '%s'", v.app); + "connect: application not found: '%s'", v->app); return NGX_ERROR; } - /* call handlers */ - NGX_RTMP_CMD_I(connect) (s) NGX_RTMP_CMD_F; - /* send all replies */ - return ngx_rtmp_send_ack_size(s, cscf->ack_window) - || ngx_rtmp_send_bandwidth(s, cscf->ack_window, NGX_RTMP_LIMIT_DYNAMIC) - || ngx_rtmp_send_user_stream_begin(s, 0) - || ngx_rtmp_send_chunk_size(s, cscf->chunk_size) - || ngx_rtmp_send_amf0(s, h, out_elts, - sizeof(out_elts) / sizeof(out_elts[0])) + return ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK + || ngx_rtmp_send_bandwidth(s, cscf->ack_window, + NGX_RTMP_LIMIT_DYNAMIC) != NGX_OK + || ngx_rtmp_send_user_stream_begin(s, 0) != NGX_OK + || ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK + || ngx_rtmp_send_amf0(s, &h, out_elts, + sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK ? NGX_ERROR - : NGX_DONE; + : NGX_OK; } -ngx_int_t -ngx_rtmp_cmd_create_stream(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, +static ngx_int_t +ngx_rtmp_cmd_create_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { - /* support one message stream per connection */ - static double stream = 1; - static double trans; - + static ngx_rtmp_create_stream_t v; static ngx_rtmp_amf0_elt_t in_elts[] = { - { NGX_RTMP_AMF0_NUMBER, 0, - &trans, sizeof(trans) }, + + { NGX_RTMP_AMF0_NUMBER, + 0, + &v.trans, sizeof(v.trans) }, }; + if (ngx_rtmp_receive_amf0(s, in, in_elts, + sizeof(in_elts) / sizeof(in_elts[0]))) + { + return NGX_ERROR; + } + + return ngx_rtmp_create_stream + ? ngx_rtmp_create_stream(s, &v) + : NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_cmd_create_stream(ngx_rtmp_session_t *s, ngx_rtmp_create_stream_t *v) +{ + /* support one message stream per connection */ + static double stream; + static double trans; + ngx_rtmp_header_t h; + static ngx_rtmp_amf0_elt_t out_elts[] = { - { NGX_RTMP_AMF0_STRING, NULL, - "_result", 0 }, - { NGX_RTMP_AMF0_NUMBER, NULL, - &trans, 0 }, - { NGX_RTMP_AMF0_NULL , NULL, - NULL, 0 }, - { NGX_RTMP_AMF0_NUMBER, NULL, - &stream, sizeof(stream) }, + + { NGX_RTMP_AMF0_STRING, + NULL, + "_result", 0 }, + + { NGX_RTMP_AMF0_NUMBER, + NULL, + &trans, 0 }, + + { NGX_RTMP_AMF0_NULL, + NULL, + NULL, 0 }, + + { NGX_RTMP_AMF0_NUMBER, + NULL, + &stream, sizeof(stream) }, }; + trans = v->trans; + stream = NGX_RTMP_CMD_MSID; + + ngx_memzero(&h, sizeof(h)); + h.csid = NGX_RTMP_CMD_CSID_AMF0_INI; + h.type = NGX_RTMP_MSG_AMF0_CMD; + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "createStream"); - /* parse input */ - if (ngx_rtmp_receive_amf0(s, in, in_elts, - sizeof(in_elts) / sizeof(in_elts[0]))) - { - return NGX_ERROR; - } - /* send result with standard stream */ - return ngx_rtmp_send_amf0(s, h, out_elts, - sizeof(out_elts) / sizeof(out_elts[0])) == NGX_OK - ? NGX_DONE - : NGX_ERROR; -} - - -ngx_int_t -ngx_rtmp_cmd_disconnect(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, - ngx_chain_t *in) -{ - /* call handlers */ - NGX_RTMP_CMD_I(close) (s) NGX_RTMP_CMD_F; - - return NGX_OK; -} - - -ngx_int_t -ngx_rtmp_cmd_close(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, - ngx_chain_t *in) -{ - ngx_rtmp_cmd_disconnect(s, h, in); - - return ngx_rtmp_cmd_default(s, h, in); -} - - -ngx_int_t -ngx_rtmp_cmd_default(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, - ngx_chain_t *in) -{ - ngx_rtmp_header_t sh; - - static double trans; - - static ngx_rtmp_amf0_elt_t in_elts[] = { - { NGX_RTMP_AMF0_NUMBER, 0, - &trans, sizeof(trans) }, - }; - - static ngx_rtmp_amf0_elt_t out_elts[] = { - { NGX_RTMP_AMF0_STRING, NULL, - "_result", 0 }, - { NGX_RTMP_AMF0_NUMBER, NULL, - &trans, 0 }, - { NGX_RTMP_AMF0_NULL, NULL, - NULL, 0 }, - { NGX_RTMP_AMF0_NULL, NULL, - NULL, 0 }, - }; - - /* parse input */ - if (ngx_rtmp_receive_amf0(s, in, in_elts, - sizeof(in_elts) / sizeof(in_elts[0]))) - { - return NGX_ERROR; - } - - memset(&sh, 0, sizeof(sh)); - sh.csid = h->csid; - sh.type = NGX_RTMP_MSG_AMF0_CMD; - sh.msid = 0; - - /* send simple _result */ - return ngx_rtmp_send_amf0(s, &sh, out_elts, + return ngx_rtmp_send_amf0(s, &h, out_elts, sizeof(out_elts) / sizeof(out_elts[0])) == NGX_OK ? NGX_DONE : NGX_ERROR; @@ -389,53 +316,71 @@ ngx_rtmp_cmd_default(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, static ngx_int_t -ngx_rtmp_cmd_publish(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, +ngx_rtmp_cmd_delete_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { - ngx_rtmp_header_t sh; - ngx_str_t stream; - ngx_int_t type; + static ngx_rtmp_delete_stream_t v; - static double trans; + static ngx_rtmp_amf0_elt_t in_elts[] = { - static struct { - u_char name[1024]; - u_char type[1024]; - } v; + { NGX_RTMP_AMF0_NUMBER, + 0, + NULL, 0 }, + { NGX_RTMP_AMF0_NULL, + 0, + NULL, 0 }, + + { NGX_RTMP_AMF0_NUMBER, + 0, + &v.stream, 0 }, + }; + + if (ngx_rtmp_receive_amf0(s, in, in_elts, + sizeof(in_elts) / sizeof(in_elts[0]))) + { + return NGX_ERROR; + } + + return ngx_rtmp_delete_stream + ? ngx_rtmp_delete_stream(s, &v) + : NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_cmd_delete_stream(ngx_rtmp_session_t *s, ngx_rtmp_delete_stream_t *v) +{ + /* TODO: send NetStream.Play.Stop? */ + return NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_cmd_publish_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, + ngx_chain_t *in) +{ + static ngx_rtmp_publish_t v; static ngx_rtmp_amf0_elt_t in_elts[] = { - { NGX_RTMP_AMF0_NUMBER, 0, - &trans, 0 }, - { NGX_RTMP_AMF0_NULL, 0, - NULL, 0 }, - { NGX_RTMP_AMF0_STRING, 0, - &v.name, sizeof(v.name) }, - { NGX_RTMP_AMF0_STRING, 0, - &v.type, sizeof(v.type) }, - }; - static ngx_rtmp_amf0_elt_t out_inf[] = { - { NGX_RTMP_AMF0_STRING, "code", - "NetStream.Publish.Start", - 0 }, - { NGX_RTMP_AMF0_STRING, "level", - "status", 0 }, - { NGX_RTMP_AMF0_STRING, "description", - "Publish succeeded.", 0 }, - }; + /* transaction is always 0 */ + { NGX_RTMP_AMF0_NUMBER, + 0, + NULL, 0 }, - static ngx_rtmp_amf0_elt_t out_elts[] = { - { NGX_RTMP_AMF0_STRING, NULL, - "onStatus", 0 }, - { NGX_RTMP_AMF0_NUMBER, NULL, - &trans, 0 }, - { NGX_RTMP_AMF0_NULL , NULL, - NULL, 0 }, - { NGX_RTMP_AMF0_OBJECT, NULL, - out_inf, sizeof(out_inf) }, - }; + { NGX_RTMP_AMF0_NULL, + 0, + NULL, 0 }, + { NGX_RTMP_AMF0_STRING, + 0, + &v.name, sizeof(v.name) }, + + { NGX_RTMP_AMF0_STRING, + 0, + &v.type, sizeof(v.type) }, + }; ngx_memzero(&v, sizeof(v)); @@ -446,26 +391,56 @@ ngx_rtmp_cmd_publish(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, return NGX_ERROR; } + return ngx_rtmp_publish + ? ngx_rtmp_publish(s, &v) + : NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_cmd_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) +{ + ngx_rtmp_header_t h; + + static double trans; + + static ngx_rtmp_amf0_elt_t out_inf[] = { + + { NGX_RTMP_AMF0_STRING, + "code", + "NetStream.Publish.Start", 0 }, + + { NGX_RTMP_AMF0_STRING, + "level", + "status", 0 }, + + { NGX_RTMP_AMF0_STRING, + "description", + "Publish succeeded.", 0 }, + }; + + static ngx_rtmp_amf0_elt_t out_elts[] = { + + { NGX_RTMP_AMF0_STRING, + NULL, + "onStatus", 0 }, + + { NGX_RTMP_AMF0_NUMBER, + NULL, + &trans, 0 }, + + { NGX_RTMP_AMF0_NULL, + NULL, + NULL, 0 }, + + { NGX_RTMP_AMF0_OBJECT, + NULL, + out_inf, sizeof(out_inf) }, + }; + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "publish: name='%s' type=%s", - v.name, v.type); - - stream.len = ngx_strlen(v.name); - stream.data = ngx_palloc(s->connection->pool, stream.len); - ngx_memcpy(stream.data, v.name, stream.len); - - if (ngx_strcmp(v.type, "record") == 0) { - type = NGX_RTMP_CMD_PUBLISH_RECORD; - } else if(ngx_strcmp(v.type, "append") == 0) { - type = NGX_RTMP_CMD_PUBLISH_APPEND; - } else if (ngx_strcmp(v.type, "live") == 0) { - type = NGX_RTMP_CMD_PUBLISH_LIVE; - } else { - type = 0; - } - - /* call handlers */ - NGX_RTMP_CMD_I(publish) (s, &stream, type) NGX_RTMP_CMD_F; + v->name, v->type); /* start stream */ if (ngx_rtmp_send_user_stream_begin(s, 1) != NGX_OK) { @@ -473,119 +448,39 @@ ngx_rtmp_cmd_publish(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, } /* send onStatus reply */ - memset(&sh, 0, sizeof(sh)); - sh.type = NGX_RTMP_MSG_AMF0_CMD; - sh.csid = NGX_RTMP_CMD_CSID_AMF0; - sh.msid = h->msid; + memset(&h, 0, sizeof(h)); + h.type = NGX_RTMP_MSG_AMF0_CMD; + h.csid = NGX_RTMP_CMD_CSID_AMF0; + h.msid = NGX_RTMP_CMD_MSID; - if (ngx_rtmp_send_amf0(s, &sh, out_elts, + if (ngx_rtmp_send_amf0(s, &h, out_elts, sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK) { return NGX_ERROR; } - return NGX_DONE; + return NGX_OK; } static ngx_int_t -ngx_rtmp_cmd_play(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, +ngx_rtmp_cmd_fcpublish_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { - ngx_rtmp_header_t sh; - ngx_str_t stream; - - static double trans; - static int bfalse; - - static struct { - u_char name[1024]; - double start; - double duration; - int reset; - } v; + static ngx_rtmp_fcpublish_t v; static ngx_rtmp_amf0_elt_t in_elts[] = { - { NGX_RTMP_AMF0_NUMBER, 0, - &trans, 0 }, - { NGX_RTMP_AMF0_NULL, 0, - NULL, 0 }, - { NGX_RTMP_AMF0_STRING, 0, - &v.name, sizeof(v.name) }, - { NGX_RTMP_AMF0_OPTIONAL - | NGX_RTMP_AMF0_NUMBER, 0, - &v.start, 0 }, - { NGX_RTMP_AMF0_OPTIONAL - | NGX_RTMP_AMF0_NUMBER, 0, - &v.duration, 0 }, - { NGX_RTMP_AMF0_OPTIONAL - | NGX_RTMP_AMF0_BOOLEAN,0, - &v.reset, 0 } - }; - static ngx_rtmp_amf0_elt_t out_inf[] = { - { NGX_RTMP_AMF0_STRING, "code", - "NetStream.Play.Reset", 0 }, - { NGX_RTMP_AMF0_STRING, "level", - "status", 0 }, - { NGX_RTMP_AMF0_STRING, "description", - "Playing and resetting.", - 0 }, - }; + /* transaction is always 0 */ + { NGX_RTMP_AMF0_NUMBER, + 0, + NULL, 0 }, - static ngx_rtmp_amf0_elt_t out_elts[] = { - { NGX_RTMP_AMF0_STRING, NULL, - "onStatus", 0 }, - { NGX_RTMP_AMF0_NUMBER, NULL, - &trans, 0 }, - { NGX_RTMP_AMF0_NULL , NULL, - NULL, 0 }, - { NGX_RTMP_AMF0_OBJECT, NULL, - out_inf, sizeof(out_inf) }, + { NGX_RTMP_AMF0_STRING, + 0, + &v.name, sizeof(v.name) }, }; - static ngx_rtmp_amf0_elt_t out2_inf[] = { - { NGX_RTMP_AMF0_STRING, "code", - "NetStream.Play.Start", 0 }, - { NGX_RTMP_AMF0_STRING, "level", - "status", 0 }, - { NGX_RTMP_AMF0_STRING, "description", - "Started playing.", 0 }, - }; - - static ngx_rtmp_amf0_elt_t out2_elts[] = { - { NGX_RTMP_AMF0_STRING, NULL, - "onStatus", 0 }, - { NGX_RTMP_AMF0_NUMBER, NULL, - &trans, 0 }, - { NGX_RTMP_AMF0_NULL , NULL, - NULL, 0 }, - { NGX_RTMP_AMF0_OBJECT, NULL, - out2_inf, sizeof(out2_inf) }, - }; - - static ngx_rtmp_amf0_elt_t out3_elts[] = { - { NGX_RTMP_AMF0_STRING, NULL, - "|RtmpSampleAccess", 0 }, - { NGX_RTMP_AMF0_BOOLEAN,NULL, - &bfalse, 0 }, - { NGX_RTMP_AMF0_BOOLEAN,NULL, - &bfalse, 0 }, - }; - - static ngx_rtmp_amf0_elt_t out4_inf[] = { - { NGX_RTMP_AMF0_STRING, "code", - "NetStream.Data.Start", 0 }, - }; - - static ngx_rtmp_amf0_elt_t out4_elts[] = { - { NGX_RTMP_AMF0_STRING, NULL, - "onStatus", 0 }, - { NGX_RTMP_AMF0_OBJECT, NULL, - out4_inf, sizeof(out4_inf) }, - }; - - ngx_memzero(&v, sizeof(v)); /* parse input */ @@ -595,17 +490,235 @@ ngx_rtmp_cmd_play(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, return NGX_ERROR; } + return ngx_rtmp_fcpublish + ? ngx_rtmp_fcpublish(s, &v) + : NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_cmd_fcpublish(ngx_rtmp_session_t *s, ngx_rtmp_fcpublish_t *v) +{ + ngx_rtmp_header_t h; + + static double trans; + + static ngx_rtmp_amf0_elt_t out_inf[] = { + + { NGX_RTMP_AMF0_STRING, + "code", + "NetStream.Publish.Start", 0 }, + + { NGX_RTMP_AMF0_STRING, + "level", + "status", 0 }, + + { NGX_RTMP_AMF0_STRING, + "description", + "FCPublish succeeded.", 0 }, + }; + + static ngx_rtmp_amf0_elt_t out_elts[] = { + + { NGX_RTMP_AMF0_STRING, + NULL, + "onFCPublish", 0 }, + + { NGX_RTMP_AMF0_NUMBER, + NULL, + &trans, 0 }, + + { NGX_RTMP_AMF0_NULL, + NULL, + NULL, 0 }, + + { NGX_RTMP_AMF0_OBJECT, + NULL, + out_inf, sizeof(out_inf) }, + }; + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "fcpublish: name='%s'", v->name); + + /* send onFCPublish reply */ + memset(&h, 0, sizeof(h)); + h.type = NGX_RTMP_MSG_AMF0_CMD; + h.csid = NGX_RTMP_CMD_CSID_AMF0; + h.msid = NGX_RTMP_CMD_MSID; + + if (ngx_rtmp_send_amf0(s, &h, out_elts, + sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK) + { + return NGX_ERROR; + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_cmd_play_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, + ngx_chain_t *in) +{ + static ngx_rtmp_play_t v; + + static ngx_rtmp_amf0_elt_t in_elts[] = { + + /* transaction is always 0 */ + { NGX_RTMP_AMF0_NUMBER, + 0, + NULL, 0 }, + + { NGX_RTMP_AMF0_NULL, + 0, + NULL, 0 }, + + { NGX_RTMP_AMF0_STRING, + 0, + &v.name, sizeof(v.name) }, + + { NGX_RTMP_AMF0_OPTIONAL | NGX_RTMP_AMF0_NUMBER, + 0, + &v.start, 0 }, + + { NGX_RTMP_AMF0_OPTIONAL | NGX_RTMP_AMF0_NUMBER, + 0, + &v.duration, 0 }, + + { NGX_RTMP_AMF0_OPTIONAL | NGX_RTMP_AMF0_BOOLEAN, + 0, + &v.reset, 0 } + }; + + ngx_memzero(&v, sizeof(v)); + + /* parse input */ + if (ngx_rtmp_receive_amf0(s, in, in_elts, + sizeof(in_elts) / sizeof(in_elts[0]))) + { + return NGX_ERROR; + } + + return ngx_rtmp_play + ? ngx_rtmp_play(s, &v) + : NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_cmd_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v) +{ + ngx_rtmp_header_t h; + + static double trans; + static int bfalse; + + static ngx_rtmp_amf0_elt_t out_inf[] = { + + { NGX_RTMP_AMF0_STRING, + "code", + "NetStream.Play.Reset", 0 }, + + { NGX_RTMP_AMF0_STRING, + "level", + "status", 0 }, + + { NGX_RTMP_AMF0_STRING, + "description", + "Playing and resetting.", 0 }, + }; + + static ngx_rtmp_amf0_elt_t out_elts[] = { + + { NGX_RTMP_AMF0_STRING, + NULL, + "onStatus", 0 }, + + { NGX_RTMP_AMF0_NUMBER, + NULL, + &trans, 0 }, + + { NGX_RTMP_AMF0_NULL, + NULL, + NULL, 0 }, + + { NGX_RTMP_AMF0_OBJECT, + NULL, + out_inf, sizeof(out_inf) }, + }; + + static ngx_rtmp_amf0_elt_t out2_inf[] = { + + { NGX_RTMP_AMF0_STRING, + "code", + "NetStream.Play.Start", 0 }, + + { NGX_RTMP_AMF0_STRING, + "level", + "status", 0 }, + + { NGX_RTMP_AMF0_STRING, + "description", + "Started playing.", 0 }, + }; + + static ngx_rtmp_amf0_elt_t out2_elts[] = { + + { NGX_RTMP_AMF0_STRING, + NULL, + "onStatus", 0 }, + + { NGX_RTMP_AMF0_NUMBER, + NULL, + &trans, 0 }, + + { NGX_RTMP_AMF0_NULL, + NULL, + NULL, 0 }, + + { NGX_RTMP_AMF0_OBJECT, + NULL, + out2_inf, + sizeof(out2_inf) }, + }; + + static ngx_rtmp_amf0_elt_t out3_elts[] = { + + { NGX_RTMP_AMF0_STRING, + NULL, + "|RtmpSampleAccess", 0 }, + + { NGX_RTMP_AMF0_BOOLEAN, + NULL, + &bfalse, 0 }, + + { NGX_RTMP_AMF0_BOOLEAN, + NULL, + &bfalse, 0 }, + }; + + static ngx_rtmp_amf0_elt_t out4_inf[] = { + + { NGX_RTMP_AMF0_STRING, + "code", + "NetStream.Data.Start", 0 }, + }; + + static ngx_rtmp_amf0_elt_t out4_elts[] = { + + { NGX_RTMP_AMF0_STRING, + NULL, + "onStatus", 0 }, + + { NGX_RTMP_AMF0_OBJECT, + NULL, + out4_inf, sizeof(out4_inf) }, + }; + ngx_log_debug4(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "play: name='%s' start=%uD duration=%uD reset=%d", - v.name, (uint32_t)v.start, (uint32_t)v.duration, v.reset); - - stream.len = ngx_strlen(v.name); - stream.data = ngx_palloc(s->connection->pool, stream.len); - ngx_memcpy(stream.data, v.name, stream.len); - - /* call handlers */ - NGX_RTMP_CMD_I(play) (s, &stream, v.start, - v.duration, v.reset) NGX_RTMP_CMD_F; + v->name, (uint32_t)v->start, + (uint32_t)v->duration, v->reset); /* start stream */ if (ngx_rtmp_send_user_stream_begin(s, 1) != NGX_OK) { @@ -613,142 +726,62 @@ ngx_rtmp_cmd_play(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, } /* send onStatus reply */ - memset(&sh, 0, sizeof(sh)); - sh.type = NGX_RTMP_MSG_AMF0_CMD; - sh.csid = NGX_RTMP_CMD_CSID_AMF0; - sh.msid = h->msid; + memset(&h, 0, sizeof(h)); + h.type = NGX_RTMP_MSG_AMF0_CMD; + h.csid = NGX_RTMP_CMD_CSID_AMF0; + h.msid = NGX_RTMP_CMD_MSID; - if (ngx_rtmp_send_amf0(s, &sh, out_elts, + if (ngx_rtmp_send_amf0(s, &h, out_elts, sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK) { return NGX_ERROR; } /* send sample access meta message FIXME */ - if (ngx_rtmp_send_amf0(s, &sh, out2_elts, + if (ngx_rtmp_send_amf0(s, &h, out2_elts, sizeof(out2_elts) / sizeof(out2_elts[0])) != NGX_OK) { return NGX_ERROR; } /* send data start meta message */ - sh.type = NGX_RTMP_MSG_AMF0_META; - if (ngx_rtmp_send_amf0(s, &sh, out3_elts, + h.type = NGX_RTMP_MSG_AMF0_META; + if (ngx_rtmp_send_amf0(s, &h, out3_elts, sizeof(out3_elts) / sizeof(out3_elts[0])) != NGX_OK) { return NGX_ERROR; } - if (ngx_rtmp_send_amf0(s, &sh, out4_elts, + if (ngx_rtmp_send_amf0(s, &h, out4_elts, sizeof(out4_elts) / sizeof(out4_elts[0])) != NGX_OK) { return NGX_ERROR; } - return NGX_DONE; -} - - -#if 0 -static ngx_int_t -ngx_rtmp_live_set_data_frame(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, - ngx_chain_t *in) -{ - ngx_rtmp_live_app_conf_t *lacf; - ngx_connection_t *c; - ngx_rtmp_live_ctx_t *ctx; - ngx_rtmp_amf0_ctx_t act; - ngx_rtmp_header_t sh; - ngx_rtmp_core_srv_conf_t *cscf; - - static ngx_rtmp_amf0_elt_t out_elts[] = { - { NGX_RTMP_AMF0_STRING, NULL, "@setDataFrame", 0 }, - }; - - lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module); - - if (lacf == NULL || !lacf->live) { - return NGX_OK; - } - - c = s->connection; - cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); - ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module); - - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "live: data_frame"); - - /* TODO: allow sending more meta packages to change live content */ - - if (ctx->data_frame) { - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, - "duplicate data_frame"); - return NGX_OK; - } - - /* create full metadata chain for output */ - memset(&act, 0, sizeof(act)); - act.cscf = cscf; - act.alloc = ngx_rtmp_alloc_shared_buf; - act.log = c->log; - - if (ngx_rtmp_amf0_write(&act, out_elts, - sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK) - { - if (act.first) { - ngx_rtmp_free_shared_bufs(cscf, act.first); - } - return NGX_ERROR; - } - - if (act.first == NULL) { - return NGX_OK; - } - - ctx->data_frame = act.first; - - if (ngx_rtmp_append_shared_bufs(cscf, ctx->data_frame, in) == NULL) { - if (ctx->data_frame) { - ngx_rtmp_free_shared_bufs(cscf, ctx->data_frame); - } - return NGX_ERROR; - } - - memset(&sh, 0, sizeof(sh)); - sh.csid = NGX_RTMP_LIVE_CSID_AMF0; - sh.msid = 1; - sh.type = NGX_RTMP_MSG_AMF0_META; - - ngx_rtmp_prepare_message(s, &sh, NULL, ctx->data_frame); - return NGX_OK; } -static ngx_int_t -ngx_rtmp_live_stream_length(ngx_rtmp_session_t *s, - ngx_rtmp_header_t *h, ngx_chain_t *in) +static ngx_int_t +ngx_rtmp_cmd_fcsubscribe_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, + ngx_chain_t *in) { - ngx_rtmp_live_app_conf_t *lacf; - ngx_rtmp_header_t sh; - - static double trans; - static double length; + static ngx_rtmp_fcsubscribe_t v; static ngx_rtmp_amf0_elt_t in_elts[] = { - { NGX_RTMP_AMF0_NUMBER, 0, &trans, sizeof(trans) }, + + /* transaction is always 0 */ + { NGX_RTMP_AMF0_NUMBER, + 0, + NULL, 0 }, + + { NGX_RTMP_AMF0_STRING, + 0, + &v.name, sizeof(v.name) }, + }; - static ngx_rtmp_amf0_elt_t out_elts[] = { - { NGX_RTMP_AMF0_STRING, NULL, "_result", 0 }, - { NGX_RTMP_AMF0_NUMBER, NULL, &trans, 0 }, - { NGX_RTMP_AMF0_NUMBER, NULL, &length, 0 }, - }; - - lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module); - - if (lacf == NULL || !lacf->live) { - return NGX_OK; - } + ngx_memzero(&v, sizeof(v)); /* parse input */ if (ngx_rtmp_receive_amf0(s, in, in_elts, @@ -757,18 +790,97 @@ ngx_rtmp_live_stream_length(ngx_rtmp_session_t *s, return NGX_ERROR; } - memset(&sh, 0, sizeof(sh)); - sh.csid = h->csid; - sh.type = NGX_RTMP_MSG_AMF0_CMD; - sh.msid = 0; - - /* send simple _result */ - return ngx_rtmp_send_amf0(s, &sh, out_elts, - sizeof(out_elts) / sizeof(out_elts[0])) == NGX_OK - ? NGX_DONE - : NGX_ERROR; + return ngx_rtmp_fcsubscribe + ? ngx_rtmp_fcsubscribe(s, &v) + : NGX_OK; } -#endif + + +static ngx_int_t +ngx_rtmp_cmd_fcsubscribe(ngx_rtmp_session_t *s, ngx_rtmp_fcsubscribe_t *v) +{ + ngx_rtmp_header_t h; + + static double trans; + + static ngx_rtmp_amf0_elt_t out_inf[] = { + + { NGX_RTMP_AMF0_STRING, + "code", + "NetStream.Play.Start", 0 }, + + { NGX_RTMP_AMF0_STRING, + "level", + "status", 0 }, + + { NGX_RTMP_AMF0_STRING, + "description", + "Started playing.", 0 }, + }; + + static ngx_rtmp_amf0_elt_t out_elts[] = { + + { NGX_RTMP_AMF0_STRING, + NULL, + "onFCSubscribe", 0 }, + + { NGX_RTMP_AMF0_NUMBER, + NULL, + &trans, 0 }, + + { NGX_RTMP_AMF0_NULL, + NULL, + NULL, 0 }, + + { NGX_RTMP_AMF0_OBJECT, + NULL, + out_inf, + sizeof(out_inf) }, + }; + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "fcsubscribe: name='%s'", v->name); + + /* send onFCSubscribe reply */ + memset(&h, 0, sizeof(h)); + h.type = NGX_RTMP_MSG_AMF0_CMD; + h.csid = NGX_RTMP_CMD_CSID_AMF0; + h.msid = NGX_RTMP_CMD_MSID; + + if (ngx_rtmp_send_amf0(s, &h, out_elts, + sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK) + { + return NGX_ERROR; + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_cmd_disconnect(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, + ngx_chain_t *in) +{ + return ngx_rtmp_delete_stream + ? ngx_rtmp_delete_stream(s, NULL) + : NGX_OK; +} + + +static ngx_rtmp_amf0_handler_t ngx_rtmp_cmd_map[] = { + + { ngx_string("connect"), ngx_rtmp_cmd_connect_init }, + { ngx_string("createStream"), ngx_rtmp_cmd_create_stream_init }, + { ngx_string("deleteStream"), ngx_rtmp_cmd_delete_stream_init }, + + { ngx_string("publish"), ngx_rtmp_cmd_publish_init }, + { ngx_string("fcpublish"), ngx_rtmp_cmd_fcpublish_init }, + /*{ ngx_string("fcunpublish"), ngx_rtmp_cmd_fcunpublish_init },*/ + + { ngx_string("play"), ngx_rtmp_cmd_play_init }, + { ngx_string("fcsubscribe"), ngx_rtmp_cmd_fcsubscribe_init }, + /*{ ngx_string("fcunsubscribe"), ngx_rtmp_cmd_fcunsubscribe_init },*/ +}; static ngx_int_t @@ -781,6 +893,9 @@ ngx_rtmp_cmd_postconfiguration(ngx_conf_t *cf) cmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_core_module); + /* redirect disconnects to deleteStream + * to free client modules from registering + * disconnect callback */ h = ngx_array_push(&cmcf->events[NGX_RTMP_DISCONNECT]); *h = ngx_rtmp_cmd_disconnect; @@ -796,5 +911,18 @@ ngx_rtmp_cmd_postconfiguration(ngx_conf_t *cf) *ch = *bh; } + /* set initial handlers */ + ngx_rtmp_connect = ngx_rtmp_cmd_connect; + ngx_rtmp_create_stream = ngx_rtmp_cmd_create_stream; + ngx_rtmp_delete_stream = ngx_rtmp_cmd_delete_stream; + + ngx_rtmp_publish = ngx_rtmp_cmd_publish; + ngx_rtmp_fcpublish = ngx_rtmp_cmd_fcpublish; + /*ngx_rtmp_fcunpublish = ngx_rtmp_cmd_fcunpublish;*/ + + ngx_rtmp_play = ngx_rtmp_cmd_play; + ngx_rtmp_fcsubscribe = ngx_rtmp_cmd_fcsubscribe; + /*ngx_rtmp_fcunsubscribe = ngx_rtmp_cmd_fcunsubsrcibe;*/ + return NGX_OK; } diff --git a/ngx_rtmp_cmd_module.h b/ngx_rtmp_cmd_module.h index 9ea27e4..8bbd8e6 100644 --- a/ngx_rtmp_cmd_module.h +++ b/ngx_rtmp_cmd_module.h @@ -2,38 +2,100 @@ * Copyright (c) 2012 Roman Arutyunyan */ + #ifndef _NGX_RTMP_CMD_H_INCLUDED_ #define _NGX_RTMP_CMD_H_INCLUDED_ + #include #include #include #include "ngx_rtmp.h" -/* publish types */ -#define NGX_RTMP_CMD_PUBLISH_RECORD 1 -#define NGX_RTMP_CMD_PUBLISH_APPEND 2 -#define NGX_RTMP_CMD_PUBLISH_LIVE 3 +/* Basic RTMP call support */ +/* TODO: improve string sizes */ -typedef ngx_int_t (*ngx_rtmp_cmd_connect_pt)(ngx_rtmp_session_t *s); -typedef ngx_int_t (*ngx_rtmp_cmd_publish_pt)(ngx_rtmp_session_t *s, - ngx_str_t *name, ngx_int_t type); -typedef ngx_int_t (*ngx_rtmp_cmd_play_pt)(ngx_rtmp_session_t *s, - ngx_str_t *name, uint32_t start, uint32_t duration, ngx_int_t reset); -typedef ngx_int_t (*ngx_rtmp_cmd_close_pt)(ngx_rtmp_session_t *s); +typedef struct { + u_char app[1024]; + u_char flashver[1024]; + u_char swf_url[1024]; + u_char tc_url[1024]; + double acodecs; + double vcodecs; + u_char page_url[1024]; +} ngx_rtmp_connect_t; typedef struct { - ngx_array_t connect; - ngx_array_t publish; - ngx_array_t play; - ngx_array_t close; -} ngx_rtmp_cmd_main_conf_t; + double trans; + double stream; +} ngx_rtmp_create_stream_t; -extern ngx_module_t ngx_rtmp_cmd_module; +typedef struct { + double stream; +} ngx_rtmp_delete_stream_t; + + +typedef struct { + u_char name[1024]; + u_char type[1024]; +} ngx_rtmp_publish_t; + + +typedef struct { + u_char name[1024]; +} ngx_rtmp_fcpublish_t; + + +typedef ngx_rtmp_fcpublish_t ngx_rtmp_fcunpublish_t; +typedef ngx_rtmp_fcpublish_t ngx_rtmp_fcsubscribe_t; +typedef ngx_rtmp_fcpublish_t ngx_rtmp_fcunsubscribe_t; + + +typedef struct { + u_char name[1024]; + double start; + double duration; + int reset; +} ngx_rtmp_play_t; + + +typedef ngx_int_t (*ngx_rtmp_connect_pt)(ngx_rtmp_session_t *s, + ngx_rtmp_connect_t *v); +typedef ngx_int_t (*ngx_rtmp_create_stream_pt)(ngx_rtmp_session_t *s, + ngx_rtmp_create_stream_t *v); +typedef ngx_int_t (*ngx_rtmp_delete_stream_pt)(ngx_rtmp_session_t *s, + ngx_rtmp_delete_stream_t *v); + +typedef ngx_int_t (*ngx_rtmp_publish_pt)(ngx_rtmp_session_t *s, + ngx_rtmp_publish_t *v); +typedef ngx_int_t (*ngx_rtmp_fcpublish_pt)(ngx_rtmp_session_t *s, + ngx_rtmp_fcpublish_t *v); +typedef ngx_int_t (*ngx_rtmp_fcunpublish_pt)(ngx_rtmp_session_t *s, + ngx_rtmp_fcunpublish_t *v); + +typedef ngx_int_t (*ngx_rtmp_play_pt)(ngx_rtmp_session_t *s, + ngx_rtmp_play_t *v); +typedef ngx_int_t (*ngx_rtmp_fcsubscribe_pt)(ngx_rtmp_session_t *s, + ngx_rtmp_fcsubscribe_t *v); +typedef ngx_int_t (*ngx_rtmp_fcunsubscribe_pt)(ngx_rtmp_session_t *s, + ngx_rtmp_fcunsubscribe_t *v); + + +extern ngx_rtmp_connect_pt ngx_rtmp_connect; +extern ngx_rtmp_create_stream_pt ngx_rtmp_create_stream; +extern ngx_rtmp_delete_stream_pt ngx_rtmp_delete_stream; + +extern ngx_rtmp_publish_pt ngx_rtmp_publish; +extern ngx_rtmp_fcpublish_pt ngx_rtmp_fcpublish; +extern ngx_rtmp_fcunpublish_pt ngx_rtmp_fcunpublish; + +extern ngx_rtmp_play_pt ngx_rtmp_play; +extern ngx_rtmp_fcsubscribe_pt ngx_rtmp_fcsubscribe; +extern ngx_rtmp_fcunsubscribe_pt ngx_rtmp_fcunsubscribe; #endif /*_NGX_RTMP_CMD_H_INCLUDED_ */ diff --git a/ngx_rtmp_live_module.c b/ngx_rtmp_live_module.c index 47471d6..9739007 100644 --- a/ngx_rtmp_live_module.c +++ b/ngx_rtmp_live_module.c @@ -9,6 +9,11 @@ #include "ngx_rtmp_cmd_module.h" +static ngx_rtmp_publish_pt next_publish; +static ngx_rtmp_play_pt next_play; +static ngx_rtmp_delete_stream_pt next_delete_stream; + + /* Chunk stream ids for output */ #define NGX_RTMP_LIVE_CSID_AUDIO 6 #define NGX_RTMP_LIVE_CSID_VIDEO 7 @@ -159,7 +164,7 @@ ngx_rtmp_live_get_head(ngx_rtmp_session_t *s) static void -ngx_rtmp_live_join(ngx_rtmp_session_t *s, ngx_str_t *stream, +ngx_rtmp_live_join(ngx_rtmp_session_t *s, u_char *name, ngx_uint_t flags) { ngx_connection_t *c; @@ -180,9 +185,13 @@ ngx_rtmp_live_join(ngx_rtmp_session_t *s, ngx_str_t *stream, } ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, - "live: join '%V'", stream); + "live: join '%s'", name); + + ctx->stream.len = ngx_strlen(name); + ctx->stream.data = ngx_palloc(s->connection->pool, + ctx->stream.len); + ngx_memcpy(ctx->stream.data, name, ctx->stream.len); - ctx->stream = *stream; hctx = ngx_rtmp_live_get_head(s); if (hctx == NULL) { return; @@ -194,7 +203,7 @@ ngx_rtmp_live_join(ngx_rtmp_session_t *s, ngx_str_t *stream, static ngx_int_t -ngx_rtmp_live_close(ngx_rtmp_session_t *s) +ngx_rtmp_live_delete_stream(ngx_rtmp_session_t *s, ngx_rtmp_delete_stream_t *v) { ngx_connection_t *c; ngx_rtmp_live_ctx_t *ctx, **hctx; @@ -203,7 +212,7 @@ ngx_rtmp_live_close(ngx_rtmp_session_t *s) ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module); if (ctx == NULL) { - return NGX_OK; + goto next; } ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, @@ -222,7 +231,8 @@ ngx_rtmp_live_close(ngx_rtmp_session_t *s) } } - return NGX_OK; +next: + return next_delete_stream(s, v); } @@ -353,48 +363,49 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, static ngx_int_t -ngx_rtmp_live_publish(ngx_rtmp_session_t *s, ngx_str_t *name, - ngx_int_t type) +ngx_rtmp_live_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) { ngx_rtmp_live_app_conf_t *lacf; lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module); if (lacf == NULL || !lacf->live) { - return NGX_OK; + goto next; } ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "live: publish: name='%V' type=%d", - name, type); + "live: publish: name='%s' type='%s'", + v->name, v->type); /* join stream as publisher */ - ngx_rtmp_live_join(s, name, NGX_RTMP_LIVE_PUBLISHING); + ngx_rtmp_live_join(s, v->name, NGX_RTMP_LIVE_PUBLISHING); - return NGX_OK; +next: + return next_publish(s, v); } static ngx_int_t -ngx_rtmp_live_play(ngx_rtmp_session_t *s, ngx_str_t *name, - uint32_t start, uint32_t duration, ngx_int_t reset) +ngx_rtmp_live_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v) { ngx_rtmp_live_app_conf_t *lacf; lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module); if (lacf == NULL || !lacf->live) { - return NGX_OK; + goto next; } ngx_log_debug4(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "live: play: name='%V' start=%uD duration=%uD reset=%d", - name, start, duration, reset); + "live: play: name='%s' start=%uD duration=%uD reset=%d", + v->name, (uint32_t)v->start, + (uint32_t)v->duration, (uint32_t)v->reset); /* join stream as player */ - ngx_rtmp_live_join(s, name, NGX_RTMP_LIVE_PLAYING); + ngx_rtmp_live_join(s, v->name, NGX_RTMP_LIVE_PLAYING); - return NGX_OK; +next: + return next_play(s, v); } @@ -402,9 +413,7 @@ static ngx_int_t ngx_rtmp_live_postconfiguration(ngx_conf_t *cf) { ngx_rtmp_core_main_conf_t *cmcf; - ngx_rtmp_cmd_main_conf_t *dmcf; ngx_rtmp_handler_pt *h; - void *ch; /* register raw event handlers */ cmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_core_module); @@ -415,17 +424,15 @@ ngx_rtmp_live_postconfiguration(ngx_conf_t *cf) h = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_VIDEO]); *h = ngx_rtmp_live_av; - /* register command handlers */ - dmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_cmd_module); + /* chain handlers */ + next_publish = ngx_rtmp_publish; + ngx_rtmp_publish = ngx_rtmp_live_publish; - ch = ngx_array_push(&dmcf->publish); - *(ngx_rtmp_cmd_publish_pt*)ch = ngx_rtmp_live_publish; + next_play = ngx_rtmp_play; + ngx_rtmp_play = ngx_rtmp_live_play; - ch = ngx_array_push(&dmcf->play); - *(ngx_rtmp_cmd_play_pt*)ch = ngx_rtmp_live_play; - - ch = ngx_array_push(&dmcf->close); - *(ngx_rtmp_cmd_close_pt*)ch = ngx_rtmp_live_close; + next_delete_stream = ngx_rtmp_delete_stream; + ngx_rtmp_delete_stream = ngx_rtmp_live_delete_stream; return NGX_OK; } diff --git a/ngx_rtmp_record_module.c b/ngx_rtmp_record_module.c index 196962f..084f87e 100644 --- a/ngx_rtmp_record_module.c +++ b/ngx_rtmp_record_module.c @@ -9,6 +9,10 @@ #include "ngx_rtmp_cmd_module.h" +static ngx_rtmp_publish_pt next_publish; +static ngx_rtmp_delete_stream_pt next_delete_stream; + + static ngx_int_t ngx_rtmp_record_postconfiguration(ngx_conf_t *cf); static void * ngx_rtmp_record_create_app_conf(ngx_conf_t *cf); static char * ngx_rtmp_record_merge_app_conf(ngx_conf_t *cf, @@ -134,8 +138,7 @@ ngx_rtmp_record_write_header(ngx_file_t *file) static ngx_int_t -ngx_rtmp_record_publish(ngx_rtmp_session_t *s, - ngx_str_t *name, ngx_int_t type) +ngx_rtmp_record_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) { ngx_rtmp_record_app_conf_t *racf; ngx_rtmp_record_ctx_t *ctx; @@ -145,7 +148,7 @@ ngx_rtmp_record_publish(ngx_rtmp_session_t *s, racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_record_module); if (racf == NULL || racf->root.len == 0) { - return NGX_OK; + goto next; } ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_record_module); @@ -208,7 +211,12 @@ ngx_rtmp_record_publish(ngx_rtmp_session_t *s, ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "record: opened '%V'", &ctx->path); - return ngx_rtmp_record_write_header(&ctx->file); + if (ngx_rtmp_record_write_header(&ctx->file) != NGX_OK) { + return NGX_ERROR; + } + +next: + return next_publish(s, v); } @@ -239,6 +247,18 @@ ngx_rtmp_record_close(ngx_rtmp_session_t *s) } +static ngx_int_t +ngx_rtmp_record_delete_stream(ngx_rtmp_session_t *s, + ngx_rtmp_delete_stream_t *v) +{ + if (ngx_rtmp_record_close(s) != NGX_OK) { + return NGX_ERROR; + } + + return next_delete_stream(s, v); +} + + static ngx_int_t ngx_rtmp_record_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) @@ -342,9 +362,7 @@ static ngx_int_t ngx_rtmp_record_postconfiguration(ngx_conf_t *cf) { ngx_rtmp_core_main_conf_t *cmcf; - ngx_rtmp_cmd_main_conf_t *dmcf; ngx_rtmp_handler_pt *h; - void *ch; cmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_core_module); @@ -355,14 +373,12 @@ ngx_rtmp_record_postconfiguration(ngx_conf_t *cf) h = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_VIDEO]); *h = ngx_rtmp_record_av; - /* register command handlers */ - dmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_cmd_module); + /* chain handlers */ + next_publish = ngx_rtmp_publish; + ngx_rtmp_publish = ngx_rtmp_record_publish; - ch = ngx_array_push(&dmcf->publish); - *(ngx_rtmp_cmd_publish_pt*)ch = ngx_rtmp_record_publish; - - ch = ngx_array_push(&dmcf->close); - *(ngx_rtmp_cmd_close_pt*)ch = ngx_rtmp_record_close; + next_delete_stream = ngx_rtmp_delete_stream; + ngx_rtmp_delete_stream = ngx_rtmp_record_delete_stream; return NGX_OK; }