SQL Expressions: Add JSON support (#103157)

- Support bi-directional mapping of frame JSON fields and GMS (go-mysql-server) columns 
- Permit GMS json functions

Co-authored-by: Kyle Brandt <kyle@grafana.com>
This commit is contained in:
Sam Jewell
2025-04-01 12:45:01 +01:00
committed by GitHub
parent 6754781d7b
commit af08a9fae2
5 changed files with 136 additions and 2 deletions

View File

@ -4,6 +4,7 @@ package sql
import ( import (
"context" "context"
"encoding/json"
"testing" "testing"
"time" "time"
@ -205,6 +206,83 @@ func TestErrorsFromGoMySQLServerAreFlagged(t *testing.T) {
require.Contains(t, err.Error(), "error in go-mysql-server") require.Contains(t, err.Error(), "error in go-mysql-server")
} }
func TestFrameToSQLAndBack_JSONRoundtrip(t *testing.T) {
expectedFrame := &data.Frame{
RefID: "json_test",
Name: "json_test",
Fields: []*data.Field{
data.NewField("id", nil, []int64{1, 2}),
data.NewField("payload", nil, []json.RawMessage{
json.RawMessage(`{"foo":1}`),
json.RawMessage(`{"bar":"baz"}`),
}),
},
}
db := DB{}
query := `SELECT * FROM json_test`
resultFrame, err := db.QueryFrames(context.Background(), "json_test", query, data.Frames{expectedFrame})
require.NoError(t, err)
// Use custom compare options that ignore Name and RefID
opts := append(
data.FrameTestCompareOptions(),
cmp.FilterPath(func(p cmp.Path) bool {
return p.String() == "Name" || p.String() == "RefID"
}, cmp.Ignore()),
)
if diff := cmp.Diff(expectedFrame, resultFrame, opts...); diff != "" {
require.FailNowf(t, "Frame mismatch (-want +got):\n%s", diff)
}
}
func TestQueryFrames_JSONFilter(t *testing.T) {
input := &data.Frame{
RefID: "A",
Name: "A",
Fields: []*data.Field{
data.NewField("title", nil, []string{"Bug report", "Feature request"}),
data.NewField("labels", nil, []json.RawMessage{
json.RawMessage(`["type/bug", "priority/high"]`),
json.RawMessage(`["type/feature", "priority/low"]`),
}),
},
}
expected := &data.Frame{
RefID: "B",
Name: "B",
Fields: []*data.Field{
data.NewField("title", nil, []string{"Bug report"}),
data.NewField("labels", nil, []json.RawMessage{
json.RawMessage(`["type/bug", "priority/high"]`),
}),
},
}
db := DB{}
query := `SELECT title, labels FROM A WHERE json_contains(labels, '"type/bug"')`
result, err := db.QueryFrames(context.Background(), "B", query, data.Frames{input})
require.NoError(t, err)
// Use custom compare options that ignore Name and RefID
opts := append(
data.FrameTestCompareOptions(),
cmp.FilterPath(func(p cmp.Path) bool {
return p.String() == "Name" || p.String() == "RefID"
}, cmp.Ignore()),
)
if diff := cmp.Diff(expected, result, opts...); diff != "" {
require.FailNowf(t, "Result mismatch (-want +got):\n%s", diff)
}
}
// p is a utility for pointers from constants // p is a utility for pointers from constants
func p[T any](v T) *T { func p[T any](v T) *T {
return &v return &v

View File

@ -3,6 +3,7 @@
package sql package sql
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -92,6 +93,8 @@ func MySQLColToFieldType(col *mysql.Column) (data.FieldType, error) {
fT = data.FieldTypeTime fT = data.FieldTypeTime
case types.Boolean: case types.Boolean:
fT = data.FieldTypeBool fT = data.FieldTypeBool
case types.JSON:
fT = data.FieldTypeJSON
default: default:
switch { switch {
case types.IsDecimal(col.Type): case types.IsDecimal(col.Type):
@ -159,8 +162,21 @@ func fieldValFromRowVal(fieldType data.FieldType, val interface{}) (interface{},
case data.FieldTypeBool, data.FieldTypeNullableBool: case data.FieldTypeBool, data.FieldTypeNullableBool:
return parseBoolFromInt8(val, nullable) return parseBoolFromInt8(val, nullable)
case data.FieldTypeJSON, data.FieldTypeNullableJSON:
switch v := val.(type) {
case types.JSONDocument:
raw := json.RawMessage(v.String())
if nullable {
return &raw, nil
}
return raw, nil
default:
return nil, fmt.Errorf("JSON field does not support val %v of type %T", val, val)
}
default: default:
return nil, fmt.Errorf("unsupported field type %s for val %v", fieldType, val) return nil, fmt.Errorf("unsupported field type %s for val %v of type %T", fieldType, val, val)
} }
} }

View File

@ -3,6 +3,7 @@
package sql package sql
import ( import (
"encoding/json"
"fmt" "fmt"
"io" "io"
"strings" "strings"
@ -90,7 +91,21 @@ func (ri *rowIter) Next(_ *mysql.Context) (mysql.Row, error) {
if field.NilAt(ri.row) { if field.NilAt(ri.row) {
continue continue
} }
row[colIndex], _ = field.ConcreteAt(ri.row) val, _ := field.ConcreteAt(ri.row)
// If the field is JSON, convert json.RawMessage to types.JSONDocument
if raw, ok := val.(json.RawMessage); ok {
doc, inRange, err := types.JSON.Convert(raw)
if err != nil {
return nil, fmt.Errorf("failed to convert json.RawMessage to JSONDocument: %w", err)
}
if !inRange {
return nil, fmt.Errorf("invalid JSON value detected at row %d, column %s: value required type coercion", ri.row, ri.ft.Frame.Fields[colIndex].Name)
}
val = doc
}
row[colIndex] = val
} }
ri.row++ ri.row++
@ -156,6 +171,8 @@ func convertDataType(fieldType data.FieldType) mysql.Type {
return types.Boolean return types.Boolean
case data.FieldTypeTime, data.FieldTypeNullableTime: case data.FieldTypeTime, data.FieldTypeNullableTime:
return types.Timestamp return types.Timestamp
case data.FieldTypeJSON, data.FieldTypeNullableJSON:
return types.JSON
default: default:
fmt.Printf("------- Unsupported field type: %v", fieldType) fmt.Printf("------- Unsupported field type: %v", fieldType)
return types.JSON return types.JSON

View File

@ -210,6 +210,12 @@ func allowedFunction(f *sqlparser.FuncExpr) (b bool) {
case "cast": case "cast":
return return
// JSON functions
case "json_extract", "json_unquote", "json_contains",
"json_object", "json_array", "json_set", "json_remove",
"json_length", "json_search", "json_type":
return
default: default:
return false return false
} }

View File

@ -67,6 +67,11 @@ func TestAllowQuery(t *testing.T) {
q: `SELECT __value__, SUBSTRING_INDEX(name, '.', -1) AS code FROM A`, q: `SELECT __value__, SUBSTRING_INDEX(name, '.', -1) AS code FROM A`,
err: nil, err: nil,
}, },
{
name: "json functions",
q: example_json_functions,
err: nil,
},
} }
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
@ -239,3 +244,15 @@ SELECT
FROM sample_data FROM sample_data
GROUP BY name, value, created_at GROUP BY name, value, created_at
LIMIT 10` LIMIT 10`
var example_json_functions = `SELECT
JSON_OBJECT('key1', 'value1', 'key2', 10) AS json_obj,
JSON_ARRAY(1, 'abc', NULL, TRUE) AS json_arr,
JSON_EXTRACT('{"id": 123, "name": "test"}', '$.id') AS json_ext,
JSON_UNQUOTE(JSON_EXTRACT('{"name": "test"}', '$.name')) AS json_unq,
JSON_CONTAINS('{"a": 1, "b": 2}', '{"a": 1}') AS json_contains,
JSON_SET('{"a": 1}', '$.b', 2) AS json_set,
JSON_REMOVE('{"a": 1, "b": 2}', '$.b') AS json_remove,
JSON_LENGTH('{"a": 1, "b": {"c": 3}}') AS json_len,
JSON_SEARCH('{"a": "xyz", "b": "abc"}', 'one', 'abc') AS json_search,
JSON_TYPE('{"a": 1}') AS json_type`