diff --git a/pkg/api/app_routes.go b/pkg/api/app_routes.go index 134e10d411f..74f76787af5 100644 --- a/pkg/api/app_routes.go +++ b/pkg/api/app_routes.go @@ -34,7 +34,7 @@ func (hs *HTTPServer) initAppPluginRoutes(r *web.Mux) { TLSHandshakeTimeout: 10 * time.Second, } - for _, plugin := range hs.pluginStore.Plugins(context.TODO(), plugins.App) { + for _, plugin := range hs.pluginStore.Plugins(context.Background(), plugins.App) { for _, route := range plugin.Routes { url := util.JoinURLFragments("/api/plugin-proxy/"+plugin.ID, route.Path) handlers := make([]web.Handler, 0) diff --git a/pkg/api/ldap_debug.go b/pkg/api/ldap_debug.go index 1bfe3a4cd57..71b9d1d98cb 100644 --- a/pkg/api/ldap_debug.go +++ b/pkg/api/ldap_debug.go @@ -177,7 +177,7 @@ func (hs *HTTPServer) PostSyncUserWithLDAP(c *models.ReqContext) response.Respon authModuleQuery := &models.GetAuthInfoQuery{UserId: query.Result.Id, AuthModule: models.AuthModuleLDAP} - if err := bus.DispatchCtx(context.TODO(), authModuleQuery); err != nil { // validate the userId comes from LDAP + if err := bus.DispatchCtx(c.Req.Context(), authModuleQuery); err != nil { // validate the userId comes from LDAP if errors.Is(err, models.ErrUserNotFound) { return response.Error(404, models.ErrUserNotFound.Error(), nil) } diff --git a/pkg/infra/remotecache/remotecache.go b/pkg/infra/remotecache/remotecache.go index 91e8b9ed06b..be00f9e7b46 100644 --- a/pkg/infra/remotecache/remotecache.go +++ b/pkg/infra/remotecache/remotecache.go @@ -28,7 +28,7 @@ const ( ) func ProvideService(cfg *setting.Cfg, sqlStore *sqlstore.SQLStore) (*RemoteCache, error) { - client, err := createClient(context.Background(), cfg.RemoteCacheOptions, sqlStore) + client, err := createClient(cfg.RemoteCacheOptions, sqlStore) if err != nil { return nil, err } @@ -95,7 +95,7 @@ func (ds *RemoteCache) Run(ctx context.Context) error { return ctx.Err() } -func createClient(ctx context.Context, opts *setting.RemoteCacheOptions, sqlstore *sqlstore.SQLStore) (CacheStorage, error) { +func createClient(opts *setting.RemoteCacheOptions, sqlstore *sqlstore.SQLStore) (CacheStorage, error) { if opts.Name == redisCacheType { return newRedisStorage(opts) } diff --git a/pkg/infra/remotecache/remotecache_test.go b/pkg/infra/remotecache/remotecache_test.go index 476a57789a6..093e8791aee 100644 --- a/pkg/infra/remotecache/remotecache_test.go +++ b/pkg/infra/remotecache/remotecache_test.go @@ -44,7 +44,7 @@ func TestCachedBasedOnConfig(t *testing.T) { } func TestInvalidCacheTypeReturnsError(t *testing.T) { - _, err := createClient(context.Background(), &setting.RemoteCacheOptions{Name: "invalid"}, nil) + _, err := createClient(&setting.RemoteCacheOptions{Name: "invalid"}, nil) assert.Equal(t, err, ErrInvalidCacheType) } diff --git a/pkg/services/live/live.go b/pkg/services/live/live.go index bee622f6604..0e5a86ef65f 100644 --- a/pkg/services/live/live.go +++ b/pkg/services/live/live.go @@ -158,7 +158,7 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r redisClient := redis.NewClient(&redis.Options{ Addr: g.Cfg.LiveHAEngineAddress, }) - cmd := redisClient.Ping(context.TODO()) + cmd := redisClient.Ping(context.Background()) if _, err := cmd.Result(); err != nil { return nil, fmt.Errorf("error pinging Redis: %v", err) } @@ -206,7 +206,7 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r // This can be unreasonable to have in production scenario with many // organizations. orgQuery := &models.SearchOrgsQuery{} - err := sqlstore.SearchOrgs(context.TODO(), orgQuery) + err := sqlstore.SearchOrgs(context.Background(), orgQuery) if err != nil { return nil, fmt.Errorf("can't get org list: %w", err) } @@ -278,7 +278,7 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r // Called when client subscribes to the channel. client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) { err := runConcurrentlyIfNeeded(client.Context(), semaphore, func() { - cb(g.handleOnSubscribe(client, e)) + cb(g.handleOnSubscribe(context.Background(), client, e)) }) if err != nil { cb(centrifuge.SubscribeReply{}, err) @@ -290,7 +290,7 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r // allows some simple prototypes to work quickly. client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) { err := runConcurrentlyIfNeeded(client.Context(), semaphore, func() { - cb(g.handleOnPublish(client, e)) + cb(g.handleOnPublish(context.Background(), client, e)) }) if err != nil { cb(centrifuge.PublishReply{}, err) @@ -429,8 +429,8 @@ type GrafanaLive struct { usageStats usageStats } -func (g *GrafanaLive) getStreamPlugin(pluginID string) (backend.StreamHandler, error) { - plugin, exists := g.pluginStore.Plugin(context.TODO(), pluginID) +func (g *GrafanaLive) getStreamPlugin(ctx context.Context, pluginID string) (backend.StreamHandler, error) { + plugin, exists := g.pluginStore.Plugin(ctx, pluginID) if !exists { return nil, fmt.Errorf("plugin not found: %s", pluginID) } @@ -604,7 +604,7 @@ func (g *GrafanaLive) handleOnRPC(client *centrifuge.Client, e centrifuge.RPCEve }, nil } -func (g *GrafanaLive) handleOnSubscribe(client *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) { +func (g *GrafanaLive) handleOnSubscribe(ctx context.Context, client *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) { logger.Debug("Client wants to subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel) user, ok := livecontext.GetContextSignedUser(client.Context()) @@ -668,7 +668,7 @@ func (g *GrafanaLive) handleOnSubscribe(client *centrifuge.Client, e centrifuge. } } if !ruleFound { - handler, addr, err := g.GetChannelHandler(user, channel) + handler, addr, err := g.GetChannelHandler(ctx, user, channel) if err != nil { if errors.Is(err, live.ErrInvalidChannelID) { logger.Info("Invalid channel ID", "user", client.UserID(), "client", client.ID(), "channel", e.Channel) @@ -704,7 +704,7 @@ func (g *GrafanaLive) handleOnSubscribe(client *centrifuge.Client, e centrifuge. }, nil } -func (g *GrafanaLive) handleOnPublish(client *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) { +func (g *GrafanaLive) handleOnPublish(ctx context.Context, client *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) { logger.Debug("Client wants to publish", "user", client.UserID(), "client", client.ID(), "channel", e.Channel) user, ok := livecontext.GetContextSignedUser(client.Context()) @@ -761,7 +761,7 @@ func (g *GrafanaLive) handleOnPublish(client *centrifuge.Client, e centrifuge.Pu } } - handler, addr, err := g.GetChannelHandler(user, channel) + handler, addr, err := g.GetChannelHandler(ctx, user, channel) if err != nil { if errors.Is(err, live.ErrInvalidChannelID) { logger.Info("Invalid channel ID", "user", client.UserID(), "client", client.ID(), "channel", e.Channel) @@ -831,7 +831,7 @@ func publishStatusToHTTPError(status backend.PublishStreamStatus) (int, string) } // GetChannelHandler gives thread-safe access to the channel. -func (g *GrafanaLive) GetChannelHandler(user *models.SignedInUser, channel string) (models.ChannelHandler, live.Channel, error) { +func (g *GrafanaLive) GetChannelHandler(ctx context.Context, user *models.SignedInUser, channel string) (models.ChannelHandler, live.Channel, error) { // Parse the identifier ${scope}/${namespace}/${path} addr, err := live.ParseChannel(channel) if err != nil { @@ -854,7 +854,7 @@ func (g *GrafanaLive) GetChannelHandler(user *models.SignedInUser, channel strin return c, addr, nil } - getter, err := g.GetChannelHandlerFactory(user, addr.Scope, addr.Namespace) + getter, err := g.GetChannelHandlerFactory(ctx, user, addr.Scope, addr.Namespace) if err != nil { return nil, addr, fmt.Errorf("error getting channel handler factory: %w", err) } @@ -872,14 +872,14 @@ func (g *GrafanaLive) GetChannelHandler(user *models.SignedInUser, channel strin // GetChannelHandlerFactory gets a ChannelHandlerFactory for a namespace. // It gives thread-safe access to the channel. -func (g *GrafanaLive) GetChannelHandlerFactory(user *models.SignedInUser, scope string, namespace string) (models.ChannelHandlerFactory, error) { +func (g *GrafanaLive) GetChannelHandlerFactory(ctx context.Context, user *models.SignedInUser, scope string, namespace string) (models.ChannelHandlerFactory, error) { switch scope { case live.ScopeGrafana: return g.handleGrafanaScope(user, namespace) case live.ScopePlugin: - return g.handlePluginScope(user, namespace) + return g.handlePluginScope(ctx, user, namespace) case live.ScopeDatasource: - return g.handleDatasourceScope(user, namespace) + return g.handleDatasourceScope(ctx, user, namespace) case live.ScopeStream: return g.handleStreamScope(user, namespace) default: @@ -894,7 +894,7 @@ func (g *GrafanaLive) handleGrafanaScope(_ *models.SignedInUser, namespace strin return nil, fmt.Errorf("unknown feature: %q", namespace) } -func (g *GrafanaLive) handlePluginScope(_ *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) { +func (g *GrafanaLive) handlePluginScope(ctx context.Context, _ *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) { // Temporary hack until we have a more generic solution later on if namespace == "cloudwatch" { return &cloudwatch.LogQueryRunnerSupplier{ @@ -902,7 +902,7 @@ func (g *GrafanaLive) handlePluginScope(_ *models.SignedInUser, namespace string Service: g.LogsService, }, nil } - streamHandler, err := g.getStreamPlugin(namespace) + streamHandler, err := g.getStreamPlugin(ctx, namespace) if err != nil { return nil, fmt.Errorf("can't find stream plugin: %s", namespace) } @@ -919,12 +919,12 @@ func (g *GrafanaLive) handleStreamScope(u *models.SignedInUser, namespace string return g.ManagedStreamRunner.GetOrCreateStream(u.OrgId, live.ScopeStream, namespace) } -func (g *GrafanaLive) handleDatasourceScope(user *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) { - ds, err := g.DataSourceCache.GetDatasourceByUID(context.TODO(), namespace, user, false) +func (g *GrafanaLive) handleDatasourceScope(ctx context.Context, user *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) { + ds, err := g.DataSourceCache.GetDatasourceByUID(ctx, namespace, user, false) if err != nil { return nil, fmt.Errorf("error getting datasource: %w", err) } - streamHandler, err := g.getStreamPlugin(ds.Type) + streamHandler, err := g.getStreamPlugin(ctx, ds.Type) if err != nil { return nil, fmt.Errorf("can't find stream plugin: %s", ds.Type) } @@ -996,7 +996,7 @@ func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext) response.Respons } } - channelHandler, addr, err := g.GetChannelHandler(ctx.SignedInUser, cmd.Channel) + channelHandler, addr, err := g.GetChannelHandler(ctx.Req.Context(), ctx.SignedInUser, cmd.Channel) if err != nil { logger.Error("Error getting channels handler", "error", err, "channel", cmd.Channel) return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil) diff --git a/pkg/services/live/managedstream/cache.go b/pkg/services/live/managedstream/cache.go index 8ab810ce58b..e6f48e0ab9f 100644 --- a/pkg/services/live/managedstream/cache.go +++ b/pkg/services/live/managedstream/cache.go @@ -1,6 +1,7 @@ package managedstream import ( + "context" "encoding/json" "github.com/grafana/grafana-plugin-sdk-go/data" @@ -11,7 +12,7 @@ type FrameCache interface { // GetActiveChannels returns active managed stream channels with JSON schema. GetActiveChannels(orgID int64) (map[string]json.RawMessage, error) // GetFrame returns full JSON frame for a channel in org. - GetFrame(orgID int64, channel string) (json.RawMessage, bool, error) + GetFrame(ctx context.Context, orgID int64, channel string) (json.RawMessage, bool, error) // Update updates frame cache and returns true if schema changed. - Update(orgID int64, channel string, frameJson data.FrameJSONCache) (bool, error) + Update(ctx context.Context, orgID int64, channel string, frameJson data.FrameJSONCache) (bool, error) } diff --git a/pkg/services/live/managedstream/cache_memory.go b/pkg/services/live/managedstream/cache_memory.go index 520c4ee079d..e90afd06a0b 100644 --- a/pkg/services/live/managedstream/cache_memory.go +++ b/pkg/services/live/managedstream/cache_memory.go @@ -1,6 +1,7 @@ package managedstream import ( + "context" "encoding/json" "sync" @@ -34,14 +35,14 @@ func (c *MemoryFrameCache) GetActiveChannels(orgID int64) (map[string]json.RawMe return info, nil } -func (c *MemoryFrameCache) GetFrame(orgID int64, channel string) (json.RawMessage, bool, error) { +func (c *MemoryFrameCache) GetFrame(ctx context.Context, orgID int64, channel string) (json.RawMessage, bool, error) { c.mu.RLock() defer c.mu.RUnlock() cachedFrame, ok := c.frames[orgID][channel] return cachedFrame.Bytes(data.IncludeAll), ok, nil } -func (c *MemoryFrameCache) Update(orgID int64, channel string, jsonFrame data.FrameJSONCache) (bool, error) { +func (c *MemoryFrameCache) Update(ctx context.Context, orgID int64, channel string, jsonFrame data.FrameJSONCache) (bool, error) { c.mu.Lock() defer c.mu.Unlock() if _, ok := c.frames[orgID]; !ok { diff --git a/pkg/services/live/managedstream/cache_memory_test.go b/pkg/services/live/managedstream/cache_memory_test.go index 9f58969a8b4..ae695fc5e2d 100644 --- a/pkg/services/live/managedstream/cache_memory_test.go +++ b/pkg/services/live/managedstream/cache_memory_test.go @@ -1,6 +1,7 @@ package managedstream import ( + "context" "encoding/json" "testing" @@ -15,7 +16,7 @@ func testFrameCache(t *testing.T, c FrameCache) { frameJsonCache, err := data.FrameToJSONCache(frame) require.NoError(t, err) - updated, err := c.Update(1, "test", frameJsonCache) + updated, err := c.Update(context.Background(), 1, "test", frameJsonCache) require.NoError(t, err) require.True(t, updated) @@ -27,7 +28,7 @@ func testFrameCache(t *testing.T, c FrameCache) { require.NotZero(t, schema) // Make sure the same frame does not update schema. - updated, err = c.Update(1, "test", frameJsonCache) + updated, err = c.Update(context.Background(), 1, "test", frameJsonCache) require.NoError(t, err) require.False(t, updated) @@ -37,17 +38,17 @@ func testFrameCache(t *testing.T, c FrameCache) { require.NoError(t, err) // Make sure schema updated. - updated, err = c.Update(1, "test", frameJsonCache) + updated, err = c.Update(context.Background(), 1, "test", frameJsonCache) require.NoError(t, err) require.True(t, updated) // Add the same with another orgID and make sure schema updated. - updated, err = c.Update(2, "test", frameJsonCache) + updated, err = c.Update(context.Background(), 2, "test", frameJsonCache) require.NoError(t, err) require.True(t, updated) // Make sure that the last frame successfully saved in cache. - frameJSON, ok, err := c.GetFrame(1, "test") + frameJSON, ok, err := c.GetFrame(context.Background(), 1, "test") require.NoError(t, err) require.True(t, ok) diff --git a/pkg/services/live/managedstream/cache_redis.go b/pkg/services/live/managedstream/cache_redis.go index 2523c8b2747..1fe83211206 100644 --- a/pkg/services/live/managedstream/cache_redis.go +++ b/pkg/services/live/managedstream/cache_redis.go @@ -42,9 +42,9 @@ func (c *RedisFrameCache) GetActiveChannels(orgID int64) (map[string]json.RawMes return info, nil } -func (c *RedisFrameCache) GetFrame(orgID int64, channel string) (json.RawMessage, bool, error) { +func (c *RedisFrameCache) GetFrame(ctx context.Context, orgID int64, channel string) (json.RawMessage, bool, error) { key := getCacheKey(orgchannel.PrependOrgID(orgID, channel)) - cmd := c.redisClient.HGetAll(context.TODO(), key) + cmd := c.redisClient.HGetAll(ctx, key) result, err := cmd.Result() if err != nil { return nil, false, err @@ -59,7 +59,7 @@ const ( frameCacheTTL = 7 * 24 * time.Hour ) -func (c *RedisFrameCache) Update(orgID int64, channel string, jsonFrame data.FrameJSONCache) (bool, error) { +func (c *RedisFrameCache) Update(ctx context.Context, orgID int64, channel string, jsonFrame data.FrameJSONCache) (bool, error) { c.mu.Lock() if _, ok := c.frames[orgID]; !ok { c.frames[orgID] = map[string]data.FrameJSONCache{} @@ -74,8 +74,6 @@ func (c *RedisFrameCache) Update(orgID int64, channel string, jsonFrame data.Fra pipe := c.redisClient.TxPipeline() defer func() { _ = pipe.Close() }() - ctx := context.TODO() - pipe.HGetAll(ctx, key) pipe.HMSet(ctx, key, map[string]string{ "schema": stringSchema, diff --git a/pkg/services/live/managedstream/runner.go b/pkg/services/live/managedstream/runner.go index f622c95e91d..e30611dc51e 100644 --- a/pkg/services/live/managedstream/runner.go +++ b/pkg/services/live/managedstream/runner.go @@ -170,7 +170,7 @@ func NewNamespaceStream(orgID int64, scope string, namespace string, publisher m // Push sends frame to the stream and saves it for later retrieval by subscribers. // * Saves the entire frame to cache. // * If schema has been changed sends entire frame to channel, otherwise only data. -func (s *NamespaceStream) Push(path string, frame *data.Frame) error { +func (s *NamespaceStream) Push(ctx context.Context, path string, frame *data.Frame) error { jsonFrameCache, err := data.FrameToJSONCache(frame) if err != nil { return err @@ -179,7 +179,7 @@ func (s *NamespaceStream) Push(path string, frame *data.Frame) error { // The channel this will be posted into. channel := live.Channel{Scope: s.scope, Namespace: s.namespace, Path: path}.String() - isUpdated, err := s.frameCache.Update(s.orgID, channel, jsonFrameCache) + isUpdated, err := s.frameCache.Update(ctx, s.orgID, channel, jsonFrameCache) if err != nil { logger.Error("Error updating managed stream schema", "error", err) return err @@ -238,9 +238,9 @@ func (s *NamespaceStream) GetHandlerForPath(_ string) (models.ChannelHandler, er return s, nil } -func (s *NamespaceStream) OnSubscribe(_ context.Context, u *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) { +func (s *NamespaceStream) OnSubscribe(ctx context.Context, u *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) { reply := models.SubscribeReply{} - frameJSON, ok, err := s.frameCache.GetFrame(u.OrgId, e.Channel) + frameJSON, ok, err := s.frameCache.GetFrame(ctx, u.OrgId, e.Channel) if err != nil { return reply, 0, err } diff --git a/pkg/services/live/managedstream/runner_test.go b/pkg/services/live/managedstream/runner_test.go index cde92eb489e..7c1fce5a7a1 100644 --- a/pkg/services/live/managedstream/runner_test.go +++ b/pkg/services/live/managedstream/runner_test.go @@ -1,6 +1,7 @@ package managedstream import ( + "context" "testing" "time" @@ -57,13 +58,13 @@ func TestGetManagedStreams(t *testing.T) { require.NoError(t, err) require.Len(t, managedChannels, 4) // 4 hardcoded testdata streams. - err = s1.Push("cpu1", data.NewFrame("cpu1")) + err = s1.Push(context.Background(), "cpu1", data.NewFrame("cpu1")) require.NoError(t, err) - err = s1.Push("cpu2", data.NewFrame("cpu2")) + err = s1.Push(context.Background(), "cpu2", data.NewFrame("cpu2")) require.NoError(t, err) - err = s2.Push("cpu1", data.NewFrame("cpu1")) + err = s2.Push(context.Background(), "cpu1", data.NewFrame("cpu1")) require.NoError(t, err) managedChannels, err = runner.GetManagedChannels(1) @@ -76,7 +77,7 @@ func TestGetManagedStreams(t *testing.T) { // Different org. s3, err := runner.GetOrCreateStream(2, "stream", "test1") require.NoError(t, err) - err = s3.Push("cpu1", data.NewFrame("cpu1")) + err = s3.Push(context.Background(), "cpu1", data.NewFrame("cpu1")) require.NoError(t, err) managedChannels, err = runner.GetManagedChannels(1) require.NoError(t, err) diff --git a/pkg/services/live/pipeline/data_output_builtin.go b/pkg/services/live/pipeline/data_output_builtin.go index 4b2d4b20b1b..0780203169b 100644 --- a/pkg/services/live/pipeline/data_output_builtin.go +++ b/pkg/services/live/pipeline/data_output_builtin.go @@ -29,7 +29,7 @@ func (s *BuiltinDataOutput) OutputData(ctx context.Context, vars Vars, data []by if !ok { return nil, errors.New("user not found in context") } - handler, _, err := s.channelHandlerGetter.GetChannelHandler(u, vars.Channel) + handler, _, err := s.channelHandlerGetter.GetChannelHandler(ctx, u, vars.Channel) if err != nil { return nil, err } diff --git a/pkg/services/live/pipeline/frame_output_managed_stream.go b/pkg/services/live/pipeline/frame_output_managed_stream.go index cbc0393ed7a..705f3735f32 100644 --- a/pkg/services/live/pipeline/frame_output_managed_stream.go +++ b/pkg/services/live/pipeline/frame_output_managed_stream.go @@ -22,11 +22,11 @@ func (out *ManagedStreamFrameOutput) Type() string { return FrameOutputTypeManagedStream } -func (out *ManagedStreamFrameOutput) OutputFrame(_ context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error) { +func (out *ManagedStreamFrameOutput) OutputFrame(ctx context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error) { stream, err := out.managedStream.GetOrCreateStream(vars.OrgID, vars.Scope, vars.Namespace) if err != nil { logger.Error("Error getting stream", "error", err) return nil, err } - return nil, stream.Push(vars.Path, frame) + return nil, stream.Push(ctx, vars.Path, frame) } diff --git a/pkg/services/live/pipeline/subscribe_builtin.go b/pkg/services/live/pipeline/subscribe_builtin.go index fcb033fd6cd..7b7e6ba3e0b 100644 --- a/pkg/services/live/pipeline/subscribe_builtin.go +++ b/pkg/services/live/pipeline/subscribe_builtin.go @@ -15,7 +15,7 @@ type BuiltinSubscriber struct { } type ChannelHandlerGetter interface { - GetChannelHandler(user *models.SignedInUser, channel string) (models.ChannelHandler, live.Channel, error) + GetChannelHandler(ctx context.Context, user *models.SignedInUser, channel string) (models.ChannelHandler, live.Channel, error) } const SubscriberTypeBuiltin = "builtin" @@ -33,7 +33,7 @@ func (s *BuiltinSubscriber) Subscribe(ctx context.Context, vars Vars, data []byt if !ok { return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil } - handler, _, err := s.channelHandlerGetter.GetChannelHandler(u, vars.Channel) + handler, _, err := s.channelHandlerGetter.GetChannelHandler(ctx, u, vars.Channel) if err != nil { return models.SubscribeReply{}, 0, err } diff --git a/pkg/services/live/pushhttp/push.go b/pkg/services/live/pushhttp/push.go index a5ed4748d89..d442443e21e 100644 --- a/pkg/services/live/pushhttp/push.go +++ b/pkg/services/live/pushhttp/push.go @@ -87,7 +87,7 @@ func (g *Gateway) Handle(ctx *models.ReqContext) { // interval = "1s" vs flush_interval = "5s" for _, mf := range metricFrames { - err := stream.Push(mf.Key(), mf.Frame()) + err := stream.Push(ctx.Req.Context(), mf.Key(), mf.Frame()) if err != nil { logger.Error("Error pushing frame", "error", err, "data", string(body)) ctx.Resp.WriteHeader(http.StatusInternalServerError) diff --git a/pkg/services/live/pushws/push_stream.go b/pkg/services/live/pushws/push_stream.go index 0b1ffbdabbb..d52b97b1147 100644 --- a/pkg/services/live/pushws/push_stream.go +++ b/pkg/services/live/pushws/push_stream.go @@ -91,7 +91,7 @@ func (s *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { } for _, mf := range metricFrames { - err := stream.Push(mf.Key(), mf.Frame()) + err := stream.Push(r.Context(), mf.Key(), mf.Frame()) if err != nil { logger.Error("Error pushing frame", "error", err, "data", string(body)) return diff --git a/pkg/services/login/authinfoservice/database.go b/pkg/services/login/authinfoservice/database.go index 45452878af0..f443f7200de 100644 --- a/pkg/services/login/authinfoservice/database.go +++ b/pkg/services/login/authinfoservice/database.go @@ -21,7 +21,7 @@ func (s *Implementation) GetExternalUserInfoByLogin(ctx context.Context, query * } authInfoQuery := &models.GetAuthInfoQuery{UserId: userQuery.Result.Id} - if err := s.Bus.DispatchCtx(context.TODO(), authInfoQuery); err != nil { + if err := s.Bus.DispatchCtx(ctx, authInfoQuery); err != nil { return err } @@ -47,7 +47,7 @@ func (s *Implementation) GetAuthInfo(ctx context.Context, query *models.GetAuthI var has bool var err error - err = s.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error { + err = s.SQLStore.WithDbSession(ctx, func(sess *sqlstore.DBSession) error { has, err = sess.Desc("created").Get(userAuth) return err }) @@ -121,7 +121,7 @@ func (s *Implementation) SetAuthInfo(ctx context.Context, cmd *models.SetAuthInf authUser.OAuthExpiry = cmd.OAuthToken.Expiry } - return s.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error { + return s.SQLStore.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error { _, err := sess.Insert(authUser) return err }) @@ -169,7 +169,7 @@ func (s *Implementation) UpdateAuthInfo(ctx context.Context, cmd *models.UpdateA AuthModule: cmd.AuthModule, } - return s.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error { + return s.SQLStore.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error { upd, err := sess.Update(authUser, cond) s.logger.Debug("Updated user_auth", "user_id", cmd.UserId, "auth_module", cmd.AuthModule, "rows", upd) return err @@ -177,7 +177,7 @@ func (s *Implementation) UpdateAuthInfo(ctx context.Context, cmd *models.UpdateA } func (s *Implementation) DeleteAuthInfo(ctx context.Context, cmd *models.DeleteAuthInfoCommand) error { - return s.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error { + return s.SQLStore.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error { _, err := sess.Delete(cmd.UserAuth) return err }) diff --git a/pkg/services/ngalert/notifier/alertmanager.go b/pkg/services/ngalert/notifier/alertmanager.go index 74b79ff7cc8..5928d5bfb10 100644 --- a/pkg/services/ngalert/notifier/alertmanager.go +++ b/pkg/services/ngalert/notifier/alertmanager.go @@ -127,7 +127,7 @@ type Alertmanager struct { decryptFn channels.GetDecryptedValueFn } -func newAlertmanager(orgID int64, cfg *setting.Cfg, store store.AlertingStore, kvStore kvstore.KVStore, +func newAlertmanager(ctx context.Context, orgID int64, cfg *setting.Cfg, store store.AlertingStore, kvStore kvstore.KVStore, peer ClusterPeer, decryptFn channels.GetDecryptedValueFn, m *metrics.Alertmanager) (*Alertmanager, error) { am := &Alertmanager{ Settings: cfg, @@ -147,11 +147,11 @@ func newAlertmanager(orgID int64, cfg *setting.Cfg, store store.AlertingStore, k am.gokitLogger = logging.NewWrapper(am.logger) am.fileStore = NewFileStore(am.orgID, kvStore, am.WorkingDirPath()) - nflogFilepath, err := am.fileStore.FilepathFor(context.TODO(), notificationLogFilename) + nflogFilepath, err := am.fileStore.FilepathFor(ctx, notificationLogFilename) if err != nil { return nil, err } - silencesFilePath, err := am.fileStore.FilepathFor(context.TODO(), silencesFilename) + silencesFilePath, err := am.fileStore.FilepathFor(ctx, silencesFilename) if err != nil { return nil, err } @@ -162,7 +162,7 @@ func newAlertmanager(orgID int64, cfg *setting.Cfg, store store.AlertingStore, k nflog.WithRetention(retentionNotificationsAndSilences), nflog.WithSnapshot(nflogFilepath), nflog.WithMaintenance(maintenanceNotificationAndSilences, am.stopc, am.wg.Done, func() (int64, error) { - return am.fileStore.Persist(context.TODO(), notificationLogFilename, am.notificationLog) + return am.fileStore.Persist(ctx, notificationLogFilename, am.notificationLog) }), ) if err != nil { @@ -187,7 +187,7 @@ func newAlertmanager(orgID int64, cfg *setting.Cfg, store store.AlertingStore, k am.wg.Add(1) go func() { am.silences.Maintenance(15*time.Minute, silencesFilePath, am.stopc, func() (int64, error) { - return am.fileStore.Persist(context.TODO(), silencesFilename, am.silences) + return am.fileStore.Persist(ctx, silencesFilename, am.silences) }) am.wg.Done() }() diff --git a/pkg/services/ngalert/notifier/alertmanager_test.go b/pkg/services/ngalert/notifier/alertmanager_test.go index edc9601a9f8..e19c3e28dcf 100644 --- a/pkg/services/ngalert/notifier/alertmanager_test.go +++ b/pkg/services/ngalert/notifier/alertmanager_test.go @@ -51,7 +51,7 @@ func setupAMTest(t *testing.T) *Alertmanager { kvStore := NewFakeKVStore(t) secretsService := secretsManager.SetupTestService(t, database.ProvideSecretsStore(sqlStore)) decryptFn := secretsService.GetDecryptedValue - am, err := newAlertmanager(1, cfg, s, kvStore, &NilPeer{}, decryptFn, m) + am, err := newAlertmanager(context.Background(), 1, cfg, s, kvStore, &NilPeer{}, decryptFn, m) require.NoError(t, err) return am } diff --git a/pkg/services/ngalert/notifier/multiorg_alertmanager.go b/pkg/services/ngalert/notifier/multiorg_alertmanager.go index 69b4a2bfa33..9fc207f35f0 100644 --- a/pkg/services/ngalert/notifier/multiorg_alertmanager.go +++ b/pkg/services/ngalert/notifier/multiorg_alertmanager.go @@ -171,7 +171,7 @@ func (moa *MultiOrgAlertmanager) SyncAlertmanagersForOrgs(ctx context.Context, o // To export them, we need to translate the metrics from each individual registry and, // then aggregate them on the main registry. m := metrics.NewAlertmanagerMetrics(moa.metrics.GetOrCreateOrgRegistry(orgID)) - am, err := newAlertmanager(orgID, moa.settings, moa.configStore, moa.kvStore, moa.peer, moa.decryptFn, m) + am, err := newAlertmanager(ctx, orgID, moa.settings, moa.configStore, moa.kvStore, moa.peer, moa.decryptFn, m) if err != nil { moa.logger.Error("unable to create Alertmanager for org", "org", orgID, "err", err) } diff --git a/pkg/services/ngalert/state/cache.go b/pkg/services/ngalert/state/cache.go index 31366e11880..049c87c22d5 100644 --- a/pkg/services/ngalert/state/cache.go +++ b/pkg/services/ngalert/state/cache.go @@ -1,6 +1,7 @@ package state import ( + "context" "fmt" "net/url" "strings" @@ -32,14 +33,14 @@ func newCache(logger log.Logger, metrics *metrics.State, externalURL *url.URL) * } } -func (c *cache) getOrCreate(alertRule *ngModels.AlertRule, result eval.Result) *State { +func (c *cache) getOrCreate(ctx context.Context, alertRule *ngModels.AlertRule, result eval.Result) *State { c.mtxStates.Lock() defer c.mtxStates.Unlock() // clone the labels so we don't change eval.Result labels := result.Instance.Copy() attachRuleLabels(labels, alertRule) - ruleLabels, annotations := c.expandRuleLabelsAndAnnotations(alertRule, labels, result) + ruleLabels, annotations := c.expandRuleLabelsAndAnnotations(ctx, alertRule, labels, result) // if duplicate labels exist, alertRule label will take precedence lbs := mergeLabels(ruleLabels, result.Instance) @@ -88,11 +89,11 @@ func attachRuleLabels(m map[string]string, alertRule *ngModels.AlertRule) { m[prometheusModel.AlertNameLabel] = alertRule.Title } -func (c *cache) expandRuleLabelsAndAnnotations(alertRule *ngModels.AlertRule, labels map[string]string, alertInstance eval.Result) (map[string]string, map[string]string) { +func (c *cache) expandRuleLabelsAndAnnotations(ctx context.Context, alertRule *ngModels.AlertRule, labels map[string]string, alertInstance eval.Result) (map[string]string, map[string]string) { expand := func(original map[string]string) map[string]string { expanded := make(map[string]string, len(original)) for k, v := range original { - ev, err := expandTemplate(alertRule.Title, v, labels, alertInstance, c.externalURL) + ev, err := expandTemplate(ctx, alertRule.Title, v, labels, alertInstance, c.externalURL) expanded[k] = ev if err != nil { c.log.Error("error in expanding template", "name", k, "value", v, "err", err.Error()) diff --git a/pkg/services/ngalert/state/manager.go b/pkg/services/ngalert/state/manager.go index 968419368a1..5a3d2e03cb3 100644 --- a/pkg/services/ngalert/state/manager.go +++ b/pkg/services/ngalert/state/manager.go @@ -116,8 +116,8 @@ func (st *Manager) Warm() { } } -func (st *Manager) getOrCreate(alertRule *ngModels.AlertRule, result eval.Result) *State { - return st.cache.getOrCreate(alertRule, result) +func (st *Manager) getOrCreate(ctx context.Context, alertRule *ngModels.AlertRule, result eval.Result) *State { + return st.cache.getOrCreate(ctx, alertRule, result) } func (st *Manager) set(entry *State) { @@ -153,7 +153,7 @@ func (st *Manager) ProcessEvalResults(ctx context.Context, alertRule *ngModels.A // Set the current state based on evaluation results func (st *Manager) setNextState(ctx context.Context, alertRule *ngModels.AlertRule, result eval.Result) *State { - currentState := st.getOrCreate(alertRule, result) + currentState := st.getOrCreate(ctx, alertRule, result) currentState.LastEvaluationTime = result.EvaluatedAt currentState.EvaluationDuration = result.EvaluationDuration diff --git a/pkg/services/ngalert/state/template.go b/pkg/services/ngalert/state/template.go index 2408af5fe49..3169e1b8d62 100644 --- a/pkg/services/ngalert/state/template.go +++ b/pkg/services/ngalert/state/template.go @@ -31,7 +31,7 @@ func (v templateCaptureValue) String() string { return strconv.FormatFloat(v.Value, 'f', -1, 64) } -func expandTemplate(name, text string, labels map[string]string, alertInstance eval.Result, externalURL *url.URL) (result string, resultErr error) { +func expandTemplate(ctx context.Context, name, text string, labels map[string]string, alertInstance eval.Result, externalURL *url.URL) (result string, resultErr error) { name = "__alert_" + name text = "{{- $labels := .Labels -}}{{- $values := .Values -}}{{- $value := .Value -}}" + text data := struct { @@ -45,7 +45,7 @@ func expandTemplate(name, text string, labels map[string]string, alertInstance e } expander := template.NewTemplateExpander( - context.TODO(), // This context is only used with the `query()` function - which we don't support yet. + ctx, // This context is only used with the `query()` function - which we don't support yet. text, name, data, diff --git a/pkg/services/ngalert/state/template_test.go b/pkg/services/ngalert/state/template_test.go index 480d2e96043..74ba9d64f79 100644 --- a/pkg/services/ngalert/state/template_test.go +++ b/pkg/services/ngalert/state/template_test.go @@ -1,6 +1,7 @@ package state import ( + "context" "errors" "net/url" "testing" @@ -403,7 +404,7 @@ func TestExpandTemplate(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { - v, err := expandTemplate("test", c.text, c.labels, c.alertInstance, externalURL) + v, err := expandTemplate(context.Background(), "test", c.text, c.labels, c.alertInstance, externalURL) if c.expectedError != nil { require.NotNil(t, err) require.EqualError(t, c.expectedError, err.Error())