From bd40fe63f9666ddf83a79af2f6b3abc2352f5af1 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Thu, 3 May 2012 02:28:21 +0400 Subject: [PATCH] implemented encrypted handshake --- config | 4 +- doc/rtmp.py | 858 +++++++++++++++++++++++++++++++++++++++++++ ngx_rtmp.c | 1 - ngx_rtmp.h | 24 +- ngx_rtmp_handler.c | 518 +------------------------- ngx_rtmp_handshake.c | 383 +++++++++++++------ ngx_rtmp_init.c | 311 ++++++++++++++++ 7 files changed, 1468 insertions(+), 631 deletions(-) create mode 100644 doc/rtmp.py create mode 100644 ngx_rtmp_init.c diff --git a/config b/config index 4c38cf3..8e4aca6 100644 --- a/config +++ b/config @@ -13,10 +13,12 @@ CORE_MODULES="$CORE_MODULES NGX_ADDON_SRCS="$NGX_ADDON_SRCS \ $ngx_addon_dir/ngx_rtmp.c \ + $ngx_addon_dir/ngx_rtmp_init.c \ + $ngx_addon_dir/ngx_rtmp_handshake.c \ + $ngx_addon_dir/ngx_rtmp_handler.c \ $ngx_addon_dir/ngx_rtmp_amf.c \ $ngx_addon_dir/ngx_rtmp_send.c \ $ngx_addon_dir/ngx_rtmp_shared.c \ - $ngx_addon_dir/ngx_rtmp_handler.c \ $ngx_addon_dir/ngx_rtmp_receive.c \ $ngx_addon_dir/ngx_rtmp_core_module.c \ $ngx_addon_dir/ngx_rtmp_cmd_module.c \ diff --git a/doc/rtmp.py b/doc/rtmp.py new file mode 100644 index 0000000..32aebed --- /dev/null +++ b/doc/rtmp.py @@ -0,0 +1,858 @@ + + + + + + + + + + + + + + + rtmp.py - + rtmplite - + + + Flash RTMP server in Python - Google Project Hosting + + + + + + + + + + + + + + + + +
+ +
+ + + + + + arutyunyan.roman@gmail.com + + + | My favorites + | Profile + | Sign out + + + +
+ +
+
+ + +
+ + + + + + + + + + + +
+ +
+ rtmplite +
+ + + + +
+ +
+ + + +
+ +
+ +
+ + +
+ Project Home + + + + + Downloads + + + + + + Wiki + + + + + + Issues + + + + + + Source + + + + + +
+
+ + + + + + + + + + + + + + +
+
+
+ + + + + Checkout   + Browse   + Changes   + +   + +
+ +   + + + + + + +
+
+
+ +
+ + + +
+ + + + + + + +
+
+ + + + + + + + + + + + + + +
+ + + + + + + +
+
+
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
+
+
+
+
# Copyright (c) 2007-2009, Mamta Singh. All rights reserved. see README for details.
# Copyright (c) 2010-2011, Kundan Singh.

'''
This is a simple implementation of a Flash RTMP server to accept connections and stream requests. The module is organized as follows:
1. The FlashServer class is the main class to provide the server abstraction. It uses the multitask module for co-operative multitasking.
   It also uses the App abstract class to implement the applications.
2. The Server class implements a simple server to receive new Client connections and inform the FlashServer application. The Client class
   derived from Protocol implements the RTMP client functions. The Protocol class implements the base RTMP protocol parsing. A Client contains
   various streams from the client, represented using the Stream class.
3. The Message, Header and Command represent RTMP message, header and command respectively. The FLV class implements functions to perform read
   and write of FLV file format.


Typically an application can launch this server as follows:
$ python rtmp.py

To know the command line options use the -h option:
$ python rtmp.py -h

To start the server with a different directory for recording and playing FLV files from, use the following command.
$ python rtmp.py -r some-other-directory/
Note the terminal '/' in the directory name. Without this, it is just used as a prefix in FLV file names.

A test client is available in testClient directory, and can be compiled using Flex Builder. Alternatively, you can use the SWF file to launch
from testClient/bin-debug after starting the server. Once you have launched the client in the browser, you can connect to
local host by clicking on 'connect' button. Then click on publish button to publish a stream. Open another browser with
same URL and first connect then play the same stream name. If everything works fine you should be able to see the video
from first browser to the second browser. Similar, in the first browser, if you check the record box before publishing,
it will create a new FLV file for the recorded stream. You can close the publishing stream and play the recorded stream to
see your recording. Note that due to initial delay in timestamp (in case publish was clicked much later than connect),
your played video will start appearing after some initial delay.


If an application wants to use this module as a library, it can launch the server as follows:
>>> agent = FlashServer()   # a new RTMP server instance
>>> agent.root = 'flvs/'    # set the document root to be 'flvs' directory. Default is current './' directory.
>>> agent.start()           # start the server
>>> multitask.run()         # this is needed somewhere in the application to actually start the co-operative multitasking.


If an application wants to specify a different application other than the default App, it can subclass it and supply the application by
setting the server's apps property. The following example shows how to define "myapp" which invokes a 'connected()' method on client when
the client connects to the server.

class MyApp(App):         # a new MyApp extends the default App in rtmp module.
    def __init__(self):   # constructor just invokes base class constructor
        App.__init__(self)
    def onConnect(self, client, *args):
        result = App.onConnect(self, client, *args)   # invoke base class method first
        def invokeAdded(self, client):                # define a method to invoke 'connected("some-arg")' on Flash client
            yield client.call('connected', 'some-arg')
        multitask.add(invokeAdded(self, client))      # need to invoke later so that connection is established before callback
        return result     # return True to accept, or None to postpone calling accept()
...
agent.apps = dict({'myapp': MyApp, 'someapp': MyApp, '*': App})

Now the client can connect to rtmp://server/myapp or rtmp://server/someapp and will get connected to this MyApp application.
If the client doesn't define "function connected(arg:String):void" in the NetConnection.client object then the server will
throw an exception and display the error message.

'''

import os, sys, time, struct, socket, traceback, multitask, amf, hashlib, hmac, random

_debug = False

class ConnectionClosed:
    'raised when the client closed the connection'

def truncate(data, max=100):
    return data and len(data)>max and data[:max] + '...(%d)'%(len(data),) or data
   
class SockStream(object):
    '''A class that represents a socket as a stream'''
    def __init__(self, sock):
        self.sock, self.buffer = sock, ''
        self.bytesWritten = self.bytesRead = 0
   
    def close(self):
        self.sock.close()
       
    def read(self, count):
        try:
            while True:
                if len(self.buffer) >= count: # do have enough data in buffer
                    data, self.buffer = self.buffer[:count], self.buffer[count:]
                    raise StopIteration(data)
                if _debug: print 'socket.read[%d] calling recv()'%(count,)
                data = (yield multitask.recv(self.sock, 4096)) # read more from socket
                if not data: raise ConnectionClosed
                if _debug: print 'socket.read[%d] %r'%(len(data), truncate(data))
                self.bytesRead += len(data)
                self.buffer += data
        except StopIteration: raise
        except: raise ConnectionClosed # anything else is treated as connection closed.
       
    def unread(self, data):
        self.buffer = data + self.buffer
           
    def write(self, data):
        while len(data) > 0: # write in 4K chunks each time
            chunk, data = data[:4096], data[4096:]
            self.bytesWritten += len(chunk)
            if _debug: print 'socket.write[%d] %r'%(len(chunk), truncate(chunk))
            try: yield multitask.send(self.sock, chunk)
            except: raise ConnectionClosed
                               

class Header(object):
    FULL, MESSAGE, TIME, SEPARATOR, MASK = 0x00, 0x40, 0x80, 0xC0, 0xC0
   
    def __init__(self, channel=0, time=0, size=None, type=None, streamId=0):
        self.channel, self.time, self.size, self.type, self.streamId = channel, time, size, type, streamId
        if channel<64: self.hdrdata = chr(channel)
        elif channel<(64+256): self.hdrdata = '\x00'+chr(channel-64)
        else: self.hdrdata = '\x01'+chr((channel-64)%256)+chr((channel-64)/256)
   
    def toBytes(self, control):
        data = chr(ord(self.hdrdata[0]) | control) + self.hdrdata[1:]
        if control != Header.SEPARATOR:
            data += struct.pack('>I', self.time if self.time < 0xFFFFFF else 0xFFFFFF)[1:]  # add time in 3 bytes
            if control != Header.TIME:
                data += struct.pack('>I', self.size & 0xFFFFFFFF)[1:]  # size
                data += chr(self.type)                    # type
                if control != Header.MESSAGE:
                    data += struct.pack('<I', self.streamId & 0xFFFFFFFF)  # add streamId
        if self.time >= 0xFFFFFF:
            data += struct.pack('>I', self.time & 0xFFFFFFFF)
        return data

    def __repr__(self):
        return ("<Header channel=%r time=%r size=%r type=%s (%r) streamId=%r>"
            % (self.channel, self.time, self.size, Message.type_name.get(self.type, 'unknown'), self.type, self.streamId))
   
    def dup(self):
        return Header(channel=self.channel, time=self.time, size=self.size, type=self.type, streamId=self.streamId)


class Message(object):
    # message types: RPC3, DATA3,and SHAREDOBJECT3 are used with AMF3
    CHUNK_SIZE,   ABORT,   ACK,   USER_CONTROL, WIN_ACK_SIZE, SET_PEER_BW, AUDIO, VIDEO, DATA3, SHAREDOBJ3, RPC3, DATA, SHAREDOBJ, RPC = \
    0x01,         0x02,    0x03,  0x04,         0x05,         0x06,        0x08,  0x09,  0x0F,  0x10,       0x11, 0x12, 0x13,      0x14
    type_name = dict(enumerate('unknown chunk-size abort ack user-control win-ack-size set-peer-bw unknown audio video unknown unknown unknown unknown unknown data3 sharedobj3 rpc3 data sharedobj rpc'.split()))
   
    def __init__(self, hdr=None, data=''):
        self.header, self.data = hdr or Header(), data
   
    # define properties type, streamId and time to access self.header.(property)
    for p in ['type', 'streamId', 'time']:
        exec 'def _g%s(self): return self.header.%s'%(p, p)
        exec 'def _s%s(self, %s): self.header.%s = %s'%(p, p, p, p)
        exec '%s = property(fget=_g%s, fset=_s%s)'%(p, p, p)
    @property
    def size(self): return len(self.data)
           
    def __repr__(self):
        return ("<Message header=%r data=%r>"% (self.header, truncate(self.data)))
   
    def dup(self):
        return Message(self.header.dup(), self.data[:])
               
class Protocol(object):
    PING_SIZE, DEFAULT_CHUNK_SIZE, PROTOCOL_CHANNEL_ID = 1536, 128, 2 # constants
    READ_WIN_SIZE, WRITE_WIN_SIZE = 1000000L, 1073741824L
   
    def __init__(self, sock):
        self.stream = SockStream(sock)
        self.lastReadHeaders, self.incompletePackets, self.lastWriteHeaders = dict(), dict(), dict()
        self.readChunkSize = self.writeChunkSize = Protocol.DEFAULT_CHUNK_SIZE
        self.readWinSize0, self.readWinSize, self.writeWinSize0, self.writeWinSize = 0L, self.READ_WIN_SIZE, 0L, self.WRITE_WIN_SIZE
        self.nextChannelId = Protocol.PROTOCOL_CHANNEL_ID + 1
        self._time0 = time.time()
        self.writeQueue = multitask.Queue()
           
    @property
    def relativeTime(self):
        return int(1000*(time.time() - self._time0))
   
    def messageReceived(self, msg): # override in subclass
        yield
           
    def protocolMessage(self, msg):
        if msg.type == Message.ACK: # respond to ACK requests
            self.writeWinSize0 = struct.unpack('>L', msg.data)[0]
