Files
Arve Knudsen 78596a6756 Migrate to Wire for dependency injection (#32289)
Fixes #30144

Co-authored-by: dsotirakis <sotirakis.dim@gmail.com>
Co-authored-by: Marcus Efraimsson <marcus.efraimsson@gmail.com>
Co-authored-by: Ida Furjesova <ida.furjesova@grafana.com>
Co-authored-by: Jack Westbrook <jack.westbrook@gmail.com>
Co-authored-by: Will Browne <wbrowne@users.noreply.github.com>
Co-authored-by: Leon Sorokin <leeoniya@gmail.com>
Co-authored-by: Andrej Ocenas <mr.ocenas@gmail.com>
Co-authored-by: spinillos <selenepinillos@gmail.com>
Co-authored-by: Karl Persson <kalle.persson@grafana.com>
Co-authored-by: Leonard Gram <leo@xlson.com>
2021-08-25 15:11:22 +02:00

82 lines
2.4 KiB
Go

package flux
import (
"context"
"fmt"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
)
var (
glog = log.New("tsdb.influx_flux")
)
// Query builds flux queries, executes them, and returns the results.
func Query(ctx context.Context, dsInfo *models.DatasourceInfo, tsdbQuery backend.QueryDataRequest) (
*backend.QueryDataResponse, error) {
tRes := backend.NewQueryDataResponse()
glog.Debug("Received a query", "query", tsdbQuery)
r, err := runnerFromDataSource(dsInfo)
if err != nil {
return &backend.QueryDataResponse{}, err
}
defer r.client.Close()
timeRange := tsdbQuery.Queries[0].TimeRange
for _, query := range tsdbQuery.Queries {
qm, err := getQueryModel(query, timeRange, dsInfo)
if err != nil {
tRes.Responses[query.RefID] = backend.DataResponse{Error: err}
continue
}
// If the default changes also update labels/placeholder in config page.
maxSeries := dsInfo.MaxSeries
res := executeQuery(ctx, *qm, r, maxSeries)
tRes.Responses[query.RefID] = res
}
return tRes, nil
}
// runner is an influxdb2 Client with an attached org property and is used
// for running flux queries.
type runner struct {
client influxdb2.Client
org string
}
// This is an interface to help testing
type queryRunner interface {
runQuery(ctx context.Context, q string) (*api.QueryTableResult, error)
}
// runQuery executes fluxQuery against the Runner's organization and returns a Flux typed result.
func (r *runner) runQuery(ctx context.Context, fluxQuery string) (*api.QueryTableResult, error) {
qa := r.client.QueryAPI(r.org)
return qa.Query(ctx, fluxQuery)
}
// runnerFromDataSource creates a runner from the datasource model (the datasource instance's configuration).
func runnerFromDataSource(dsInfo *models.DatasourceInfo) (*runner, error) {
org := dsInfo.Organization
if org == "" {
return nil, fmt.Errorf("missing organization in datasource configuration")
}
url := dsInfo.URL
if url == "" {
return nil, fmt.Errorf("missing URL from datasource configuration")
}
opts := influxdb2.DefaultOptions()
opts.HTTPOptions().SetHTTPClient(dsInfo.HTTPClient)
return &runner{
client: influxdb2.NewClientWithOptions(url, dsInfo.Token, opts),
org: org,
}, nil
}