mirror of
https://github.com/grafana/grafana.git
synced 2025-07-30 06:52:37 +08:00
SQL: Migrate to use SDK contracts (#36635)
* convert SQLs to use sdk contracts * make draft * postgres * intermedia * get datasourceinfo filled at the beginning of the service * move the interval into package because of cyclict import and fix all postgres tests * fix mysql test * fix mssql * fix the test for pr https://github.com/grafana/grafana/issues/35839 * fix some issue about intervalv2 package * update sql test * wire migration for SQLs * add sqls to the background process * make it register instead of register and start * revert formatting * fix tests * fix linter * remove integration test * Postgres test fix Co-authored-by: Marcus Efraimsson <marcus.efraimsson@gmail.com>
This commit is contained in:
@ -1,72 +1,124 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
|
||||
"github.com/grafana/grafana/pkg/plugins/backendplugin"
|
||||
"github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/util/errutil"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/tsdb/sqleng"
|
||||
)
|
||||
|
||||
func ProvideService(cfg *setting.Cfg) *PostgresService {
|
||||
logger := log.New("tsdb.postgres")
|
||||
return &PostgresService{
|
||||
var logger = log.New("tsdb.postgres")
|
||||
|
||||
func ProvideService(cfg *setting.Cfg, manager backendplugin.Manager) (*Service, error) {
|
||||
s := &Service{
|
||||
Cfg: cfg,
|
||||
logger: logger,
|
||||
tlsManager: newTLSManager(logger, cfg.DataPath),
|
||||
}
|
||||
s.im = datasource.NewInstanceManager(s.newInstanceSettings())
|
||||
factory := coreplugin.New(backend.ServeOpts{
|
||||
QueryDataHandler: s,
|
||||
})
|
||||
|
||||
if err := manager.Register("postgres", factory); err != nil {
|
||||
logger.Error("Failed to register plugin", "error", err)
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
type PostgresService struct {
|
||||
type Service struct {
|
||||
Cfg *setting.Cfg
|
||||
logger log.Logger
|
||||
tlsManager tlsSettingsProvider
|
||||
im instancemgmt.InstanceManager
|
||||
}
|
||||
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
func (s *PostgresService) NewExecutor(datasource *models.DataSource) (plugins.DataPlugin, error) {
|
||||
s.logger.Debug("Creating Postgres query endpoint")
|
||||
|
||||
cnnstr, err := s.generateConnectionString(datasource)
|
||||
func (s *Service) getDSInfo(pluginCtx backend.PluginContext) (*sqleng.DataSourceHandler, error) {
|
||||
i, err := s.im.Get(pluginCtx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
instance := i.(*sqleng.DataSourceHandler)
|
||||
return instance, nil
|
||||
}
|
||||
|
||||
if s.Cfg.Env == setting.Dev {
|
||||
s.logger.Debug("getEngine", "connection", cnnstr)
|
||||
}
|
||||
|
||||
config := sqleng.DataPluginConfiguration{
|
||||
DriverName: "postgres",
|
||||
ConnectionString: cnnstr,
|
||||
Datasource: datasource,
|
||||
MetricColumnTypes: []string{"UNKNOWN", "TEXT", "VARCHAR", "CHAR"},
|
||||
}
|
||||
|
||||
queryResultTransformer := postgresQueryResultTransformer{
|
||||
log: s.logger,
|
||||
}
|
||||
|
||||
timescaledb := datasource.JsonData.Get("timescaledb").MustBool(false)
|
||||
|
||||
plugin, err := sqleng.NewDataPlugin(config, &queryResultTransformer, newPostgresMacroEngine(timescaledb),
|
||||
s.logger)
|
||||
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
dsInfo, err := s.getDSInfo(req.PluginContext)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed connecting to Postgres", "err", err)
|
||||
return nil, err
|
||||
}
|
||||
return dsInfo.QueryData(ctx, req)
|
||||
}
|
||||
|
||||
s.logger.Debug("Successfully connected to Postgres")
|
||||
return plugin, nil
|
||||
func (s *Service) newInstanceSettings() datasource.InstanceFactoryFunc {
|
||||
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
||||
logger.Debug("Creating Postgres query endpoint")
|
||||
jsonData := sqleng.JsonData{
|
||||
MaxOpenConns: 0,
|
||||
MaxIdleConns: 2,
|
||||
ConnMaxLifetime: 14400,
|
||||
Timescaledb: false,
|
||||
ConfigurationMethod: "file-path",
|
||||
}
|
||||
|
||||
err := json.Unmarshal(settings.JSONData, &jsonData)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error reading settings: %w", err)
|
||||
}
|
||||
dsInfo := sqleng.DataSourceInfo{
|
||||
JsonData: jsonData,
|
||||
URL: settings.URL,
|
||||
User: settings.User,
|
||||
Database: settings.Database,
|
||||
ID: settings.ID,
|
||||
Updated: settings.Updated,
|
||||
UID: settings.UID,
|
||||
DecryptedSecureJSONData: settings.DecryptedSecureJSONData,
|
||||
}
|
||||
|
||||
cnnstr, err := s.generateConnectionString(dsInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if s.Cfg.Env == setting.Dev {
|
||||
logger.Debug("getEngine", "connection", cnnstr)
|
||||
}
|
||||
|
||||
config := sqleng.DataPluginConfiguration{
|
||||
DriverName: "postgres",
|
||||
ConnectionString: cnnstr,
|
||||
DSInfo: dsInfo,
|
||||
MetricColumnTypes: []string{"UNKNOWN", "TEXT", "VARCHAR", "CHAR"},
|
||||
}
|
||||
|
||||
queryResultTransformer := postgresQueryResultTransformer{
|
||||
log: logger,
|
||||
}
|
||||
|
||||
handler, err := sqleng.NewQueryDataHandler(config, &queryResultTransformer, newPostgresMacroEngine(dsInfo.JsonData.Timescaledb),
|
||||
logger)
|
||||
if err != nil {
|
||||
logger.Error("Failed connecting to Postgres", "err", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
logger.Debug("Successfully connected to Postgres")
|
||||
return handler, nil
|
||||
}
|
||||
}
|
||||
|
||||
// escape single quotes and backslashes in Postgres connection string parameters.
|
||||
@ -74,14 +126,14 @@ func escape(input string) string {
|
||||
return strings.ReplaceAll(strings.ReplaceAll(input, `\`, `\\`), "'", `\'`)
|
||||
}
|
||||
|
||||
func (s *PostgresService) generateConnectionString(datasource *models.DataSource) (string, error) {
|
||||
func (s *Service) generateConnectionString(dsInfo sqleng.DataSourceInfo) (string, error) {
|
||||
var host string
|
||||
var port int
|
||||
if strings.HasPrefix(datasource.Url, "/") {
|
||||
host = datasource.Url
|
||||
s.logger.Debug("Generating connection string with Unix socket specifier", "socket", host)
|
||||
if strings.HasPrefix(dsInfo.URL, "/") {
|
||||
host = dsInfo.URL
|
||||
logger.Debug("Generating connection string with Unix socket specifier", "socket", host)
|
||||
} else {
|
||||
sp := strings.SplitN(datasource.Url, ":", 2)
|
||||
sp := strings.SplitN(dsInfo.URL, ":", 2)
|
||||
host = sp[0]
|
||||
if len(sp) > 1 {
|
||||
var err error
|
||||
@ -90,19 +142,19 @@ func (s *PostgresService) generateConnectionString(datasource *models.DataSource
|
||||
return "", errutil.Wrapf(err, "invalid port in host specifier %q", sp[1])
|
||||
}
|
||||
|
||||
s.logger.Debug("Generating connection string with network host/port pair", "host", host, "port", port)
|
||||
logger.Debug("Generating connection string with network host/port pair", "host", host, "port", port)
|
||||
} else {
|
||||
s.logger.Debug("Generating connection string with network host", "host", host)
|
||||
logger.Debug("Generating connection string with network host", "host", host)
|
||||
}
|
||||
}
|
||||
|
||||
connStr := fmt.Sprintf("user='%s' password='%s' host='%s' dbname='%s'",
|
||||
escape(datasource.User), escape(datasource.DecryptedPassword()), escape(host), escape(datasource.Database))
|
||||
escape(dsInfo.User), escape(dsInfo.DecryptedSecureJSONData["password"]), escape(host), escape(dsInfo.Database))
|
||||
if port > 0 {
|
||||
connStr += fmt.Sprintf(" port=%d", port)
|
||||
}
|
||||
|
||||
tlsSettings, err := s.tlsManager.getTLSSettings(datasource)
|
||||
tlsSettings, err := s.tlsManager.getTLSSettings(dsInfo)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -111,19 +163,19 @@ func (s *PostgresService) generateConnectionString(datasource *models.DataSource
|
||||
|
||||
// Attach root certificate if provided
|
||||
if tlsSettings.RootCertFile != "" {
|
||||
s.logger.Debug("Setting server root certificate", "tlsRootCert", tlsSettings.RootCertFile)
|
||||
logger.Debug("Setting server root certificate", "tlsRootCert", tlsSettings.RootCertFile)
|
||||
connStr += fmt.Sprintf(" sslrootcert='%s'", escape(tlsSettings.RootCertFile))
|
||||
}
|
||||
|
||||
// Attach client certificate and key if both are provided
|
||||
if tlsSettings.CertFile != "" && tlsSettings.CertKeyFile != "" {
|
||||
s.logger.Debug("Setting TLS/SSL client auth", "tlsCert", tlsSettings.CertFile, "tlsKey", tlsSettings.CertKeyFile)
|
||||
logger.Debug("Setting TLS/SSL client auth", "tlsCert", tlsSettings.CertFile, "tlsKey", tlsSettings.CertKeyFile)
|
||||
connStr += fmt.Sprintf(" sslcert='%s' sslkey='%s'", escape(tlsSettings.CertFile), escape(tlsSettings.CertKeyFile))
|
||||
} else if tlsSettings.CertFile != "" || tlsSettings.CertKeyFile != "" {
|
||||
return "", fmt.Errorf("TLS/SSL client certificate and key must both be specified")
|
||||
}
|
||||
|
||||
s.logger.Debug("Generated Postgres connection string successfully")
|
||||
logger.Debug("Generated Postgres connection string successfully")
|
||||
return connStr, nil
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user