From c22905f86470aa60cef33fbbec6c58e4401c20c1 Mon Sep 17 00:00:00 2001 From: ying-jeanne <74549700+ying-jeanne@users.noreply.github.com> Date: Thu, 8 Jul 2021 12:03:55 +0200 Subject: [PATCH] Graphite: Convert to use grafana-plugin-sdk-go contracts (#35798) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Use Dataframes and extract tags from response * Fix timestamp conversion * Add tests for data frame conversion * Add missing RefID and simplify returning an error * draft dataframe/sdk convertion for graphite * intermedia * modify init because registration failed * Allocate memory for each data point value * Remove redundant memory aliasing * Remove redundant new line * Sort imports * Simplify returning nil values * fix lint * remove unused jsondata * add checks on query length * remove basic auth from request Co-authored-by: Piotr Jamróz --- pkg/tsdb/graphite/graphite.go | 226 ++++++++++++++++------------- pkg/tsdb/graphite/graphite_test.go | 23 +-- pkg/tsdb/service.go | 2 - 3 files changed, 131 insertions(+), 120 deletions(-) diff --git a/pkg/tsdb/graphite/graphite.go b/pkg/tsdb/graphite/graphite.go index 03cfdd4ac3e..4982cbfb0af 100644 --- a/pkg/tsdb/graphite/graphite.go +++ b/pkg/tsdb/graphite/graphite.go @@ -10,55 +10,112 @@ import ( "net/url" "path" "regexp" - "strconv" "strings" "time" "golang.org/x/net/context/ctxhttp" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/backend/datasource" + "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/httpclient" "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin" + "github.com/grafana/grafana/pkg/registry" "github.com/grafana/grafana/pkg/setting" "github.com/opentracing/opentracing-go" ) -type GraphiteExecutor struct { - httpClientProvider httpclient.Provider +type Service struct { + logger log.Logger + im instancemgmt.InstanceManager + BackendPluginManager backendplugin.Manager `inject:""` + Cfg *setting.Cfg `inject:""` + HTTPClientProvider httpclient.Provider `inject:""` } -// nolint:staticcheck // plugins.DataPlugin deprecated -func New(httpClientProvider httpclient.Provider) func(*models.DataSource) (plugins.DataPlugin, error) { - // nolint:staticcheck // plugins.DataPlugin deprecated - return func(dsInfo *models.DataSource) (plugins.DataPlugin, error) { - return &GraphiteExecutor{ - httpClientProvider: httpClientProvider, - }, nil - } +func init() { + registry.Register(®istry.Descriptor{ + Name: "GraphiteService", + InitPriority: registry.Low, + Instance: &Service{}, + }) } -var glog = log.New("tsdb.graphite") +type datasourceInfo struct { + HTTPClient *http.Client + URL string + Id int64 +} -//nolint: staticcheck // plugins.DataQuery deprecated -func (e *GraphiteExecutor) DataQuery(ctx context.Context, dsInfo *models.DataSource, tsdbQuery plugins.DataQuery) ( - plugins.DataResponse, error) { - // This logic is used when called from Dashboard Alerting. - from := "-" + formatTimeRange(tsdbQuery.TimeRange.From) - until := formatTimeRange(tsdbQuery.TimeRange.To) - - // This logic is used when called through server side expressions. - if isTimeRangeNumeric(*tsdbQuery.TimeRange) { - var err error - from, until, err = epochMStoGraphiteTime(*tsdbQuery.TimeRange) +func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.InstanceFactoryFunc { + return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + opts, err := settings.HTTPClientOptions() if err != nil { - return plugins.DataResponse{}, err + return nil, err } + + client, err := httpClientProvider.New(opts) + if err != nil { + return nil, err + } + + model := datasourceInfo{ + HTTPClient: client, + URL: settings.URL, + Id: settings.ID, + } + + return model, nil + } +} + +func (s *Service) Init() error { + s.logger = log.New("tsdb.graphite") + s.im = datasource.NewInstanceManager(newInstanceSettings(s.HTTPClientProvider)) + + factory := coreplugin.New(backend.ServeOpts{ + QueryDataHandler: s, + }) + + if err := s.BackendPluginManager.RegisterAndStart(context.Background(), "graphite", factory); err != nil { + s.logger.Error("Failed to register plugin", "error", err) + } + return nil +} + +func (s *Service) getDSInfo(pluginCtx backend.PluginContext) (*datasourceInfo, error) { + i, err := s.im.Get(pluginCtx) + if err != nil { + return nil, err + } + instance := i.(datasourceInfo) + return &instance, nil +} + +func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + if len(req.Queries) == 0 { + return nil, fmt.Errorf("query contains no queries") } - var target string + // get datasource info from context + dsInfo, err := s.getDSInfo(req.PluginContext) + if err != nil { + return nil, err + } + // take the first query in the request list, since all query should share the same timerange + q := req.Queries[0] + + /* + graphite doc about from and until, with sdk we are getting absolute instead of relative time + https://graphite-api.readthedocs.io/en/latest/api.html#from-until + */ + from, until := epochMStoGraphiteTime(q.TimeRange) formData := url.Values{ "from": []string{from}, "until": []string{until}, @@ -66,42 +123,45 @@ func (e *GraphiteExecutor) DataQuery(ctx context.Context, dsInfo *models.DataSou "maxDataPoints": []string{"500"}, } + // Calculate and get the last target of Graphite Request + var target string emptyQueries := make([]string, 0) - for _, query := range tsdbQuery.Queries { - glog.Debug("graphite", "query", query.Model) + for _, query := range req.Queries { + model, err := simplejson.NewJson(query.JSON) + if err != nil { + return nil, err + } + s.logger.Debug("graphite", "query", model) currTarget := "" - if fullTarget, err := query.Model.Get("targetFull").String(); err == nil { + if fullTarget, err := model.Get("targetFull").String(); err == nil { currTarget = fullTarget } else { - currTarget = query.Model.Get("target").MustString() + currTarget = model.Get("target").MustString() } if currTarget == "" { - glog.Debug("graphite", "empty query target", query.Model) - emptyQueries = append(emptyQueries, fmt.Sprintf("Query: %v has no target", query.Model)) + s.logger.Debug("graphite", "empty query target", model) + emptyQueries = append(emptyQueries, fmt.Sprintf("Query: %v has no target", model)) continue } target = fixIntervalFormat(currTarget) } + var result = backend.QueryDataResponse{} + if target == "" { - glog.Error("No targets in query model", "models without targets", strings.Join(emptyQueries, "\n")) - return plugins.DataResponse{}, errors.New("no query target found for the alert rule") + s.logger.Error("No targets in query model", "models without targets", strings.Join(emptyQueries, "\n")) + return &result, errors.New("no query target found for the alert rule") } formData["target"] = []string{target} if setting.Env == setting.Dev { - glog.Debug("Graphite request", "params", formData) + s.logger.Debug("Graphite request", "params", formData) } - req, err := e.createRequest(dsInfo, formData) + graphiteReq, err := s.createRequest(dsInfo, formData) if err != nil { - return plugins.DataResponse{}, err - } - - httpClient, err := dsInfo.GetHTTPClient(e.httpClientProvider) - if err != nil { - return plugins.DataResponse{}, err + return &result, err } span, ctx := opentracing.StartSpanFromContext(ctx, "graphite query") @@ -109,65 +169,66 @@ func (e *GraphiteExecutor) DataQuery(ctx context.Context, dsInfo *models.DataSou span.SetTag("from", from) span.SetTag("until", until) span.SetTag("datasource_id", dsInfo.Id) - span.SetTag("org_id", dsInfo.OrgId) + span.SetTag("org_id", req.PluginContext.OrgID) defer span.Finish() if err := opentracing.GlobalTracer().Inject( span.Context(), opentracing.HTTPHeaders, - opentracing.HTTPHeadersCarrier(req.Header)); err != nil { - return plugins.DataResponse{}, err + opentracing.HTTPHeadersCarrier(graphiteReq.Header)); err != nil { + return &result, err } - res, err := ctxhttp.Do(ctx, httpClient, req) + res, err := ctxhttp.Do(ctx, dsInfo.HTTPClient, graphiteReq) if err != nil { - return plugins.DataResponse{}, err + return &result, err } - frames, err := e.toDataFrames(res) + frames, err := s.toDataFrames(res) if err != nil { - return plugins.DataResponse{}, err + return &result, err } - result := plugins.DataResponse{ - Results: make(map[string]plugins.DataQueryResult), + result = backend.QueryDataResponse{ + Responses: make(backend.Responses), } - result.Results["A"] = plugins.DataQueryResult{ - RefID: "A", - Dataframes: plugins.NewDecodedDataFrames(frames), + + result.Responses["A"] = backend.DataResponse{ + Frames: frames, } - return result, nil + + return &result, nil } -func (e *GraphiteExecutor) parseResponse(res *http.Response) ([]TargetResponseDTO, error) { +func (s *Service) parseResponse(res *http.Response) ([]TargetResponseDTO, error) { body, err := ioutil.ReadAll(res.Body) if err != nil { return nil, err } defer func() { if err := res.Body.Close(); err != nil { - glog.Warn("Failed to close response body", "err", err) + s.logger.Warn("Failed to close response body", "err", err) } }() if res.StatusCode/100 != 2 { - glog.Info("Request failed", "status", res.Status, "body", string(body)) + s.logger.Info("Request failed", "status", res.Status, "body", string(body)) return nil, fmt.Errorf("request failed, status: %s", res.Status) } var data []TargetResponseDTO err = json.Unmarshal(body, &data) if err != nil { - glog.Info("Failed to unmarshal graphite response", "error", err, "status", res.Status, "body", string(body)) + s.logger.Info("Failed to unmarshal graphite response", "error", err, "status", res.Status, "body", string(body)) return nil, err } return data, nil } -func (e *GraphiteExecutor) toDataFrames(response *http.Response) (frames data.Frames, error error) { - responseData, err := e.parseResponse(response) +func (s *Service) toDataFrames(response *http.Response) (frames data.Frames, error error) { + responseData, err := s.parseResponse(response) if err != nil { return nil, err } @@ -192,14 +253,14 @@ func (e *GraphiteExecutor) toDataFrames(response *http.Response) (frames data.Fr data.NewField("value", series.Tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name}))) if setting.Env == setting.Dev { - glog.Debug("Graphite response", "target", series.Target, "datapoints", len(series.DataPoints)) + s.logger.Debug("Graphite response", "target", series.Target, "datapoints", len(series.DataPoints)) } } return } -func (e *GraphiteExecutor) createRequest(dsInfo *models.DataSource, data url.Values) (*http.Request, error) { - u, err := url.Parse(dsInfo.Url) +func (s *Service) createRequest(dsInfo *datasourceInfo, data url.Values) (*http.Request, error) { + u, err := url.Parse(dsInfo.URL) if err != nil { return nil, err } @@ -207,25 +268,14 @@ func (e *GraphiteExecutor) createRequest(dsInfo *models.DataSource, data url.Val req, err := http.NewRequest(http.MethodPost, u.String(), strings.NewReader(data.Encode())) if err != nil { - glog.Info("Failed to create request", "error", err) + s.logger.Info("Failed to create request", "error", err) return nil, fmt.Errorf("failed to create request: %w", err) } req.Header.Set("Content-Type", "application/x-www-form-urlencoded") - if dsInfo.BasicAuth { - req.SetBasicAuth(dsInfo.BasicAuthUser, dsInfo.DecryptedBasicAuthPassword()) - } - return req, err } -func formatTimeRange(input string) string { - if input == "now" { - return input - } - return strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(input, "now", ""), "m", "min"), "M", "mon") -} - func fixIntervalFormat(target string) string { rMinute := regexp.MustCompile(`'(\d+)m'`) target = rMinute.ReplaceAllStringFunc(target, func(m string) string { @@ -238,28 +288,8 @@ func fixIntervalFormat(target string) string { return target } -func isTimeRangeNumeric(tr plugins.DataTimeRange) bool { - if _, err := strconv.ParseInt(tr.From, 10, 64); err != nil { - return false - } - if _, err := strconv.ParseInt(tr.To, 10, 64); err != nil { - return false - } - return true -} - -func epochMStoGraphiteTime(tr plugins.DataTimeRange) (string, string, error) { - from, err := strconv.ParseInt(tr.From, 10, 64) - if err != nil { - return "", "", err - } - - to, err := strconv.ParseInt(tr.To, 10, 64) - if err != nil { - return "", "", err - } - - return fmt.Sprintf("%d", from/1000), fmt.Sprintf("%d", to/1000), nil +func epochMStoGraphiteTime(tr backend.TimeRange) (string, string) { + return fmt.Sprintf("%d", tr.From.UTC().Unix()), fmt.Sprintf("%d", tr.To.UTC().Unix()) } /** diff --git a/pkg/tsdb/graphite/graphite_test.go b/pkg/tsdb/graphite/graphite_test.go index ff60605de1e..b5d2a348d49 100644 --- a/pkg/tsdb/graphite/graphite_test.go +++ b/pkg/tsdb/graphite/graphite_test.go @@ -10,28 +10,11 @@ import ( "time" "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana/pkg/infra/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestFormatTimeRange(t *testing.T) { - testCases := []struct { - input string - expected string - }{ - {"now", "now"}, - {"now-1m", "-1min"}, - {"now-1M", "-1mon"}, - } - - for _, tc := range testCases { - t.Run(tc.input, func(t *testing.T) { - tr := formatTimeRange(tc.input) - assert.Equal(t, tc.expected, tr) - }) - } -} - func TestFixIntervalFormat(t *testing.T) { testCases := []struct { name string @@ -67,7 +50,7 @@ func TestFixIntervalFormat(t *testing.T) { }) } - executor := &GraphiteExecutor{} + service := &Service{logger: log.New("tsdb.graphite")} t.Run("Converts response to data frames", func(*testing.T) { body := ` @@ -90,7 +73,7 @@ func TestFixIntervalFormat(t *testing.T) { expectedFrames := data.Frames{expectedFrame} httpResponse := &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(body))} - dataFrames, err := executor.toDataFrames(httpResponse) + dataFrames, err := service.toDataFrames(httpResponse) require.NoError(t, err) if !reflect.DeepEqual(expectedFrames, dataFrames) { diff --git a/pkg/tsdb/service.go b/pkg/tsdb/service.go index d505d173857..b04b6a94667 100644 --- a/pkg/tsdb/service.go +++ b/pkg/tsdb/service.go @@ -14,7 +14,6 @@ import ( "github.com/grafana/grafana/pkg/tsdb/azuremonitor" "github.com/grafana/grafana/pkg/tsdb/cloudmonitoring" "github.com/grafana/grafana/pkg/tsdb/elasticsearch" - "github.com/grafana/grafana/pkg/tsdb/graphite" "github.com/grafana/grafana/pkg/tsdb/influxdb" "github.com/grafana/grafana/pkg/tsdb/loki" "github.com/grafana/grafana/pkg/tsdb/mssql" @@ -57,7 +56,6 @@ type Service struct { // Init initialises the service. func (s *Service) Init() error { - s.registry["graphite"] = graphite.New(s.HTTPClientProvider) s.registry["prometheus"] = prometheus.New(s.HTTPClientProvider) s.registry["influxdb"] = influxdb.New(s.HTTPClientProvider) s.registry["mssql"] = mssql.NewExecutor