diff --git a/pkg/tsdb/influxdb/flux/builder.go b/pkg/tsdb/influxdb/flux/builder.go index 309c89a0bb3..43da09c58dd 100644 --- a/pkg/tsdb/influxdb/flux/builder.go +++ b/pkg/tsdb/influxdb/flux/builder.go @@ -254,6 +254,14 @@ func (e maxPointsExceededError) Error() string { return fmt.Sprintf("max data points limit exceeded (count is %d)", e.Count) } +type maxSeriesExceededError struct { + Count int +} + +func (e maxSeriesExceededError) Error() string { + return fmt.Sprintf("max series limit exceeded (count is %d)", e.Count) +} + func getColumnInfo(col *query.FluxColumn) (info *columnInfo, isTimestamp bool, err error) { dataType := col.DataType() isTimestamp = isTimestampType(dataType) @@ -316,7 +324,7 @@ func (fb *frameBuilder) Append(record *query.FluxRecord) error { if (fb.currentGroupKey == nil) || !isTableIDEqual(table, fb.currentGroupKey) { fb.totalSeries++ if fb.totalSeries > fb.maxSeries { - return fmt.Errorf("results are truncated, max series reached (%d)", fb.maxSeries) + return maxSeriesExceededError{Count: fb.totalSeries} } // labels have the same value for every row in the same "table", diff --git a/pkg/tsdb/influxdb/flux/executor.go b/pkg/tsdb/influxdb/flux/executor.go index acad55df636..7ce62d5d67c 100644 --- a/pkg/tsdb/influxdb/flux/executor.go +++ b/pkg/tsdb/influxdb/flux/executor.go @@ -48,6 +48,7 @@ func executeQuery(ctx context.Context, logger log.Logger, query queryModel, runn } dr.Error = fmt.Errorf(errMsg, maxPointError.Count, query.MaxDataPoints) + dr.ErrorSource = backend.ErrorSourceDownstream } } } @@ -96,6 +97,10 @@ func readDataFrames(logger log.Logger, result *api.QueryTableResult, maxPoints i err := builder.Append(result.Record()) if err != nil { dr.Error = err + var maxSeriesError maxSeriesExceededError + if errors.As(err, &maxSeriesError) { + dr.ErrorSource = backend.ErrorSourceDownstream + } break } } diff --git a/pkg/tsdb/influxdb/flux/executor_test.go b/pkg/tsdb/influxdb/flux/executor_test.go index 4e72c5dde7c..e4576929b30 100644 --- a/pkg/tsdb/influxdb/flux/executor_test.go +++ b/pkg/tsdb/influxdb/flux/executor_test.go @@ -61,8 +61,11 @@ func executeMockedQuery(t *testing.T, name string, query queryModel) *backend.Da runner := &MockRunner{ testDataPath: name + ".csv", } + if query.MaxSeries == 0 { + query.MaxSeries = 50 + } - dr := executeQuery(context.Background(), glog, query, runner, 50) + dr := executeQuery(context.Background(), glog, query, runner, query.MaxSeries) return &dr } @@ -243,6 +246,16 @@ func assertDataResponseDimensions(t *testing.T, dr *backend.DataResponse, rows i require.Equal(t, fields[1].Len(), columns) } +func TestMaxSeriesExceeded(t *testing.T) { + // unfortunately the golden-response style tests do not support + // responses that contain errors, so we can only do manual checks + // on the DataResponse + dr := executeMockedQuery(t, "multiple", queryModel{MaxSeries: 1, MaxDataPoints: 100}) + + require.ErrorAs(t, dr.Error, &maxSeriesExceededError{}) + require.Equal(t, backend.ErrorSourceDownstream, dr.ErrorSource) +} + func TestMaxDataPointsExceededNoAggregate(t *testing.T) { // unfortunately the golden-response style tests do not support // responses that contain errors, so we can only do manual checks @@ -251,6 +264,7 @@ func TestMaxDataPointsExceededNoAggregate(t *testing.T) { // it should contain the error-message require.EqualError(t, dr.Error, "A query returned too many datapoints and the results have been truncated at 21 points to prevent memory issues. At the current graph size, Grafana can only draw 2. Try using the aggregateWindow() function in your query to reduce the number of points returned.") + require.Equal(t, backend.ErrorSourceDownstream, dr.ErrorSource) assertDataResponseDimensions(t, dr, 2, 21) } @@ -262,6 +276,7 @@ func TestMaxDataPointsExceededWithAggregate(t *testing.T) { // it should contain the error-message require.EqualError(t, dr.Error, "A query returned too many datapoints and the results have been truncated at 21 points to prevent memory issues. At the current graph size, Grafana can only draw 2.") + require.Equal(t, backend.ErrorSourceDownstream, dr.ErrorSource) assertDataResponseDimensions(t, dr, 2, 21) } diff --git a/pkg/tsdb/influxdb/flux/query_models.go b/pkg/tsdb/influxdb/flux/query_models.go index 3a97efeffdf..b08128b93f6 100644 --- a/pkg/tsdb/influxdb/flux/query_models.go +++ b/pkg/tsdb/influxdb/flux/query_models.go @@ -25,6 +25,7 @@ type queryModel struct { // Not from JSON TimeRange backend.TimeRange `json:"-"` MaxDataPoints int64 `json:"-"` + MaxSeries int `json:"-"` Interval time.Duration `json:"-"` } diff --git a/pkg/tsdb/influxdb/fsql/fsql.go b/pkg/tsdb/influxdb/fsql/fsql.go index 52ed6614d04..e7f2b49e7d5 100644 --- a/pkg/tsdb/influxdb/fsql/fsql.go +++ b/pkg/tsdb/influxdb/fsql/fsql.go @@ -44,7 +44,7 @@ func Query(ctx context.Context, dsInfo *models.DatasourceInfo, req backend.Query for _, q := range req.Queries { qm, err := getQueryModel(q) if err != nil { - tRes.Responses[q.RefID] = backend.ErrDataResponse(backend.StatusInternal, "bad request") + tRes.Responses[q.RefID] = backend.ErrDataResponseWithSource(backend.StatusValidationFailed, backend.ErrorSourceDownstream, "bad request") continue } diff --git a/pkg/tsdb/influxdb/fsql/fsql_test.go b/pkg/tsdb/influxdb/fsql/fsql_test.go index e0c38b0cfa4..843816d2c30 100644 --- a/pkg/tsdb/influxdb/fsql/fsql_test.go +++ b/pkg/tsdb/influxdb/fsql/fsql_test.go @@ -127,3 +127,27 @@ func freeport(t *testing.T) (addr string, err error) { a := l.Addr().(*net.TCPAddr) return a.String(), nil } + +func TestInvalidSchema(t *testing.T) { + resp, _ := Query( + context.Background(), + &models.DatasourceInfo{ + HTTPClient: nil, + Token: "secret", + URL: "http://127.0.0.1:1234", + DbName: "influxdb", + Version: "test", + HTTPMode: "proxy", + InsecureGrpc: true, + }, + backend.QueryDataRequest{ + Queries: []backend.DataQuery{ + { + RefID: "A", + JSON: []byte(`this is not valid JSON`), + }, + }, + }, + ) + require.Equal(t, backend.ErrorSourceDownstream, resp.Responses["A"].ErrorSource) +} diff --git a/pkg/tsdb/influxdb/fsql/query_model.go b/pkg/tsdb/influxdb/fsql/query_model.go index 46128708102..328e06951b3 100644 --- a/pkg/tsdb/influxdb/fsql/query_model.go +++ b/pkg/tsdb/influxdb/fsql/query_model.go @@ -48,7 +48,8 @@ func getQueryModel(dataQuery backend.DataQuery) (*queryModel, error) { Format: format, } - // Process macros and execute the query. + // Process macros and generate raw fsql to be sent to + // influxdb backend for execution. sql, err := sqlutil.Interpolate(query, macros) if err != nil { return nil, fmt.Errorf("macro interpolation: %w", err) diff --git a/pkg/tsdb/influxdb/influxql/buffered/response_parser.go b/pkg/tsdb/influxdb/influxql/buffered/response_parser.go index 3d63de7cc61..b2fd376ebbc 100644 --- a/pkg/tsdb/influxdb/influxql/buffered/response_parser.go +++ b/pkg/tsdb/influxdb/influxql/buffered/response_parser.go @@ -30,7 +30,10 @@ func parse(buf io.Reader, statusCode int, query *models.Query) *backend.DataResp if errorStr == "" { errorStr = response.Message } - return &backend.DataResponse{Error: fmt.Errorf("InfluxDB returned error: %s", errorStr)} + return &backend.DataResponse{ + Error: fmt.Errorf("InfluxDB returned error: %s", errorStr), + ErrorSource: backend.ErrorSourceFromHTTPStatus(statusCode), + } } if jsonErr != nil { @@ -38,12 +41,18 @@ func parse(buf io.Reader, statusCode int, query *models.Query) *backend.DataResp } if response.Error != "" { - return &backend.DataResponse{Error: errors.New(response.Error)} + return &backend.DataResponse{ + Error: errors.New(response.Error), + ErrorSource: backend.ErrorSourceDownstream, + } } result := response.Results[0] if result.Error != "" { - return &backend.DataResponse{Error: errors.New(result.Error)} + return &backend.DataResponse{ + Error: errors.New(result.Error), + ErrorSource: backend.ErrorSourceDownstream, + } } if query.ResultFormat == "table" { diff --git a/pkg/tsdb/influxdb/influxql/buffered/response_parser_test.go b/pkg/tsdb/influxdb/influxql/buffered/response_parser_test.go index e2039c390b0..9ae187b7ebe 100644 --- a/pkg/tsdb/influxdb/influxql/buffered/response_parser_test.go +++ b/pkg/tsdb/influxdb/influxql/buffered/response_parser_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/experimental" "github.com/influxdata/influxql" @@ -351,12 +352,14 @@ func TestInfluxdbResponseParser(t *testing.T) { result := ResponseParse(readJsonFile("error_on_top_level_response"), 400, generateQuery("Test raw query", "time_series", "")) require.Nil(t, result.Frames) require.EqualError(t, result.Error, "InfluxDB returned error: error parsing query: found THING") + require.Equal(t, backend.ErrorSourceDownstream, result.ErrorSource) }) t.Run("Influxdb response parser with error message", func(t *testing.T) { result := ResponseParse(readJsonFile("invalid_response"), 400, generateQuery("Test raw query", "time_series", "")) require.Nil(t, result.Frames) require.EqualError(t, result.Error, "InfluxDB returned error: failed to parse query: found WERE, expected ; at line 1, char 38") + require.Equal(t, backend.ErrorSourceDownstream, result.ErrorSource) }) t.Run("Influxdb response parser parseNumber nil", func(t *testing.T) { diff --git a/pkg/tsdb/influxdb/influxql/influxql.go b/pkg/tsdb/influxdb/influxql/influxql.go index b4d03ce6064..912a7abe2f4 100644 --- a/pkg/tsdb/influxdb/influxql/influxql.go +++ b/pkg/tsdb/influxdb/influxql/influxql.go @@ -175,7 +175,10 @@ func createRequest(ctx context.Context, logger log.Logger, dsInfo *models.Dataso func execute(ctx context.Context, tracer trace.Tracer, dsInfo *models.DatasourceInfo, logger log.Logger, query *models.Query, request *http.Request, isStreamingParserEnabled bool) (backend.DataResponse, error) { res, err := dsInfo.HTTPClient.Do(request) if err != nil { - return backend.DataResponse{}, err + return backend.DataResponse{ + Error: err, + ErrorSource: backend.ErrorSourceFromHTTPStatus(res.StatusCode), + }, err } defer func() { if err := res.Body.Close(); err != nil { diff --git a/pkg/tsdb/influxdb/influxql/querydata/stream_parser.go b/pkg/tsdb/influxdb/influxql/querydata/stream_parser.go index dde413c8fa4..806bc81ccd9 100644 --- a/pkg/tsdb/influxdb/influxql/querydata/stream_parser.go +++ b/pkg/tsdb/influxdb/influxql/querydata/stream_parser.go @@ -24,7 +24,10 @@ func ResponseParse(buf io.ReadCloser, statusCode int, query *models.Query) *back r := converter.ReadInfluxQLStyleResult(iter, query) if statusCode/100 != 2 { - return &backend.DataResponse{Error: fmt.Errorf("InfluxDB returned error: %s", r.Error)} + return &backend.DataResponse{ + Error: fmt.Errorf("InfluxDB returned error: %s", r.Error), + ErrorSource: backend.ErrorSourceFromHTTPStatus(statusCode), + } } // The ExecutedQueryString can be viewed in QueryInspector in UI