mirror of
https://github.com/grafana/grafana.git
synced 2025-07-30 04:12:09 +08:00
This PR adds `backend.ErrorSourceDownstream` values to all `backend.DataResponse` values where it's certain that the error wasn't the result of the InfluxDB datasource plugin.
This commit is contained in:
@ -254,6 +254,14 @@ func (e maxPointsExceededError) Error() string {
|
||||
return fmt.Sprintf("max data points limit exceeded (count is %d)", e.Count)
|
||||
}
|
||||
|
||||
type maxSeriesExceededError struct {
|
||||
Count int
|
||||
}
|
||||
|
||||
func (e maxSeriesExceededError) Error() string {
|
||||
return fmt.Sprintf("max series limit exceeded (count is %d)", e.Count)
|
||||
}
|
||||
|
||||
func getColumnInfo(col *query.FluxColumn) (info *columnInfo, isTimestamp bool, err error) {
|
||||
dataType := col.DataType()
|
||||
isTimestamp = isTimestampType(dataType)
|
||||
@ -316,7 +324,7 @@ func (fb *frameBuilder) Append(record *query.FluxRecord) error {
|
||||
if (fb.currentGroupKey == nil) || !isTableIDEqual(table, fb.currentGroupKey) {
|
||||
fb.totalSeries++
|
||||
if fb.totalSeries > fb.maxSeries {
|
||||
return fmt.Errorf("results are truncated, max series reached (%d)", fb.maxSeries)
|
||||
return maxSeriesExceededError{Count: fb.totalSeries}
|
||||
}
|
||||
|
||||
// labels have the same value for every row in the same "table",
|
||||
|
@ -48,6 +48,7 @@ func executeQuery(ctx context.Context, logger log.Logger, query queryModel, runn
|
||||
}
|
||||
|
||||
dr.Error = fmt.Errorf(errMsg, maxPointError.Count, query.MaxDataPoints)
|
||||
dr.ErrorSource = backend.ErrorSourceDownstream
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -96,6 +97,10 @@ func readDataFrames(logger log.Logger, result *api.QueryTableResult, maxPoints i
|
||||
err := builder.Append(result.Record())
|
||||
if err != nil {
|
||||
dr.Error = err
|
||||
var maxSeriesError maxSeriesExceededError
|
||||
if errors.As(err, &maxSeriesError) {
|
||||
dr.ErrorSource = backend.ErrorSourceDownstream
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -61,8 +61,11 @@ func executeMockedQuery(t *testing.T, name string, query queryModel) *backend.Da
|
||||
runner := &MockRunner{
|
||||
testDataPath: name + ".csv",
|
||||
}
|
||||
if query.MaxSeries == 0 {
|
||||
query.MaxSeries = 50
|
||||
}
|
||||
|
||||
dr := executeQuery(context.Background(), glog, query, runner, 50)
|
||||
dr := executeQuery(context.Background(), glog, query, runner, query.MaxSeries)
|
||||
return &dr
|
||||
}
|
||||
|
||||
@ -243,6 +246,16 @@ func assertDataResponseDimensions(t *testing.T, dr *backend.DataResponse, rows i
|
||||
require.Equal(t, fields[1].Len(), columns)
|
||||
}
|
||||
|
||||
func TestMaxSeriesExceeded(t *testing.T) {
|
||||
// unfortunately the golden-response style tests do not support
|
||||
// responses that contain errors, so we can only do manual checks
|
||||
// on the DataResponse
|
||||
dr := executeMockedQuery(t, "multiple", queryModel{MaxSeries: 1, MaxDataPoints: 100})
|
||||
|
||||
require.ErrorAs(t, dr.Error, &maxSeriesExceededError{})
|
||||
require.Equal(t, backend.ErrorSourceDownstream, dr.ErrorSource)
|
||||
}
|
||||
|
||||
func TestMaxDataPointsExceededNoAggregate(t *testing.T) {
|
||||
// unfortunately the golden-response style tests do not support
|
||||
// responses that contain errors, so we can only do manual checks
|
||||
@ -251,6 +264,7 @@ func TestMaxDataPointsExceededNoAggregate(t *testing.T) {
|
||||
|
||||
// it should contain the error-message
|
||||
require.EqualError(t, dr.Error, "A query returned too many datapoints and the results have been truncated at 21 points to prevent memory issues. At the current graph size, Grafana can only draw 2. Try using the aggregateWindow() function in your query to reduce the number of points returned.")
|
||||
require.Equal(t, backend.ErrorSourceDownstream, dr.ErrorSource)
|
||||
assertDataResponseDimensions(t, dr, 2, 21)
|
||||
}
|
||||
|
||||
@ -262,6 +276,7 @@ func TestMaxDataPointsExceededWithAggregate(t *testing.T) {
|
||||
|
||||
// it should contain the error-message
|
||||
require.EqualError(t, dr.Error, "A query returned too many datapoints and the results have been truncated at 21 points to prevent memory issues. At the current graph size, Grafana can only draw 2.")
|
||||
require.Equal(t, backend.ErrorSourceDownstream, dr.ErrorSource)
|
||||
assertDataResponseDimensions(t, dr, 2, 21)
|
||||
}
|
||||
|
||||
|
@ -25,6 +25,7 @@ type queryModel struct {
|
||||
// Not from JSON
|
||||
TimeRange backend.TimeRange `json:"-"`
|
||||
MaxDataPoints int64 `json:"-"`
|
||||
MaxSeries int `json:"-"`
|
||||
Interval time.Duration `json:"-"`
|
||||
}
|
||||
|
||||
|
@ -44,7 +44,7 @@ func Query(ctx context.Context, dsInfo *models.DatasourceInfo, req backend.Query
|
||||
for _, q := range req.Queries {
|
||||
qm, err := getQueryModel(q)
|
||||
if err != nil {
|
||||
tRes.Responses[q.RefID] = backend.ErrDataResponse(backend.StatusInternal, "bad request")
|
||||
tRes.Responses[q.RefID] = backend.ErrDataResponseWithSource(backend.StatusValidationFailed, backend.ErrorSourceDownstream, "bad request")
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -127,3 +127,27 @@ func freeport(t *testing.T) (addr string, err error) {
|
||||
a := l.Addr().(*net.TCPAddr)
|
||||
return a.String(), nil
|
||||
}
|
||||
|
||||
func TestInvalidSchema(t *testing.T) {
|
||||
resp, _ := Query(
|
||||
context.Background(),
|
||||
&models.DatasourceInfo{
|
||||
HTTPClient: nil,
|
||||
Token: "secret",
|
||||
URL: "http://127.0.0.1:1234",
|
||||
DbName: "influxdb",
|
||||
Version: "test",
|
||||
HTTPMode: "proxy",
|
||||
InsecureGrpc: true,
|
||||
},
|
||||
backend.QueryDataRequest{
|
||||
Queries: []backend.DataQuery{
|
||||
{
|
||||
RefID: "A",
|
||||
JSON: []byte(`this is not valid JSON`),
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
require.Equal(t, backend.ErrorSourceDownstream, resp.Responses["A"].ErrorSource)
|
||||
}
|
||||
|
@ -48,7 +48,8 @@ func getQueryModel(dataQuery backend.DataQuery) (*queryModel, error) {
|
||||
Format: format,
|
||||
}
|
||||
|
||||
// Process macros and execute the query.
|
||||
// Process macros and generate raw fsql to be sent to
|
||||
// influxdb backend for execution.
|
||||
sql, err := sqlutil.Interpolate(query, macros)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("macro interpolation: %w", err)
|
||||
|
@ -30,7 +30,10 @@ func parse(buf io.Reader, statusCode int, query *models.Query) *backend.DataResp
|
||||
if errorStr == "" {
|
||||
errorStr = response.Message
|
||||
}
|
||||
return &backend.DataResponse{Error: fmt.Errorf("InfluxDB returned error: %s", errorStr)}
|
||||
return &backend.DataResponse{
|
||||
Error: fmt.Errorf("InfluxDB returned error: %s", errorStr),
|
||||
ErrorSource: backend.ErrorSourceFromHTTPStatus(statusCode),
|
||||
}
|
||||
}
|
||||
|
||||
if jsonErr != nil {
|
||||
@ -38,12 +41,18 @@ func parse(buf io.Reader, statusCode int, query *models.Query) *backend.DataResp
|
||||
}
|
||||
|
||||
if response.Error != "" {
|
||||
return &backend.DataResponse{Error: errors.New(response.Error)}
|
||||
return &backend.DataResponse{
|
||||
Error: errors.New(response.Error),
|
||||
ErrorSource: backend.ErrorSourceDownstream,
|
||||
}
|
||||
}
|
||||
|
||||
result := response.Results[0]
|
||||
if result.Error != "" {
|
||||
return &backend.DataResponse{Error: errors.New(result.Error)}
|
||||
return &backend.DataResponse{
|
||||
Error: errors.New(result.Error),
|
||||
ErrorSource: backend.ErrorSourceDownstream,
|
||||
}
|
||||
}
|
||||
|
||||
if query.ResultFormat == "table" {
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/experimental"
|
||||
"github.com/influxdata/influxql"
|
||||
@ -351,12 +352,14 @@ func TestInfluxdbResponseParser(t *testing.T) {
|
||||
result := ResponseParse(readJsonFile("error_on_top_level_response"), 400, generateQuery("Test raw query", "time_series", ""))
|
||||
require.Nil(t, result.Frames)
|
||||
require.EqualError(t, result.Error, "InfluxDB returned error: error parsing query: found THING")
|
||||
require.Equal(t, backend.ErrorSourceDownstream, result.ErrorSource)
|
||||
})
|
||||
|
||||
t.Run("Influxdb response parser with error message", func(t *testing.T) {
|
||||
result := ResponseParse(readJsonFile("invalid_response"), 400, generateQuery("Test raw query", "time_series", ""))
|
||||
require.Nil(t, result.Frames)
|
||||
require.EqualError(t, result.Error, "InfluxDB returned error: failed to parse query: found WERE, expected ; at line 1, char 38")
|
||||
require.Equal(t, backend.ErrorSourceDownstream, result.ErrorSource)
|
||||
})
|
||||
|
||||
t.Run("Influxdb response parser parseNumber nil", func(t *testing.T) {
|
||||
|
@ -175,7 +175,10 @@ func createRequest(ctx context.Context, logger log.Logger, dsInfo *models.Dataso
|
||||
func execute(ctx context.Context, tracer trace.Tracer, dsInfo *models.DatasourceInfo, logger log.Logger, query *models.Query, request *http.Request, isStreamingParserEnabled bool) (backend.DataResponse, error) {
|
||||
res, err := dsInfo.HTTPClient.Do(request)
|
||||
if err != nil {
|
||||
return backend.DataResponse{}, err
|
||||
return backend.DataResponse{
|
||||
Error: err,
|
||||
ErrorSource: backend.ErrorSourceFromHTTPStatus(res.StatusCode),
|
||||
}, err
|
||||
}
|
||||
defer func() {
|
||||
if err := res.Body.Close(); err != nil {
|
||||
|
@ -24,7 +24,10 @@ func ResponseParse(buf io.ReadCloser, statusCode int, query *models.Query) *back
|
||||
r := converter.ReadInfluxQLStyleResult(iter, query)
|
||||
|
||||
if statusCode/100 != 2 {
|
||||
return &backend.DataResponse{Error: fmt.Errorf("InfluxDB returned error: %s", r.Error)}
|
||||
return &backend.DataResponse{
|
||||
Error: fmt.Errorf("InfluxDB returned error: %s", r.Error),
|
||||
ErrorSource: backend.ErrorSourceFromHTTPStatus(statusCode),
|
||||
}
|
||||
}
|
||||
|
||||
// The ExecutedQueryString can be viewed in QueryInspector in UI
|
||||
|
Reference in New Issue
Block a user