From c1f4096e11a045b5064170626b00febb34e06b5b Mon Sep 17 00:00:00 2001 From: Gabe Kangas Date: Mon, 20 Jan 2025 16:32:25 -0800 Subject: [PATCH] chore(go): new chat message db repository. Closes #3081 (#4161) --- core/chat/chat.go | 10 +- core/chat/events.go | 5 +- core/chat/messages.go | 4 +- core/chat/persistence.go | 463 +--------------- core/data/data.go | 1 + core/data/datastore.go | 10 +- core/data/messages.go | 18 - metrics/viewers.go | 5 +- .../chatmessagerepository.go | 523 ++++++++++++++++++ persistence/tables/config.go | 17 + test/automated/api/005_chatmoderation.test.js | 26 +- test/populateContent.sh | 0 webserver/handlers/admin/chat.go | 20 +- webserver/handlers/chat.go | 5 +- webserver/handlers/moderation/moderation.go | 4 +- 15 files changed, 604 insertions(+), 507 deletions(-) delete mode 100644 core/data/messages.go create mode 100644 persistence/chatmessagerepository/chatmessagerepository.go create mode 100644 persistence/tables/config.go mode change 100644 => 100755 test/populateContent.sh diff --git a/core/chat/chat.go b/core/chat/chat.go index bd7789412c..02ca885123 100644 --- a/core/chat/chat.go +++ b/core/chat/chat.go @@ -8,6 +8,7 @@ import ( "github.com/owncast/owncast/config" "github.com/owncast/owncast/core/chat/events" "github.com/owncast/owncast/models" + "github.com/owncast/owncast/persistence/chatmessagerepository" "github.com/owncast/owncast/persistence/configrepository" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -103,7 +104,8 @@ func SendSystemMessage(text string, ephemeral bool) error { } if !ephemeral { - saveEvent(message.ID, nil, message.Body, message.GetMessageType(), nil, message.Timestamp, nil, nil, nil, nil) + chatMessageRepository := chatmessagerepository.Get() + chatMessageRepository.SaveEvent(message.ID, nil, message.Body, message.GetMessageType(), nil, message.Timestamp, nil, nil, nil, nil) } return nil @@ -131,7 +133,8 @@ func SendFediverseAction(eventType string, userAccountName string, image *string return err } - saveFederatedAction(message) + chatMessageRepository := chatmessagerepository.Get() + chatMessageRepository.SaveFederatedAction(message) return nil } @@ -152,7 +155,8 @@ func SendSystemAction(text string, ephemeral bool) error { } if !ephemeral { - saveEvent(message.ID, nil, message.Body, message.GetMessageType(), nil, message.Timestamp, nil, nil, nil, nil) + chatMessageRepository := chatmessagerepository.Get() + chatMessageRepository.SaveEvent(message.ID, nil, message.Body, message.GetMessageType(), nil, message.Timestamp, nil, nil, nil, nil) } return nil diff --git a/core/chat/events.go b/core/chat/events.go index 09fb29999f..a82331f871 100644 --- a/core/chat/events.go +++ b/core/chat/events.go @@ -9,6 +9,7 @@ import ( "github.com/owncast/owncast/config" "github.com/owncast/owncast/core/chat/events" "github.com/owncast/owncast/core/webhooks" + "github.com/owncast/owncast/persistence/chatmessagerepository" "github.com/owncast/owncast/persistence/configrepository" "github.com/owncast/owncast/persistence/userrepository" "github.com/owncast/owncast/utils" @@ -172,8 +173,8 @@ func (s *Server) userMessageSent(eventData chatClientEvent) { // Send chat message sent webhook webhooks.SendChatEvent(&event) chatMessagesSentCounter.Inc() - - SaveUserMessage(event) + chatMessageRepository := chatmessagerepository.Get() + chatMessageRepository.SaveUserMessage(event) eventData.client.MessageCount++ } diff --git a/core/chat/messages.go b/core/chat/messages.go index 5e05b57964..81ccf1e8e0 100644 --- a/core/chat/messages.go +++ b/core/chat/messages.go @@ -5,13 +5,15 @@ import ( "github.com/owncast/owncast/core/chat/events" "github.com/owncast/owncast/core/webhooks" + "github.com/owncast/owncast/persistence/chatmessagerepository" log "github.com/sirupsen/logrus" ) // SetMessagesVisibility will set the visibility of multiple messages by ID. func SetMessagesVisibility(messageIDs []string, visibility bool) error { // Save new message visibility - if err := saveMessageVisibility(messageIDs, visibility); err != nil { + chatMessageRepository := chatmessagerepository.Get() + if err := chatMessageRepository.SetMessageVisibilityForMessageIDs(messageIDs, visibility); err != nil { log.Errorln(err) return err } diff --git a/core/chat/persistence.go b/core/chat/persistence.go index eb2d71f9b6..8c9090dae3 100644 --- a/core/chat/persistence.go +++ b/core/chat/persistence.go @@ -1,25 +1,17 @@ package chat import ( - "context" - "database/sql" - "strings" "time" - "github.com/owncast/owncast/core/chat/events" "github.com/owncast/owncast/core/data" - "github.com/owncast/owncast/models" "github.com/owncast/owncast/persistence/authrepository" "github.com/owncast/owncast/persistence/tables" - - log "github.com/sirupsen/logrus" ) var _datastore *data.Datastore const ( - maxBacklogHours = 2 // Keep backlog max hours worth of messages - maxBacklogNumber = 50 // Return max number of messages in history request + maxBacklogHours = 2 // Keep backlog max hours worth of messages ) func setupPersistence() { @@ -37,456 +29,3 @@ func setupPersistence() { } }() } - -// SaveUserMessage will save a single chat event to the messages database. -func SaveUserMessage(event events.UserMessageEvent) { - saveEvent(event.ID, &event.User.ID, event.Body, event.Type, event.HiddenAt, event.Timestamp, nil, nil, nil, nil) -} - -func saveFederatedAction(event events.FediverseEngagementEvent) { - saveEvent(event.ID, nil, event.Body, event.Type, nil, event.Timestamp, event.Image, &event.Link, &event.UserAccountName, nil) -} - -// nolint: unparam -func saveEvent(id string, userID *string, body string, eventType string, hidden *time.Time, timestamp time.Time, image *string, link *string, title *string, subtitle *string) { - defer func() { - _historyCache = nil - }() - - tx, err := _datastore.DB.Begin() - if err != nil { - log.Errorln("error saving", eventType, err) - return - } - - defer tx.Rollback() // nolint - - stmt, err := tx.Prepare("INSERT INTO messages(id, user_id, body, eventType, hidden_at, timestamp, image, link, title, subtitle) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") - if err != nil { - log.Errorln("error saving", eventType, err) - return - } - - defer stmt.Close() - - if _, err = stmt.Exec(id, userID, body, eventType, hidden, timestamp, image, link, title, subtitle); err != nil { - log.Errorln("error saving", eventType, err) - return - } - if err = tx.Commit(); err != nil { - log.Errorln("error saving", eventType, err) - return - } -} - -func makeUserMessageEventFromRowData(row rowData) events.UserMessageEvent { - scopes := "" - if row.userScopes != nil { - scopes = *row.userScopes - } - - previousUsernames := []string{} - if row.previousUsernames != nil { - previousUsernames = strings.Split(*row.previousUsernames, ",") - } - - displayName := "" - if row.userDisplayName != nil { - displayName = *row.userDisplayName - } - - displayColor := 0 - if row.userDisplayColor != nil { - displayColor = *row.userDisplayColor - } - - createdAt := time.Time{} - if row.userCreatedAt != nil { - createdAt = *row.userCreatedAt - } - - isBot := (row.userType != nil && *row.userType == "API") - scopeSlice := strings.Split(scopes, ",") - - u := models.User{ - ID: *row.userID, - DisplayName: displayName, - DisplayColor: displayColor, - CreatedAt: createdAt, - DisabledAt: row.userDisabledAt, - NameChangedAt: row.userNameChangedAt, - PreviousNames: previousUsernames, - AuthenticatedAt: row.userAuthenticatedAt, - Authenticated: row.userAuthenticatedAt != nil, - Scopes: scopeSlice, - IsBot: isBot, - } - - message := events.UserMessageEvent{ - Event: events.Event{ - Type: row.eventType, - ID: row.id, - Timestamp: row.timestamp, - }, - UserEvent: events.UserEvent{ - User: &u, - HiddenAt: row.hiddenAt, - }, - MessageEvent: events.MessageEvent{ - Body: row.body, - RawBody: row.body, - }, - } - - return message -} - -func makeSystemMessageChatEventFromRowData(row rowData) events.SystemMessageEvent { - message := events.SystemMessageEvent{ - Event: events.Event{ - Type: row.eventType, - ID: row.id, - Timestamp: row.timestamp, - }, - MessageEvent: events.MessageEvent{ - Body: row.body, - RawBody: row.body, - }, - } - return message -} - -func makeActionMessageChatEventFromRowData(row rowData) events.ActionEvent { - message := events.ActionEvent{ - Event: events.Event{ - Type: row.eventType, - ID: row.id, - Timestamp: row.timestamp, - }, - MessageEvent: events.MessageEvent{ - Body: row.body, - RawBody: row.body, - }, - } - return message -} - -func makeFederatedActionChatEventFromRowData(row rowData) events.FediverseEngagementEvent { - message := events.FediverseEngagementEvent{ - Event: events.Event{ - Type: row.eventType, - ID: row.id, - Timestamp: row.timestamp, - }, - MessageEvent: events.MessageEvent{ - Body: row.body, - RawBody: row.body, - }, - Image: row.image, - Link: *row.link, - UserAccountName: *row.title, - } - return message -} - -type rowData struct { - timestamp time.Time - image *string - previousUsernames *string - - userDisplayName *string - userDisplayColor *int - userID *string - title *string - subtitle *string - link *string - - userType *string - userScopes *string - hiddenAt *time.Time - userCreatedAt *time.Time - userDisabledAt *time.Time - userAuthenticatedAt *time.Time - userNameChangedAt *time.Time - body string - eventType models.EventType - id string -} - -func getChat(rows *sql.Rows) ([]interface{}, error) { - history := make([]interface{}, 0) - - for rows.Next() { - row := rowData{} - - // Convert a database row into a chat event - if err := rows.Scan( - &row.id, - &row.userID, - &row.body, - &row.title, - &row.subtitle, - &row.image, - &row.link, - &row.eventType, - &row.hiddenAt, - &row.timestamp, - &row.userDisplayName, - &row.userDisplayColor, - &row.userCreatedAt, - &row.userDisabledAt, - &row.previousUsernames, - &row.userNameChangedAt, - &row.userAuthenticatedAt, - &row.userScopes, - &row.userType, - ); err != nil { - return nil, err - } - - var message interface{} - - switch row.eventType { - case events.MessageSent: - message = makeUserMessageEventFromRowData(row) - case events.SystemMessageSent: - message = makeSystemMessageChatEventFromRowData(row) - case events.ChatActionSent: - message = makeActionMessageChatEventFromRowData(row) - case events.FediverseEngagementFollow: - message = makeFederatedActionChatEventFromRowData(row) - case events.FediverseEngagementLike: - message = makeFederatedActionChatEventFromRowData(row) - case events.FediverseEngagementRepost: - message = makeFederatedActionChatEventFromRowData(row) - } - - history = append(history, message) - } - - return history, nil -} - -var _historyCache *[]interface{} - -// GetChatModerationHistory will return all the chat messages suitable for moderation purposes. -func GetChatModerationHistory() []interface{} { - if _historyCache != nil { - return *_historyCache - } - - tx, err := _datastore.DB.Begin() - if err != nil { - log.Errorln("error fetching chat moderation history", err) - return nil - } - - defer tx.Rollback() // nolint - - // Get all messages regardless of visibility - query := "SELECT messages.id, user_id, body, title, subtitle, image, link, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, authenticated_at, scopes, type FROM messages INNER JOIN users ON messages.user_id = users.id ORDER BY timestamp DESC" - stmt, err := tx.Prepare(query) - if err != nil { - log.Errorln("error fetching chat moderation history", err) - return nil - } - - rows, err := stmt.Query() - if err != nil { - log.Errorln("error fetching chat moderation history", err) - return nil - } - - defer stmt.Close() - defer rows.Close() - - result, err := getChat(rows) - if err != nil { - log.Errorln(err) - log.Errorln("There is a problem enumerating chat message rows. Please report this:", query) - return nil - } - - _historyCache = &result - - if err = tx.Commit(); err != nil { - log.Errorln("error fetching chat moderation history", err) - return nil - } - - return result -} - -// GetChatHistory will return all the chat messages suitable for returning as user-facing chat history. -func GetChatHistory() []interface{} { - tx, err := _datastore.DB.Begin() - if err != nil { - log.Errorln("error fetching chat history", err) - return nil - } - - defer tx.Rollback() // nolint - - // Get all visible messages - query := "SELECT messages.id, messages.user_id, messages.body, messages.title, messages.subtitle, messages.image, messages.link, messages.eventType, messages.hidden_at, messages.timestamp, users.display_name, users.display_color, users.created_at, users.disabled_at, users.previous_names, users.namechanged_at, users.authenticated_at, users.scopes, users.type FROM users JOIN messages ON users.id = messages.user_id WHERE hidden_at IS NULL AND disabled_at IS NULL ORDER BY timestamp DESC LIMIT ?" - - stmt, err := tx.Prepare(query) - if err != nil { - log.Errorln("error fetching chat history", err) - return nil - } - - rows, err := stmt.Query(maxBacklogNumber) - if err != nil { - log.Errorln("error fetching chat history", err) - return nil - } - - defer stmt.Close() - defer rows.Close() - - m, err := getChat(rows) - if err != nil { - log.Errorln(err) - log.Errorln("There is a problem enumerating chat message rows. Please report this:", query) - return nil - } - - if err = tx.Commit(); err != nil { - log.Errorln("error fetching chat history", err) - return nil - } - - // Invert order of messages - for i, j := 0, len(m)-1; i < j; i, j = i+1, j-1 { - m[i], m[j] = m[j], m[i] - } - - return m -} - -// GetMessagesFromUser returns chat messages that were sent by a specific user. -func GetMessagesFromUser(userID string) ([]events.UserMessageEvent, error) { - query, err := _datastore.GetQueries().GetMessagesFromUser(context.Background(), sql.NullString{String: userID, Valid: true}) - if err != nil { - return nil, err - } - - results := make([]events.UserMessageEvent, len(query)) - for i, row := range query { - results[i] = events.UserMessageEvent{ - Event: events.Event{ - Timestamp: row.Timestamp.Time, - ID: row.ID, - }, - MessageEvent: events.MessageEvent{ - Body: row.Body.String, - }, - } - } - - return results, nil -} - -// SetMessageVisibilityForUserID will bulk change the visibility of messages for a user -// and then send out visibility changed events to chat clients. -func SetMessageVisibilityForUserID(userID string, visible bool) error { - defer func() { - _historyCache = nil - }() - - tx, err := _datastore.DB.Begin() - if err != nil { - log.Errorln("error while setting message visibility", err) - return nil - } - - defer tx.Rollback() // nolint - query := "SELECT messages.id, user_id, body, title, subtitle, image, link, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, authenticated_at, scopes, type FROM messages INNER JOIN users ON messages.user_id = users.id WHERE user_id IS ?" - - stmt, err := tx.Prepare(query) - if err != nil { - log.Errorln("error while setting message visibility", err) - return nil - } - - rows, err := stmt.Query(userID) - if err != nil { - log.Errorln("error while setting message visibility", err) - return nil - } - - defer stmt.Close() - defer rows.Close() - - // Get a list of IDs to send to the connected clients to hide - ids := make([]string, 0) - - messages, err := getChat(rows) - if err != nil { - log.Errorln(err) - log.Errorln("There is a problem enumerating chat message rows. Please report this:", query) - return nil - } - - if len(messages) == 0 { - return nil - } - - for _, message := range messages { - ids = append(ids, message.(events.UserMessageEvent).ID) - } - - if err = tx.Commit(); err != nil { - log.Errorln("error while setting message visibility ", err) - return nil - } - - // Tell the clients to hide/show these messages. - return SetMessagesVisibility(ids, visible) -} - -func saveMessageVisibility(messageIDs []string, visible bool) error { - defer func() { - _historyCache = nil - }() - - _datastore.DbLock.Lock() - defer _datastore.DbLock.Unlock() - - tx, err := _datastore.DB.Begin() - if err != nil { - return err - } - - // nolint:gosec - stmt, err := tx.Prepare("UPDATE messages SET hidden_at=? WHERE id IN (?" + strings.Repeat(",?", len(messageIDs)-1) + ")") - if err != nil { - return err - } - defer stmt.Close() - - var hiddenAt *time.Time - if !visible { - now := time.Now() - hiddenAt = &now - } else { - hiddenAt = nil - } - - args := make([]interface{}, len(messageIDs)+1) - args[0] = hiddenAt - for i, id := range messageIDs { - args[i+1] = id - } - - if _, err = stmt.Exec(args...); err != nil { - return err - } - - if err = tx.Commit(); err != nil { - return err - } - - return nil -} diff --git a/core/data/data.go b/core/data/data.go index 896aa35e5d..030738f038 100644 --- a/core/data/data.go +++ b/core/data/data.go @@ -75,6 +75,7 @@ func SetupPersistence(file string) error { _, _ = db.Exec("pragma temp_store = memory") _, _ = db.Exec("pragma wal_checkpoint(full)") + tables.CreateConfigTable(db) tables.CreateWebhooksTable(db) tables.CreateUsersTable(db) tables.CreateAccessTokenTable(db) diff --git a/core/data/datastore.go b/core/data/datastore.go index 4f96a14a23..5498bc18f5 100644 --- a/core/data/datastore.go +++ b/core/data/datastore.go @@ -107,19 +107,11 @@ func (ds *Datastore) Save(e models.ConfigEntry) error { return nil } -// Setup will create the datastore table and perform initial initialization. +// Setup will perform initial initialization. func (ds *Datastore) Setup() { ds.cache = make(map[string][]byte) ds.DB = GetDatabase() ds.DbLock = &sync.Mutex{} - - createTableSQL := `CREATE TABLE IF NOT EXISTS datastore ( - "key" string NOT NULL PRIMARY KEY, - "value" BLOB, - "timestamp" DATE DEFAULT CURRENT_TIMESTAMP NOT NULL - );` - - ds.MustExec(createTableSQL) } // Reset will delete all config entries in the datastore and start over. diff --git a/core/data/messages.go b/core/data/messages.go deleted file mode 100644 index b3b8ac828d..0000000000 --- a/core/data/messages.go +++ /dev/null @@ -1,18 +0,0 @@ -package data - -// GetMessagesCount will return the number of messages in the database. -func GetMessagesCount() int64 { - query := `SELECT COUNT(*) FROM messages` - rows, err := _db.Query(query) - if err != nil || rows.Err() != nil { - return 0 - } - defer rows.Close() - var count int64 - for rows.Next() { - if err := rows.Scan(&count); err != nil { - return 0 - } - } - return count -} diff --git a/metrics/viewers.go b/metrics/viewers.go index e613784037..b1156bdc1d 100644 --- a/metrics/viewers.go +++ b/metrics/viewers.go @@ -6,7 +6,7 @@ import ( "github.com/nakabonne/tstorage" "github.com/owncast/owncast/core" "github.com/owncast/owncast/core/chat" - "github.com/owncast/owncast/core/data" + "github.com/owncast/owncast/persistence/chatmessagerepository" "github.com/owncast/owncast/persistence/userrepository" log "github.com/sirupsen/logrus" ) @@ -56,7 +56,8 @@ func collectChatClientCount() { activeChatClientCount.Set(float64(count)) // Total message count - cmc := data.GetMessagesCount() + chatMessageRepository := chatmessagerepository.Get() + cmc := chatMessageRepository.GetMessagesCount() // Insert message count into Prometheus collector. currentChatMessageCount.Set(float64(cmc)) diff --git a/persistence/chatmessagerepository/chatmessagerepository.go b/persistence/chatmessagerepository/chatmessagerepository.go new file mode 100644 index 0000000000..f5b8663160 --- /dev/null +++ b/persistence/chatmessagerepository/chatmessagerepository.go @@ -0,0 +1,523 @@ +package chatmessagerepository + +import ( + "context" + "database/sql" + "errors" + "strings" + "time" + + "github.com/owncast/owncast/core/chat/events" + "github.com/owncast/owncast/core/data" + "github.com/owncast/owncast/models" + + log "github.com/sirupsen/logrus" +) + +const maxBacklogNumber = 50 // Return max number of messages in history request + +type ChatMessageRepository interface { + SaveUserMessage(event events.UserMessageEvent) + SaveFederatedAction(event events.FediverseEngagementEvent) + SaveEvent(id string, userID *string, body string, eventType string, hidden *time.Time, timestamp time.Time, image *string, link *string, title *string, subtitle *string) + GetChatModerationHistory() []interface{} + GetChatHistory() []interface{} + GetMessagesFromUser(userID string) ([]events.UserMessageEvent, error) + GetMessageIdsForUserID(userID string) ([]string, error) + SetMessageVisibilityForMessageIDs(messageIDs []string, visible bool) error + GetMessagesCount() int64 +} + +type SqlChatMessageRepository struct { + datastore *data.Datastore +} + +// NOTE: This is temporary during the transition period. +var temporaryGlobalInstance ChatMessageRepository + +// Get will return the user repository. +func Get() ChatMessageRepository { + if temporaryGlobalInstance == nil { + i := New(data.GetDatastore()) + temporaryGlobalInstance = i + } + return temporaryGlobalInstance +} + +// New will create a new instance of the UserRepository. +func New(datastore *data.Datastore) ChatMessageRepository { + r := SqlChatMessageRepository{ + datastore: datastore, + } + + return &r +} + +// SaveUserMessage will save a single chat event to the messages database. +func (r *SqlChatMessageRepository) SaveUserMessage(event events.UserMessageEvent) { + r.SaveEvent(event.ID, &event.User.ID, event.Body, event.Type, event.HiddenAt, event.Timestamp, nil, nil, nil, nil) +} + +func (r *SqlChatMessageRepository) SaveFederatedAction(event events.FediverseEngagementEvent) { + r.SaveEvent(event.ID, nil, event.Body, event.Type, nil, event.Timestamp, event.Image, &event.Link, &event.UserAccountName, nil) +} + +// nolint: unparam +func (r *SqlChatMessageRepository) SaveEvent(id string, userID *string, body string, eventType string, hidden *time.Time, timestamp time.Time, image *string, link *string, title *string, subtitle *string) { + defer func() { + _historyCache = nil + }() + + tx, err := r.datastore.DB.Begin() + if err != nil { + log.Errorln("error saving", eventType, err) + return + } + + defer tx.Rollback() // nolint + + stmt, err := tx.Prepare("INSERT INTO messages(id, user_id, body, eventType, hidden_at, timestamp, image, link, title, subtitle) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + if err != nil { + log.Errorln("error saving", eventType, err) + return + } + + defer stmt.Close() + + if _, err = stmt.Exec(id, userID, body, eventType, hidden, timestamp, image, link, title, subtitle); err != nil { + log.Errorln("error saving", eventType, err) + return + } + if err = tx.Commit(); err != nil { + log.Errorln("error saving", eventType, err) + return + } +} + +func makeUserMessageEventFromRowData(row rowData) events.UserMessageEvent { + scopes := "" + if row.userScopes != nil { + scopes = *row.userScopes + } + + previousUsernames := []string{} + if row.previousUsernames != nil { + previousUsernames = strings.Split(*row.previousUsernames, ",") + } + + displayName := "" + if row.userDisplayName != nil { + displayName = *row.userDisplayName + } + + displayColor := 0 + if row.userDisplayColor != nil { + displayColor = *row.userDisplayColor + } + + createdAt := time.Time{} + if row.userCreatedAt != nil { + createdAt = *row.userCreatedAt + } + + isBot := (row.userType != nil && *row.userType == "API") + scopeSlice := strings.Split(scopes, ",") + + u := models.User{ + ID: *row.userID, + DisplayName: displayName, + DisplayColor: displayColor, + CreatedAt: createdAt, + DisabledAt: row.userDisabledAt, + NameChangedAt: row.userNameChangedAt, + PreviousNames: previousUsernames, + AuthenticatedAt: row.userAuthenticatedAt, + Authenticated: row.userAuthenticatedAt != nil, + Scopes: scopeSlice, + IsBot: isBot, + } + + message := events.UserMessageEvent{ + Event: events.Event{ + Type: row.eventType, + ID: row.id, + Timestamp: row.timestamp, + }, + UserEvent: events.UserEvent{ + User: &u, + HiddenAt: row.hiddenAt, + }, + MessageEvent: events.MessageEvent{ + Body: row.body, + RawBody: row.body, + }, + } + + return message +} + +func makeSystemMessageChatEventFromRowData(row rowData) events.SystemMessageEvent { + message := events.SystemMessageEvent{ + Event: events.Event{ + Type: row.eventType, + ID: row.id, + Timestamp: row.timestamp, + }, + MessageEvent: events.MessageEvent{ + Body: row.body, + RawBody: row.body, + }, + } + return message +} + +func makeActionMessageChatEventFromRowData(row rowData) events.ActionEvent { + message := events.ActionEvent{ + Event: events.Event{ + Type: row.eventType, + ID: row.id, + Timestamp: row.timestamp, + }, + MessageEvent: events.MessageEvent{ + Body: row.body, + RawBody: row.body, + }, + } + return message +} + +func makeFederatedActionChatEventFromRowData(row rowData) events.FediverseEngagementEvent { + message := events.FediverseEngagementEvent{ + Event: events.Event{ + Type: row.eventType, + ID: row.id, + Timestamp: row.timestamp, + }, + MessageEvent: events.MessageEvent{ + Body: row.body, + RawBody: row.body, + }, + Image: row.image, + Link: *row.link, + UserAccountName: *row.title, + } + return message +} + +type rowData struct { + timestamp time.Time + image *string + previousUsernames *string + + userDisplayName *string + userDisplayColor *int + userID *string + title *string + subtitle *string + link *string + + userType *string + userScopes *string + hiddenAt *time.Time + userCreatedAt *time.Time + userDisabledAt *time.Time + userAuthenticatedAt *time.Time + userNameChangedAt *time.Time + body string + eventType models.EventType + id string +} + +func getChat(rows *sql.Rows) ([]interface{}, error) { + history := make([]interface{}, 0) + + for rows.Next() { + row := rowData{} + + // Convert a database row into a chat event + if err := rows.Scan( + &row.id, + &row.userID, + &row.body, + &row.title, + &row.subtitle, + &row.image, + &row.link, + &row.eventType, + &row.hiddenAt, + &row.timestamp, + &row.userDisplayName, + &row.userDisplayColor, + &row.userCreatedAt, + &row.userDisabledAt, + &row.previousUsernames, + &row.userNameChangedAt, + &row.userAuthenticatedAt, + &row.userScopes, + &row.userType, + ); err != nil { + return nil, err + } + + var message interface{} + + switch row.eventType { + case events.MessageSent: + message = makeUserMessageEventFromRowData(row) + case events.SystemMessageSent: + message = makeSystemMessageChatEventFromRowData(row) + case events.ChatActionSent: + message = makeActionMessageChatEventFromRowData(row) + case events.FediverseEngagementFollow: + message = makeFederatedActionChatEventFromRowData(row) + case events.FediverseEngagementLike: + message = makeFederatedActionChatEventFromRowData(row) + case events.FediverseEngagementRepost: + message = makeFederatedActionChatEventFromRowData(row) + } + + history = append(history, message) + } + + return history, nil +} + +var _historyCache *[]interface{} + +// GetChatModerationHistory will return all the chat messages suitable for moderation purposes. +func (r *SqlChatMessageRepository) GetChatModerationHistory() []interface{} { + if _historyCache != nil { + return *_historyCache + } + + tx, err := r.datastore.DB.Begin() + if err != nil { + log.Errorln("error fetching chat moderation history", err) + return nil + } + + defer tx.Rollback() // nolint + + // Get all messages regardless of visibility + query := "SELECT messages.id, user_id, body, title, subtitle, image, link, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, authenticated_at, scopes, type FROM messages INNER JOIN users ON messages.user_id = users.id ORDER BY timestamp DESC" + stmt, err := tx.Prepare(query) + if err != nil { + log.Errorln("error fetching chat moderation history", err) + return nil + } + + rows, err := stmt.Query() + if err != nil { + log.Errorln("error fetching chat moderation history", err) + return nil + } + + defer stmt.Close() + defer rows.Close() + + result, err := getChat(rows) + if err != nil { + log.Errorln(err) + log.Errorln("There is a problem enumerating chat message rows. Please report this:", query) + return nil + } + + _historyCache = &result + + if err = tx.Commit(); err != nil { + log.Errorln("error fetching chat moderation history", err) + return nil + } + + return result +} + +// GetChatHistory will return all the chat messages suitable for returning as user-facing chat history. +func (r *SqlChatMessageRepository) GetChatHistory() []interface{} { + tx, err := r.datastore.DB.Begin() + if err != nil { + log.Errorln("error fetching chat history", err) + return nil + } + + defer tx.Rollback() // nolint + + // Get all visible messages + query := "SELECT messages.id, messages.user_id, messages.body, messages.title, messages.subtitle, messages.image, messages.link, messages.eventType, messages.hidden_at, messages.timestamp, users.display_name, users.display_color, users.created_at, users.disabled_at, users.previous_names, users.namechanged_at, users.authenticated_at, users.scopes, users.type FROM users JOIN messages ON users.id = messages.user_id WHERE hidden_at IS NULL AND disabled_at IS NULL ORDER BY timestamp DESC LIMIT ?" + + stmt, err := tx.Prepare(query) + if err != nil { + log.Errorln("error fetching chat history", err) + return nil + } + + rows, err := stmt.Query(maxBacklogNumber) + if err != nil { + log.Errorln("error fetching chat history", err) + return nil + } + + defer stmt.Close() + defer rows.Close() + + m, err := getChat(rows) + if err != nil { + log.Errorln(err) + log.Errorln("There is a problem enumerating chat message rows. Please report this:", query) + return nil + } + + if err = tx.Commit(); err != nil { + log.Errorln("error fetching chat history", err) + return nil + } + + // Invert order of messages + for i, j := 0, len(m)-1; i < j; i, j = i+1, j-1 { + m[i], m[j] = m[j], m[i] + } + + return m +} + +// GetMessagesFromUser returns chat messages that were sent by a specific user. +func (r *SqlChatMessageRepository) GetMessagesFromUser(userID string) ([]events.UserMessageEvent, error) { + query, err := r.datastore.GetQueries().GetMessagesFromUser(context.Background(), sql.NullString{String: userID, Valid: true}) + if err != nil { + return nil, err + } + + results := make([]events.UserMessageEvent, len(query)) + for i, row := range query { + results[i] = events.UserMessageEvent{ + Event: events.Event{ + Timestamp: row.Timestamp.Time, + ID: row.ID, + }, + MessageEvent: events.MessageEvent{ + Body: row.Body.String, + }, + } + } + + return results, nil +} + +// GetMessageIdsForUserID will return the chat message IDs for a specific user. +func (r *SqlChatMessageRepository) GetMessageIdsForUserID(userID string) ([]string, error) { + defer func() { + _historyCache = nil + }() + + tx, err := r.datastore.DB.Begin() + if err != nil { + return nil, errors.New("error while setting message visibility") + } + + defer tx.Rollback() // nolint + query := "SELECT messages.id, user_id, body, title, subtitle, image, link, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, authenticated_at, scopes, type FROM messages INNER JOIN users ON messages.user_id = users.id WHERE user_id IS ?" + + stmt, err := tx.Prepare(query) + if err != nil { + return nil, errors.New("error while setting message visibility") + } + + rows, err := stmt.Query(userID) + if err != nil { + log.Errorln("error while setting message visibility", err) + return nil, errors.New("error while setting message visibility") + } + + defer stmt.Close() + defer rows.Close() + + // Get a list of IDs to send to the connected clients to hide + ids := make([]string, 0) + + messages, err := getChat(rows) + if err != nil { + log.Errorln(err) + return nil, errors.New("There is a problem enumerating chat message rows. Please report this: " + query) + } + + if len(messages) == 0 { + return []string{}, nil + } + + for _, message := range messages { + ids = append(ids, message.(events.UserMessageEvent).ID) + } + + if err = tx.Commit(); err != nil { + return nil, errors.New("error while setting message visibility") + } + + // Tell the clients to hide/show these messages. + return ids, nil +} + +func (r *SqlChatMessageRepository) SetMessageVisibilityForMessageIDs(messageIDs []string, visible bool) error { + defer func() { + _historyCache = nil + }() + + if len(messageIDs) == 0 { + return nil + } + + r.datastore.DbLock.Lock() + defer r.datastore.DbLock.Unlock() + + tx, err := r.datastore.DB.Begin() + if err != nil { + return err + } + + // nolint:gosec + stmt, err := tx.Prepare("UPDATE messages SET hidden_at=? WHERE id IN (?" + strings.Repeat(",?", len(messageIDs)-1) + ")") + if err != nil { + return err + } + defer stmt.Close() + + var hiddenAt *time.Time + if !visible { + now := time.Now() + hiddenAt = &now + } else { + hiddenAt = nil + } + + args := make([]interface{}, len(messageIDs)+1) + args[0] = hiddenAt + for i, id := range messageIDs { + args[i+1] = id + } + + if _, err = stmt.Exec(args...); err != nil { + return err + } + + if err = tx.Commit(); err != nil { + return err + } + + return nil +} + +// GetMessagesCount will return the number of messages in the database. +func (r *SqlChatMessageRepository) GetMessagesCount() int64 { + query := `SELECT COUNT(*) FROM messages` + rows, err := r.datastore.DB.Query(query) + if err != nil || rows.Err() != nil { + return 0 + } + defer rows.Close() + var count int64 + for rows.Next() { + if err := rows.Scan(&count); err != nil { + return 0 + } + } + return count +} diff --git a/persistence/tables/config.go b/persistence/tables/config.go new file mode 100644 index 0000000000..189cde2f7f --- /dev/null +++ b/persistence/tables/config.go @@ -0,0 +1,17 @@ +package tables + +import ( + "database/sql" + + "github.com/owncast/owncast/utils" +) + +func CreateConfigTable(db *sql.DB) { + createTableSQL := `CREATE TABLE IF NOT EXISTS datastore ( + "key" string NOT NULL PRIMARY KEY, + "value" BLOB, + "timestamp" DATE DEFAULT CURRENT_TIMESTAMP NOT NULL + );` + + utils.MustExec(createTableSQL, db) +} diff --git a/test/automated/api/005_chatmoderation.test.js b/test/automated/api/005_chatmoderation.test.js index 7781e8cb61..c1473e233c 100644 --- a/test/automated/api/005_chatmoderation.test.js +++ b/test/automated/api/005_chatmoderation.test.js @@ -22,6 +22,11 @@ const establishedUserFailedChatMessage = { type: 'CHAT', }; +const establishedUserSucceedChatMessage = { + body: 'this message should send ' + Math.floor(Math.random() * 100), + type: 'CHAT', +}; + test('send a chat message', async () => { const registration = await registerChat(); const accessToken = registration.accessToken; @@ -36,7 +41,7 @@ test('verify admin can make API call to mark message as hidden', async () => { `ws://localhost:8080/ws?accessToken=${accessToken}`, { origin: 'http://localhost:8080', - } + }, ); // Verify the visibility change comes through the websocket @@ -70,7 +75,7 @@ test('verify message has become hidden', async () => { return obj.id === messageId; }); expect(message.length).toBe(1); - // expect(message[0].hiddenAt).toBeTruthy(); + expect(message[0].hiddenAt).toBeTruthy(); }); test('enable established chat user mode', async () => { @@ -97,3 +102,20 @@ test('verify rejected message is not in the chat feed', async () => { test('disable established chat user mode', async () => { await sendAdminRequest('config/chat/establishedusermode', false); }); + +test('send a message after established user mode is disabled', async () => { + const registration = await registerChat(); + const accessToken = registration.accessToken; + + await sendChatMessage(establishedUserSucceedChatMessage, accessToken); +}); + +test('verify message is in the chat feed', async () => { + const res = await getAdminResponse('chat/messages'); + + const message = res.body.filter((obj) => { + return obj.body === establishedUserSucceedChatMessage.body; + }); + + expect(message.length).toBe(0); +}); diff --git a/test/populateContent.sh b/test/populateContent.sh old mode 100644 new mode 100755 diff --git a/webserver/handlers/admin/chat.go b/webserver/handlers/admin/chat.go index a6064b0812..40ea75c4f6 100644 --- a/webserver/handlers/admin/chat.go +++ b/webserver/handlers/admin/chat.go @@ -4,7 +4,6 @@ package admin import ( "encoding/json" - "errors" "fmt" "net/http" "strconv" @@ -13,11 +12,13 @@ import ( "github.com/owncast/owncast/core/chat/events" "github.com/owncast/owncast/models" "github.com/owncast/owncast/persistence/authrepository" + "github.com/owncast/owncast/persistence/chatmessagerepository" "github.com/owncast/owncast/persistence/configrepository" "github.com/owncast/owncast/persistence/userrepository" "github.com/owncast/owncast/utils" "github.com/owncast/owncast/webserver/handlers/generated" webutils "github.com/owncast/owncast/webserver/utils" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) @@ -146,13 +147,20 @@ func UpdateUserEnabled(w http.ResponseWriter, r *http.Request) { func updateUserStatus(request generated.UpdateUserEnabledJSONBody) error { userRepository := userrepository.Get() + chatMessageRepository := chatmessagerepository.Get() + if err := userRepository.SetEnabled(*request.UserId, *request.Enabled); err != nil { log.Errorln("error changing user enabled status", err) return err } - if !*request.Enabled { - if err := chat.SetMessageVisibilityForUserID(*request.UserId, *request.Enabled); err != nil { + messageIDs, err := chatMessageRepository.GetMessageIdsForUserID(*request.UserId) + if err != nil { + return errors.Wrap(err, "error fetching user messages") + } + + if !*request.Enabled && len(messageIDs) > 0 { + if err := chat.SetMessagesVisibility(messageIDs, *request.Enabled); err != nil { log.Errorln("error changing user messages visibility", err) return err } @@ -247,7 +255,8 @@ func GetModerators(w http.ResponseWriter, r *http.Request) { func GetChatMessages(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") - messages := chat.GetChatModerationHistory() + chatMessageRepository := chatmessagerepository.Get() + messages := chatMessageRepository.GetChatModerationHistory() webutils.WriteResponse(w, messages) } @@ -343,7 +352,8 @@ func SendIntegrationChatMessage(integration models.ExternalAPIUser, w http.Respo return } - chat.SaveUserMessage(event) + chatMessageRepository := chatmessagerepository.Get() + chatMessageRepository.SaveUserMessage(event) webutils.WriteSimpleResponse(w, true, "sent") } diff --git a/webserver/handlers/chat.go b/webserver/handlers/chat.go index a402d84ac9..7758aebcc8 100644 --- a/webserver/handlers/chat.go +++ b/webserver/handlers/chat.go @@ -5,8 +5,8 @@ import ( "net/http" "github.com/owncast/owncast/config" - "github.com/owncast/owncast/core/chat" "github.com/owncast/owncast/models" + "github.com/owncast/owncast/persistence/chatmessagerepository" "github.com/owncast/owncast/persistence/configrepository" "github.com/owncast/owncast/persistence/userrepository" "github.com/owncast/owncast/utils" @@ -33,7 +33,8 @@ func getChatMessages(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: - messages := chat.GetChatHistory() + chatMessageRepository := chatmessagerepository.Get() + messages := chatMessageRepository.GetChatHistory() if err := json.NewEncoder(w).Encode(messages); err != nil { log.Debugln(err) diff --git a/webserver/handlers/moderation/moderation.go b/webserver/handlers/moderation/moderation.go index 43b52e058b..5a5843ed3e 100644 --- a/webserver/handlers/moderation/moderation.go +++ b/webserver/handlers/moderation/moderation.go @@ -9,6 +9,7 @@ import ( "github.com/owncast/owncast/core/chat" "github.com/owncast/owncast/core/chat/events" "github.com/owncast/owncast/models" + "github.com/owncast/owncast/persistence/chatmessagerepository" "github.com/owncast/owncast/persistence/userrepository" "github.com/owncast/owncast/webserver/utils" log "github.com/sirupsen/logrus" @@ -58,7 +59,8 @@ func GetUserDetails(w http.ResponseWriter, r *http.Request) { clients[i] = client } - messages, err := chat.GetMessagesFromUser(uid) + chatMessagesRepository := chatmessagerepository.Get() + messages, err := chatMessagesRepository.GetMessagesFromUser(uid) if err != nil { log.Errorln(err) }