Introduce TSDB service (#31520)

* Introduce TSDB service

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>

Co-authored-by: Erik Sundell <erik.sundell87@gmail.com>
Co-authored-by: Will Browne <will.browne@grafana.com>
Co-authored-by: Torkel Ödegaard <torkel@grafana.org>
Co-authored-by: Will Browne <wbrowne@users.noreply.github.com>
Co-authored-by: Zoltán Bedi <zoltan.bedi@gmail.com>
This commit is contained in:
Arve Knudsen
2021-03-08 07:02:49 +01:00
committed by GitHub
parent c899bf3592
commit b79e61656a
203 changed files with 5270 additions and 4777 deletions

View File

@ -12,10 +12,11 @@ import (
"sync"
"time"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb/interval"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/grafana/grafana/pkg/components/null"
@ -28,16 +29,16 @@ import (
// MetaKeyExecutedQueryString is the key where the executed query should get stored
const MetaKeyExecutedQueryString = "executedQueryString"
// SqlMacroEngine interpolates macros into sql. It takes in the Query to have access to query context and
// SQLMacroEngine interpolates macros into sql. It takes in the Query to have access to query context and
// timeRange to be able to generate queries that use from and to.
type SqlMacroEngine interface {
Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error)
type SQLMacroEngine interface {
Interpolate(query plugins.DataSubQuery, timeRange plugins.DataTimeRange, sql string) (string, error)
}
// SqlQueryResultTransformer transforms a query result row to RowValues with proper types.
type SqlQueryResultTransformer interface {
// TransformQueryResult transforms a query result row to RowValues with proper types.
TransformQueryResult(columnTypes []*sql.ColumnType, rows *core.Rows) (tsdb.RowValues, error)
TransformQueryResult(columnTypes []*sql.ColumnType, rows *core.Rows) (plugins.DataRowValues, error)
// TransformQueryError transforms a query error.
TransformQueryError(err error) error
}
@ -53,7 +54,7 @@ var engineCache = engineCacheType{
versions: make(map[int64]int),
}
var sqlIntervalCalculator = tsdb.NewIntervalCalculator(nil)
var sqlIntervalCalculator = interval.NewCalculator()
// NewXormEngine is an xorm.Engine factory, that can be stubbed by tests.
//nolint:gocritic
@ -63,8 +64,8 @@ var NewXormEngine = func(driverName string, connectionString string) (*xorm.Engi
const timeEndColumnName = "timeend"
type sqlQueryEndpoint struct {
macroEngine SqlMacroEngine
type dataPlugin struct {
macroEngine SQLMacroEngine
queryResultTransformer SqlQueryResultTransformer
engine *xorm.Engine
timeColumnNames []string
@ -72,7 +73,7 @@ type sqlQueryEndpoint struct {
log log.Logger
}
type SqlQueryEndpointConfiguration struct {
type DataPluginConfiguration struct {
DriverName string
Datasource *models.DataSource
ConnectionString string
@ -80,8 +81,10 @@ type SqlQueryEndpointConfiguration struct {
MetricColumnTypes []string
}
var NewSqlQueryEndpoint = func(config *SqlQueryEndpointConfiguration, queryResultTransformer SqlQueryResultTransformer, macroEngine SqlMacroEngine, log log.Logger) (tsdb.TsdbQueryEndpoint, error) {
queryEndpoint := sqlQueryEndpoint{
// NewDataPlugin returns a new plugins.DataPlugin
func NewDataPlugin(config DataPluginConfiguration, queryResultTransformer SqlQueryResultTransformer,
macroEngine SQLMacroEngine, log log.Logger) (plugins.DataPlugin, error) {
plugin := dataPlugin{
queryResultTransformer: queryResultTransformer,
macroEngine: macroEngine,
timeColumnNames: []string{"time"},
@ -89,11 +92,11 @@ var NewSqlQueryEndpoint = func(config *SqlQueryEndpointConfiguration, queryResul
}
if len(config.TimeColumnNames) > 0 {
queryEndpoint.timeColumnNames = config.TimeColumnNames
plugin.timeColumnNames = config.TimeColumnNames
}
if len(config.MetricColumnTypes) > 0 {
queryEndpoint.metricColumnTypes = config.MetricColumnTypes
plugin.metricColumnTypes = config.MetricColumnTypes
}
engineCache.Lock()
@ -101,8 +104,8 @@ var NewSqlQueryEndpoint = func(config *SqlQueryEndpointConfiguration, queryResul
if engine, present := engineCache.cache[config.Datasource.Id]; present {
if version := engineCache.versions[config.Datasource.Id]; version == config.Datasource.Version {
queryEndpoint.engine = engine
return &queryEndpoint, nil
plugin.engine = engine
return &plugin, nil
}
}
@ -120,50 +123,61 @@ var NewSqlQueryEndpoint = func(config *SqlQueryEndpointConfiguration, queryResul
engineCache.versions[config.Datasource.Id] = config.Datasource.Version
engineCache.cache[config.Datasource.Id] = engine
queryEndpoint.engine = engine
plugin.engine = engine
return &queryEndpoint, nil
return &plugin, nil
}
const rowLimit = 1000000
// Query is the main function for the SqlQueryEndpoint
func (e *sqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) {
result := &tsdb.Response{
Results: make(map[string]*tsdb.QueryResult),
func (e *dataPlugin) DataQuery(ctx context.Context, dsInfo *models.DataSource,
queryContext plugins.DataQuery) (plugins.DataResponse, error) {
var timeRange plugins.DataTimeRange
if queryContext.TimeRange != nil {
timeRange = *queryContext.TimeRange
}
ch := make(chan plugins.DataQueryResult, len(queryContext.Queries))
var wg sync.WaitGroup
for _, query := range tsdbQuery.Queries {
rawSQL := query.Model.Get("rawSql").MustString()
if rawSQL == "" {
// Execute each query in a goroutine and wait for them to finish afterwards
for _, query := range queryContext.Queries {
if query.Model.Get("rawSql").MustString() == "" {
continue
}
queryResult := &tsdb.QueryResult{Meta: simplejson.New(), RefId: query.RefId}
result.Results[query.RefId] = queryResult
// global substitutions
rawSQL, err := Interpolate(query, tsdbQuery.TimeRange, rawSQL)
if err != nil {
queryResult.Error = err
continue
}
// datasource specific substitutions
rawSQL, err = e.macroEngine.Interpolate(query, tsdbQuery.TimeRange, rawSQL)
if err != nil {
queryResult.Error = err
continue
}
queryResult.Meta.Set(MetaKeyExecutedQueryString, rawSQL)
wg.Add(1)
go func(rawSQL string, query *tsdb.Query, queryResult *tsdb.QueryResult) {
go func(query plugins.DataSubQuery) {
defer wg.Done()
queryResult := plugins.DataQueryResult{
Meta: simplejson.New(),
RefID: query.RefID,
}
rawSQL := query.Model.Get("rawSql").MustString()
if rawSQL == "" {
panic("Query model property rawSql should not be empty at this point")
}
// global substitutions
rawSQL, err := Interpolate(query, timeRange, rawSQL)
if err != nil {
queryResult.Error = err
ch <- queryResult
return
}
// datasource specific substitutions
rawSQL, err = e.macroEngine.Interpolate(query, timeRange, rawSQL)
if err != nil {
queryResult.Error = err
ch <- queryResult
return
}
queryResult.Meta.Set(MetaKeyExecutedQueryString, rawSQL)
session := e.engine.NewSession()
defer session.Close()
db := session.DB()
@ -183,28 +197,40 @@ func (e *sqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource,
switch format {
case "time_series":
err := e.transformToTimeSeries(query, rows, queryResult, tsdbQuery)
err := e.transformToTimeSeries(query, rows, &queryResult, queryContext)
if err != nil {
queryResult.Error = err
return
}
case "table":
err := e.transformToTable(query, rows, queryResult, tsdbQuery)
err := e.transformToTable(query, rows, &queryResult, queryContext)
if err != nil {
queryResult.Error = err
return
}
}
}(rawSQL, query, queryResult)
ch <- queryResult
}(query)
}
wg.Wait()
// Read results from channels
close(ch)
result := plugins.DataResponse{
Results: make(map[string]plugins.DataQueryResult),
}
for queryResult := range ch {
result.Results[queryResult.RefID] = queryResult
}
return result, nil
}
// Interpolate provides global macros/substitutions for all sql datasources.
var Interpolate = func(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) {
minInterval, err := tsdb.GetIntervalFrom(query.DataSource, query.Model, time.Second*60)
var Interpolate = func(query plugins.DataSubQuery, timeRange plugins.DataTimeRange, sql string) (string, error) {
minInterval, err := interval.GetIntervalFrom(query.DataSource, query.Model, time.Second*60)
if err != nil {
return sql, nil
}
@ -218,21 +244,22 @@ var Interpolate = func(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string)
return sql, nil
}
func (e *sqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error {
func (e *dataPlugin) transformToTable(query plugins.DataSubQuery, rows *core.Rows,
result *plugins.DataQueryResult, queryContext plugins.DataQuery) error {
columnNames, err := rows.Columns()
columnCount := len(columnNames)
if err != nil {
return err
}
columnCount := len(columnNames)
rowCount := 0
timeIndex := -1
timeEndIndex := -1
table := &tsdb.Table{
Columns: make([]tsdb.TableColumn, columnCount),
Rows: make([]tsdb.RowValues, 0),
table := plugins.DataTable{
Columns: make([]plugins.DataTableColumn, columnCount),
Rows: make([]plugins.DataRowValues, 0),
}
for i, name := range columnNames {
@ -279,7 +306,7 @@ func (e *sqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows,
return nil
}
func newProcessCfg(query *tsdb.Query, tsdbQuery *tsdb.TsdbQuery, rows *core.Rows) (*processCfg, error) {
func newProcessCfg(query plugins.DataSubQuery, queryContext plugins.DataQuery, rows *core.Rows) (*processCfg, error) {
columnNames, err := rows.Columns()
if err != nil {
return nil, err
@ -301,15 +328,15 @@ func newProcessCfg(query *tsdb.Query, tsdbQuery *tsdb.TsdbQuery, rows *core.Rows
metricPrefix: false,
fillMissing: fillMissing,
seriesByQueryOrder: list.New(),
pointsBySeries: make(map[string]*tsdb.TimeSeries),
tsdbQuery: tsdbQuery,
pointsBySeries: make(map[string]*plugins.DataTimeSeries),
queryContext: queryContext,
}
return cfg, nil
}
func (e *sqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult,
tsdbQuery *tsdb.TsdbQuery) error {
cfg, err := newProcessCfg(query, tsdbQuery, rows)
func (e *dataPlugin) transformToTimeSeries(query plugins.DataSubQuery, rows *core.Rows,
result *plugins.DataQueryResult, queryContext plugins.DataQuery) error {
cfg, err := newProcessCfg(query, queryContext, rows)
if err != nil {
return err
}
@ -369,15 +396,15 @@ func (e *sqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.R
for elem := cfg.seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() {
key := elem.Value.(string)
result.Series = append(result.Series, cfg.pointsBySeries[key])
if !cfg.fillMissing {
result.Series = append(result.Series, *cfg.pointsBySeries[key])
continue
}
series := cfg.pointsBySeries[key]
// fill in values from last fetched value till interval end
intervalStart := series.Points[len(series.Points)-1][1].Float64
intervalEnd := float64(tsdbQuery.TimeRange.MustGetTo().UnixNano() / 1e6)
intervalEnd := float64(queryContext.TimeRange.MustGetTo().UnixNano() / 1e6)
if cfg.fillPrevious {
if len(series.Points) > 0 {
@ -390,9 +417,11 @@ func (e *sqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.R
// align interval start
intervalStart = math.Floor(intervalStart/cfg.fillInterval) * cfg.fillInterval
for i := intervalStart + cfg.fillInterval; i < intervalEnd; i += cfg.fillInterval {
series.Points = append(series.Points, tsdb.TimePoint{cfg.fillValue, null.FloatFrom(i)})
series.Points = append(series.Points, plugins.DataTimePoint{cfg.fillValue, null.FloatFrom(i)})
cfg.rowCount++
}
result.Series = append(result.Series, *series)
}
result.Meta.Set("rowCount", cfg.rowCount)
@ -409,15 +438,15 @@ type processCfg struct {
metricPrefix bool
metricPrefixValue string
fillMissing bool
pointsBySeries map[string]*tsdb.TimeSeries
pointsBySeries map[string]*plugins.DataTimeSeries
seriesByQueryOrder *list.List
fillValue null.Float
tsdbQuery *tsdb.TsdbQuery
queryContext plugins.DataQuery
fillInterval float64
fillPrevious bool
}
func (e *sqlQueryEndpoint) processRow(cfg *processCfg) error {
func (e *dataPlugin) processRow(cfg *processCfg) error {
var timestamp float64
var value null.Float
var metric string
@ -447,17 +476,18 @@ func (e *sqlQueryEndpoint) processRow(cfg *processCfg) error {
}
if cfg.metricIndex >= 0 {
if columnValue, ok := values[cfg.metricIndex].(string); ok {
if cfg.metricPrefix {
cfg.metricPrefixValue = columnValue
} else {
metric = columnValue
}
} else {
columnValue, ok := values[cfg.metricIndex].(string)
if !ok {
return fmt.Errorf("column metric must be of type %s. metric column name: %s type: %s but datatype is %T",
strings.Join(e.metricColumnTypes, ", "), cfg.columnNames[cfg.metricIndex],
cfg.columnTypes[cfg.metricIndex].DatabaseTypeName(), values[cfg.metricIndex])
}
if cfg.metricPrefix {
cfg.metricPrefixValue = columnValue
} else {
metric = columnValue
}
}
for i, col := range cfg.columnNames {
@ -475,17 +505,17 @@ func (e *sqlQueryEndpoint) processRow(cfg *processCfg) error {
metric = cfg.metricPrefixValue + " " + col
}
series, exist := cfg.pointsBySeries[metric]
if !exist {
series = &tsdb.TimeSeries{Name: metric}
series, exists := cfg.pointsBySeries[metric]
if !exists {
series = &plugins.DataTimeSeries{Name: metric}
cfg.pointsBySeries[metric] = series
cfg.seriesByQueryOrder.PushBack(metric)
}
if cfg.fillMissing {
var intervalStart float64
if !exist {
intervalStart = float64(cfg.tsdbQuery.TimeRange.MustGetFrom().UnixNano() / 1e6)
if !exists {
intervalStart = float64(cfg.queryContext.TimeRange.MustGetFrom().UnixNano() / 1e6)
} else {
intervalStart = series.Points[len(series.Points)-1][1].Float64 + cfg.fillInterval
}
@ -502,13 +532,15 @@ func (e *sqlQueryEndpoint) processRow(cfg *processCfg) error {
intervalStart = math.Floor(intervalStart/cfg.fillInterval) * cfg.fillInterval
for i := intervalStart; i < timestamp; i += cfg.fillInterval {
series.Points = append(series.Points, tsdb.TimePoint{cfg.fillValue, null.FloatFrom(i)})
series.Points = append(series.Points, plugins.DataTimePoint{cfg.fillValue, null.FloatFrom(i)})
cfg.rowCount++
}
}
series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)})
series.Points = append(series.Points, plugins.DataTimePoint{value, null.FloatFrom(timestamp)})
cfg.pointsBySeries[metric] = series
// TODO: Make non-global
if setting.Env == setting.Dev {
e.log.Debug("Rows", "metric", metric, "time", timestamp, "value", value)
}
@ -519,7 +551,7 @@ func (e *sqlQueryEndpoint) processRow(cfg *processCfg) error {
// ConvertSqlTimeColumnToEpochMs converts column named time to unix timestamp in milliseconds
// to make native datetime types and epoch dates work in annotation and table queries.
func ConvertSqlTimeColumnToEpochMs(values tsdb.RowValues, timeIndex int) {
func ConvertSqlTimeColumnToEpochMs(values plugins.DataRowValues, timeIndex int) {
if timeIndex >= 0 {
switch value := values[timeIndex].(type) {
case time.Time:
@ -529,40 +561,40 @@ func ConvertSqlTimeColumnToEpochMs(values tsdb.RowValues, timeIndex int) {
values[timeIndex] = float64(value.UnixNano()) / float64(time.Millisecond)
}
case int64:
values[timeIndex] = int64(tsdb.EpochPrecisionToMs(float64(value)))
values[timeIndex] = int64(epochPrecisionToMS(float64(value)))
case *int64:
if value != nil {
values[timeIndex] = int64(tsdb.EpochPrecisionToMs(float64(*value)))
values[timeIndex] = int64(epochPrecisionToMS(float64(*value)))
}
case uint64:
values[timeIndex] = int64(tsdb.EpochPrecisionToMs(float64(value)))
values[timeIndex] = int64(epochPrecisionToMS(float64(value)))
case *uint64:
if value != nil {
values[timeIndex] = int64(tsdb.EpochPrecisionToMs(float64(*value)))
values[timeIndex] = int64(epochPrecisionToMS(float64(*value)))
}
case int32:
values[timeIndex] = int64(tsdb.EpochPrecisionToMs(float64(value)))
values[timeIndex] = int64(epochPrecisionToMS(float64(value)))
case *int32:
if value != nil {
values[timeIndex] = int64(tsdb.EpochPrecisionToMs(float64(*value)))
values[timeIndex] = int64(epochPrecisionToMS(float64(*value)))
}
case uint32:
values[timeIndex] = int64(tsdb.EpochPrecisionToMs(float64(value)))
values[timeIndex] = int64(epochPrecisionToMS(float64(value)))
case *uint32:
if value != nil {
values[timeIndex] = int64(tsdb.EpochPrecisionToMs(float64(*value)))
values[timeIndex] = int64(epochPrecisionToMS(float64(*value)))
}
case float64:
values[timeIndex] = tsdb.EpochPrecisionToMs(value)
values[timeIndex] = epochPrecisionToMS(value)
case *float64:
if value != nil {
values[timeIndex] = tsdb.EpochPrecisionToMs(*value)
values[timeIndex] = epochPrecisionToMS(*value)
}
case float32:
values[timeIndex] = tsdb.EpochPrecisionToMs(float64(value))
values[timeIndex] = epochPrecisionToMS(float64(value))
case *float32:
if value != nil {
values[timeIndex] = tsdb.EpochPrecisionToMs(float64(*value))
values[timeIndex] = epochPrecisionToMS(float64(*value))
}
}
}
@ -678,7 +710,7 @@ func ConvertSqlValueColumnToFloat(columnName string, columnValue interface{}) (n
return value, nil
}
func SetupFillmode(query *tsdb.Query, interval time.Duration, fillmode string) error {
func SetupFillmode(query plugins.DataSubQuery, interval time.Duration, fillmode string) error {
query.Model.Set("fill", true)
query.Model.Set("fillInterval", interval.Seconds())
switch fillmode {
@ -698,13 +730,13 @@ func SetupFillmode(query *tsdb.Query, interval time.Duration, fillmode string) e
return nil
}
type SqlMacroEngineBase struct{}
type SQLMacroEngineBase struct{}
func NewSqlMacroEngineBase() *SqlMacroEngineBase {
return &SqlMacroEngineBase{}
func NewSQLMacroEngineBase() *SQLMacroEngineBase {
return &SQLMacroEngineBase{}
}
func (m *SqlMacroEngineBase) ReplaceAllStringSubmatchFunc(re *regexp.Regexp, str string, repl func([]string) string) string {
func (m *SQLMacroEngineBase) ReplaceAllStringSubmatchFunc(re *regexp.Regexp, str string, repl func([]string) string) string {
result := ""
lastIndex := 0
@ -720,3 +752,18 @@ func (m *SqlMacroEngineBase) ReplaceAllStringSubmatchFunc(re *regexp.Regexp, str
return result + str[lastIndex:]
}
// epochPrecisionToMS converts epoch precision to millisecond, if needed.
// Only seconds to milliseconds supported right now
func epochPrecisionToMS(value float64) float64 {
s := strconv.FormatFloat(value, 'e', -1, 64)
if strings.HasSuffix(s, "e+09") {
return value * float64(1e3)
}
if strings.HasSuffix(s, "e+18") {
return value / float64(time.Millisecond)
}
return value
}