#            response = Message()
#            response.type, response.data = msg.type, msg.data
#            yield self.writeMessage(response)
        elif msg.type == Message.CHUNK_SIZE:
            self.readChunkSize = struct.unpack('>L', msg.data)[0]
        elif msg.type == Message.WIN_ACK_SIZE:
            self.readWinSize, self.readWinSize0 = struct.unpack('>L', msg.data)[0], self.stream.bytesRead
        elif msg.type == Message.USER_CONTROL:
            type, data = struct.unpack('>H', msg.data[:2])[0], msg.data[2:]
            if type == 3: # client expects a response when it sends set buffer length
                streamId, bufferTime = struct.unpack('>II', data)
                response = Message()
                response.time, response.type, response.data = self.relativeTime, Message.USER_CONTROL, struct.pack('>HI', 0, streamId)
                yield self.writeMessage(response)
        yield
       
    def connectionClosed(self):
        yield
                           
    def parse(self):
        try:
            yield self.parseCrossDomainPolicyRequest() # check for cross domain policy
            yield self.parseHandshake()  # parse rtmp handshake
            yield self.parseMessages()   # parse messages
        except ConnectionClosed:
            yield self.connectionClosed()
            if _debug: print 'parse connection closed'
        except:
            if _debug: print 'exception, closing connection'
            if _debug: traceback.print_exc()
            yield self.connectionClosed()
                   
    def writeMessage(self, message):
        yield self.writeQueue.put(message)
           
    def parseCrossDomainPolicyRequest(self):
        # read the request
        REQUEST = '<policy-file-request/>\x00'
        data = (yield self.stream.read(len(REQUEST)))
        if data == REQUEST:
            if _debug: print data
            data = '''<!DOCTYPE cross-domain-policy SYSTEM "http://www.macromedia.com/xml/dtds/cross-domain-policy.dtd">
                    <cross-domain-policy>
                      <allow-access-from domain="*" to-ports="1935" secure='false'/>
                    </cross-domain-policy>'''
            yield self.stream.write(data)
            raise ConnectionClosed
        else:
            yield self.stream.unread(data)
           
    SERVER_KEY = '\x47\x65\x6e\x75\x69\x6e\x65\x20\x41\x64\x6f\x62\x65\x20\x46\x6c\x61\x73\x68\x20\x4d\x65\x64\x69\x61\x20\x53\x65\x72\x76\x65\x72\x20\x30\x30\x31\xf0\xee\xc2\x4a\x80\x68\xbe\xe8\x2e\x00\xd0\xd1\x02\x9e\x7e\x57\x6e\xec\x5d\x2d\x29\x80\x6f\xab\x93\xb8\xe6\x36\xcf\xeb\x31\xae'
    FLASHPLAYER_KEY = '\x47\x65\x6E\x75\x69\x6E\x65\x20\x41\x64\x6F\x62\x65\x20\x46\x6C\x61\x73\x68\x20\x50\x6C\x61\x79\x65\x72\x20\x30\x30\x31\xF0\xEE\xC2\x4A\x80\x68\xBE\xE8\x2E\x00\xD0\xD1\x02\x9E\x7E\x57\x6E\xEC\x5D\x2D\x29\x80\x6F\xAB\x93\xB8\xE6\x36\xCF\xEB\x31\xAE'
   
    def parseHandshake(self):
        '''Parses the rtmp handshake'''
        data = (yield self.stream.read(Protocol.PING_SIZE + 1)) # bound version and first ping
        data = Protocol.handshakeResponse(data)
        yield self.stream.write(data)
        data = (yield self.stream.read(Protocol.PING_SIZE))
   
    @staticmethod
    def handshakeResponse(data):
        # send both data parts before reading next ping-size, to work with ffmpeg
        if struct.unpack('>I', data[5:9])[0] == 0:
            data = '\x03' + '\x00'*Protocol.PING_SIZE
            return data + data[1:]
        else:
            type, data = ord(data[0]), data[1:] # first byte is ignored
            scheme = None
            for s in range(0, 2):
                digest_offset = (sum([ord(data[i]) for i in range(772, 776)]) % 728 + 776) if s == 1 else (sum([ord(data[i]) for i in range(8, 12)]) % 728 + 12)
                temp = data[0:digest_offset] + data[digest_offset+32:Protocol.PING_SIZE]
                hash = Protocol._calculateHash(temp, Protocol.FLASHPLAYER_KEY[:30])
                if hash == data[digest_offset:digest_offset+32]:
                    scheme = s
                    break
            if scheme is None:
                if _debug: print 'invalid RTMP connection data, assuming scheme 0'
                scheme = 0
            client_dh_offset = (sum([ord(data[i]) for i in range(768, 772)]) % 632 + 8) if scheme == 1 else (sum([ord(data[i]) for i in range(1532, 1536)]) % 632 + 772)
            outgoingKp = data[client_dh_offset:client_dh_offset+128]
            handshake = struct.pack('>IBBBB', 0, 1, 2, 3, 4) + ''.join([chr(random.randint(0, 255)) for i in xrange(Protocol.PING_SIZE-8)])
            server_dh_offset = (sum([ord(handshake[i]) for i in range(768, 772)]) % 632 + 8) if scheme == 1 else (sum([ord(handshake[i]) for i in range(1532, 1536)]) % 632 + 772)
            keys = Protocol._generateKeyPair() # (public, private)
            handshake = handshake[:server_dh_offset] + keys[0][0:128] + handshake[server_dh_offset+128:]
            if type > 0x03: raise Exception('encryption is not supported')
            server_digest_offset = (sum([ord(handshake[i]) for i in range(772, 776)]) % 728 + 776) if scheme == 1 else (sum([ord(handshake[i]) for i in range(8, 12)]) % 728 + 12)
            temp = handshake[0:server_digest_offset] + handshake[server_digest_offset+32:Protocol.PING_SIZE]
            hash = Protocol._calculateHash(temp, Protocol.SERVER_KEY[:36])
            handshake = handshake[:server_digest_offset] + hash + handshake[server_digest_offset+32:]
            buffer = data[:Protocol.PING_SIZE-32]
            key_challenge_offset = (sum([ord(buffer[i]) for i in range(772, 776)]) % 728 + 776) if scheme == 1 else (sum([ord(buffer[i]) for i in range(8, 12)]) % 728 + 12)
            challenge_key = data[key_challenge_offset:key_challenge_offset+32]
            hash = Protocol._calculateHash(challenge_key, Protocol.SERVER_KEY[:68])
            rand_bytes = ''.join([chr(random.randint(0, 255)) for i in xrange(Protocol.PING_SIZE-32)])
            last_hash = Protocol._calculateHash(rand_bytes, hash[:32])
            output = chr(type) + handshake + rand_bytes + last_hash
            return output
       
    @staticmethod
    def _calculateHash(msg, key): # Hmac-sha256
        return hmac.new(key, msg, hashlib.sha256).digest()
       
    @staticmethod
    def _generateKeyPair(): # dummy key pair since we don't support encryption
        return (''.join([chr(random.randint(0, 255)) for i in xrange(128)]), '')
       
    def parseMessages(self):
        '''Parses complete messages until connection closed. Raises ConnectionLost exception.'''
        CHANNEL_MASK = 0x3F
        while True:
            hdrsize = ord((yield self.stream.read(1))[0])  # read header size byte
            channel = hdrsize & CHANNEL_MASK
            if channel == 0: # we need one more byte
                channel = 64 + ord((yield self.stream.read(1))[0])
            elif channel == 1: # we need two more bytes
                data = (yield self.stream.read(2))
                channel = 64 + ord(data[0]) + 256 * ord(data[1])

            hdrtype = hdrsize & Header.MASK   # read header type byte
            if hdrtype == Header.FULL or not self.lastReadHeaders.has_key(channel):
                header = Header(channel)
                self.lastReadHeaders[channel] = header
            else:
                header = self.lastReadHeaders[channel]
           
            if hdrtype < Header.SEPARATOR: # time or delta has changed
                data = (yield self.stream.read(3))
                header.time = struct.unpack('!I', '\x00' + data)[0]
               
            if hdrtype < Header.TIME: # size and type also changed
                data = (yield self.stream.read(3))
                header.size = struct.unpack('!I', '\x00' + data)[0]
                header.type = ord((yield self.stream.read(1))[0])

            if hdrtype < Header.MESSAGE: # streamId also changed
                data = (yield self.stream.read(4))
                header.streamId = struct.unpack('<I', data)[0]

            if header.time == 0xFFFFFF: # if we have extended timestamp, read it
                data = (yield self.stream.read(4))
                header.extendedTime = struct.unpack('!I', data)[0]
                if _debug: print 'extended time stamp', '%x'%(header.extendedTime,)
            else:
                header.extendedTime = None
               
            if hdrtype == Header.FULL:
                header.currentTime = header.extendedTime or header.time
                header.hdrtype = hdrtype
            elif hdrtype in (Header.MESSAGE, Header.TIME):
                header.hdrtype = hdrtype

            #print header.type, '0x%02x'%(hdrtype,), header.time, header.currentTime
           
            # if _debug: print 'R', header, header.currentTime, header.extendedTime, '0x%x'%(hdrsize,)
             
            data = self.incompletePackets.get(channel, "") # are we continuing an incomplete packet?
           
            count = min(header.size - (len(data)), self.readChunkSize) # how much more
           
            data += (yield self.stream.read(count))

            # check if we need to send Ack
            if self.readWinSize is not None:
                if self.stream.bytesRead > (self.readWinSize0 + self.readWinSize):
                    self.readWinSize0 = self.stream.bytesRead
                    ack = Message()
                    ack.time, ack.type, ack.data = self.relativeTime, Message.ACK, struct.pack('>L', self.readWinSize0)
                    yield self.writeMessage(ack)
                   
            if len(data) < header.size: # we don't have all data
                self.incompletePackets[channel] = data
            else: # we have all data
                if hdrtype in (Header.MESSAGE, Header.TIME):
                    header.currentTime = header.currentTime + (header.extendedTime or header.time)
                elif hdrtype == Header.SEPARATOR:
                    if header.hdrtype in (Header.MESSAGE, Header.TIME):
                        header.currentTime = header.currentTime + (header.extendedTime or header.time)
                if len(data) == header.size:
                    if channel in self.incompletePackets:
                        del self.incompletePackets[channel]
                else:
                    data, self.incompletePackets[channel] = data[:header.size], data[header.size:]
               
                hdr = Header(channel=header.channel, time=header.currentTime, size=header.size, type=header.type, streamId=header.streamId)
                msg = Message(hdr, data)
                if _debug: print 'Protocol.parseMessage msg=', msg
                try:
                    if channel == Protocol.PROTOCOL_CHANNEL_ID:
                        yield self.protocolMessage(msg)
                    else:
                        yield self.messageReceived(msg)
                except:
                    if _debug: print 'Protocol.parseMessages exception', (traceback and traceback.print_exc() or None)

    def write(self):
        '''Writes messages to stream'''
        while True:
#            while self.writeQueue.empty(): (yield multitask.sleep(0.01))
#            message = self.writeQueue.get() # TODO this should be used using multitask.Queue and remove previous wait.
            message = yield self.writeQueue.get() # TODO this should be used using multitask.Queue and remove previous wait.
            if _debug: print 'Protocol.write msg=', message
            if message is None:
                try: self.stream.close()  # just in case TCP socket is not closed, close it.
                except: pass
                break
           
            # get the header stored for the stream
            if self.lastWriteHeaders.has_key(message.streamId):
                header = self.lastWriteHeaders[message.streamId]
            else:
                if self.nextChannelId <= Protocol.PROTOCOL_CHANNEL_ID: self.nextChannelId = Protocol.PROTOCOL_CHANNEL_ID+1
                header, self.nextChannelId = Header(self.nextChannelId), self.nextChannelId + 1
                self.lastWriteHeaders[message.streamId] = header
            if message.type < Message.AUDIO:
                header = Header(Protocol.PROTOCOL_CHANNEL_ID)
               
            # now figure out the header data bytes
            if header.streamId != message.streamId or header.time == 0 or message.time <= header.time:
                header.streamId, header.type, header.size, header.time, header.delta = message.streamId, message.type, message.size, message.time, message.time
                control = Header.FULL
            elif header.size != message.size or header.type != message.type:
                header.type, header.size, header.time, header.delta = message.type, message.size, message.time, message.time-header.time
                control = Header.MESSAGE
            else:
                header.time, header.delta = message.time, message.time-header.time
                control = Header.TIME
           
            hdr = Header(channel=header.channel, time=header.delta if control in (Header.MESSAGE, Header.TIME) else header.time, size=header.size, type=header.type, streamId=header.streamId)
            assert message.size == len(message.data)

            data = ''
            while len(message.data) > 0:
                data += hdr.toBytes(control) # gather header bytes
                count = min(self.writeChunkSize, len(message.data))
                data += message.data[:count]
                message.data = message.data[count:]
                control = Header.SEPARATOR # incomplete message continuation
            try:
                yield self.stream.write(data)
            except ConnectionClosed:
                yield self.connectionClosed()
            except:
                print traceback.print_exc()

