mirror of
https://github.com/grafana/grafana.git
synced 2025-07-31 22:12:34 +08:00
Add LockExecuteAndRelease method to ServerLockService (#53417)
This commit is contained in:
@ -2,6 +2,7 @@ package serverlock
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
@ -15,8 +16,9 @@ func ProvideService(sqlStore *sqlstore.SQLStore) *ServerLockService {
|
||||
}
|
||||
}
|
||||
|
||||
// ServerLockService allows servers in HA mode to claim a lock
|
||||
// and execute an function if the server was granted the lock
|
||||
// ServerLockService allows servers in HA mode to claim a lock and execute a function if the server was granted the lock
|
||||
// It exposes 2 services LockAndExecute and LockExecuteAndRelease, which are intended to be used independently, don't mix
|
||||
// them up (ie, use the same actionName for both of them).
|
||||
type ServerLockService struct {
|
||||
SQLStore *sqlstore.SQLStore
|
||||
log log.Logger
|
||||
@ -33,11 +35,8 @@ func (sl *ServerLockService) LockAndExecute(ctx context.Context, actionName stri
|
||||
}
|
||||
|
||||
// avoid execution if last lock happened less than `maxInterval` ago
|
||||
if rowLock.LastExecution != 0 {
|
||||
lastExecutionTime := time.Unix(rowLock.LastExecution, 0)
|
||||
if time.Since(lastExecutionTime) < maxInterval {
|
||||
return nil
|
||||
}
|
||||
if sl.isLockWithinInterval(rowLock, maxInterval) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// try to get lock based on rowLow version
|
||||
@ -104,9 +103,112 @@ func (sl *ServerLockService) getOrCreate(ctx context.Context, actionName string)
|
||||
}
|
||||
|
||||
result = lockRow
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return result, err
|
||||
}
|
||||
|
||||
// LockExecuteAndRelease Creates the lock, executes the func, and then release the locks. The locking mechanism is
|
||||
// based on the UNIQUE constraint of the actionName in the database (column operation_uid), so a new process can not insert
|
||||
// a new operation if already exists one. The parameter 'maxInterval' is a timeout safeguard, if the LastExecution in the
|
||||
// database is older than maxInterval, we will assume the lock as timeouted. The 'maxInterval' parameter should be so long
|
||||
// that is impossible for 2 processes to run at the same time.
|
||||
func (sl *ServerLockService) LockExecuteAndRelease(ctx context.Context, actionName string, maxInterval time.Duration, fn func(ctx context.Context)) error {
|
||||
err := sl.acquireForRelease(ctx, actionName, maxInterval)
|
||||
// could not get the lock, returning
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// execute!
|
||||
fn(ctx)
|
||||
|
||||
err = sl.releaseLock(ctx, actionName)
|
||||
if err != nil {
|
||||
sl.log.Error("Error releasing the lock.", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// acquireForRelease will check if the lock is already on the database, if it is, will check with maxInterval if it is
|
||||
// timeouted. Returns nil error if the lock was acquired correctly
|
||||
func (sl *ServerLockService) acquireForRelease(ctx context.Context, actionName string, maxInterval time.Duration) error {
|
||||
// getting the lock - as the action name has a Unique constraint, this will fail if the lock is already on the database
|
||||
err := sl.SQLStore.WithTransactionalDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
|
||||
// we need to find if the lock is in the database
|
||||
lockRows := []*serverLock{}
|
||||
err := dbSession.Where("operation_uid = ?", actionName).Find(&lockRows)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(lockRows) > 0 {
|
||||
result := lockRows[0]
|
||||
if sl.isLockWithinInterval(result, maxInterval) {
|
||||
return errors.New("there is already a lock for this operation")
|
||||
} else {
|
||||
// lock has timeouted, so we update the timestamp
|
||||
result.LastExecution = time.Now().Unix()
|
||||
cond := &serverLock{OperationUID: actionName}
|
||||
affected, err := dbSession.Update(result, cond)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if affected != 1 {
|
||||
sl.log.Error("Expected rows affected to be 1 if there was no error.", "rowAffected", affected)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
// lock not found, creating it
|
||||
lockRow := &serverLock{
|
||||
OperationUID: actionName,
|
||||
LastExecution: time.Now().Unix(),
|
||||
}
|
||||
|
||||
affected, err := dbSession.Insert(lockRow)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if affected != 1 {
|
||||
// this means that there was no error but there is something not working correctly
|
||||
sl.log.Error("Expected rows affected to be 1 if there was no error.", "rowAffected", affected)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// releaseLock will delete the row at the database. This is only intended to be used within the scope of LockExecuteAndRelease
|
||||
// method, but not as to manually release a Lock
|
||||
func (sl *ServerLockService) releaseLock(ctx context.Context, actionName string) error {
|
||||
err := sl.SQLStore.WithDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
|
||||
sql := `DELETE FROM server_lock WHERE operation_uid=? `
|
||||
|
||||
res, err := dbSession.Exec(sql, actionName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
affected, err := res.RowsAffected()
|
||||
if affected != 1 {
|
||||
sl.log.Debug("Error releasing lock ", "affected", affected)
|
||||
}
|
||||
return err
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (sl *ServerLockService) isLockWithinInterval(lock *serverLock, maxInterval time.Duration) bool {
|
||||
if lock.LastExecution != 0 {
|
||||
lastExecutionTime := time.Unix(lock.LastExecution, 0)
|
||||
if time.Since(lastExecutionTime) < maxInterval {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
@ -8,7 +8,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestIntegrationServerLok(t *testing.T) {
|
||||
func TestIntegrationServerLock_LockAndExecute(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test")
|
||||
}
|
||||
@ -35,3 +35,30 @@ func TestIntegrationServerLok(t *testing.T) {
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, 2, counter)
|
||||
}
|
||||
|
||||
func TestIntegrationServerLock_LockExecuteAndRelease(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test")
|
||||
}
|
||||
sl := createTestableServerLock(t)
|
||||
|
||||
counter := 0
|
||||
fn := func(context.Context) { counter++ }
|
||||
atInterval := time.Hour
|
||||
ctx := context.Background()
|
||||
|
||||
//
|
||||
err := sl.LockExecuteAndRelease(ctx, "test-operation", atInterval, fn)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, counter)
|
||||
|
||||
// the function will be executed again, as everytime the lock is released
|
||||
err = sl.LockExecuteAndRelease(ctx, "test-operation", atInterval, fn)
|
||||
require.NoError(t, err)
|
||||
err = sl.LockExecuteAndRelease(ctx, "test-operation", atInterval, fn)
|
||||
require.NoError(t, err)
|
||||
err = sl.LockExecuteAndRelease(ctx, "test-operation", atInterval, fn)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, 4, counter)
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package serverlock
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -14,10 +15,10 @@ import (
|
||||
func createTestableServerLock(t *testing.T) *ServerLockService {
|
||||
t.Helper()
|
||||
|
||||
sqlstore := sqlstore.InitTestDB(t)
|
||||
store := sqlstore.InitTestDB(t)
|
||||
|
||||
return &ServerLockService{
|
||||
SQLStore: sqlstore,
|
||||
SQLStore: store,
|
||||
log: log.New("test-logger"),
|
||||
}
|
||||
}
|
||||
@ -56,3 +57,76 @@ func TestServerLock(t *testing.T) {
|
||||
assert.False(t, gotLock)
|
||||
})
|
||||
}
|
||||
|
||||
func TestLockAndRelease(t *testing.T) {
|
||||
operationUID := "test-operation-release"
|
||||
|
||||
t.Run("create lock and then release it", func(t *testing.T) {
|
||||
sl := createTestableServerLock(t)
|
||||
duration := time.Hour * 5
|
||||
|
||||
err := sl.acquireForRelease(context.Background(), operationUID, duration)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = sl.releaseLock(context.Background(), operationUID)
|
||||
require.NoError(t, err)
|
||||
|
||||
// and now we can acquire it again
|
||||
err2 := sl.acquireForRelease(context.Background(), operationUID, duration)
|
||||
require.NoError(t, err2)
|
||||
|
||||
err = sl.releaseLock(context.Background(), operationUID)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("try to acquire a lock which is already locked, get error", func(t *testing.T) {
|
||||
sl := createTestableServerLock(t)
|
||||
duration := time.Hour * 5
|
||||
|
||||
err := sl.acquireForRelease(context.Background(), operationUID, duration)
|
||||
require.NoError(t, err)
|
||||
|
||||
err2 := sl.acquireForRelease(context.Background(), operationUID, duration)
|
||||
require.Error(t, err2, "We should expect an error when trying to get the second lock")
|
||||
require.Equal(t, "there is already a lock for this operation", err2.Error())
|
||||
|
||||
err3 := sl.releaseLock(context.Background(), operationUID)
|
||||
require.NoError(t, err3)
|
||||
})
|
||||
|
||||
t.Run("lock already exists but is timeouted", func(t *testing.T) {
|
||||
sl := createTestableServerLock(t)
|
||||
pastLastExec := time.Now().Add(-time.Hour).Unix()
|
||||
lock := serverLock{
|
||||
OperationUID: operationUID,
|
||||
LastExecution: pastLastExec,
|
||||
}
|
||||
|
||||
// inserting a row with lock in the past
|
||||
err := sl.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
|
||||
r, err := sess.Insert(lock)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(1), r)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
duration := time.Minute * 5
|
||||
|
||||
err = sl.acquireForRelease(context.Background(), operationUID, duration)
|
||||
require.NoError(t, err)
|
||||
|
||||
//validate that the lock LastExecution was updated (at least different from the original)
|
||||
err = sl.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
|
||||
lockRows := []*serverLock{}
|
||||
err := sess.Where("operation_uid = ?", operationUID).Find(&lockRows)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(lockRows))
|
||||
require.NotEqual(t, pastLastExec, lockRows[0].LastExecution)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err3 := sl.releaseLock(context.Background(), operationUID)
|
||||
require.NoError(t, err3)
|
||||
})
|
||||
}
|
||||
|
Reference in New Issue
Block a user