Files
grafana/pkg/expr/sql/frame_db_conv.go
Kyle Brandt 5e056c2a3f SQL Expressions: Add sql expression specific timeout and output limit (#104834)
Adds settings for SQL expressions:
 sql_expression_cell_output_limit

Set the maximum number of cells that can be returned from a SQL expression. Default is 100000.

sql_expression_timeout

The duration a SQL expression will run before being cancelled. The default is 10s.
2025-05-13 15:22:20 -04:00

246 lines
6.9 KiB
Go

//go:build !arm
package sql
import (
"encoding/json"
"errors"
"fmt"
"io"
"time"
mysql "github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/types"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/shopspring/decimal"
)
// TODO: Should this accept a row limit and converters, like sqlutil.FrameFromRows?
func convertToDataFrame(ctx *mysql.Context, iter mysql.RowIter, schema mysql.Schema, maxOutputCells int64) (*data.Frame, error) {
f := &data.Frame{}
// Create fields based on the schema
for _, col := range schema {
fT, err := MySQLColToFieldType(col)
if err != nil {
return nil, err
}
field := data.NewFieldFromFieldType(fT, 0)
field.Name = col.Name
f.Fields = append(f.Fields, field)
}
cellCount := int64(0)
// Iterate through the rows and append data to fields
for {
// Check for context cancellation or timeout
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
row, err := iter.Next(ctx)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, fmt.Errorf("error reading row: %v", err)
}
// We check the cell count here to avoid appending an incomplete row, so the
// the number returned may be less than the maxOutputCells.
// If the maxOutputCells is 0, we don't check the cell count.
if maxOutputCells > 0 {
cellCount += int64(len(row))
if cellCount > maxOutputCells {
f.AppendNotices(data.Notice{
Severity: data.NoticeSeverityWarning,
Text: fmt.Sprintf("Query exceeded max output cells (%d). Only %d cells returned.", maxOutputCells, cellCount-int64(len(row))),
})
return f, nil
}
}
for i, val := range row {
// Run val through mysql.Type.Convert to normalize underlying value
// of the interface
nV, _, err := schema[i].Type.Convert(ctx, val)
if err != nil {
return nil, err
}
// Run the normalized value through fieldValFromRowVal to normalize
// the interface type to the dataframe value type, and make nullable
// values pointers as dataframe expects.
fV, err := fieldValFromRowVal(f.Fields[i].Type(), nV)
if err != nil {
return nil, fmt.Errorf("unexpected type for column %s: %w", schema[i].Name, err)
}
f.Fields[i].Append(fV)
}
}
return f, nil
}
// MySQLColToFieldType converts a MySQL column to a data.FieldType
func MySQLColToFieldType(col *mysql.Column) (data.FieldType, error) {
var fT data.FieldType
switch col.Type {
case types.Int8:
fT = data.FieldTypeInt8
case types.Uint8:
fT = data.FieldTypeUint8
case types.Int16:
fT = data.FieldTypeInt16
case types.Uint16:
fT = data.FieldTypeUint16
case types.Int32:
fT = data.FieldTypeInt32
case types.Uint32:
fT = data.FieldTypeUint32
case types.Int64:
fT = data.FieldTypeInt64
case types.Uint64:
fT = data.FieldTypeUint64
case types.Float32:
fT = data.FieldTypeFloat32
case types.Float64:
fT = data.FieldTypeFloat64
case types.Timestamp:
fT = data.FieldTypeTime
case types.Datetime:
fT = data.FieldTypeTime
case types.Boolean:
fT = data.FieldTypeBool
case types.JSON:
fT = data.FieldTypeJSON
default:
switch {
case types.IsDecimal(col.Type):
fT = data.FieldTypeFloat64
case types.IsText(col.Type):
fT = data.FieldTypeString
default:
return fT, fmt.Errorf("unsupported type for column %s of type %v", col.Name, col.Type)
}
}
if col.Nullable {
fT = fT.NullableType()
}
return fT, nil
}
// fieldValFromRowVal converts a go-mysql-server row value to a data.field value
func fieldValFromRowVal(fieldType data.FieldType, val interface{}) (interface{}, error) {
// if the input interface is nil, we can return an untyped nil
if val == nil {
return nil, nil
}
nullable := fieldType.Nullable()
switch fieldType {
case data.FieldTypeInt8, data.FieldTypeNullableInt8:
return parseVal[int8](val, "int8", nullable)
case data.FieldTypeUint8, data.FieldTypeNullableUint8:
return parseVal[uint8](val, "uint8", nullable)
case data.FieldTypeInt16, data.FieldTypeNullableInt16:
return parseVal[int16](val, "int16", nullable)
case data.FieldTypeUint16, data.FieldTypeNullableUint16:
return parseVal[uint16](val, "uint16", nullable)
case data.FieldTypeInt32, data.FieldTypeNullableInt32:
return parseVal[int32](val, "int32", nullable)
case data.FieldTypeUint32, data.FieldTypeNullableUint32:
return parseVal[uint32](val, "uint32", nullable)
case data.FieldTypeInt64, data.FieldTypeNullableInt64:
return parseVal[int64](val, "int64", nullable)
case data.FieldTypeUint64, data.FieldTypeNullableUint64:
return parseVal[uint64](val, "uint64", nullable)
case data.FieldTypeFloat32, data.FieldTypeNullableFloat32:
return parseVal[float32](val, "float32", nullable)
case data.FieldTypeFloat64, data.FieldTypeNullableFloat64:
return parseFloat64OrDecimal(val, nullable)
case data.FieldTypeTime, data.FieldTypeNullableTime:
return parseVal[time.Time](val, "time.Time", nullable)
case data.FieldTypeString, data.FieldTypeNullableString:
return parseVal[string](val, "string", nullable)
case data.FieldTypeBool, data.FieldTypeNullableBool:
return parseBoolFromInt8(val, nullable)
case data.FieldTypeJSON, data.FieldTypeNullableJSON:
switch v := val.(type) {
case types.JSONDocument:
raw := json.RawMessage(v.String())
if nullable {
return &raw, nil
}
return raw, nil
default:
return nil, fmt.Errorf("JSON field does not support val %v of type %T", val, val)
}
default:
return nil, fmt.Errorf("unsupported field type %s for val %v of type %T", fieldType, val, val)
}
}
// parseVal attempts to assert `val` as type T. If successful, it returns either
// the value or a pointer, depending on `isNullable`. If not, returns an error.
func parseVal[T any](val interface{}, typeName string, isNullable bool) (interface{}, error) {
v, ok := val.(T)
if !ok {
return nil, fmt.Errorf("unexpected value type %v of type %T, expected %s", val, val, typeName)
}
return ptrIfNull(v, isNullable), nil
}
// parseFloat64OrDecimal handles the special case where val can be float64 or decimal.Decimal.
func parseFloat64OrDecimal(val interface{}, isNullable bool) (interface{}, error) {
if fv, ok := val.(float64); ok {
return ptrIfNull(fv, isNullable), nil
}
if d, ok := val.(decimal.Decimal); ok {
return ptrIfNull(d.InexactFloat64(), isNullable), nil
}
return nil, fmt.Errorf("unexpected value type %v of type %T, expected float64 or decimal.Decimal", val, val)
}
// parseBoolFromInt8 asserts val as an int8, converts non-zero to true.
// Returns pointer if isNullable, otherwise the bool value.
func parseBoolFromInt8(val interface{}, isNullable bool) (interface{}, error) {
v, ok := val.(int8)
if !ok {
return nil, fmt.Errorf("unexpected value type %v of type %T, expected int8 (for bool)", val, val)
}
b := (v != 0)
return ptrIfNull(b, isNullable), nil
}
// ptrIfNull returns a pointer to val if isNullable is true; otherwise, returns val.
func ptrIfNull[T any](val T, isNullable bool) interface{} {
if isNullable {
return &val
}
return val
}