From 49bb899d51a4ffebf1db631ede48959d150d0f0d Mon Sep 17 00:00:00 2001 From: Gabe Kangas Date: Wed, 3 Mar 2021 20:44:13 -0800 Subject: [PATCH] Optimize/fix concurrency for chat --- core/chat/chat.go | 3 +++ core/chat/client.go | 9 ++++----- core/chat/server.go | 19 ++++++++++++++----- core/stats.go | 8 ++++++-- test/load/websocketTest.yaml | 14 +++++++++++--- 5 files changed, 38 insertions(+), 15 deletions(-) diff --git a/core/chat/chat.go b/core/chat/chat.go index 1498ec165c..631887d7f3 100644 --- a/core/chat/chat.go +++ b/core/chat/chat.go @@ -73,6 +73,9 @@ func GetModerationChatMessages() []models.ChatEvent { } func GetClient(clientID string) *Client { + l.RLock() + defer l.RUnlock() + for _, client := range _server.Clients { if client.ClientID == clientID { return client diff --git a/core/chat/client.go b/core/chat/client.go index 916506c194..5ecb5f8fc9 100644 --- a/core/chat/client.go +++ b/core/chat/client.go @@ -153,10 +153,9 @@ func (c *Client) listenRead() { if err != nil { if err == io.EOF { c.doneCh <- true - } else { - c.handleClientSocketError(err) + return } - return + c.handleClientSocketError(err) } var messageTypeCheck map[string]interface{} @@ -165,12 +164,12 @@ func (c *Client) listenRead() { log.Errorln(err) } - messageType := messageTypeCheck["type"].(string) - if !c.passesRateLimit() { continue } + messageType := messageTypeCheck["type"].(string) + if messageType == models.MessageSent { c.chatMessageReceived(data) } else if messageType == models.UserNameChanged { diff --git a/core/chat/server.go b/core/chat/server.go index 4b2e710bb1..5a32255da1 100644 --- a/core/chat/server.go +++ b/core/chat/server.go @@ -18,7 +18,7 @@ var ( _server *server ) -var l = sync.Mutex{} +var l = &sync.RWMutex{} // Server represents the server which handles the chat. type server struct { @@ -56,32 +56,42 @@ func (s *server) err(err error) { } func (s *server) sendAll(msg models.ChatEvent) { + l.RLock() for _, c := range s.Clients { c.write(msg) } + l.RUnlock() } func (s *server) ping() { ping := models.PingMessage{MessageType: models.PING} + + l.RLock() for _, c := range s.Clients { c.pingch <- ping } + l.RUnlock() } func (s *server) usernameChanged(msg models.NameChangeEvent) { + l.RLock() for _, c := range s.Clients { c.usernameChangeChannel <- msg } + l.RUnlock() go webhooks.SendChatEventUsernameChanged(msg) } func (s *server) userJoined(msg models.UserJoinedEvent) { + l.RLock() if s.listener.IsStreamConnected() { for _, c := range s.Clients { c.userJoinedChannel <- msg } } + l.RUnlock() + go webhooks.SendChatEventUserJoined(msg) } @@ -92,7 +102,8 @@ func (s *server) onConnection(ws *websocket.Conn) { s.removeClient(client) if err := ws.Close(); err != nil { - s.errCh <- err + log.Debugln(err) + //s.errCh <- err } }() @@ -113,12 +124,12 @@ func (s *server) Listen() { case c := <-s.addCh: l.Lock() s.Clients[c.socketID] = c - l.Unlock() if !c.Ignore { s.listener.ClientAdded(c.GetViewerClientFromChatClient()) s.sendWelcomeMessageToClient(c) } + l.Unlock() // remove a client case c := <-s.delCh: @@ -158,12 +169,10 @@ func (s *server) Listen() { func (s *server) removeClient(c *Client) { l.Lock() - if _, ok := s.Clients[c.socketID]; ok { delete(s.Clients, c.socketID) s.listener.ClientRemoved(c.socketID) - log.Tracef("The client was connected for %s and sent %d messages (%s)", time.Since(c.ConnectedAt), c.MessageCount, c.ClientID) } l.Unlock() diff --git a/core/stats.go b/core/stats.go index 2e4f21f87d..1ece58a200 100644 --- a/core/stats.go +++ b/core/stats.go @@ -14,7 +14,7 @@ import ( "github.com/owncast/owncast/utils" ) -var l = sync.Mutex{} +var l = &sync.RWMutex{} func setupStats() error { s := getSavedStats() @@ -52,6 +52,8 @@ func IsStreamConnected() bool { // SetClientActive sets a client as active and connected. func SetClientActive(client models.Client) { l.Lock() + defer l.Unlock() + // If this clientID already exists then update it. // Otherwise set a new one. if existingClient, ok := _stats.Clients[client.ClientID]; ok { @@ -66,7 +68,6 @@ func SetClientActive(client models.Client) { } _stats.Clients[client.ClientID] = client } - l.Unlock() // Don't update viewer counts if a live stream session is not active. if _stats.StreamConnected { @@ -85,6 +86,7 @@ func RemoveClient(clientID string) { } func GetClients() []models.Client { + l.RLock() clients := make([]models.Client, 0) for _, client := range _stats.Clients { chatClient := chat.GetClient(client.ClientID) @@ -94,6 +96,8 @@ func GetClients() []models.Client { clients = append(clients, client) } } + l.RUnlock() + return clients } diff --git a/test/load/websocketTest.yaml b/test/load/websocketTest.yaml index e5e674cea1..2d299da6d7 100644 --- a/test/load/websocketTest.yaml +++ b/test/load/websocketTest.yaml @@ -7,8 +7,15 @@ config: maxErrorRate: 1 phases: - - duration: 100 - arrivalRate: 15 + - duration: 30 + arrivalRate: 5 + rampTo: 5 + name: "Warming up" + - duration: 240 + arrivalRate: 5 + rampTo: 40 + name: "Max load" + ws: subprotocols: - json @@ -16,9 +23,10 @@ config: Connection: Upgrade Origin: http://localhost:8080 Sec-WebSocket-Version: 13 + scenarios: - engine: "ws" flow: - function: "createTestMessageObject" - send: "{{ data }}" - - think: 10 + - think: 30 # Each client should stay connected for 30 seconds