class Command(object):
    ''' Class for command / data messages'''
    def __init__(self, type=Message.RPC, name=None, id=None, tm=0, cmdData=None, args=[]):
        '''Create a new command with given type, name, id, cmdData and args list.'''
        self.type, self.name, self.id, self.time, self.cmdData, self.args = type, name, id, tm, cmdData, args[:]
       
    def __repr__(self):
        return ("<Command type=%r name=%r id=%r data=%r args=%r>" % (self.type, self.name, self.id, self.cmdData, self.args))
   
    def setArg(self, arg):
        self.args.append(arg)
   
    def getArg(self, index):
        return self.args[index]
   
    @classmethod
    def fromMessage(cls, message):
        ''' initialize from a parsed RTMP message'''
        assert (message.type in [Message.RPC, Message.RPC3, Message.DATA, Message.DATA3])

        length = len(message.data)
        if length == 0: raise ValueError('zero length message data')
       
        if message.type == Message.RPC3 or message.type == Message.DATA3:
            assert message.data[0] == '\x00' # must be 0 in AMF3
            data = message.data[1:]
        else:
            data = message.data
       
        amfReader = amf.AMF0(data)

        inst = cls()
        inst.type = message.type
        inst.time = message.time
        inst.name = amfReader.read() # first field is command name

        try:
            if message.type == Message.RPC or message.type == Message.RPC3:
                inst.id = amfReader.read() # second field *may* be message id
                inst.cmdData = amfReader.read() # third is command data
            else:
                inst.id = 0
            inst.args = [] # others are optional
            while True:
                inst.args.append(amfReader.read())
        except EOFError:
            pass
        return inst
   
    def toMessage(self):
        msg = Message()
        assert self.type
        msg.type = self.type
        msg.time = self.time
        output = amf.BytesIO()
        amfWriter = amf.AMF0(output)
        amfWriter.write(self.name)
        if msg.type == Message.RPC or msg.type == Message.RPC3:
            amfWriter.write(self.id)
            amfWriter.write(self.cmdData)
        for arg in self.args:
            amfWriter.write(arg)
        output.seek(0)
        #hexdump.hexdump(output)
        #output.seek(0)
        if msg.type == Message.RPC3 or msg.type == Message.DATA3:
            data = '\x00' + output.read()
        else:
            data = output.read()
        msg.data = data
        output.close()
        return msg

def getfilename(path, name, root):
    '''return the file name for the given stream. The name is derived as root/scope/name.flv where scope is
    the the path present in the path variable.'''
    ignore, ignore, scope = path.partition('/')
    if scope: scope = scope + '/'
    result = root + scope + name + '.flv'
    if _debug: print 'filename=', result
    return result

class FLV(object):
    '''An FLV file which converts between RTMP message and FLV tags.'''
    def __init__(self):
        self.fname = self.fp = self.type = None
        self.tsp = self.tsr = 0; self.tsr0 = None
   
    def open(self, path, type='read', mode=0775):
        '''Open the file for reading (type=read) or writing (type=record or append).'''
        if str(path).find('/../') >= 0 or str(path).find('\\..\\') >= 0: raise ValueError('Must not contain .. in name')
        if _debug: print 'opening file', path
        self.tsp = self.tsr = 0; self.tsr0 = None; self.type = type
        if type in ('record', 'append'):
            try: os.makedirs(os.path.dirname(path), mode)
            except: pass
            self.fp = open(path, ('w' if type == 'record' else 'a')+'b')
            if type == 'record':
                self.fp.write('FLV\x01\x05\x00\x00\x00\x09\x00\x00\x00\x00') # the header and first previousTagSize
                self.writeDuration(0.0)
        else:
            self.fp = open(path, 'rb')
            magic, version, flags, offset = struct.unpack('!3sBBI', self.fp.read(9))
            if _debug: print 'FLV.open() hdr=', magic, version, flags, offset
            if magic != 'FLV': raise ValueError('This is not a FLV file')
            if version != 1: raise ValueError('Unsupported FLV file version')
            if offset > 9: self.fp.seek(offset-9, os.SEEK_CUR)
            self.fp.read(4) # ignore first previous tag size
        return self
   
    def close(self):
        '''Close the underlying file for this object.'''
        if _debug: print 'closing flv file'
        if self.type == 'record' and self.tsr0 is not None: self.writeDuration((self.tsr - self.tsr0)/1000.0)
        if self.fp is not None:
            try: self.fp.close()
            except: pass
            self.fp = None
   
    def delete(self, path):
        '''Delete the underlying file for this object.'''
        try: os.unlink(path)
        except: pass
       
    def writeDuration(self, duration):
        if _debug: print 'writing duration', duration
        output = amf.BytesIO()
        amfWriter = amf.AMF0(output) # TODO: use AMF3 if needed
        amfWriter.write('onMetaData')
        amfWriter.write({"duration": duration, "videocodecid": 2})
        output.seek(0); data = output.read()
        length, ts = len(data), 0
        data = struct.pack('>BBHBHB', Message.DATA, (length >> 16) & 0xff, length & 0x0ffff, (ts >> 16) & 0xff, ts & 0x0ffff, (ts >> 24) & 0xff) + '\x00\x00\x00' +  data
        data += struct.pack('>I', len(data))
        lastpos = self.fp.tell()
        if lastpos != 13: self.fp.seek(13, os.SEEK_SET)
        self.fp.write(data)
        if lastpos != 13: self.fp.seek(lastpos, os.SEEK_SET)
       
    def write(self, message):
        '''Write a message to the file, assuming it was opened for writing or appending.'''
#        if message.type == Message.VIDEO:
#            self.videostarted = True
#        elif not hasattr(self, "videostarted"): return
        if message.type == Message.AUDIO or message.type == Message.VIDEO:
            length, ts = message.size, message.time
            #if _debug: print 'FLV.write()', message.type, ts
            if self.tsr0 is None: self.tsr0 = ts
            self.tsr, ts = ts, ts - self.tsr0
            # if message.type == Message.AUDIO: print 'w', message.type, ts
            data = struct.pack('>BBHBHB', message.type, (length >> 16) & 0xff, length & 0x0ffff, (ts >> 16) & 0xff, ts & 0x0ffff, (ts >> 24) & 0xff) + '\x00\x00\x00' +  message.data
            data += struct.pack('>I', len(data))
            self.fp.write(data)
   
    def reader(self, stream):
        '''A generator to periodically read the file and dispatch them to the stream. The supplied stream
        object must have a send(Message) method and id and client properties.'''
        if _debug: print 'reader started'
        yield
        try:
            while self.fp is not None:
                bytes = self.fp.read(11)
                if len(bytes) == 0:
                    response = Command(name='onStatus', id=stream.id, tm=stream.client.relativeTime, args=[amf.Object(level='status',code='NetStream.Play.Stop', description='File ended', details=None)])
                    yield stream.send(response.toMessage())
                    break
                type, len0, len1, ts0, ts1, ts2, sid0, sid1 = struct.unpack('>BBHBHBBH', bytes)
                length = (len0 << 16) | len1; ts = (ts0 << 16) | (ts1 & 0x0ffff) | (ts2 << 24)
                body = self.fp.read(length); ptagsize, = struct.unpack('>I', self.fp.read(4))
                if ptagsize != (length+11):
                    if _debug: print 'invalid previous tag-size found:', ptagsize, '!=', (length+11),'ignored.'
                if stream is None or stream.client is None: break # if it is closed
                #hdr = Header(3 if type == Message.AUDIO else 4, ts if ts < 0xffffff else 0xffffff, length, type, stream.id)
                hdr = Header(0, ts, length, type, stream.id)
                msg = Message(hdr, body)
                # if _debug: print 'FLV.read() length=', length, 'hdr=', hdr
                # if hdr.type == Message.AUDIO: print 'r', hdr.type, hdr.time
                if type == Message.DATA: # metadata
                    amfReader = amf.AMF0(body) # TODO: use AMF3 if needed
                    name = amfReader.read()
                    obj = amfReader.read()
                    if _debug: print 'FLV.read()', name, repr(obj)
                yield stream.send(msg)
                if ts > self.tsp:
                    diff, self.tsp = ts - self.tsp, ts
                    if _debug: print 'FLV.read() sleep', diff
                    yield multitask.sleep(diff / 1000.0)
        except StopIteration: pass
        except:
            if _debug: print 'closing the reader', (sys and sys.exc_info() or None)
            if self.fp is not None:
                try: self.fp.close()
                except: pass
                self.fp = None
           
    def seek(self, offset):
        '''For file reader, try seek to the given time. The offset is in millisec'''
        if self.type == 'read':
            if _debug: print 'FLV.seek() offset=', offset, 'current tsp=', self.tsp
            self.fp.seek(0, os.SEEK_SET)
            magic, version, flags, length = struct.unpack('!3sBBI', self.fp.read(9))
            if length > 9: self.fp.seek(length-9, os.SEEK_CUR)
            self.fp.seek(4, os.SEEK_CUR) # ignore first previous tag size
            self.tsp, ts = int(offset), 0
            while self.tsp > 0 and ts < self.tsp:
                bytes = self.fp.read(11)
                if not bytes: break
                type, len0, len1, ts0, ts1, ts2, sid0, sid1 = struct.unpack('>BBHBHBBH', bytes)
                length = (len0 << 16) | len1; ts = (ts0 << 16) | (ts1 & 0x0ffff) | (ts2 << 24)
                self.fp.seek(length, os.SEEK_CUR)
                ptagsize, = struct.unpack('>I', self.fp.read(4))
                if ptagsize != (length+11): break
            if _debug: print 'FLV.seek() new ts=', ts, 'tell', self.fp.tell()
               
       
class Stream(object):
    '''The stream object that is used for RTMP stream.'''
    count = 0;
    def __init__(self, client):
        self.client, self.id, self.name = client, 0, ''
        self.recordfile = self.playfile = None # so that it doesn't complain about missing attribute
        self.queue = multitask.Queue()
        self._name = 'Stream[' + str(Stream.count) + ']'; Stream.count += 1
        if _debug: print self, 'created'
       
    def close(self):
        if _debug: print self, 'closing'
        if self.recordfile is not None: self.recordfile.close(); self.recordfile = None
        if self.playfile is not None: self.playfile.close(); self.playfile = None
        self.client = None # to clear the reference
        pass
   
    def __repr__(self):
        return self._name;
   
    def recv(self):
        '''Generator to receive new Message on this stream, or None if stream is closed.'''
        return self.queue.get()
   
    def send(self, msg):
        '''Method to send a Message or Command on this stream.'''
        if isinstance(msg, Command):
            msg = msg.toMessage()
        msg.streamId = self.id
        # if _debug: print self,'send'
        if self.client is not None: yield self.client.writeMessage(msg)
       
class Client(Protocol):
    '''The client object represents a single connected client to the server.'''
    def __init__(self, sock, server):
        Protocol.__init__(self, sock)
        self.server, self.agent, self.streams, self._nextCallId, self._nextStreamId, self.objectEncoding = \
          server,      None,         {},           2,                1,                  0.0
        self.queue = multitask.Queue() # receive queue used by application
        multitask.add(self.parse()); multitask.add(self.write())

    def recv(self):
        '''Generator to receive new Message (msg, arg) on this stream, or (None,None) if stream is closed.'''
        return self.queue.get()
   
    def connectionClosed(self):
        '''Called when the client drops the connection'''
        if _debug: 'Client.connectionClosed'
        yield self.writeMessage(None)
        yield self.queue.put((None,None))
           
    def messageReceived(self, msg):
        if (msg.type == Message.RPC or msg.type == Message.RPC3) and msg.streamId == 0:
            cmd = Command.fromMessage(msg)
            # if _debug: print 'rtmp.Client.messageReceived cmd=', cmd
            if cmd.name == 'connect':
                self.agent = cmd.cmdData
                if _debug: print 'connect', ', '.join(['%s=%r'%(x, getattr(self.agent, x)) for x in 'app flashVer swfUrl tcUrl fpad capabilities audioCodecs videoCodecs videoFunction pageUrl objectEncoding'.split() if hasattr(self.agent, x)])
                self.objectEncoding = self.agent.objectEncoding if hasattr(self.agent, 'objectEncoding') else 0.0
                yield self.server.queue.put((self, cmd.args)) # new connection
            elif cmd.name == 'createStream':
                response = Command(name='_result', id=cmd.id, tm=self.relativeTime, type=self.rpc, args=[self._nextStreamId])
                yield self.writeMessage(response.toMessage())
               
                stream = Stream(self) # create a stream object
                stream.id = self._nextStreamId
                self.streams[self._nextStreamId] = stream
                self._nextStreamId += 1

                yield self.queue.put(('stream', stream)) # also notify others of our new stream
            elif cmd.name == 'closeStream':
                assert msg.streamId in self.streams
                yield self.streams[msg.streamId].queue.put(None) # notify closing to others
                del self.streams[msg.streamId]
            else:
                # if _debug: print 'Client.messageReceived cmd=', cmd
                yield self.queue.put(('command', cmd)) # RPC call
        else: # this has to be a message on the stream
            assert msg.streamId != 0
            assert msg.streamId in self.streams
            # if _debug: print self.streams[msg.streamId], 'recv'
            stream = self.streams[msg.streamId]
            if not stream.client: stream.client = self
            yield stream.queue.put(msg) # give it to stream

    @property
    def rpc(self):
        # TODO: reverting r141 since it causes exception in setting self.rpc
        return Message.RPC if self.objectEncoding == 0.0 else Message.RPC3
   
    def accept(self):
        '''Method to accept an incoming client.'''
        response = Command()
        response.id, response.name, response.type = 1, '_result', self.rpc
        if _debug: print 'Client.accept() objectEncoding=', self.objectEncoding
        arg = amf.Object(level='status', code='NetConnection.Connect.Success',
                         description='Connection succeeded.', fmsVer='rtmplite/8,2')
        if hasattr(self.agent, 'objectEncoding'):
            arg.objectEncoding, arg.details = self.objectEncoding, None
        response.setArg(arg)
        yield self.writeMessage(response.toMessage())
           
    def rejectConnection(self, reason=''):
        '''Method to reject an incoming client.'''
        response = Command()
        response.id, response.name, response.type = 1, '_error', self.rpc
        response.setArg(amf.Object(level='status', code='NetConnection.Connect.Rejected',
                        description=reason, fmsVer='rtmplite/8,2', details=None))
        yield self.writeMessage(response.toMessage())
           
    def redirectConnection(self, url, reason='Connection failed'):
        '''Method to redirect an incoming client to the given url.'''
        response = Command()
        response.id, response.name, response.type = 1, '_error', self.rpc
        extra = dict(code=302, redirect=url)
        response.setArg(amf.Object(level='status', code='NetConnection.Connect.Rejected',
                        description=reason, fmsVer='rtmplite/8,2', details=None, ex=extra))
        yield self.writeMessage(response.toMessage())

    def call(self, method, *args):
        '''Call a (callback) method on the client.'''
        cmd = Command()
        cmd.id, cmd.time, cmd.name, cmd.type = self._nextCallId, self.relativeTime, method, self.rpc
        cmd.args, cmd.cmdData = args, None
        self._nextCallId += 1
        if _debug: print 'Client.call method=', method, 'args=', args, ' msg=', cmd.toMessage()
        yield self.writeMessage(cmd.toMessage())
           
    def createStream(self):
        ''' Create a stream on the server side'''
        stream = Stream(self)
        stream.id = self._nextStreamId
        self.streams[stream.id] = stream
        self._nextStreamId += 1
        return stream


