mirror of
				https://github.com/owncast/owncast.git
				synced 2025-11-04 05:17:27 +08:00 
			
		
		
		
	Refactor migration to loop over each user instead of bulk inserts
This commit is contained in:
		@ -2,8 +2,6 @@ package data
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"database/sql"
 | 
						"database/sql"
 | 
				
			||||||
	"fmt"
 | 
					 | 
				
			||||||
	"strings"
 | 
					 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/owncast/owncast/utils"
 | 
						"github.com/owncast/owncast/utils"
 | 
				
			||||||
@ -11,54 +9,11 @@ import (
 | 
				
			|||||||
	"github.com/teris-io/shortid"
 | 
						"github.com/teris-io/shortid"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// nolint:cyclop
 | 
				
			||||||
func migrateToSchema5(db *sql.DB) {
 | 
					func migrateToSchema5(db *sql.DB) {
 | 
				
			||||||
	// Create the access tokens table.
 | 
						// Create the access tokens table.
 | 
				
			||||||
	createAccessTokenTable(db)
 | 
						createAccessTokenTable(db)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Migrate the access tokens from the users table to the access tokens table.
 | 
					 | 
				
			||||||
	query := `SELECT id, access_token, created_at FROM users`
 | 
					 | 
				
			||||||
	rows, err := db.Query(query)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		log.Errorln("error migrating access tokens to schema v5", err)
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if rows.Err() != nil {
 | 
					 | 
				
			||||||
		log.Errorln("error migrating access tokens to schema v5", rows.Err())
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	defer rows.Close()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	valueStrings := []string{}
 | 
					 | 
				
			||||||
	valueArgs := []interface{}{}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var token string
 | 
					 | 
				
			||||||
	var userID string
 | 
					 | 
				
			||||||
	var timestamp time.Time
 | 
					 | 
				
			||||||
	for rows.Next() {
 | 
					 | 
				
			||||||
		if err := rows.Scan(&userID, &token, ×tamp); err != nil {
 | 
					 | 
				
			||||||
			log.Error("There is a problem reading the database.", err)
 | 
					 | 
				
			||||||
			return
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		valueStrings = append(valueStrings, "(?, ?, ?)")
 | 
					 | 
				
			||||||
		valueArgs = append(valueArgs, token, userID, timestamp)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	smt := `INSERT INTO user_access_tokens(token, user_id, timestamp) VALUES %s ON CONFLICT DO NOTHING`
 | 
					 | 
				
			||||||
	smt = fmt.Sprintf(smt, strings.Join(valueStrings, ","))
 | 
					 | 
				
			||||||
	tx, err := db.Begin()
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		log.Fatalln("Error starting transaction", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	_, err = tx.Exec(smt, valueArgs...)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		_ = tx.Rollback()
 | 
					 | 
				
			||||||
		log.Errorln("Error inserting access tokens", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if err := tx.Commit(); err != nil {
 | 
					 | 
				
			||||||
		log.Errorln("Error committing transaction", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// 1. Authenticated bool added to the users table.
 | 
						// 1. Authenticated bool added to the users table.
 | 
				
			||||||
	// 2. Access tokens are now stored in their own table.
 | 
						// 2. Access tokens are now stored in their own table.
 | 
				
			||||||
	//
 | 
						//
 | 
				
			||||||
@ -83,15 +38,79 @@ func migrateToSchema5(db *sql.DB) {
 | 
				
			|||||||
	CREATE INDEX user_id_index ON users (id);
 | 
						CREATE INDEX user_id_index ON users (id);
 | 
				
			||||||
	CREATE INDEX user_id_disabled_index ON users (id, disabled_at);
 | 
						CREATE INDEX user_id_disabled_index ON users (id, disabled_at);
 | 
				
			||||||
	CREATE INDEX user_disabled_at_index ON USERS (disabled_at);`
 | 
						CREATE INDEX user_disabled_at_index ON USERS (disabled_at);`
 | 
				
			||||||
	_, err = db.Exec(createTempTable)
 | 
						_, err := db.Exec(createTempTable)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		log.Errorln("error running migration, you may experience issues: ", err)
 | 
							log.Errorln("error running migration, you may experience issues: ", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	_, err = db.Exec(`INSERT INTO users_copy (id, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, scopes, type, last_used)
 | 
						// Start insert transaction
 | 
				
			||||||
  SELECT id, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, scopes, type, last_used FROM users;`)
 | 
						tx, err := db.Begin()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		log.Errorln("error running migration, you may experience issues: ", err)
 | 
							log.Errorln(err)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Migrate the users table to the new users_copy table.
 | 
				
			||||||
 | 
						rows, err := tx.Query(`SELECT id, access_token, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, scopes, type, last_used FROM users`)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							log.Errorln("error migrating access tokens to schema v5", err)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if rows.Err() != nil {
 | 
				
			||||||
 | 
							log.Errorln("error migrating access tokens to schema v5", rows.Err())
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						defer rows.Close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						defer tx.Rollback() //nolint:errcheck
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						log.Println("Migrating users. This may take time if you have lots of users...")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for rows.Next() {
 | 
				
			||||||
 | 
							var id string
 | 
				
			||||||
 | 
							var accessToken string
 | 
				
			||||||
 | 
							var displayName string
 | 
				
			||||||
 | 
							var displayColor int
 | 
				
			||||||
 | 
							var createdAt time.Time
 | 
				
			||||||
 | 
							var disabledAt *time.Time
 | 
				
			||||||
 | 
							var previousNames string
 | 
				
			||||||
 | 
							var namechangedAt *time.Time
 | 
				
			||||||
 | 
							var scopes *string
 | 
				
			||||||
 | 
							var userType string
 | 
				
			||||||
 | 
							var lastUsed *time.Time
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if err := rows.Scan(&id, &accessToken, &displayName, &displayColor, &createdAt, &disabledAt, &previousNames, &namechangedAt, &scopes, &userType, &lastUsed); err != nil {
 | 
				
			||||||
 | 
								log.Error("There is a problem reading the database when migrating users.", err)
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							stmt, err := tx.Prepare(`INSERT INTO users_copy (id, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, scopes, type, last_used) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								log.Errorln(err)
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							defer stmt.Close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if _, err := stmt.Exec(id, displayName, displayColor, createdAt, disabledAt, previousNames, namechangedAt, scopes, userType, lastUsed); err != nil {
 | 
				
			||||||
 | 
								log.Errorln(err)
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							stmt, err = tx.Prepare(`INSERT INTO user_access_tokens(token, user_id, timestamp) VALUES (?, ?, ?) ON CONFLICT DO NOTHING`)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								log.Errorln(err)
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							defer stmt.Close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if _, err := stmt.Exec(accessToken, id, createdAt); err != nil {
 | 
				
			||||||
 | 
								log.Errorln(err)
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if err := tx.Commit(); err != nil {
 | 
				
			||||||
 | 
							log.Errorln(err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	_, err = db.Exec(`PRAGMA foreign_keys = OFF;DROP TABLE "users";ALTER TABLE "users_copy" RENAME TO users;PRAGMA foreign_keys = ON;`)
 | 
						_, err = db.Exec(`PRAGMA foreign_keys = OFF;DROP TABLE "users";ALTER TABLE "users_copy" RENAME TO users;PRAGMA foreign_keys = ON;`)
 | 
				
			||||||
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user