improved publishing & subscribing: now subscriber receives some data which looks like real video

This commit is contained in:
Roman Arutyunyan
2012-03-15 20:43:03 +04:00
parent 17a3d368d8
commit 65cc92e7ca
5 changed files with 146 additions and 71 deletions

View File

@ -24,10 +24,6 @@ static ngx_int_t ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in);
static char handshake_text[] =
"This RTMP handshake is generated by nginx-rtmp-module. ";
#ifdef NGX_DEBUG
static char*
ngx_rtmp_packet_type(uint8_t type) {
@ -222,13 +218,18 @@ ngx_rtmp_init_session(ngx_connection_t *c)
}
s->in_chunk_size = NGX_RTMP_DEFAULT_CHUNK_SIZE;
s->in_pool = ngx_create_pool(NGX_RTMP_HANDSHAKE_SIZE + 1
s->in_pool = ngx_create_pool(2 * (NGX_RTMP_HANDSHAKE_SIZE + 1)
+ sizeof(ngx_pool_t), c->log);
/* start handshake */
size = NGX_RTMP_HANDSHAKE_SIZE + 1;
b = &s->hs_buf;
b = &s->hs_in_buf;
b->start = b->pos = b->last = ngx_pcalloc(s->in_pool, size);
b->end = b->start + size;
b->temporary = 1;
b = &s->hs_out_buf;
b->start = b->pos = b->last = ngx_pcalloc(s->in_pool, size);
b->end = b->start + size;
b->temporary = 1;
@ -244,15 +245,11 @@ uint32_t
ngx_rtmp_get_timestamp()
{
ngx_time_t *tod;
uint32_t t;
tod = ngx_timeofday();
t = tod->sec;
t *= 1e6;
t += tod->msec;
return t;
return (uint32_t)tod->sec * 1e3
+ (uint32_t)tod->msec / 1e3;
}
@ -285,7 +282,9 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
ngx_del_timer(rev);
}
b = &s->hs_buf;
b = (s->hs_stage == NGX_RTMP_HS_READ_DATA)
? &s->hs_in_buf
: &s->hs_out_buf;
while (b->last != b->end) {
@ -304,16 +303,6 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
return;
}
if (b->last == b->start
&& s->hs_stage == NGX_RTMP_HS_READ_DATA
&& *b->last != NGX_RTMP_VERSION)
{
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
"invalid handshake signature");
ngx_rtmp_close_connection(c);
return;
}
b->last += n;
}
@ -322,18 +311,49 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
++s->hs_stage;
if (s->hs_stage == NGX_RTMP_HS_WRITE_DATA) {
if (*b->pos != NGX_RTMP_VERSION) {
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
"invalid handshake signature");
ngx_rtmp_close_connection(c);
return;
}
/* version is never needed anymore */
++b->pos;
/* store current time as our epoch */
s->epoch = ngx_rtmp_get_timestamp();
/* read client epoch */
p = (u_char*)&s->peer_epoch;
*p++ = b->pos[3];
*p++ = b->pos[2];
*p++ = b->pos[1];
*p++ = b->pos[0];
/* prepare output signature:
* set version, set epoch, fill zeroes */
p = (u_char*)&s->epoch;
b->pos[7] = *p++;
b->pos[6] = *p++;
b->pos[5] = *p++;
b = &s->hs_out_buf;
b->pos[0] = NGX_RTMP_VERSION;
b->pos[4] = *p++;
b->pos[3] = *p++;
b->pos[2] = *p++;
b->pos[1] = *p++;
b->pos[5] = b->pos[6] = b->pos[7] = b->pos[8] = 0;
for(b->last = b->pos + 9, n = 1;
b->last < b->end;
++b->last, ++n)
{
*b->last = (u_char)(n & 0xff);
}
/* reply timestamp is the same as out epoch */
ngx_memcpy(s->hs_in_buf.pos + 4, b->pos + 1, 4);
ngx_rtmp_handshake_send(c->write);
return;
}
@ -344,7 +364,7 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
c->write->handler = ngx_rtmp_send;
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP handshake done; epoch=%D peer_epoch=%d",
"RTMP handshake done; epoch=%uD peer_epoch=%uD",
s->epoch, s->peer_epoch);
ngx_rtmp_recv(rev);
@ -359,8 +379,6 @@ ngx_rtmp_handshake_send(ngx_event_t *wev)
ngx_rtmp_session_t *s;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_buf_t *b;
size_t offs;
static u_char zeroes[4];
c = wev->data;
s = c->data;
@ -382,55 +400,41 @@ ngx_rtmp_handshake_send(ngx_event_t *wev)
ngx_del_timer(wev);
}
b = &s->hs_buf;
restart:
b = (s->hs_stage == NGX_RTMP_HS_WRITE_DATA)
? &s->hs_out_buf
: &s->hs_in_buf;
while(b->pos != b->last) {
if (s->hs_stage == NGX_RTMP_HS_WRITE_ECHO) {
n = c->send(c, b->pos, b->last - b->pos);
} else if (b->pos - b->start < 4) {
/* use the timestamp from echo packet */
n = c->send(c, b->pos + 4, 4 - (b->pos - b->start));
} else if (b->pos - b->start < 8) {
n = c->send(c, zeroes, 8 - (b->pos - b->start));
} else {
offs = (b->pos - b->start - 4) % sizeof(handshake_text);
n = c->send(c, (u_char*)handshake_text + offs,
ngx_min(b->last - b->pos,
(ngx_int_t)(sizeof(handshake_text) - offs)));
}
if (n > 0) {
b->pos += n;
}
n = c->send(c, b->pos, b->last - b->pos);
if (n == NGX_ERROR) {
ngx_rtmp_close_connection(c);
return;
}
if (n == NGX_AGAIN) {
if (n == NGX_AGAIN || n == 0) {
ngx_add_timer(c->write, cscf->timeout);
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
ngx_rtmp_close_connection(c);
return;
}
}
b->pos += n;
}
++s->hs_stage;
if (s->hs_stage == NGX_RTMP_HS_WRITE_ECHO) {
b->pos = b->start + 1;
goto restart;
}
ngx_del_event(wev, NGX_WRITE_EVENT, 0);
b = &s->hs_out_buf;
b->pos = b->last = b->start + 1;
ngx_rtmp_handshake_recv(c->read);
}
@ -526,6 +530,20 @@ ngx_rtmp_recv(ngx_event_t *rev)
}
b->last += n;
s->in_bytes += n;
if (s->in_bytes - s->in_last_ack >= cscf->ack_window) {
s->in_last_ack = s->in_bytes;
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"sending RTMP ACK(%D)", s->in_bytes);
if (ngx_rtmp_send_ack(s, s->in_bytes)) {
ngx_rtmp_close_connection(c);
return;
}
}
}
old_pos = NULL;
@ -737,7 +755,7 @@ ngx_rtmp_send(ngx_event_t *wev)
}
if (out == s->out
&& s->out->buf->pos == s->out->buf->last)
&& out->buf->pos == out->buf->last)
{
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
ngx_add_timer(c->write, cscf->timeout);
@ -964,12 +982,14 @@ ngx_int_t
ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out)
{
ngx_chain_t *l, **ll;
size_t nbytes, nbufs;
size_t nbytes, nbufs, noutbytes, noutbufs;
ngx_connection_t *c;
c = s->connection;
nbytes = 0;
nbufs = 0;
noutbytes = 0;
noutbufs = 0;
for(l = out; l; l = l->next) {
ngx_rtmp_buf_addref(l->buf);
@ -977,16 +997,20 @@ ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out)
++nbufs;
}
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"RTMP send nbytes=%d, nbufs=%d",
nbytes, nbufs);
/* TODO: optimize lookup */
/* TODO: implement dropper */
for(ll = &s->out; *ll; ll = &(*ll)->next);
for(ll = &s->out; *ll; ll = &(*ll)->next) {
noutbytes += (*ll)->buf->last - (*ll)->buf->pos;
++noutbufs;
}
*ll = out;
ngx_rtmp_send(s->connection->write);
ngx_log_debug6(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP send nbytes=%d (%d), nbufs=%d (%d) ready=%d; active=%d",
nbytes, noutbytes, nbufs, noutbufs, c->write->ready, c->write->active);
ngx_rtmp_send(c->write);
return c->destroyed ? NGX_ERROR : NGX_OK;
}