diff --git a/pkg/infra/serverlock/serverlock.go b/pkg/infra/serverlock/serverlock.go index 7a92ef36c73..0cdb94ca627 100644 --- a/pkg/infra/serverlock/serverlock.go +++ b/pkg/infra/serverlock/serverlock.go @@ -28,20 +28,23 @@ func (sl *ServerLockService) Init() error { // OncePerServerGroup try to create a lock for this server and only executes the // `fn` function when successful. This should not be used at low internal. But services // that needs to be run once every ex 10m. -func (sl *ServerLockService) OncePerServerGroup(ctx context.Context, actionName string, maxEvery time.Duration, fn func()) error { +func (sl *ServerLockService) OncePerServerGroup(ctx context.Context, actionName string, maxInterval time.Duration, fn func()) error { + // gets or creates a lockable row rowLock, err := sl.getOrCreate(ctx, actionName) if err != nil { return err } + // avoid execution if last lock happened less than `matInterval` ago if rowLock.LastExecution != 0 { lastExeuctionTime := time.Unix(rowLock.LastExecution, 0) - if lastExeuctionTime.Unix() > time.Now().Add(-maxEvery).Unix() { + if lastExeuctionTime.Unix() > time.Now().Add(-maxInterval).Unix() { return nil } } - acquiredLock, err := sl.acquireLock(ctx, rowLock, maxEvery) + // try to get lock based on rowLow version + acquiredLock, err := sl.acquireLock(ctx, rowLock) if err != nil { return err } @@ -53,7 +56,7 @@ func (sl *ServerLockService) OncePerServerGroup(ctx context.Context, actionName return nil } -func (sl *ServerLockService) acquireLock(ctx context.Context, serverLock *serverLock, maxEvery time.Duration) (bool, error) { +func (sl *ServerLockService) acquireLock(ctx context.Context, serverLock *serverLock) (bool, error) { var result bool err := sl.SQLStore.WithDbSession(ctx, func(dbSession *sqlstore.DBSession) error { diff --git a/pkg/infra/serverlock/serverlock_integration_test.go b/pkg/infra/serverlock/serverlock_integration_test.go index 0065651b8d7..335f540b2e3 100644 --- a/pkg/infra/serverlock/serverlock_integration_test.go +++ b/pkg/infra/serverlock/serverlock_integration_test.go @@ -13,63 +13,28 @@ import ( func TestServerLok(t *testing.T) { sl := createTestableServerLock(t) - Convey("Server lock integration test", t, func() { + Convey("Server lock integration tests", t, func() { + counter := 0 + var err error + incCounter := func() { counter++ } + atInterval := time.Second * 1 + ctx := context.Background() - Convey("Check that we can call OncePerServerGroup multiple times without executing callback", func() { - counter := 0 - var err error + //this time `fn` should be executed + So(sl.OncePerServerGroup(ctx, "test-operation", atInterval, incCounter), ShouldBeNil) - //this time `fn` should be executed - err = sl.OncePerServerGroup(context.Background(), "test-operation", time.Second*5, func() { counter++ }) - So(err, ShouldBeNil) + //this should not execute `fn` + So(sl.OncePerServerGroup(ctx, "test-operation", atInterval, incCounter), ShouldBeNil) + So(sl.OncePerServerGroup(ctx, "test-operation", atInterval, incCounter), ShouldBeNil) + So(sl.OncePerServerGroup(ctx, "test-operation", atInterval, incCounter), ShouldBeNil) + So(sl.OncePerServerGroup(ctx, "test-operation", atInterval, incCounter), ShouldBeNil) - //this should not execute `fn` - err = sl.OncePerServerGroup(context.Background(), "test-operation", time.Second*5, func() { counter++ }) - So(err, ShouldBeNil) + // wait 5 second. + <-time.After(atInterval * 2) - //this should not execute `fn` - err = sl.OncePerServerGroup(context.Background(), "test-operation", time.Second*5, func() { counter++ }) - So(err, ShouldBeNil) - - // wg := sync.WaitGroup{} - // for i := 0; i < 3; i++ { - // wg.Add(1) - // go func(index int) { - // defer wg.Done() - // //sl := createTestableServerLock(t) - // //<-time.After(time.Second) - - // j := 0 - // for { - // select { - // case <-time.Tick(time.Second): - // fmt.Printf("running worker %d loop %d\n", index, j) - // err := sl.OncePerServerGroup(context.Background(), "test-operation", time.Second*2, func() { - // counter++ - // }) - - // if err != nil { - // t.Errorf("expected. err: %v", err) - // } - - // j++ - // if j > 3 { - // return - // } - // } - // } - // }(i) - // } - - // wg.Wait() - - // wait 5 second. - <-time.After(time.Second * 10) - - // now `fn` should be executed again - err = sl.OncePerServerGroup(context.Background(), "test-operation", time.Second*5, func() { counter++ }) - So(err, ShouldBeNil) - So(counter, ShouldEqual, 2) - }) + // now `fn` should be executed again + err = sl.OncePerServerGroup(ctx, "test-operation", atInterval, incCounter) + So(err, ShouldBeNil) + So(counter, ShouldEqual, 2) }) } diff --git a/pkg/infra/serverlock/serverlock_test.go b/pkg/infra/serverlock/serverlock_test.go index e5166144e0b..ccd1c252090 100644 --- a/pkg/infra/serverlock/serverlock_test.go +++ b/pkg/infra/serverlock/serverlock_test.go @@ -3,10 +3,8 @@ package serverlock import ( "context" "testing" - "time" "github.com/grafana/grafana/pkg/log" - "github.com/grafana/grafana/pkg/services/sqlstore" . "github.com/smartystreets/goconvey/convey" ) @@ -45,11 +43,11 @@ func TestServerLock(t *testing.T) { }) Convey("Should be able to create lock on first row", func() { - gotLock, err := sl.acquireLock(context.Background(), first, time.Second*1) + gotLock, err := sl.acquireLock(context.Background(), first) So(err, ShouldBeNil) So(gotLock, ShouldBeTrue) - gotLock, err = sl.acquireLock(context.Background(), first, time.Second*1) + gotLock, err = sl.acquireLock(context.Background(), first) So(err, ShouldBeNil) So(gotLock, ShouldBeFalse) })