mirror of
				https://github.com/owncast/owncast.git
				synced 2025-11-04 13:27:21 +08:00 
			
		
		
		
	* Able to authenticate user against IndieAuth. For #1273 * WIP server indieauth endpoint. For https://github.com/owncast/owncast/issues/1272 * Add migration to remove access tokens from user * Add authenticated bool to user for display purposes * Add indieauth modal and auth flair to display names. For #1273 * Validate URLs and display errors * Renames, cleanups * Handle relative auth endpoint paths. Add error handling for missing redirects. * Disallow using display names in use by registered users. Closes #1810 * Verify code verifier via code challenge on callback * Use relative path to authorization_endpoint * Post-rebase fixes * Use a timestamp instead of a bool for authenticated * Propertly handle and display error in modal * Use auth'ed timestamp to derive authenticated flag to display in chat * don't redirect unless a URL is present avoids redirecting to `undefined` if there was an error * improve error message if owncast server URL isn't set * fix IndieAuth PKCE implementation use SHA256 instead of SHA1, generates a longer code verifier (must be 43-128 chars long), fixes URL-safe SHA256 encoding * return real profile data for IndieAuth response * check the code verifier in the IndieAuth server * Linting * Add new chat settings modal anad split up indieauth ui * Remove logging error * Update the IndieAuth modal UI. For #1273 * Add IndieAuth repsonse error checking * Disable IndieAuth client if server URL is not set. * Add explicit error messages for specific error types * Fix bad logic * Return OAuth-keyed error responses for indieauth server * Display IndieAuth error in plain text with link to return to main page * Remove redundant check * Add additional detail to error * Hide IndieAuth details behind disclosure details * Break out migration into two steps because some people have been runing dev in production * Add auth option to user dropdown Co-authored-by: Aaron Parecki <aaron@parecki.com>
		
			
				
	
	
		
			413 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			413 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package chat
 | 
						|
 | 
						|
import (
 | 
						|
	"encoding/json"
 | 
						|
	"fmt"
 | 
						|
	"net/http"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	log "github.com/sirupsen/logrus"
 | 
						|
 | 
						|
	"github.com/gorilla/websocket"
 | 
						|
 | 
						|
	"github.com/owncast/owncast/config"
 | 
						|
	"github.com/owncast/owncast/core/chat/events"
 | 
						|
	"github.com/owncast/owncast/core/data"
 | 
						|
	"github.com/owncast/owncast/core/user"
 | 
						|
	"github.com/owncast/owncast/core/webhooks"
 | 
						|
	"github.com/owncast/owncast/geoip"
 | 
						|
	"github.com/owncast/owncast/utils"
 | 
						|
)
 | 
						|
 | 
						|
var _server *Server
 | 
						|
 | 
						|
// a map of user IDs and when they last were active.
 | 
						|
var _lastSeenCache = map[string]time.Time{}
 | 
						|
 | 
						|
// Server represents an instance of the chat server.
 | 
						|
type Server struct {
 | 
						|
	mu                       sync.RWMutex
 | 
						|
	seq                      uint
 | 
						|
	clients                  map[uint]*Client
 | 
						|
	maxSocketConnectionLimit int64
 | 
						|
 | 
						|
	// send outbound message payload to all clients
 | 
						|
	outbound chan []byte
 | 
						|
 | 
						|
	// receive inbound message payload from all clients
 | 
						|
	inbound chan chatClientEvent
 | 
						|
 | 
						|
	// unregister requests from clients.
 | 
						|
	unregister chan uint // the ChatClient id
 | 
						|
 | 
						|
	geoipClient *geoip.Client
 | 
						|
}
 | 
						|
 | 
						|
// NewChat will return a new instance of the chat server.
 | 
						|
func NewChat() *Server {
 | 
						|
	maximumConcurrentConnectionLimit := getMaximumConcurrentConnectionLimit()
 | 
						|
	setSystemConcurrentConnectionLimit(maximumConcurrentConnectionLimit)
 | 
						|
 | 
						|
	server := &Server{
 | 
						|
		clients:                  map[uint]*Client{},
 | 
						|
		outbound:                 make(chan []byte),
 | 
						|
		inbound:                  make(chan chatClientEvent),
 | 
						|
		unregister:               make(chan uint),
 | 
						|
		maxSocketConnectionLimit: maximumConcurrentConnectionLimit,
 | 
						|
		geoipClient:              geoip.NewClient(),
 | 
						|
	}
 | 
						|
 | 
						|
	return server
 | 
						|
}
 | 
						|
 | 
						|