class Server(object):
    '''A RTMP server listens for incoming connections and informs the app.'''
    def __init__(self, sock):
        '''Create an RTMP server on the given bound TCP socket. The server will terminate
        when the socket is disconnected, or some other error occurs in listening.'''
        self.sock = sock
        self.queue = multitask.Queue()  # queue to receive incoming client connections
        multitask.add(self.run())

    def recv(self):
        '''Generator to wait for incoming client connections on this server and return
        (client, args) or (None, None) if the socket is closed or some error.'''
        return self.queue.get()
       
    def run(self):
        try:
            while True:
                sock, remote = (yield multitask.accept(self.sock))  # receive client TCP
                if sock == None:
                    if _debug: print 'rtmp.Server accept(sock) returned None.'
                    break
                if _debug: print 'connection received from', remote
                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # make it non-block
                client = Client(sock, self)
        except GeneratorExit: pass # terminate
        except:
            if _debug: print 'rtmp.Server exception ', (sys and sys.exc_info() or None)
       
        if (self.sock):
            try: self.sock.close(); self.sock = None
            except: pass
        if (self.queue):
            yield self.queue.put((None, None))
            self.queue = None

class App(object):
    '''An application instance containing any number of streams. Except for constructor all methods are generators.'''
    count = 0
    def __init__(self):
        self.name = str(self.__class__.__name__) + '[' + str(App.count) + ']'; App.count += 1
        self.players, self.publishers, self._clients = {}, {}, [] # Streams indexed by stream name, and list of clients
        if _debug: print self.name, 'created'
    def __del__(self):
        if _debug: print self.name, 'destroyed'
    @property
    def clients(self):
        '''everytime this property is accessed it returns a new list of clients connected to this instance.'''
        return self._clients[1:] if self._clients is not None else []
    def onConnect(self, client, *args):
        if _debug: print self.name, 'onConnect', client.path
        return True
    def onDisconnect(self, client):
        if _debug: print self.name, 'onDisconnect', client.path
    def onPublish(self, client, stream):
        if _debug: print self.name, 'onPublish', client.path, stream.name
    def onClose(self, client, stream):
        if _debug: print self.name, 'onClose', client.path, stream.name
    def onPlay(self, client, stream):
        if _debug: print self.name, 'onPlay', client.path, stream.name
    def onStop(self, client, stream):
        if _debug: print self.name, 'onStop', client.path, stream.name
    def onCommand(self, client, cmd, *args):
        if _debug: print self.name, 'onCommand', cmd, args
    def onStatus(self, client, info):
        if _debug: print self.name, 'onStatus', info
    def onResult(self, client, result):
        if _debug: print self.name, 'onResult', result
    def onPublishData(self, client, stream, message): # this is invoked every time some media packet is received from published stream.
        return True # should return True so that the data is actually published in that stream
    def onPlayData(self, client, stream, message):
        return True # should return True so that data will be actually played in that stream

class Wirecast(App):
    '''A wrapper around App to workaround with wirecast publisher which does not send AVC seq periodically. It defines new stream variables
    such as in publish stream 'metaData' to store first published metadata Message, and 'avcSeq' to store the last published AVC seq Message,
    and in play stream 'avcIntra' to indicate if AVC intra frame has been sent or not. These variables are created onPublish and onPlay.
    Additional, when onPlay it also also sends any published stream.metaData if found in associated publisher. When onPlayData for video, if
    it detects AVC seq it sets avcIntra so that it is not explicitly sent. This is the case with Flash Player publisher. When onPlayData for video,
    if it detects avcIntra is not set, it discards the packet until AVC NALU or seq is received. If NALU is received but previous seq is not received
    it uses the publisher's avcSeq message to send before this NALU if found.'''
    def __init__(self):
        App.__init__(self)

    def onPublish(self, client, stream):
        App.onPublish(self, client, stream)
        if not hasattr(stream, 'metaData'): stream.metaData = None
        if not hasattr(stream, 'avcSeq'): stream.avcSeq = None
       
    def onPlay(self, client, stream):
        App.onPlay(self, client, stream)
        if not hasattr(stream, 'avcIntra'): stream.avcIntra = False
        publisher = self.publishers.get(stream.name, None)
        if publisher and publisher.metaData: # send published meta data to this player joining late
            multitask.add(stream.send(publisher.metaData.dup()))
   
    def onPublishData(self, client, stream, message):
        if message.type == Message.DATA and not stream.metaData: # store the first meta data on this published stream for late joining players
            stream.metaData = message.dup()
        if message.type == Message.VIDEO and message.data[:2] == '\x17\x00': # H264Avc intra + seq, store it
            stream.avcSeq = message.dup()
        return True

    def onPlayData(self, client, stream, message):
        if message.type == Message.VIDEO: # only video packets need special handling
            if message.data[:2] == '\x17\x00': # intra+seq is being sent, possibly by Flash Player publisher.
                stream.avcIntra = True
            elif not stream.avcIntra:  # intra frame hasn't been sent yet.
                if message.data[:2] == '\x17\x01': # intra+nalu is being sent, possibly by wirecast publisher.
                    publisher = self.publishers.get(stream.name, None)
                    if publisher and publisher.avcSeq: # if a publisher exists
                        def sendboth(stream, msgs):
                            stream.avcIntra = True
                            for msg in msgs: yield stream.send(msg)
                        multitask.add(sendboth(stream, [publisher.avcSeq.dup(), message]))
                        return False # so that caller doesn't send it again
                return False # drop until next intra video is sent
        return True

class FlashServer(object):
    '''A RTMP server to record and stream Flash video.'''
    def __init__(self):
        '''Construct a new FlashServer. It initializes the local members.'''
        self.sock = self.server = None;
        self.apps = dict({'*': App, 'wirecast': Wirecast}) # supported applications: * means any as in {'*': App}
        self.clients = dict()  # list of clients indexed by scope. First item in list is app instance.
        self.root = '';
       
    def start(self, host='0.0.0.0', port=1935):
        '''This should be used to start listening for RTMP connections on the given port, which defaults to 1935.'''
        if not self.server:
            sock = self.sock = socket.socket(type=socket.SOCK_STREAM)
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            sock.bind((host, port))
            if _debug: print 'listening on ', sock.getsockname()
            sock.listen(5)
            server = self.server = Server(sock) # start rtmp server on that socket
            multitask.add(self.serverlistener())
   
    def stop(self):
        if _debug: print 'stopping Flash server'
        if self.server and self.sock:
            try: self.sock.close(); self.sock = None
            except: pass
        self.server = None
       
    def serverlistener(self):
        '''Server listener (generator). It accepts all connections and invokes client listener'''
        try:
            while True:  # main loop to receive new connections on the server
                client, args = (yield self.server.recv()) # receive an incoming client connection.
                # TODO: we should reject non-localhost client connections.
                if not client:                # if the server aborted abnormally,
                    break                     #    hence close the listener.
                if _debug: print 'client connection received', client, args
                if client.objectEncoding != 0 and client.objectEncoding != 3:
                #if client.objectEncoding != 0:
                    yield client.rejectConnection(reason='Unsupported encoding ' + str(client.objectEncoding) + '. Please use NetConnection.defaultObjectEncoding=ObjectEncoding.AMF0')
                    yield client.connectionClosed()
                else:
                    client.path = str(client.agent.app) if hasattr(client.agent, 'app') else str(client.agent['app']) if isinstance(client.agent, dict) else None
                    if not client.path:
                        yield client.rejectConnection(reason='Missing app path')
                        break
                    name, ignore, scope = client.path.partition('/')
                    if '*' not in self.apps and name not in self.apps:
                        yield client.rejectConnection(reason='Application not found: ' + name)
                    else: # create application instance as needed and add in our list
                        if _debug: print 'name=', name, 'name in apps', str(name in self.apps)
                        app = self.apps[name] if name in self.apps else self.apps['*'] # application class
                        if client.path in self.clients: inst = self.clients[client.path][0]
                        else: inst = app()
                       
                        win_ack = Message()
                        win_ack.time, win_ack.type, win_ack.data = client.relativeTime, Message.WIN_ACK_SIZE, struct.pack('>L', client.writeWinSize)
                        yield client.writeMessage(win_ack)
                       
