diff --git a/pkg/models/datasource_cache.go b/pkg/models/datasource_cache.go index c67c8f1e8af..a7421c4fbb3 100644 --- a/pkg/models/datasource_cache.go +++ b/pkg/models/datasource_cache.go @@ -45,7 +45,6 @@ var ptc = proxyTransportCache{ func (ds *DataSource) GetHttpClient() (*http.Client, error) { transport, err := ds.GetHttpTransport() - if err != nil { return nil, err } diff --git a/pkg/tsdb/influxdb/flux/builder.go b/pkg/tsdb/influxdb/flux/builder.go index d86a180df3a..8222c2e5ed5 100644 --- a/pkg/tsdb/influxdb/flux/builder.go +++ b/pkg/tsdb/influxdb/flux/builder.go @@ -69,7 +69,7 @@ func getConverter(t string) (*data.FieldConverter, error) { return &AnyToOptionalString, nil } - return nil, fmt.Errorf("No matching converter found for [%v]", t) + return nil, fmt.Errorf("no matching converter found for [%v]", t) } // Init initializes the frame to be returned @@ -94,6 +94,7 @@ func (fb *FrameBuilder) Init(metadata *query.FluxTableMetadata) error { if err != nil { return err } + fb.value = converter fb.isTimeSeries = true case isTag(col.Name()): @@ -106,6 +107,7 @@ func (fb *FrameBuilder) Init(metadata *query.FluxTableMetadata) error { if col == nil { return fmt.Errorf("no time column in timeSeries") } + fb.timeColumn = col.Name() fb.timeDisplay = "Time" if "_time" != fb.timeColumn { @@ -118,6 +120,7 @@ func (fb *FrameBuilder) Init(metadata *query.FluxTableMetadata) error { if err != nil { return err } + fb.columns = append(fb.columns, columnInfo{ name: col.Name(), converter: converter, @@ -201,13 +204,14 @@ func (fb *FrameBuilder) Append(record *query.FluxRecord) error { if fb.isTimeSeries { time, ok := record.ValueByKey(fb.timeColumn).(time.Time) if !ok { - return fmt.Errorf("unable to get time colum: %s", fb.timeColumn) + return fmt.Errorf("unable to get time colum: %q", fb.timeColumn) } val, err := fb.value.Converter(record.Value()) if err != nil { return err } + fb.active.Fields[0].Append(time) fb.active.Fields[1].Append(val) } else { @@ -217,6 +221,7 @@ func (fb *FrameBuilder) Append(record *query.FluxRecord) error { if err != nil { return err } + fb.active.Fields[idx].Append(val) } } diff --git a/pkg/tsdb/influxdb/flux/executor.go b/pkg/tsdb/influxdb/flux/executor.go index 955a83702fc..fe7c5054146 100644 --- a/pkg/tsdb/influxdb/flux/executor.go +++ b/pkg/tsdb/influxdb/flux/executor.go @@ -9,9 +9,9 @@ import ( "github.com/influxdata/influxdb-client-go/api" ) -// ExecuteQuery runs a flux query using the QueryModel to interpolate the query and the runner to execute it. +// executeQuery runs a flux query using the QueryModel to interpolate the query and the runner to execute it. // maxSeries somehow limits the response. -func ExecuteQuery(ctx context.Context, query QueryModel, runner queryRunner, maxSeries int) (dr backend.DataResponse) { +func executeQuery(ctx context.Context, query QueryModel, runner queryRunner, maxSeries int) (dr backend.DataResponse) { dr = backend.DataResponse{} flux, err := Interpolate(query) @@ -20,10 +20,11 @@ func ExecuteQuery(ctx context.Context, query QueryModel, runner queryRunner, max return } - glog.Debug("Flux", "interpolated query", flux) + glog.Debug("Executing Flux query", "interpolated query", flux) tables, err := runner.runQuery(ctx, flux) if err != nil { + glog.Warn("Flux query failed", "err", err, "query", flux) dr.Error = err metaFrame := data.NewFrame("meta for error") metaFrame.Meta = &data.FrameMeta{ @@ -32,6 +33,7 @@ func ExecuteQuery(ctx context.Context, query QueryModel, runner queryRunner, max dr.Frames = append(dr.Frames, metaFrame) return } + defer tables.Close() dr = readDataFrames(tables, int(float64(query.MaxDataPoints)*1.5), maxSeries) @@ -46,6 +48,7 @@ func ExecuteQuery(ctx context.Context, query QueryModel, runner queryRunner, max } func readDataFrames(result *api.QueryTableResult, maxPoints int, maxSeries int) (dr backend.DataResponse) { + glog.Debug("Reading data frames from query result", "maxPoints", maxPoints, "maxSeries", maxSeries) dr = backend.DataResponse{} builder := &FrameBuilder{ @@ -69,7 +72,7 @@ func readDataFrames(result *api.QueryTableResult, maxPoints int, maxSeries int) } if builder.frames == nil { - dr.Error = fmt.Errorf("Invalid state") + dr.Error = fmt.Errorf("invalid state") return dr } diff --git a/pkg/tsdb/influxdb/flux/executor_test.go b/pkg/tsdb/influxdb/flux/executor_test.go index 579be99cc68..6e51378b584 100644 --- a/pkg/tsdb/influxdb/flux/executor_test.go +++ b/pkg/tsdb/influxdb/flux/executor_test.go @@ -55,7 +55,7 @@ func verifyGoldenResponse(name string) (*backend.DataResponse, error) { testDataPath: name + ".csv", } - dr := ExecuteQuery(context.Background(), QueryModel{MaxDataPoints: 100}, runner, 50) + dr := executeQuery(context.Background(), QueryModel{MaxDataPoints: 100}, runner, 50) err := experimental.CheckGoldenDataResponse("./testdata/"+name+".golden.txt", &dr, true) return &dr, err } diff --git a/pkg/tsdb/influxdb/flux/flux.go b/pkg/tsdb/influxdb/flux/flux.go index 753c58dbca3..f04c85206a7 100644 --- a/pkg/tsdb/influxdb/flux/flux.go +++ b/pkg/tsdb/influxdb/flux/flux.go @@ -22,6 +22,7 @@ func init() { // Query builds flux queries, executes them, and returns the results. func Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) { + glog.Debug("Received a query", "query", *tsdbQuery) tRes := &tsdb.Response{ Results: make(map[string]*tsdb.QueryResult), } @@ -38,7 +39,7 @@ func Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQ continue } - res := ExecuteQuery(context.Background(), *qm, runner, 50) + res := executeQuery(context.Background(), *qm, runner, 50) tRes.Results[query.RefId] = backendDataResponseToTSDBResponse(&res, query.RefId) } @@ -57,9 +58,10 @@ type queryRunner interface { runQuery(ctx context.Context, q string) (*api.QueryTableResult, error) } -// runQuery executes fluxQuery against the Runner's organization and returns an flux typed result. +// runQuery executes fluxQuery against the Runner's organization and returns a Flux typed result. func (r *Runner) runQuery(ctx context.Context, fluxQuery string) (*api.QueryTableResult, error) { - return r.client.QueryApi(r.org).Query(ctx, fluxQuery) + qa := r.client.QueryApi(r.org) + return qa.Query(ctx, fluxQuery) } // RunnerFromDataSource creates a runner from the datasource model (the datasource instance's configuration). @@ -71,7 +73,7 @@ func RunnerFromDataSource(dsInfo *models.DataSource) (*Runner, error) { url := dsInfo.Url if url == "" { - return nil, fmt.Errorf("missing url from datasource configuration") + return nil, fmt.Errorf("missing URL from datasource configuration") } token, found := dsInfo.SecureJsonData.DecryptedValue("token") if !found { diff --git a/pkg/tsdb/influxdb/flux/query_models.go b/pkg/tsdb/influxdb/flux/query_models.go index be93fac085c..c36666316ec 100644 --- a/pkg/tsdb/influxdb/flux/query_models.go +++ b/pkg/tsdb/influxdb/flux/query_models.go @@ -54,9 +54,8 @@ func GetQueryModelTSDB(query *tsdb.Query, timeRange *tsdb.TimeRange, dsInfo *mod return nil, fmt.Errorf("failed to re-encode the flux query into JSON: %w", err) } - err = json.Unmarshal(queryBytes, &model) - if err != nil { - return nil, fmt.Errorf("error reading query: %s", err.Error()) + if err := json.Unmarshal(queryBytes, &model); err != nil { + return nil, fmt.Errorf("error reading query: %w", err) } if model.Options.DefaultBucket == "" { model.Options.DefaultBucket = dsInfo.JsonData.Get("defaultBucket").MustString("") diff --git a/pkg/tsdb/influxdb/influxdb.go b/pkg/tsdb/influxdb/influxdb.go index c0cb6a36434..bacc0d656ba 100644 --- a/pkg/tsdb/influxdb/influxdb.go +++ b/pkg/tsdb/influxdb/influxdb.go @@ -15,7 +15,6 @@ import ( "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/tsdb" "github.com/grafana/grafana/pkg/tsdb/influxdb/flux" - "golang.org/x/net/context/ctxhttp" ) type InfluxDBExecutor struct { @@ -51,6 +50,8 @@ func (e *InfluxDBExecutor) Query(ctx context.Context, dsInfo *models.DataSource, return flux.Query(ctx, dsInfo, tsdbQuery) } + glog.Debug("Making a non-Flux type query") + // NOTE: the following path is currently only called from alerting queries // In dashboards, the request runs through proxy and are managed in the frontend @@ -68,7 +69,7 @@ func (e *InfluxDBExecutor) Query(ctx context.Context, dsInfo *models.DataSource, glog.Debug("Influxdb query", "raw query", rawQuery) } - req, err := e.createRequest(dsInfo, rawQuery) + req, err := e.createRequest(ctx, dsInfo, rawQuery) if err != nil { return nil, err } @@ -78,7 +79,7 @@ func (e *InfluxDBExecutor) Query(ctx context.Context, dsInfo *models.DataSource, return nil, err } - resp, err := ctxhttp.Do(ctx, httpClient, req) + resp, err := httpClient.Do(req) if err != nil { return nil, err } @@ -91,12 +92,9 @@ func (e *InfluxDBExecutor) Query(ctx context.Context, dsInfo *models.DataSource, var response Response dec := json.NewDecoder(resp.Body) dec.UseNumber() - err = dec.Decode(&response) - - if err != nil { + if err := dec.Decode(&response); err != nil { return nil, err } - if response.Err != nil { return nil, response.Err } @@ -109,42 +107,45 @@ func (e *InfluxDBExecutor) Query(ctx context.Context, dsInfo *models.DataSource, } func (e *InfluxDBExecutor) getQuery(dsInfo *models.DataSource, queries []*tsdb.Query, context *tsdb.TsdbQuery) (*Query, error) { + if len(queries) == 0 { + return nil, fmt.Errorf("query request contains no queries") + } + // The model supports multiple queries, but right now this is only used from // alerting so we only needed to support batch executing 1 query at a time. - if len(queries) > 0 { - query, err := e.QueryParser.Parse(queries[0].Model, dsInfo) - if err != nil { - return nil, err - } - return query, nil + query, err := e.QueryParser.Parse(queries[0].Model, dsInfo) + if err != nil { + return nil, err } - return nil, fmt.Errorf("query request contains no queries") + return query, nil } -func (e *InfluxDBExecutor) createRequest(dsInfo *models.DataSource, query string) (*http.Request, error) { +func (e *InfluxDBExecutor) createRequest(ctx context.Context, dsInfo *models.DataSource, query string) (*http.Request, error) { u, err := url.Parse(dsInfo.Url) if err != nil { return nil, err } + u.Path = path.Join(u.Path, "query") httpMode := dsInfo.JsonData.Get("httpMode").MustString("GET") - req, err := func() (*http.Request, error) { - switch httpMode { - case "GET": - return http.NewRequest(http.MethodGet, u.String(), nil) - case "POST": - bodyValues := url.Values{} - bodyValues.Add("q", query) - body := bodyValues.Encode() - return http.NewRequest(http.MethodPost, u.String(), strings.NewReader(body)) - default: - return nil, ErrInvalidHttpMode + var req *http.Request + switch httpMode { + case "GET": + req, err = http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) + if err != nil { + return nil, err } - }() - - if err != nil { - return nil, err + case "POST": + bodyValues := url.Values{} + bodyValues.Add("q", query) + body := bodyValues.Encode() + req, err = http.NewRequestWithContext(ctx, http.MethodPost, u.String(), strings.NewReader(body)) + if err != nil { + return nil, err + } + default: + return nil, ErrInvalidHttpMode } req.Header.Set("User-Agent", "Grafana") diff --git a/pkg/tsdb/influxdb/influxdb_test.go b/pkg/tsdb/influxdb/influxdb_test.go index 4c2456dc412..8e42756858f 100644 --- a/pkg/tsdb/influxdb/influxdb_test.go +++ b/pkg/tsdb/influxdb/influxdb_test.go @@ -1,6 +1,7 @@ package influxdb import ( + "context" "io/ioutil" "net/url" "testing" @@ -23,7 +24,8 @@ func TestInfluxDB(t *testing.T) { ResponseParser: &ResponseParser{}, } Convey("createRequest with GET httpMode", func() { - req, _ := e.createRequest(datasource, query) + req, err := e.createRequest(context.Background(), datasource, query) + So(err, ShouldBeNil) Convey("as default", func() { So(req.Method, ShouldEqual, "GET") @@ -41,7 +43,8 @@ func TestInfluxDB(t *testing.T) { Convey("createRequest with POST httpMode", func() { datasource.JsonData.Set("httpMode", "POST") - req, _ := e.createRequest(datasource, query) + req, err := e.createRequest(context.Background(), datasource, query) + So(err, ShouldBeNil) Convey("method should be POST", func() { So(req.Method, ShouldEqual, "POST") @@ -63,7 +66,7 @@ func TestInfluxDB(t *testing.T) { Convey("createRequest with PUT httpMode", func() { datasource.JsonData.Set("httpMode", "PUT") - _, err := e.createRequest(datasource, query) + _, err := e.createRequest(context.Background(), datasource, query) Convey("should miserably fail", func() { So(err, ShouldEqual, ErrInvalidHttpMode) diff --git a/pkg/tsdb/influxdb/model_parser.go b/pkg/tsdb/influxdb/model_parser.go index 443c08b1250..7d728e8dea9 100644 --- a/pkg/tsdb/influxdb/model_parser.go +++ b/pkg/tsdb/influxdb/model_parser.go @@ -151,14 +151,13 @@ func (*InfluxdbQueryParser) parseQueryPart(model *simplejson.Json) (*QueryPart, func (qp *InfluxdbQueryParser) parseGroupBy(model *simplejson.Json) ([]*QueryPart, error) { var result []*QueryPart - for _, groupObj := range model.Get("groupBy").MustArray() { groupJson := simplejson.NewFromAny(groupObj) queryPart, err := qp.parseQueryPart(groupJson) - if err != nil { return nil, err } + result = append(result, queryPart) } diff --git a/pkg/tsdb/influxdb/query.go b/pkg/tsdb/influxdb/query.go index 396414a0da1..a3ec81aec55 100644 --- a/pkg/tsdb/influxdb/query.go +++ b/pkg/tsdb/influxdb/query.go @@ -16,7 +16,6 @@ var ( func (query *Query) Build(queryContext *tsdb.TsdbQuery) (string, error) { var res string - if query.UseRawQuery && query.RawQuery != "" { res = query.RawQuery } else { diff --git a/pkg/tsdb/influxdb/query_part.go b/pkg/tsdb/influxdb/query_part.go index e81da28fd4d..00d408a071f 100644 --- a/pkg/tsdb/influxdb/query_part.go +++ b/pkg/tsdb/influxdb/query_part.go @@ -134,9 +134,8 @@ func (r QueryDefinition) Render(query *Query, queryContext *tsdb.TsdbQuery, part func NewQueryPart(typ string, params []string) (*QueryPart, error) { def, exist := renders[typ] - if !exist { - return nil, fmt.Errorf("Missing query definition for %s", typ) + return nil, fmt.Errorf("missing query definition for %q", typ) } return &QueryPart{ diff --git a/pkg/tsdb/influxdb/response_parser.go b/pkg/tsdb/influxdb/response_parser.go index f79e7dd32bd..fd502667c61 100644 --- a/pkg/tsdb/influxdb/response_parser.go +++ b/pkg/tsdb/influxdb/response_parser.go @@ -36,7 +36,6 @@ func (rp *ResponseParser) Parse(response *Response, query *Query) *tsdb.QueryRes func (rp *ResponseParser) transformRows(rows []Row, queryResult *tsdb.QueryResult, query *Query) tsdb.TimeSeriesSlice { var result tsdb.TimeSeriesSlice - for _, row := range rows { for columnIndex, column := range row.Columns { if column == "time" { @@ -104,7 +103,6 @@ func (rp *ResponseParser) formatSeriesName(row Row, column string, query *Query) func (rp *ResponseParser) buildSeriesNameFromQuery(row Row, column string) string { var tags []string - for k, v := range row.Tags { tags = append(tags, fmt.Sprintf("%s: %s", k, v)) } @@ -118,9 +116,12 @@ func (rp *ResponseParser) buildSeriesNameFromQuery(row Row, column string) strin } func (rp *ResponseParser) parseTimepoint(valuePair []interface{}, valuePosition int) (tsdb.TimePoint, error) { - var value null.Float = rp.parseValue(valuePair[valuePosition]) + value := rp.parseValue(valuePair[valuePosition]) - timestampNumber, _ := valuePair[0].(json.Number) + timestampNumber, ok := valuePair[0].(json.Number) + if !ok { + return tsdb.TimePoint{}, fmt.Errorf("valuePair[0] has invalid type: %#v", valuePair[0]) + } timestamp, err := timestampNumber.Float64() if err != nil { return tsdb.TimePoint{}, err