// Run will start the chat server.
 | 
						|
func (s *Server) Run() {
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case clientID := <-s.unregister:
 | 
						|
			if _, ok := s.clients[clientID]; ok {
 | 
						|
				s.mu.Lock()
 | 
						|
				delete(s.clients, clientID)
 | 
						|
				s.mu.Unlock()
 | 
						|
			}
 | 
						|
 | 
						|
		case message := <-s.inbound:
 | 
						|
			s.eventReceived(message)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Addclient registers new connection as a User.
 | 
						|
func (s *Server) Addclient(conn *websocket.Conn, user *user.User, accessToken string, userAgent string, ipAddress string) *Client {
 | 
						|
	client := &Client{
 | 
						|
		server:      s,
 | 
						|
		conn:        conn,
 | 
						|
		User:        user,
 | 
						|
		IPAddress:   ipAddress,
 | 
						|
		accessToken: accessToken,
 | 
						|
		send:        make(chan []byte, 256),
 | 
						|
		UserAgent:   userAgent,
 | 
						|
		ConnectedAt: time.Now(),
 | 
						|
	}
 | 
						|
 | 
						|
	// Do not send user re-joined broadcast message if they've been active within 5 minutes.
 | 
						|
	shouldSendJoinedMessages := data.GetChatJoinMessagesEnabled()
 | 
						|
	if previouslyLastSeen, ok := _lastSeenCache[user.ID]; ok && time.Since(previouslyLastSeen) < time.Minute*5 {
 | 
						|
		shouldSendJoinedMessages = false
 | 
						|
	}
 | 
						|
 | 
						|
	s.mu.Lock()
 | 
						|
	{
 | 
						|
		client.id = s.seq
 | 
						|
		s.clients[client.id] = client
 | 
						|
		s.seq++
 | 
						|
		_lastSeenCache[user.ID] = time.Now()
 | 
						|
	}
 | 
						|
	s.mu.Unlock()
 | 
						|
 | 
						|
	log.Traceln("Adding client", client.id, "total count:", len(s.clients))
 | 
						|
 | 
						|
	go client.writePump()
 | 
						|
	go client.readPump()
 | 
						|
 | 
						|
	client.sendConnectedClientInfo()
 | 
						|
 | 
						|
	if getStatus().Online {
 | 
						|
		if shouldSendJoinedMessages {
 | 
						|
			s.sendUserJoinedMessage(client)
 | 
						|
		}
 | 
						|
		s.sendWelcomeMessageToClient(client)
 | 
						|
	}
 | 
						|
 | 
						|
	// Asynchronously, optionally, fetch GeoIP data.
 | 
						|
	go func(client *Client) {
 | 
						|
		client.Geo = s.geoipClient.GetGeoFromIP(ipAddress)
 | 
						|
	}(client)
 | 
						|
 | 
						|
	return client
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) sendUserJoinedMessage(c *Client) {
 | 
						|
	userJoinedEvent := events.UserJoinedEvent{}
 | 
						|
	userJoinedEvent.SetDefaults()
 | 
						|
	userJoinedEvent.User = c.User
 | 
						|
	userJoinedEvent.ClientID = c.id
 | 
						|
 | 
						|
	if err := s.Broadcast(userJoinedEvent.GetBroadcastPayload()); err != nil {
 | 
						|
		log.Errorln("error adding client to chat server", err)
 | 
						|
	}
 | 
						|
 | 
						|
	// Send chat user joined webhook
 | 
						|
	webhooks.SendChatEventUserJoined(userJoinedEvent)
 | 
						|
}
 | 
						|
 | 
						|
// ClientClosed is fired when a client disconnects or connection is dropped.
 | 
						|
func (s *Server) ClientClosed(c *Client) {
 | 
						|
	s.mu.Lock()
 | 
						|
	defer s.mu.Unlock()
 | 
						|
	c.close()
 | 
						|
 | 
						|
	if _, ok := s.clients[c.id]; ok {
 | 
						|
		log.Debugln("Deleting", c.id)
 | 
						|
		delete(s.clients, c.id)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// HandleClientConnection is fired when a single client connects to the websocket.
 | 
						|
func (s *Server) HandleClientConnection(w http.ResponseWriter, r *http.Request) {
 | 
						|
	if data.GetChatDisabled() {
 | 
						|
		_, _ = w.Write([]byte(events.ChatDisabled))
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	ipAddress := utils.GetIPAddressFromRequest(r)
 | 
						|
	// Check if this client's IP address is banned. If so send a rejection.
 | 
						|
	if blocked, err := data.IsIPAddressBanned(ipAddress); blocked {
 | 
						|
		log.Debugln("Client ip address has been blocked. Rejecting.")
 | 
						|
		event := events.UserDisabledEvent{}
 | 
						|
		event.SetDefaults()
 | 
						|
 | 
						|
		w.WriteHeader(http.StatusForbidden)
 | 
						|
		// Send this disabled event specifically to this single connected client
 | 
						|
		// to let them know they've been banned.
 | 
						|
		// _server.Send(event.GetBroadcastPayload(), client)
 | 
						|
		return
 | 
						|
	} else if err != nil {
 | 
						|
		log.Errorln("error determining if IP address is blocked: ", err)
 | 
						|
	}
 | 
						|
 | 
						|
	// Limit concurrent chat connections
 | 
						|
	if int64(len(s.clients)) >= s.maxSocketConnectionLimit {
 | 
						|
		log.Warnln("rejecting incoming client connection as it exceeds the max client count of", s.maxSocketConnectionLimit)
 | 
						|
		_, _ = w.Write([]byte(events.ErrorMaxConnectionsExceeded))
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	conn, err := upgrader.Upgrade(w, r, nil)
 | 
						|
	if err != nil {
 | 
						|
		log.Debugln(err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	accessToken := r.URL.Query().Get("accessToken")
 | 
						|
	if accessToken == "" {
 | 
						|
		log.Errorln("Access token is required")
 | 
						|
		// Return HTTP status code
 | 
						|
		_ = conn.Close()
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// A user is required to use the websocket
 | 
						|
	user := user.GetUserByToken(accessToken)
 | 
						|
	if user == nil {
 | 
						|
		// Send error that registration is required
 | 
						|
		_ = conn.WriteJSON(events.EventPayload{
 | 
						|
			"type": events.ErrorNeedsRegistration,
 | 
						|
		})
 | 
						|
		_ = conn.Close()
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// User is disabled therefore we should disconnect.
 | 
						|
	if user.DisabledAt != nil {
 | 
						|
		log.Traceln("Disabled user", user.ID, user.DisplayName, "rejected")
 | 
						|
		_ = conn.WriteJSON(events.EventPayload{
 | 
						|
			"type": events.ErrorUserDisabled,
 | 
						|
		})
 | 
						|
		_ = conn.Close()
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	userAgent := r.UserAgent()
 | 
						|
 | 
						|
	s.Addclient(conn, user, accessToken, userAgent, ipAddress)
 | 
						|
}
 | 
						|
 | 
						|
// Broadcast sends message to all connected clients.
 | 
						|
func (s *Server) Broadcast(payload events.EventPayload) error {
 | 
						|
	data, err := json.Marshal(payload)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	s.mu.RLock()
 | 
						|
	defer s.mu.RUnlock()
 | 
						|
 | 
						|
	for _, client := range s.clients {
 | 
						|
		if client == nil {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		select {
 | 
						|
		case client.send <- data:
 | 
						|
		default:
 | 
						|
			go client.close()
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Send will send a single payload to a single connected client.
 | 
						|
func (s *Server) Send(payload events.EventPayload, client *Client) {
 | 
						|
	data, err := json.Marshal(payload)
 | 
						|
	if err != nil {
 | 
						|
		log.Errorln(err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	client.send <- data
 | 
						|
}
 | 
						|
 | 
						|
// DisconnectClients will forcefully disconnect all clients belonging to a user by ID.
 | 
						|
func (s *Server) DisconnectClients(clients []*Client) {
 | 
						|
	for _, client := range clients {
 | 
						|
		log.Traceln("Disconnecting client", client.User.ID, "owned by", client.User.DisplayName)
 | 
						|
 | 
						|
		go func(client *Client) {
 | 
						|
			event := events.UserDisabledEvent{}
 | 
						|
			event.SetDefaults()
 | 
						|
 | 
						|
			// Send this disabled event specifically to this single connected client
 | 
						|
			// to let them know they've been banned.
 | 
						|
			_server.Send(event.GetBroadcastPayload(), client)
 | 
						|
 | 
						|
			// Give the socket time to send out the above message.
 | 
						|
			// Unfortunately I don't know of any way to get a real callback to know when
 | 
						|
			// the message was successfully sent, so give it a couple seconds.
 | 
						|
			time.Sleep(2 * time.Second)
 | 
						|
 | 
						|
			// Forcefully disconnect if still valid.
 | 
						|
			if client != nil {
 | 
						|
				client.close()
 | 
						|
			}
 | 
						|
		}(client)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// SendConnectedClientInfoToUser will find all the connected clients assigned to a user
 | 
						|
// and re-send each the connected client info.
 | 
						|
func SendConnectedClientInfoToUser(userID string) error {
 | 
						|
	clients, err := GetClientsForUser(userID)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	// Get an updated reference to the user.
 | 
						|
	user := user.GetUserByID(userID)
 | 
						|
	if user == nil {
 | 
						|
		return fmt.Errorf("user not found")
 | 
						|
	}
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	for _, client := range clients {
 | 
						|
		// Update the client's reference to its user.
 | 
						|
		client.User = user
 | 
						|
		// Send the update to the client.
 | 
						|
		client.sendConnectedClientInfo()
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// SendActionToUser will send system action text to all connected clients
 | 
						|
// assigned to a user ID.
 | 
						|
func SendActionToUser(userID string, text string) error {
 | 
						|
	clients, err := GetClientsForUser(userID)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	for _, client := range clients {
 | 
						|
		_server.sendActionToClient(client, text)
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) eventReceived(event chatClientEvent) {
 | 
						|
	c := event.client
 | 
						|
	u := c.User
 | 
						|
 | 
						|
	// If established chat user only mode is enabled and the user is not old
 | 
						|
	// enough then reject this event and send them an informative message.
 | 
						|
	if u != nil && data.GetChatEstbalishedUsersOnlyMode() && time.Since(event.client.User.CreatedAt) < config.GetDefaults().ChatEstablishedUserModeTimeDuration && !u.IsModerator() {
 | 
						|
		s.sendActionToClient(c, "You have not been an established chat participant long enough to take part in chat. Please enjoy the stream and try again later.")
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	var typecheck map[string]interface{}
 | 
						|
	if err := json.Unmarshal(event.data, &typecheck); err != nil {
 | 
						|
		log.Debugln(err)
 | 
						|
	}
 | 
						|
 | 
						|
	eventType := typecheck["type"]
 | 
						|
 | 
						|
	switch eventType {
 | 
						|
	case events.MessageSent:
 | 
						|
		s.userMessageSent(event)
 | 
						|
 | 
						|
	case events.UserNameChanged:
 | 
						|
		s.userNameChanged(event)
 | 
						|
 | 
						|
	default:
 | 
						|
		log.Debugln(eventType, "event not found:", typecheck)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) sendWelcomeMessageToClient(c *Client) {
 | 
						|
	// Add an artificial delay so people notice this message come in.
 | 
						|
	time.Sleep(7 * time.Second)
 | 
						|
 | 
						|
	welcomeMessage := utils.RenderSimpleMarkdown(data.GetServerWelcomeMessage())
 | 
						|
 | 
						|
	if welcomeMessage != "" {
 | 
						|
		s.sendSystemMessageToClient(c, welcomeMessage)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) sendAllWelcomeMessage() {
 | 
						|
	welcomeMessage := utils.RenderSimpleMarkdown(data.GetServerWelcomeMessage())
 | 
						|
 | 
						|
	if welcomeMessage != "" {
 | 
						|
		clientMessage := events.SystemMessageEvent{
 | 
						|
			Event: events.Event{},
 | 
						|
			MessageEvent: events.MessageEvent{
 | 
						|
				Body: welcomeMessage,
 | 
						|
			},
 | 
						|
		}
 | 
						|
		clientMessage.SetDefaults()
 | 
						|
		_ = s.Broadcast(clientMessage.GetBroadcastPayload())
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) sendSystemMessageToClient(c *Client, message string) {
 | 
						|
	clientMessage := events.SystemMessageEvent{
 | 
						|
		Event: events.Event{},
 | 
						|
		MessageEvent: events.MessageEvent{
 | 
						|
			Body: message,
 | 
						|
		},
 | 
						|
	}
 | 
						|
	clientMessage.SetDefaults()
 | 
						|
	clientMessage.RenderBody()
 | 
						|
	s.Send(clientMessage.GetBroadcastPayload(), c)
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) sendActionToClient(c *Client, message string) {
 | 
						|
	clientMessage := events.ActionEvent{
 | 
						|
		MessageEvent: events.MessageEvent{
 | 
						|
			Body: message,
 | 
						|
		},
 | 
						|
		Event: events.Event{
 | 
						|
			Type: events.ChatActionSent,
 | 
						|
		},
 | 
						|
	}
 | 
						|
	clientMessage.SetDefaults()
 | 
						|
	clientMessage.RenderBody()
 | 
						|
	s.Send(clientMessage.GetBroadcastPayload(), c)
 | 
						|
}
 |