#                        set_peer_bw = Message()
#                        set_peer_bw.time, set_peer_bw.type, set_peer_bw.data = client.relativeTime, Message.SET_PEER_BW, struct.pack('>LB', client.writeWinSize, 1)
#                        client.writeMessage(set_peer_bw)
                       
                        try:
                            result = inst.onConnect(client, *args)
                        except:
                            if _debug: print sys.exc_info()
                            yield client.rejectConnection(reason='Exception on onConnect');
                            continue
                        if result is True or result is None:
                            if client.path not in self.clients:
                                self.clients[client.path] = [inst]; inst._clients=self.clients[client.path]
                            self.clients[client.path].append(client)
                            if result is True:
                                yield client.accept() # TODO: else how to kill this task when rejectConnection() later
                            multitask.add(self.clientlistener(client)) # receive messages from client.
                        else:
                            yield client.rejectConnection(reason='Rejected in onConnect')
        except GeneratorExit: pass # terminate
        except StopIteration: raise
        except:
            if _debug: print 'serverlistener exception', traceback.print_exc()
           
    def clientlistener(self, client):
        '''Client listener (generator). It receives a command and invokes client handler, or receives a new stream and invokes streamlistener.'''
        try:
            while True:
                msg, arg = (yield client.recv())   # receive new message from client
                if not msg:                   # if the client disconnected,
                    if _debug: print 'connection closed from client'
                    break                     #    come out of listening loop.
                if msg == 'command':          # handle a new command
                    multitask.add(self.clienthandler(client, arg))
                elif msg == 'stream':         # a new stream is created, handle the stream.
                    arg.client = client
                    multitask.add(self.streamlistener(arg))
        except StopIteration: raise
        except:
            if _debug: print 'clientlistener exception', (sys and sys.exc_info() or None)
       
        try:
            # client is disconnected, clear our state for application instance.
            if _debug: print 'cleaning up client', client.path
            inst = None
            if client.path in self.clients:
                inst = self.clients[client.path][0]
                self.clients[client.path].remove(client)
            for stream in client.streams.values(): # for all streams of this client
                self.closehandler(stream)
            client.streams.clear() # and clear the collection of streams
            if client.path in self.clients and len(self.clients[client.path]) == 1: # no more clients left, delete the instance.
                if _debug: print 'removing the application instance'
                inst = self.clients[client.path][0]
                inst._clients = None
                del self.clients[client.path]
            if inst is not None: inst.onDisconnect(client)
        except:
            if _debug: print 'clientlistener exception', (sys and sys.exc_info() or None)
           
    def closehandler(self, stream):
        '''A stream is closed explicitly when a closeStream command is received from given client.'''
        if stream.client is not None:
            inst = self.clients[stream.client.path][0]
            if stream.name in inst.publishers and inst.publishers[stream.name] == stream: # clear the published stream
                inst.onClose(stream.client, stream)
                del inst.publishers[stream.name]
            if stream.name in inst.players and stream in inst.players[stream.name]:
                inst.onStop(stream.client, stream)
                inst.players[stream.name].remove(stream)
                if len(inst.players[stream.name]) == 0:
                    del inst.players[stream.name]
            stream.close()
       
    def clienthandler(self, client, cmd):
        '''A generator to handle a single command on the client.'''
        inst = self.clients[client.path][0]
        if inst:
            if cmd.name == '_error':
                if hasattr(inst, 'onStatus'):
                    result = inst.onStatus(client, cmd.args[0])
            elif cmd.name == '_result':
                if hasattr(inst, 'onResult'):
                    result = inst.onResult(client, cmd.args[0])
            else:
                res, code, result = Command(), '_result', None
                try:
                    result = inst.onCommand(client, cmd.name, *cmd.args)
                except:
                    if _debug: print 'Client.call exception', (sys and sys.exc_info() or None)
                    code = '_error'
                args = (result,) if result is not None else dict()
                res.id, res.time, res.name, res.type = cmd.id, client.relativeTime, code, client.rpc
                res.args, res.cmdData = args, None
                if _debug: print 'Client.call method=', code, 'args=', args, ' msg=', res.toMessage()
                yield client.writeMessage(res.toMessage())
        yield
       
    def streamlistener(self, stream):
        '''Stream listener (generator). It receives stream message and invokes streamhandler.'''
        try:
            stream.recordfile = None # so that it doesn't complain about missing attribute
            while True:
                msg = (yield stream.recv())
                if not msg:
                    if _debug: print 'stream closed'
                    self.closehandler(stream)
                    break
                # if _debug: msg
                multitask.add(self.streamhandler(stream, msg))
        except:
            if _debug: print 'streamlistener exception', (sys and sys.exc_info() or None)
           
    def streamhandler(self, stream, message):
        '''A generator to handle a single message on the stream.'''
        try:
            if message.type == Message.RPC or message.type == Message.RPC3:
                cmd = Command.fromMessage(message)
                if _debug: print 'streamhandler received cmd=', cmd
                if cmd.name == 'publish':
                    yield self.publishhandler(stream, cmd)
                elif cmd.name == 'play':
                    yield self.playhandler(stream, cmd)
                elif cmd.name == 'closeStream':
                    self.closehandler(stream)
                elif cmd.name == 'seek':
                    yield self.seekhandler(stream, cmd)
            else: # audio or video message
                yield self.mediahandler(stream, message)
        except GeneratorExit: pass
        except StopIteration: raise
        except:
            if _debug: print 'exception in streamhandler', (sys and sys.exc_info())
   
    def publishhandler(self, stream, cmd):
        '''A new stream is published. Store the information in the application instance.'''
        try:
            stream.mode = 'live' if len(cmd.args) < 2 else cmd.args[1] # live, record, append
            stream.name = cmd.args[0]
            if _debug: print 'publishing stream=', stream.name, 'mode=', stream.mode
            if stream.name and '?' in stream.name: stream.name = stream.name.partition('?')[0]
            inst = self.clients[stream.client.path][0]
            if (stream.name in inst.publishers):
                raise ValueError, 'Stream name already in use'
            inst.publishers[stream.name] = stream # store the client for publisher
            inst.onPublish(stream.client, stream)
           
            path = getfilename(stream.client.path, stream.name, self.root)
            if stream.mode in ('record', 'append'):
                stream.recordfile = FLV().open(path, stream.mode)
            # elif stream.mode == 'live': FLV().delete(path) # TODO: this is commented out to avoid accidental delete
            response = Command(name='onStatus', id=cmd.id, tm=stream.client.relativeTime, args=[amf.Object(level='status', code='NetStream.Publish.Start', description='', details=None)])
            yield stream.send(response)
        except ValueError, E: # some error occurred. inform the app.
            if _debug: print 'error in publishing stream', str(E)
            response = Command(name='onStatus', id=cmd.id, tm=stream.client.relativeTime, args=[amf.Object(level='error',code='NetStream.Publish.BadName',description=str(E),details=None)])
            yield stream.send(response)

    def playhandler(self, stream, cmd):
        '''A new stream is being played. Just updated the players list with this stream.'''
        try:
            inst = self.clients[stream.client.path][0]
            name = stream.name = cmd.args[0]  # store the stream's name
            if stream.name and '?' in stream.name: name = stream.name = stream.name.partition('?')[0]
            start = cmd.args[1] if len(cmd.args) >= 2 else -2
            if name not in inst.players:
                inst.players[name] = [] # initialize the players for this stream name
            if stream not in inst.players[name]: # store the stream as players of this name
                inst.players[name].append(stream)
            task = None
            if start >= 0 or start == -2 and name not in inst.publishers:
                path = getfilename(stream.client.path, stream.name, self.root)
                if os.path.exists(path):
                    stream.playfile = FLV().open(path)
                    if start > 0: stream.playfile.seek(start)
                    task = stream.playfile.reader(stream)
                elif start >= 0: raise ValueError, 'Stream name not found'
            if _debug: print 'playing stream=', name, 'start=', start
            inst.onPlay(stream.client, stream)
           
#            m0 = Message() # SetChunkSize
#            m0.time, m0.type, m0.data = stream.client.relativeTime, Message.CHUNK_SIZE, struct.pack('>L', stream.client.writeChunkSize)
#            yield stream.client.writeMessage(m0)
           
#            m1 = Message() # UserControl/StreamIsRecorded
#            m1.time, m1.type, m1.data = stream.client.relativeTime, Message.USER_CONTROL, struct.pack('>HI', 4, stream.id)
#            yield stream.client.writeMessage(m1)
           
            m2 = Message() # UserControl/StreamBegin
            m2.time, m2.type, m2.data = stream.client.relativeTime, Message.USER_CONTROL, struct.pack('>HI', 0, stream.id)
            yield stream.client.writeMessage(m2)
           
#            response = Command(name='onStatus', id=cmd.id, args=[amf.Object(level='status',code='NetStream.Play.Reset', description=stream.name, details=None)])
#            yield stream.send(response)
           
            response = Command(name='onStatus', id=cmd.id, tm=stream.client.relativeTime, args=[amf.Object(level='status',code='NetStream.Play.Start', description=stream.name, details=None)])
            yield stream.send(response)
           
#            response = Command(name='onStatus', id=cmd.id, tm=stream.client.relativeTime, args=[amf.Object(level='status',code='NetStream.Play.PublishNotify', description=stream.name, details=None)])
#            yield stream.send(response)
           
            if task is not None: multitask.add(task)
        except ValueError, E: # some error occurred. inform the app.
            if _debug: print 'error in playing stream', str(E)
            response = Command(name='onStatus', id=cmd.id, tm=stream.client.relativeTime, args=[amf.Object(level='error',code='NetStream.Play.StreamNotFound',description=str(E),details=None)])
            yield stream.send(response)
           
    def seekhandler(self, stream, cmd):
        '''A stream is seeked to a new position. This is allowed only for play from a file.'''
        try:
            offset = cmd.args[0]
            if stream.playfile is None or stream.playfile.type != 'read':
                raise ValueError, 'Stream is not seekable'
            stream.playfile.seek(offset)
            response = Command(name='onStatus', id=cmd.id, tm=stream.client.relativeTime, args=[amf.Object(level='status',code='NetStream.Seek.Notify', description=stream.name, details=None)])
            yield stream.send(response)
        except ValueError, E: # some error occurred. inform the app.
            if _debug: print 'error in seeking stream', str(E)
            response = Command(name='onStatus', id=cmd.id, tm=stream.client.relativeTime, args=[amf.Object(level='error',code='NetStream.Seek.Failed',description=str(E),details=None)])
            yield stream.send(response)
           
    def mediahandler(self, stream, message):
        '''Handle incoming media on the stream, by sending to other stream in this application instance.'''
        if stream.client is not None:
            inst = self.clients[stream.client.path][0]
            result = inst.onPublishData(stream.client, stream, message)
            if result:
                for s in (inst.players.get(stream.name, [])):
                    #if _debug: print 'D', stream.name, s.name
                    m = message.dup()
                    result = inst.onPlayData(s.client, s, m)
                    if result:
                        yield s.send(m)
                if stream.recordfile is not None:
                    stream.recordfile.write(message)

# The main routine to start, run and stop the service
if __name__ == '__main__':
    from optparse import OptionParser
    parser = OptionParser(version='SVN $Revision$, $Date$'.replace('$', ''))
    parser.add_option('-i', '--host',    dest='host',    default='0.0.0.0', help="listening IP address. Default '0.0.0.0'")
    parser.add_option('-p', '--port',    dest='port',    default=1935, type="int", help='listening port number. Default 1935')
    parser.add_option('-r', '--root',    dest='root',    default='./',       help="document path prefix. Directory must end with /. Default './'")
    parser.add_option('-d', '--verbose', dest='verbose', default=False, action='store_true', help='enable debug trace')
    (options, args) = parser.parse_args()
   
    _debug = options.verbose
    try:
        agent = FlashServer()
        agent.root = options.root
        agent.start(options.host, options.port)
        if _debug: print time.asctime(), 'Flash Server Starts - %s:%d' % (options.host, options.port)
        multitask.run()
    except KeyboardInterrupt:
        pass
    if _debug: print time.asctime(), 'Flash Server Stops'
+
+
+ + + + + + +
+ +
+ + +
+
+
+
+
+
+

Change log

+
+ r145 + by kundan10 + on Feb 6, 2012 +   Diff +
+
patch submitted by Teeed teeed@na1noc.pl
+on Jan 16 to fix the result fron onCommand
+
+ + + + + + + + + + +
+
Go to:  + +
+ + + + + + + +
+ + +
+
+
+
+
+
+
+
+
+
+
+

Older revisions

+ + +
+ + + r142 + by kundan10 + on Feb 5, 2012 +   Diff +
+
reverting r141 since it causes
+exception in setting self.rpc because
+rpc exists as a function
+
+ +
+ + + r141 + by pratnama + on Jan 28, 2012 +   Diff +
+
force self.rpc to be Message.RPC on
+NetConnection.connect() even in AMF3.
+AMF3 is fully working now
+
+ +
+ + + r137 + by kundan10 + on Jan 20, 2012 +   Diff +
+
show media flow trace only for -D.
+support streambegin and define FLV in
+gevent version. ignore stream name
+after ?. don't include objectEncoding
+in connect success if connect did not
+...
+
+ + + All revisions of this file +
+
+
+
+
+
+ +
+
+
+
+
+
+

File info

+ +
Size: 65812 bytes, + 1214 lines
+ + +
+ +
+

File properties

+
+ +
svn:executable
+
*
+ +
svn:keywords
+
Author Id Revision Date
+ +
+
+ +
+
+
+
+
+
+
+ + +
+ +
+
+ + + + + + + + + + + + + + + + + + + + + + +
+ +
+ Powered by Google Project Hosting +
+ + + + + + + + + \ No newline at end of file diff --git a/ngx_rtmp.c b/ngx_rtmp.c index d6b5bf3..ce5c505 100644 --- a/ngx_rtmp.c +++ b/ngx_rtmp.c @@ -776,4 +776,3 @@ ngx_rtmp_rmemcpy(void *dst, void* src, size_t n) return dst; } - diff --git a/ngx_rtmp.h b/ngx_rtmp.h index 79f6eb7..bfe3046 100644 --- a/ngx_rtmp.h +++ b/ngx_rtmp.h @@ -101,18 +101,9 @@ typedef struct { #define NGX_LOG_DEBUG_RTMP NGX_LOG_DEBUG_CORE -#define NGX_RTMP_HANDSHAKE_SIZE 1536 - #define NGX_RTMP_DEFAULT_CHUNK_SIZE 128 -/* RTMP handshake stages */ -#define NGX_RTMP_HS_READ_DATA 0 -#define NGX_RTMP_HS_WRITE_DATA 1 -#define NGX_RTMP_HS_WRITE_ECHO 2 -#define NGX_RTMP_HS_READ_ECHO 3 - - /* RTMP message types */ #define NGX_RTMP_MSG_CHUNK_SIZE 1 #define NGX_RTMP_MSG_ABORT 2 @@ -200,9 +191,9 @@ typedef struct { uint32_t vcodecs; ngx_str_t page_url; - /* TODO: allocate this bufs from shared pool */ - ngx_buf_t hs_in_buf; - ngx_buf_t hs_out_buf; + /* handshake data */ + ngx_buf_t *hs_in; + ngx_buf_t *hs_out1, *hs_out2; ngx_uint_t hs_stage; /* connection timestamps */ @@ -347,9 +338,14 @@ char* ngx_rtmp_message_type(uint8_t type); char* ngx_rtmp_user_message_type(uint16_t evt); #endif -void ngx_rtmp_init_connection(ngx_connection_t *c); + +void ngx_rtmp_init_connection(ngx_connection_t *c); void ngx_rtmp_finalize_session(ngx_rtmp_session_t *s); -u_char * ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len); +void ngx_rtmp_handshake(ngx_rtmp_session_t *s); +void ngx_rtmp_free_handshake_buffers(ngx_rtmp_session_t *s); +void ngx_rtmp_cycle(ngx_rtmp_session_t *s); + + ngx_int_t ngx_rtmp_set_chunk_size(ngx_rtmp_session_t *s, ngx_uint_t size); diff --git a/ngx_rtmp_handler.c b/ngx_rtmp_handler.c index 71aca06..b988868 100644 --- a/ngx_rtmp_handler.c +++ b/ngx_rtmp_handler.c @@ -1,24 +1,12 @@ - /* * Copyright (c) 2012 Roman Arutyunyan */ -#include -#include -#include -#include - #include "ngx_rtmp.h" #include "ngx_rtmp_amf.h" -static void ngx_rtmp_init_session(ngx_connection_t *c); -static void ngx_rtmp_close_connection(ngx_connection_t *c); - -static void ngx_rtmp_handshake_recv(ngx_event_t *rev); -static void ngx_rtmp_handshake_send(ngx_event_t *rev); - static void ngx_rtmp_recv(ngx_event_t *rev); static void ngx_rtmp_send(ngx_event_t *rev); static ngx_int_t ngx_rtmp_receive_message(ngx_rtmp_session_t *s, @@ -28,8 +16,9 @@ static ngx_int_t ngx_rtmp_finalize_set_chunk_size(ngx_rtmp_session_t *s); #ifdef NGX_DEBUG char* -ngx_rtmp_message_type(uint8_t type) { - static char* types[] = { +ngx_rtmp_message_type(uint8_t type) +{ + static char* types[] = { "?", "chunk_size", "abort", @@ -62,8 +51,9 @@ ngx_rtmp_message_type(uint8_t type) { char* -ngx_rtmp_user_message_type(uint16_t evt) { - static char* evts[] = { +ngx_rtmp_user_message_type(uint16_t evt) +{ + static char* evts[] = { "stream_begin", "stream_eof", "stream dry", @@ -79,390 +69,17 @@ ngx_rtmp_user_message_type(uint16_t evt) { } #endif -void -ngx_rtmp_init_connection(ngx_connection_t *c) -{ - ngx_uint_t i; - ngx_rtmp_port_t *port; - struct sockaddr *sa; - struct sockaddr_in *sin; - ngx_rtmp_log_ctx_t *ctx; - ngx_rtmp_in_addr_t *addr; - ngx_rtmp_session_t *s; - ngx_rtmp_addr_conf_t *addr_conf; -#if (NGX_HAVE_INET6) - struct sockaddr_in6 *sin6; - ngx_rtmp_in6_addr_t *addr6; -#endif - - - /* find the server configuration for the address:port */ - - /* AF_INET only */ - - port = c->listening->servers; - - if (port->naddrs > 1) { - - /* - * There are several addresses on this port and one of them - * is the "*:port" wildcard so getsockname() is needed to determine - * the server address. - * - * AcceptEx() already gave this address. - */ - - if (ngx_connection_local_sockaddr(c, NULL, 0) != NGX_OK) { - ngx_rtmp_close_connection(c); - return; - } - - sa = c->local_sockaddr; - - switch (sa->sa_family) { - -#if (NGX_HAVE_INET6) - case AF_INET6: - sin6 = (struct sockaddr_in6 *) sa; - - addr6 = port->addrs; - - /* the last address is "*" */ - - for (i = 0; i < port->naddrs - 1; i++) { - if (ngx_memcmp(&addr6[i].addr6, &sin6->sin6_addr, 16) == 0) { - break; - } - } - - addr_conf = &addr6[i].conf; - - break; -#endif - - default: /* AF_INET */ - sin = (struct sockaddr_in *) sa; - - addr = port->addrs; - - /* the last address is "*" */ - - for (i = 0; i < port->naddrs - 1; i++) { - if (addr[i].addr == sin->sin_addr.s_addr) { - break; - } - } - - addr_conf = &addr[i].conf; - - break; - } - - } else { - switch (c->local_sockaddr->sa_family) { - -#if (NGX_HAVE_INET6) - case AF_INET6: - addr6 = port->addrs; - addr_conf = &addr6[0].conf; - break; -#endif - - default: /* AF_INET */ - addr = port->addrs; - addr_conf = &addr[0].conf; - break; - } - } - - s = ngx_pcalloc(c->pool, sizeof(ngx_rtmp_session_t)); - if (s == NULL) { - ngx_rtmp_close_connection(c); - return; - } - - s->main_conf = addr_conf->ctx->main_conf; - s->srv_conf = addr_conf->ctx->srv_conf; - - s->addr_text = &addr_conf->addr_text; - - c->data = s; - s->connection = c; - - ngx_log_error(NGX_LOG_INFO, c->log, 0, "*%ui client connected", - c->number, &c->addr_text); - - ctx = ngx_palloc(c->pool, sizeof(ngx_rtmp_log_ctx_t)); - if (ctx == NULL) { - ngx_rtmp_close_connection(c); - return; - } - - ctx->client = &c->addr_text; - ctx->session = s; - - c->log->connection = c->number; - c->log->handler = ngx_rtmp_log_error; - c->log->data = ctx; - c->log->action = NULL; - - c->log_error = NGX_ERROR_INFO; - - ngx_rtmp_init_session(c); -} - - -static void -ngx_rtmp_init_session(ngx_connection_t *c) -{ - ngx_rtmp_session_t *s; - ngx_rtmp_core_main_conf_t *cmcf; - ngx_rtmp_core_srv_conf_t *cscf; - ngx_buf_t *b; - size_t n, size; - ngx_rtmp_handler_pt *h; - ngx_array_t *ch; - - s = c->data; - - cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); - - s->ctx = ngx_pcalloc(c->pool, sizeof(void *) * ngx_rtmp_max_module); - if (s->ctx == NULL) { - ngx_rtmp_close_connection(c); - return; - } - - - s->in_streams = ngx_pcalloc(c->pool, sizeof(ngx_rtmp_stream_t) - * cscf->max_streams); - if (s->in_streams == NULL) { - ngx_rtmp_close_connection(c); - return; - } - size = NGX_RTMP_HANDSHAKE_SIZE + 1; - - s->epoch = ngx_current_msec; - s->timeout = cscf->timeout; - ngx_rtmp_set_chunk_size(s, NGX_RTMP_DEFAULT_CHUNK_SIZE); - - - /* start handshake */ - b = &s->hs_in_buf; - b->start = b->pos = b->last = ngx_pcalloc(s->in_pool, size); - b->end = b->start + size; - b->temporary = 1; - - b = &s->hs_out_buf; - b->start = b->pos = b->last = ngx_pcalloc(s->in_pool, size); - b->end = b->start + size; - b->temporary = 1; - - c->write->handler = ngx_rtmp_handshake_send; - c->read->handler = ngx_rtmp_handshake_recv; - - /* call connect callbacks */ - cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module); - - ch = &cmcf->events[NGX_RTMP_CONNECT]; - h = ch->elts; - for(n = 0; n < ch->nelts; ++n, ++h) { - if (*h) { - if ((*h)(s, NULL, NULL) != NGX_OK) { - ngx_rtmp_finalize_session(s); - return; - } - } - } - - ngx_rtmp_handshake_recv(c->read); -} - void -ngx_rtmp_handshake_recv(ngx_event_t *rev) +ngx_rtmp_cycle(ngx_rtmp_session_t *s) { - ssize_t n; ngx_connection_t *c; - ngx_rtmp_session_t *s; - ngx_buf_t *b; - u_char *p; - - c = rev->data; - s = c->data; - - if (c->destroyed) { - return; - } - - if (rev->timedout) { - ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out"); - c->timedout = 1; - ngx_rtmp_finalize_session(s); - return; - } - - if (rev->timer_set) { - ngx_del_timer(rev); - } - - b = (s->hs_stage == NGX_RTMP_HS_READ_DATA) - ? &s->hs_in_buf - : &s->hs_out_buf; - - while (b->last != b->end) { - - n = c->recv(c, b->last, b->end - b->last); - - if (n == NGX_ERROR || n == 0) { - ngx_rtmp_finalize_session(s); - return; - } - - if (n == NGX_AGAIN) { - ngx_add_timer(rev, s->timeout); - if (ngx_handle_read_event(c->read, 0) != NGX_OK) { - ngx_rtmp_finalize_session(s); - } - return; - } - - b->last += n; - } - - if (rev->active) { - ngx_del_event(c->read, NGX_READ_EVENT, 0); - } - - ++s->hs_stage; - - if (s->hs_stage == NGX_RTMP_HS_WRITE_DATA) { - - if (*b->pos != NGX_RTMP_VERSION) { - ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR, - "invalid handshake signature"); - ngx_rtmp_finalize_session(s); - return; - } - - /* version is never needed anymore */ - ++b->pos; - - /* store current time as our epoch */ - s->epoch = ngx_current_msec; - - /* read client epoch */ - p = (u_char*)&s->peer_epoch; - *p++ = b->pos[3]; - *p++ = b->pos[2]; - *p++ = b->pos[1]; - *p++ = b->pos[0]; - - /* prepare output signature: - * set version, set epoch, fill zeroes */ - p = (u_char*)&s->epoch; - b = &s->hs_out_buf; - b->pos[0] = NGX_RTMP_VERSION; - b->pos[4] = *p++; - b->pos[3] = *p++; - b->pos[2] = *p++; - b->pos[1] = *p++; - b->pos[5] = b->pos[6] = b->pos[7] = b->pos[8] = 0; - for(b->last = b->pos + 9, n = 1; - b->last < b->end; - ++b->last, ++n) - { - *b->last = (u_char)(n & 0xff); - } - - /* reply timestamp is the same as out epoch */ - /*ngx_memcpy(s->hs_in_buf.pos + 4, b->pos + 1, 4);*/ - - ngx_rtmp_handshake_send(c->write); - - return; - } - - /* handshake done */ - ngx_reset_pool(s->in_pool); + c = s->connection; c->read->handler = ngx_rtmp_recv; c->write->handler = ngx_rtmp_send; - ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, - "RTMP handshake done; epoch=%uD peer_epoch=%uD", - s->epoch, s->peer_epoch); - - ngx_rtmp_recv(rev); -} - - -void -ngx_rtmp_handshake_send(ngx_event_t *wev) -{ - ngx_int_t n; - ngx_connection_t *c; - ngx_rtmp_session_t *s; - ngx_buf_t *b; - - c = wev->data; - s = c->data; - - if (c->destroyed) { - return; - } - - if (wev->timedout) { - ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, - "client timed out"); - c->timedout = 1; - ngx_rtmp_finalize_session(s); - return; - } - - if (wev->timer_set) { - ngx_del_timer(wev); - } - -restart: - - b = (s->hs_stage == NGX_RTMP_HS_WRITE_DATA) - ? &s->hs_out_buf - : &s->hs_in_buf; - - while(b->pos != b->last) { - - n = c->send(c, b->pos, b->last - b->pos); - - if (n == NGX_ERROR) { - ngx_rtmp_finalize_session(s); - return; - } - - if (n == NGX_AGAIN || n == 0) { - ngx_add_timer(c->write, s->timeout); - if (ngx_handle_write_event(c->write, 0) != NGX_OK) { - ngx_rtmp_finalize_session(s); - return; - } - } - - b->pos += n; - } - - ++s->hs_stage; - - if (s->hs_stage == NGX_RTMP_HS_WRITE_ECHO) { - goto restart; - } - - if (wev->active) { - ngx_del_event(wev, NGX_WRITE_EVENT, 0); - } - - b = &s->hs_out_buf; - b->pos = b->last = b->start + 1; - ngx_rtmp_handshake_recv(c->read); + ngx_rtmp_recv(c->read); } @@ -493,7 +110,7 @@ ngx_rtmp_alloc_in_buf(ngx_rtmp_session_t *s) } -void +static void ngx_rtmp_recv(ngx_event_t *rev) { ngx_int_t n; @@ -778,6 +395,7 @@ ngx_rtmp_recv(ngx_event_t *rev) } } + static void ngx_rtmp_send(ngx_event_t *wev) { @@ -1179,117 +797,3 @@ ngx_rtmp_finalize_set_chunk_size(ngx_rtmp_session_t *s) } -static void -ngx_rtmp_close_connection(ngx_connection_t *c) -{ - ngx_pool_t *pool; - - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "close connection"); - - pool = c->pool; - ngx_close_connection(c); - ngx_destroy_pool(pool); -} - - -static void -ngx_rtmp_close_session_handler(ngx_event_t *e) -{ - ngx_rtmp_session_t *s; - ngx_connection_t *c; - ngx_rtmp_core_main_conf_t *cmcf; - ngx_rtmp_core_srv_conf_t *cscf; - ngx_rtmp_handler_pt *h; - ngx_array_t *dh; - size_t n; - - s = e->data; - c = s->connection; - - cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module); - cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); - - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "close session"); - - if (s) { - dh = &cmcf->events[NGX_RTMP_DISCONNECT]; - h = dh->elts; - - for(n = 0; n < dh->nelts; ++n, ++h) { - if (*h) { - (*h)(s, NULL, NULL); - } - } - - if (s->in_old_pool) { - ngx_destroy_pool(s->in_old_pool); - } - - if (s->in_pool) { - ngx_destroy_pool(s->in_pool); - } - } - - while (s->out_pos != s->out_last) { - ngx_rtmp_free_shared_chain(cscf, s->out[s->out_pos++]); - s->out_pos %= NGX_RTMP_OUT_QUEUE; - } - - ngx_rtmp_close_connection(c); -} - - -void -ngx_rtmp_finalize_session(ngx_rtmp_session_t *s) -{ - ngx_event_t *e; - ngx_connection_t *c; - - /* deferred session finalize; - * schedule handler here */ - - c = s->connection; - - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "finalize session"); - - c->destroyed = 1; - e = &s->close; - e->data = s; - e->handler = ngx_rtmp_close_session_handler; - e->log = c->log; - - ngx_post_event(e, &ngx_posted_events); -} - - -u_char * -ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len) -{ - u_char *p; - ngx_rtmp_session_t *s; - ngx_rtmp_log_ctx_t *ctx; - - if (log->action) { - p = ngx_snprintf(buf, len, " while %s", log->action); - len -= p - buf; - buf = p; - } - - ctx = log->data; - - p = ngx_snprintf(buf, len, ", client: %V", ctx->client); - len -= p - buf; - buf = p; - - s = ctx->session; - - if (s == NULL) { - return p; - } - - p = ngx_snprintf(buf, len, ", server: %V", s->addr_text); - len -= p - buf; - buf = p; - - return p; -} diff --git a/ngx_rtmp_handshake.c b/ngx_rtmp_handshake.c index 6486f66..24d847d 100644 --- a/ngx_rtmp_handshake.c +++ b/ngx_rtmp_handshake.c @@ -3,18 +3,19 @@ */ -#include -#include -#include +#include "ngx_rtmp.h" -#ifdef NGX_SSL #include #include -#endif + + +static void ngx_rtmp_handshake_send(ngx_event_t *wev); +static void ngx_rtmp_handshake_recv(ngx_event_t *rev); +static void ngx_rtmp_handshake_done(ngx_rtmp_session_t *s); /* Handshake keys */ -static const u_char +static u_char ngx_rtmp_server_key[] = { 'G', 'e', 'n', 'u', 'i', 'n', 'e', ' ', 'A', 'd', 'o', 'b', 'e', ' ', 'F', 'l', 'a', 's', 'h', ' ', 'M', 'e', 'd', 'i', 'a', ' ', @@ -27,7 +28,7 @@ ngx_rtmp_server_key[] = { }; -static const u_char +static u_char ngx_rtmp_client_key[] = { 'G', 'e', 'n', 'u', 'i', 'n', 'e', ' ', 'A', 'd', 'o', 'b', 'e', ' ', 'F', 'l', 'a', 's', 'h', ' ', 'P', 'l', 'a', 'y', 'e', 'r', ' ', @@ -50,29 +51,30 @@ ngx_rtmp_server_version[4] = { static ngx_str_t ngx_rtmp_server_full_key - = { ngx_rtmp_server_key, sizeof(ngx_rtmp_server_key) }; -static ngx_str_t ngx_rtmp_server_partial_key; - = { ngx_rtmp_server_key, 36 }; + = { sizeof(ngx_rtmp_server_key), ngx_rtmp_server_key }; +static ngx_str_t ngx_rtmp_server_partial_key + = { 36, ngx_rtmp_server_key }; -static ngx_str_t ngx_rtmp_client_full_key - = { ngx_rtmp_client_key, sizeof(ngx_rtmp_client_key) }; static ngx_str_t ngx_rtmp_client_partial_key - = { ngx_rtmp_client_key, 30 }; + = { 30, ngx_rtmp_client_key }; static ngx_int_t ngx_rtmp_make_digest(ngx_str_t *key, ngx_buf_t *src, u_char *skip, u_char *dst, ngx_log_t *log) { -#ifdef NGX_SSL HMAC_CTX hmac; unsigned int len; /* TODO */ + ngx_int_t rc; + + rc = NGX_ERROR; + HMAC_CTX_init(&hmac); if (HMAC_Init_ex(&hmac, key->data, key->len, - EVP_sha256, NULL) == 0) + EVP_sha256(), NULL) == 0) { ngx_log_error(NGX_LOG_INFO, log, 0, "HMAC_Init_ex error"); - return NGX_ERROR; + goto out; } if (skip && src->pos <= skip && skip <= src->last) { @@ -80,37 +82,36 @@ ngx_rtmp_make_digest(ngx_str_t *key, ngx_buf_t *src, && HMAC_Update(&hmac, src->pos, skip - src->pos) == 0) { ngx_log_error(NGX_LOG_INFO, log, 0, "HMAC_Update error"); - return NGX_ERROR; + goto out; } if (src->last != skip + NGX_RTMP_KEYLEN && HMAC_Update(&hmac, skip + NGX_RTMP_KEYLEN, src->last - skip - NGX_RTMP_KEYLEN) == 0) { ngx_log_error(NGX_LOG_INFO, log, 0, "HMAC_Update error"); - return NGX_ERROR; + goto out; } } else if (HMAC_Update(&hmac, src->pos, src->last - src->pos) == 0) { ngx_log_error(NGX_LOG_INFO, log, 0, "HMAC_Update error"); - return NGX_ERROR; + goto out; } if (HMAC_Final(&hmac, dst, &len) == 0) { ngx_log_error(NGX_LOG_INFO, log, 0, "HMAC_Final error"); - return NGX_ERROR; + goto out; } - /* TODO: free? */ + rc = NGX_OK; - return NGX_OK; +out: + HMAC_CTX_cleanup(&hmac); -#else /* NGX_SSL */ - return NGX_ERROR; -#endif + return rc; } static ngx_int_t -ngx_rtmp_get_digest(ngx_buf_t *b, size_t base, ngx_log_t *log) +ngx_rtmp_find_digest(ngx_buf_t *b, size_t base, ngx_log_t *log) { size_t n, offs; u_char digest[NGX_RTMP_KEYLEN]; @@ -123,8 +124,8 @@ ngx_rtmp_get_digest(ngx_buf_t *b, size_t base, ngx_log_t *log) offs = (offs % 728) + base + 4; p = b->pos + offs; - if (ngx_rtmp_make_digest(&ngx_rtmp_client_partial_key, - b, p, digest, log) != NGX_OK) + if (ngx_rtmp_make_digest(&ngx_rtmp_client_partial_key, b, + p, digest, log) != NGX_OK) { return NGX_ERROR; } @@ -138,15 +139,16 @@ ngx_rtmp_get_digest(ngx_buf_t *b, size_t base, ngx_log_t *log) static ngx_int_t -ngx_rtmp_put_digest(ngx_buf_t *b, size_t base, ngx_log_t *log) +ngx_rtmp_write_digest(ngx_buf_t *b, size_t base, ngx_log_t *log) { size_t n, offs; + u_char *p; offs = 0; - for (n = 0; n < 4; ++n) { + for (n = 8; n < 12; ++n) { offs += b->pos[base + n]; } - offs = (offs % 728) + base + 4; + offs = (offs % 728) + base + 12; p = b->pos + offs; if (ngx_rtmp_make_digest(&ngx_rtmp_server_partial_key, @@ -160,29 +162,29 @@ ngx_rtmp_put_digest(ngx_buf_t *b, size_t base, ngx_log_t *log) static void -ngx_rtmp_make_random_buffer(ngx_buf_t *b) +ngx_rtmp_fill_random_buffer(ngx_buf_t *b) { - u_char *p; - - for (p = b->pos; p != b->last; ++p) { - *p = rand(); + for (; b->last != b->end; ++b->last) { + *b->last = rand(); } } static ngx_buf_t * -ngx_rtmp_alloc_handshake_buffer(ngx_rtmp_session_t *s) +ngx_rtmp_alloc_handshake_buffer(ngx_rtmp_session_t *s, int short_buf) { ngx_rtmp_core_srv_conf_t *cscf; ngx_chain_t *cl; ngx_buf_t *b; cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); + if (cscf->free_hs) { cl = cscf->free_hs; b = cl->buf; cscf->free_hs = cl->next; - ngx_free_chain(cacf->pool, cl); + ngx_free_chain(cscf->pool, cl); + } else { b = ngx_pcalloc(cscf->pool, sizeof(ngx_buf_t)); if (b == NULL) { @@ -196,7 +198,12 @@ ngx_rtmp_alloc_handshake_buffer(ngx_rtmp_session_t *s) b->end = b->start + NGX_RTMP_HANDSHAKE_BUFSIZE; } - b->pos = b->last = b->start; + if (short_buf) { + b->pos = b->last = b->start + 1; + } else { + b->pos = b->last = b->start; + } + return b; } @@ -206,7 +213,6 @@ ngx_rtmp_free_handshake_buffer(ngx_rtmp_session_t *s, ngx_buf_t *b) { ngx_rtmp_core_srv_conf_t *cscf; ngx_chain_t *cl; - ngx_buf_t *b; cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); cl = ngx_alloc_chain_link(cscf->pool); @@ -220,103 +226,119 @@ ngx_rtmp_free_handshake_buffer(ngx_rtmp_session_t *s, ngx_buf_t *b) } +void +ngx_rtmp_free_handshake_buffers(ngx_rtmp_session_t *s) +{ + if (s->hs_in) { + ngx_rtmp_free_handshake_buffer(s, s->hs_in); + s->hs_in = NULL; + } + + if (s->hs_out1) { + ngx_rtmp_free_handshake_buffer(s, s->hs_out1); + s->hs_out1 = NULL; + } + + if (s->hs_out2) { + ngx_rtmp_free_handshake_buffer(s, s->hs_out2); + s->hs_out2 = NULL; + } +} + + +static ngx_int_t +ngx_rtmp_old_handshake_response(ngx_rtmp_session_t *s) +{ + ngx_buf_t *b; + u_char *src; + size_t len; + + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "RTMP old-style handshake"); + + src = s->hs_in->pos + 8; + len = s->hs_in->last - src; + + b = s->hs_out1; + *b->last++ = '\x03'; + b->last = ngx_rtmp_rcpymem(b->last, &s->epoch, 4); + ngx_memzero(b->last, 4); + b->last = ngx_cpymem(b->last + 4, src, len); + + b = s->hs_out2; + b->last = ngx_rtmp_rcpymem(b->last, &s->peer_epoch, 4); + ngx_memzero(b->last, 4); + b->last = ngx_cpymem(b->last + 4, src, len); + + return NGX_OK; +} + + static ngx_int_t ngx_rtmp_handshake_response(ngx_rtmp_session_t *s) { - u_char *p, *pp; - ngx_buf_t b; + u_char *p; + ngx_buf_t *b; ngx_int_t offs; u_char digest[NGX_RTMP_KEYLEN]; ngx_str_t key; - - s->hs_out1 = ngx_rtmp_alloc_handshake_buffer(s); - s->hs_out1->last = s->hs_out1.end; - s->hs_out2 = ngx_rtmp_alloc_handshake_buffer(s); - s->hs_out1->last = s->hs_out2.end - 1; - - /* read input buffer */ - b = *s->hs_in; - p = b->pos; - if (*p != '\x03') { + b = s->hs_in; + if (*b->pos != '\x03') { ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, - "Unexpected RTMP version: %i", (int)*p); + "Unexpected RTMP version: %i", (ngx_int_t)*b->pos); return NGX_ERROR; } - ++p; - pp = (u_char *)&s->peer_epoch + 3; - *pp-- = *p++; - *pp-- = *p++; - *pp-- = *p++; - *pp-- = *p++; - if ( -#ifndef NGX_SSL - 1 || -#endif - *(uint32_t *)p == 0) - { - /*TODO*/ - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "RTMP epoch=%uD", s->peer_epoch); - ngx_memzero(p, 4); - p += 4; - ngx_memcpy(p, s->hs_in->pos + 9, s->hs_out1->last - p); - p = s->hs_out; - ngx_memzero(p, 8); - p += 8; - ngx_memcpy(pp, s->hs_in->pos + 9, s->hs_out2->last - p); - return NGX_OK; - } + ++b->pos; + ngx_rtmp_rmemcpy(&s->peer_epoch, b->pos, 4); + + p = b->pos + 4; ngx_log_debug5(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "RTMP client version=%i.%i.%i.%i epoch=%uD", - (ngx_int_t)p[0], (ngx_int_t)p[1], - (ngx_int_t)p[2], (ngx_int_t)p[3], + (ngx_int_t)p[3], (ngx_int_t)p[2], + (ngx_int_t)p[1], (ngx_int_t)p[0], s->peer_epoch); - p += 4; - b.pos = p; - offs = ngx_rtmp_get_digest(&b, 764, s->connection->log); + if (*(uint32_t *)p == 0) { + return ngx_rtmp_old_handshake_response(s); + } + + offs = ngx_rtmp_find_digest(b, 772, s->connection->log); if (offs == NGX_ERROR) { - offs = ngx_rtmp_get_digest(&b, 0, s->connection->log); + offs = ngx_rtmp_find_digest(b, 8, s->connection->log); } if (offs == NGX_ERROR) { ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "RTMP digest not found"); return NGX_ERROR; } - + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "RTMP digest found at pos=%i", offs); /* create first output buffer */ - b = *s->hs_out1; - p = b.pos; - *p++ = '\x03'; - pp = (u_char *)&s->epoch + 3; - *p++ = *pp--; - *p++ = *pp--; - *p++ = *pp--; - *p++ = *pp--; - p = ngx_cpymem(p, ngx_rtmp_server_version, 4); - b.pos = p; - ngx_rtmp_make_random_buffer(&b); - if (ngx_rtmp_put_digest(&b, 0, s->connection->log) != NGX_OK) { + b = s->hs_out1; + *b->last++ = '\x03'; + b->last = ngx_rtmp_rcpymem(b->last, &s->epoch, 4); + b->last = ngx_cpymem(b->last, ngx_rtmp_server_version, 4); + ngx_rtmp_fill_random_buffer(b); + ++b->pos; + if (ngx_rtmp_write_digest(b, 0, s->connection->log) != NGX_OK) { return NGX_ERROR; } - + --b->pos; /* create second output buffer */ - b = *s->hs_out2; - p = b.pos; - p = ngx_cpymem(b, s->hs_out1->pos + 1, 8); - ngx_rtmp_make_random_buffer(&b); - if (ngx_rtmp_make_digest(&ngx_rtmp_server_full_key, &b, + b = s->hs_out2; + ngx_rtmp_fill_random_buffer(b); + if (ngx_rtmp_make_digest(&ngx_rtmp_server_full_key, b, NULL, digest, s->connection->log) != NGX_OK) { return NGX_ERROR; } key.data = digest; key.len = sizeof(digest); - p = b.last - key.len; - if (ngx_rtmp_make_digest(&key, &b, p, p, s->connection->log) != NGX_OK) { + p = b->last - key.len; + if (ngx_rtmp_make_digest(&key, b, p, p, s->connection->log) != NGX_OK) { return NGX_ERROR; } @@ -324,11 +346,156 @@ ngx_rtmp_handshake_response(ngx_rtmp_session_t *s) } -ngx_int_t -ngx_rtmp_response(ngx_rtmp_session_t *s) +static void +ngx_rtmp_handshake_done(ngx_rtmp_session_t *s) { - s->hs_in = ngx_rtmp_alloc_handshake_buffer(s); + ngx_rtmp_free_handshake_buffers(s); - return ngx_rtmp_handshake_recv(s->connection->read); + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "RTMP handshake done"); + + ngx_rtmp_cycle(s); +} + + +static void +ngx_rtmp_handshake_recv(ngx_event_t *rev) +{ + ssize_t n; + ngx_connection_t *c; + ngx_rtmp_session_t *s; + ngx_buf_t *b; + + c = rev->data; + s = c->data; + + if (c->destroyed) { + return; + } + + if (rev->timedout) { + ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out"); + c->timedout = 1; + ngx_rtmp_finalize_session(s); + return; + } + + if (rev->timer_set) { + ngx_del_timer(rev); + } + + b = s->hs_in; + + while (b->last != b->end) { + n = c->recv(c, b->last, b->end - b->last); + + if (n == NGX_ERROR || n == 0) { + ngx_rtmp_finalize_session(s); + return; + } + + if (n == NGX_AGAIN) { + ngx_add_timer(rev, s->timeout); + if (ngx_handle_read_event(c->read, 0) != NGX_OK) { + ngx_rtmp_finalize_session(s); + } + return; + } + + b->last += n; + } + + if (rev->active) { + ngx_del_event(c->read, NGX_READ_EVENT, 0); + } + + if (++s->hs_stage == 1) { + s->hs_out1 = ngx_rtmp_alloc_handshake_buffer(s, 0); + s->hs_out2 = ngx_rtmp_alloc_handshake_buffer(s, 1); + if (ngx_rtmp_handshake_response(s) != NGX_OK) { + ngx_log_error(NGX_LOG_INFO, c->log, 0, + "RTMP handshake error"); + ngx_rtmp_finalize_session(s); + return; + } + ngx_rtmp_handshake_send(c->write); + return; + } + + ngx_rtmp_handshake_done(s); +} + + +static void +ngx_rtmp_handshake_send(ngx_event_t *wev) +{ + ngx_int_t n; + ngx_connection_t *c; + ngx_rtmp_session_t *s; + ngx_buf_t *b; + + c = wev->data; + s = c->data; + + if (c->destroyed) { + return; + } + + if (wev->timedout) { + ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, + "client timed out"); + c->timedout = 1; + ngx_rtmp_finalize_session(s); + return; + } + + if (wev->timer_set) { + ngx_del_timer(wev); + } + +restart: + + b = (s->hs_stage == 1 ? s->hs_out1 : s->hs_out2); + + while(b->pos != b->last) { + n = c->send(c, b->pos, b->last - b->pos); + + if (n == NGX_ERROR) { + ngx_rtmp_finalize_session(s); + return; + } + + if (n == NGX_AGAIN || n == 0) { + ngx_add_timer(c->write, s->timeout); + if (ngx_handle_write_event(c->write, 0) != NGX_OK) { + ngx_rtmp_finalize_session(s); + return; + } + } + + b->pos += n; + } + + if (++s->hs_stage == 2) { + goto restart; + } + + s->hs_in->pos = s->hs_in->last = s->hs_in->start + 1; + ngx_rtmp_handshake_recv(c->read); +} + + +void +ngx_rtmp_handshake(ngx_rtmp_session_t *s) +{ + ngx_connection_t *c; + + c = s->connection; + c->read->handler = ngx_rtmp_handshake_recv; + c->write->handler = ngx_rtmp_handshake_send; + + s->hs_in = ngx_rtmp_alloc_handshake_buffer(s, 0); + + ngx_rtmp_handshake_recv(c->read); } diff --git a/ngx_rtmp_init.c b/ngx_rtmp_init.c new file mode 100644 index 0000000..b45cc78 --- /dev/null +++ b/ngx_rtmp_init.c @@ -0,0 +1,311 @@ +/* + * Copyright (c) 2012 Roman Arutyunyan + */ + + +#include "ngx_rtmp.h" + + +static void ngx_rtmp_close_connection(ngx_connection_t *c); +static void ngx_rtmp_init_session(ngx_connection_t *c); +static u_char * ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len); + + +void +ngx_rtmp_init_connection(ngx_connection_t *c) +{ + ngx_uint_t i; + ngx_rtmp_port_t *port; + struct sockaddr *sa; + struct sockaddr_in *sin; + ngx_rtmp_log_ctx_t *ctx; + ngx_rtmp_in_addr_t *addr; + ngx_rtmp_session_t *s; + ngx_rtmp_addr_conf_t *addr_conf; +#if (NGX_HAVE_INET6) + struct sockaddr_in6 *sin6; + ngx_rtmp_in6_addr_t *addr6; +#endif + + + /* find the server configuration for the address:port */ + + /* AF_INET only */ + + port = c->listening->servers; + + if (port->naddrs > 1) { + + /* + * There are several addresses on this port and one of them + * is the "*:port" wildcard so getsockname() is needed to determine + * the server address. + * + * AcceptEx() already gave this address. + */ + + if (ngx_connection_local_sockaddr(c, NULL, 0) != NGX_OK) { + ngx_rtmp_close_connection(c); + return; + } + + sa = c->local_sockaddr; + + switch (sa->sa_family) { + +#if (NGX_HAVE_INET6) + case AF_INET6: + sin6 = (struct sockaddr_in6 *) sa; + + addr6 = port->addrs; + + /* the last address is "*" */ + + for (i = 0; i < port->naddrs - 1; i++) { + if (ngx_memcmp(&addr6[i].addr6, &sin6->sin6_addr, 16) == 0) { + break; + } + } + + addr_conf = &addr6[i].conf; + + break; +#endif + + default: /* AF_INET */ + sin = (struct sockaddr_in *) sa; + + addr = port->addrs; + + /* the last address is "*" */ + + for (i = 0; i < port->naddrs - 1; i++) { + if (addr[i].addr == sin->sin_addr.s_addr) { + break; + } + } + + addr_conf = &addr[i].conf; + + break; + } + + } else { + switch (c->local_sockaddr->sa_family) { + +#if (NGX_HAVE_INET6) + case AF_INET6: + addr6 = port->addrs; + addr_conf = &addr6[0].conf; + break; +#endif + + default: /* AF_INET */ + addr = port->addrs; + addr_conf = &addr[0].conf; + break; + } + } + + s = ngx_pcalloc(c->pool, sizeof(ngx_rtmp_session_t)); + if (s == NULL) { + ngx_rtmp_close_connection(c); + return; + } + + s->main_conf = addr_conf->ctx->main_conf; + s->srv_conf = addr_conf->ctx->srv_conf; + + s->addr_text = &addr_conf->addr_text; + + c->data = s; + s->connection = c; + + ngx_log_error(NGX_LOG_INFO, c->log, 0, "*%ui client connected", + c->number, &c->addr_text); + + ctx = ngx_palloc(c->pool, sizeof(ngx_rtmp_log_ctx_t)); + if (ctx == NULL) { + ngx_rtmp_close_connection(c); + return; + } + + ctx->client = &c->addr_text; + ctx->session = s; + + c->log->connection = c->number; + c->log->handler = ngx_rtmp_log_error; + c->log->data = ctx; + c->log->action = NULL; + + c->log_error = NGX_ERROR_INFO; + + ngx_rtmp_init_session(c); +} + + +static void +ngx_rtmp_init_session(ngx_connection_t *c) +{ + ngx_rtmp_session_t *s; + ngx_rtmp_core_main_conf_t *cmcf; + ngx_rtmp_core_srv_conf_t *cscf; + size_t n; + ngx_rtmp_handler_pt *h; + ngx_array_t *ch; + + s = c->data; + + cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); + + s->ctx = ngx_pcalloc(c->pool, sizeof(void *) * ngx_rtmp_max_module); + if (s->ctx == NULL) { + ngx_rtmp_close_connection(c); + return; + } + + + s->in_streams = ngx_pcalloc(c->pool, sizeof(ngx_rtmp_stream_t) + * cscf->max_streams); + if (s->in_streams == NULL) { + ngx_rtmp_close_connection(c); + return; + } + + s->epoch = ngx_current_msec; + s->timeout = cscf->timeout; + ngx_rtmp_set_chunk_size(s, NGX_RTMP_DEFAULT_CHUNK_SIZE); + + + /* call connect callbacks */ + cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module); + + ch = &cmcf->events[NGX_RTMP_CONNECT]; + h = ch->elts; + for(n = 0; n < ch->nelts; ++n, ++h) { + if (*h) { + if ((*h)(s, NULL, NULL) != NGX_OK) { + ngx_rtmp_finalize_session(s); + return; + } + } + } + + ngx_rtmp_handshake(s); +} + + +static u_char * +ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len) +{ + u_char *p; + ngx_rtmp_session_t *s; + ngx_rtmp_log_ctx_t *ctx; + + if (log->action) { + p = ngx_snprintf(buf, len, " while %s", log->action); + len -= p - buf; + buf = p; + } + + ctx = log->data; + + p = ngx_snprintf(buf, len, ", client: %V", ctx->client); + len -= p - buf; + buf = p; + + s = ctx->session; + + if (s == NULL) { + return p; + } + + p = ngx_snprintf(buf, len, ", server: %V", s->addr_text); + len -= p - buf; + buf = p; + + return p; +} + + +static void +ngx_rtmp_close_connection(ngx_connection_t *c) +{ + ngx_pool_t *pool; + + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "close connection"); + + pool = c->pool; + ngx_close_connection(c); + ngx_destroy_pool(pool); +} + + +static void +ngx_rtmp_close_session_handler(ngx_event_t *e) +{ + ngx_rtmp_session_t *s; + ngx_connection_t *c; + ngx_rtmp_core_main_conf_t *cmcf; + ngx_rtmp_core_srv_conf_t *cscf; + ngx_rtmp_handler_pt *h; + ngx_array_t *dh; + size_t n; + + s = e->data; + c = s->connection; + + cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module); + cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); + + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "close session"); + + if (s) { + dh = &cmcf->events[NGX_RTMP_DISCONNECT]; + h = dh->elts; + + for(n = 0; n < dh->nelts; ++n, ++h) { + if (*h) { + (*h)(s, NULL, NULL); + } + } + + if (s->in_old_pool) { + ngx_destroy_pool(s->in_old_pool); + } + + if (s->in_pool) { + ngx_destroy_pool(s->in_pool); + } + + ngx_rtmp_free_handshake_buffers(s); + + while (s->out_pos != s->out_last) { + ngx_rtmp_free_shared_chain(cscf, s->out[s->out_pos++]); + s->out_pos %= NGX_RTMP_OUT_QUEUE; + } + } + + ngx_rtmp_close_connection(c); +} + + +void +ngx_rtmp_finalize_session(ngx_rtmp_session_t *s) +{ + ngx_event_t *e; + ngx_connection_t *c; + + c = s->connection; + + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "finalize session"); + + c->destroyed = 1; + e = &s->close; + e->data = s; + e->handler = ngx_rtmp_close_session_handler; + e->log = c->log; + + ngx_post_event(e, &ngx_posted_events); +} +