mirror of
https://github.com/grafana/grafana.git
synced 2025-07-31 06:52:42 +08:00
postgres: refactor code that is called by tests (#81279)
* postgres: refactor code that is called by tests too * removed debug log
This commit is contained in:
@ -57,6 +57,50 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
|
|||||||
return dsInfo.QueryData(ctx, req)
|
return dsInfo.QueryData(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newPostgres(cfg *setting.Cfg, dsInfo sqleng.DataSourceInfo, cnnstr string, logger log.Logger) (*sql.DB, *sqleng.DataSourceHandler, error) {
|
||||||
|
connector, err := pq.NewConnector(cnnstr)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("postgres connector creation failed", "error", err)
|
||||||
|
return nil, nil, fmt.Errorf("postgres connector creation failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
// use the proxy-dialer if the secure socks proxy is enabled
|
||||||
|
proxyOpts := proxyutil.GetSQLProxyOptions(cfg.SecureSocksDSProxy, dsInfo)
|
||||||
|
if sdkproxy.New(proxyOpts).SecureSocksProxyEnabled() {
|
||||||
|
dialer, err := newPostgresProxyDialer(proxyOpts)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("postgres proxy creation failed", "error", err)
|
||||||
|
return nil, nil, fmt.Errorf("postgres proxy creation failed")
|
||||||
|
}
|
||||||
|
// update the postgres dialer with the proxy dialer
|
||||||
|
connector.Dialer(dialer)
|
||||||
|
}
|
||||||
|
|
||||||
|
config := sqleng.DataPluginConfiguration{
|
||||||
|
DSInfo: dsInfo,
|
||||||
|
MetricColumnTypes: []string{"UNKNOWN", "TEXT", "VARCHAR", "CHAR"},
|
||||||
|
RowLimit: cfg.DataProxyRowLimit,
|
||||||
|
}
|
||||||
|
|
||||||
|
queryResultTransformer := postgresQueryResultTransformer{}
|
||||||
|
|
||||||
|
db := sql.OpenDB(connector)
|
||||||
|
|
||||||
|
db.SetMaxOpenConns(config.DSInfo.JsonData.MaxOpenConns)
|
||||||
|
db.SetMaxIdleConns(config.DSInfo.JsonData.MaxIdleConns)
|
||||||
|
db.SetConnMaxLifetime(time.Duration(config.DSInfo.JsonData.ConnMaxLifetime) * time.Second)
|
||||||
|
|
||||||
|
handler, err := sqleng.NewQueryDataHandler(cfg, db, config, &queryResultTransformer, newPostgresMacroEngine(dsInfo.JsonData.Timescaledb),
|
||||||
|
logger)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("Failed connecting to Postgres", "err", err)
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Debug("Successfully connected to Postgres")
|
||||||
|
return db, handler, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Service) newInstanceSettings(cfg *setting.Cfg) datasource.InstanceFactoryFunc {
|
func (s *Service) newInstanceSettings(cfg *setting.Cfg) datasource.InstanceFactoryFunc {
|
||||||
logger := s.logger
|
logger := s.logger
|
||||||
return func(_ context.Context, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
return func(_ context.Context, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
||||||
@ -96,44 +140,8 @@ func (s *Service) newInstanceSettings(cfg *setting.Cfg) datasource.InstanceFacto
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.Env == setting.Dev {
|
_, handler, err := newPostgres(cfg, dsInfo, cnnstr, logger)
|
||||||
logger.Debug("GetEngine", "connection", cnnstr)
|
|
||||||
}
|
|
||||||
|
|
||||||
connector, err := pq.NewConnector(cnnstr)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("postgres connector creation failed", "error", err)
|
|
||||||
return nil, fmt.Errorf("postgres connector creation failed")
|
|
||||||
}
|
|
||||||
|
|
||||||
// use the proxy-dialer if the secure socks proxy is enabled
|
|
||||||
proxyOpts := proxyutil.GetSQLProxyOptions(cfg.SecureSocksDSProxy, dsInfo)
|
|
||||||
if sdkproxy.New(proxyOpts).SecureSocksProxyEnabled() {
|
|
||||||
dialer, err := newPostgresProxyDialer(proxyOpts)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("postgres proxy creation failed", "error", err)
|
|
||||||
return nil, fmt.Errorf("postgres proxy creation failed")
|
|
||||||
}
|
|
||||||
// update the postgres dialer with the proxy dialer
|
|
||||||
connector.Dialer(dialer)
|
|
||||||
}
|
|
||||||
|
|
||||||
config := sqleng.DataPluginConfiguration{
|
|
||||||
DSInfo: dsInfo,
|
|
||||||
MetricColumnTypes: []string{"UNKNOWN", "TEXT", "VARCHAR", "CHAR"},
|
|
||||||
RowLimit: cfg.DataProxyRowLimit,
|
|
||||||
}
|
|
||||||
|
|
||||||
queryResultTransformer := postgresQueryResultTransformer{}
|
|
||||||
|
|
||||||
db := sql.OpenDB(connector)
|
|
||||||
|
|
||||||
db.SetMaxOpenConns(config.DSInfo.JsonData.MaxOpenConns)
|
|
||||||
db.SetMaxIdleConns(config.DSInfo.JsonData.MaxIdleConns)
|
|
||||||
db.SetConnMaxLifetime(time.Duration(config.DSInfo.JsonData.ConnMaxLifetime) * time.Second)
|
|
||||||
|
|
||||||
handler, err := sqleng.NewQueryDataHandler(cfg, db, config, &queryResultTransformer, newPostgresMacroEngine(dsInfo.JsonData.Timescaledb),
|
|
||||||
logger)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("Failed connecting to Postgres", "err", err)
|
logger.Error("Failed connecting to Postgres", "err", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -2,7 +2,6 @@ package postgres
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
@ -53,7 +52,7 @@ func TestIntegrationPostgresSnapshots(t *testing.T) {
|
|||||||
t.Skip()
|
t.Skip()
|
||||||
}
|
}
|
||||||
|
|
||||||
openDB := func() *sql.DB {
|
getCnnStr := func() string {
|
||||||
host := os.Getenv("POSTGRES_HOST")
|
host := os.Getenv("POSTGRES_HOST")
|
||||||
if host == "" {
|
if host == "" {
|
||||||
host = "localhost"
|
host = "localhost"
|
||||||
@ -63,12 +62,8 @@ func TestIntegrationPostgresSnapshots(t *testing.T) {
|
|||||||
port = "5432"
|
port = "5432"
|
||||||
}
|
}
|
||||||
|
|
||||||
connStr := fmt.Sprintf("user=grafanatest password=grafanatest host=%s port=%s dbname=grafanadstest sslmode=disable",
|
return fmt.Sprintf("user=grafanatest password=grafanatest host=%s port=%s dbname=grafanadstest sslmode=disable",
|
||||||
host, port)
|
host, port)
|
||||||
|
|
||||||
db, err := sql.Open("postgres", connStr)
|
|
||||||
require.NoError(t, err)
|
|
||||||
return db
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sqlQueryCommentRe := regexp.MustCompile(`^-- (.+)\n`)
|
sqlQueryCommentRe := regexp.MustCompile(`^-- (.+)\n`)
|
||||||
@ -146,17 +141,9 @@ func TestIntegrationPostgresSnapshots(t *testing.T) {
|
|||||||
return sql, nil
|
return sql, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
db := openDB()
|
|
||||||
|
|
||||||
t.Cleanup((func() {
|
|
||||||
_, err := db.Exec("DROP TABLE tbl")
|
|
||||||
require.NoError(t, err)
|
|
||||||
err = db.Close()
|
|
||||||
require.NoError(t, err)
|
|
||||||
}))
|
|
||||||
|
|
||||||
cfg := setting.NewCfg()
|
cfg := setting.NewCfg()
|
||||||
cfg.DataPath = t.TempDir()
|
cfg.DataPath = t.TempDir()
|
||||||
|
cfg.DataProxyRowLimit = 10000
|
||||||
|
|
||||||
jsonData := sqleng.JsonData{
|
jsonData := sqleng.JsonData{
|
||||||
MaxOpenConns: 0,
|
MaxOpenConns: 0,
|
||||||
@ -171,17 +158,18 @@ func TestIntegrationPostgresSnapshots(t *testing.T) {
|
|||||||
DecryptedSecureJSONData: map[string]string{},
|
DecryptedSecureJSONData: map[string]string{},
|
||||||
}
|
}
|
||||||
|
|
||||||
config := sqleng.DataPluginConfiguration{
|
|
||||||
DSInfo: dsInfo,
|
|
||||||
MetricColumnTypes: []string{"UNKNOWN", "TEXT", "VARCHAR", "CHAR"},
|
|
||||||
RowLimit: 1000000,
|
|
||||||
}
|
|
||||||
|
|
||||||
queryResultTransformer := postgresQueryResultTransformer{}
|
|
||||||
|
|
||||||
logger := log.New()
|
logger := log.New()
|
||||||
handler, err := sqleng.NewQueryDataHandler(cfg, db, config, &queryResultTransformer, newPostgresMacroEngine(dsInfo.JsonData.Timescaledb),
|
|
||||||
logger)
|
cnnstr := getCnnStr()
|
||||||
|
|
||||||
|
db, handler, err := newPostgres(cfg, dsInfo, cnnstr, logger)
|
||||||
|
|
||||||
|
t.Cleanup((func() {
|
||||||
|
_, err := db.Exec("DROP TABLE tbl")
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = db.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
}))
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -194,6 +194,7 @@ func TestIntegrationPostgres(t *testing.T) {
|
|||||||
|
|
||||||
cfg := setting.NewCfg()
|
cfg := setting.NewCfg()
|
||||||
cfg.DataPath = t.TempDir()
|
cfg.DataPath = t.TempDir()
|
||||||
|
cfg.DataProxyRowLimit = 10000
|
||||||
|
|
||||||
jsonData := sqleng.JsonData{
|
jsonData := sqleng.JsonData{
|
||||||
MaxOpenConns: 0,
|
MaxOpenConns: 0,
|
||||||
@ -208,20 +209,11 @@ func TestIntegrationPostgres(t *testing.T) {
|
|||||||
DecryptedSecureJSONData: map[string]string{},
|
DecryptedSecureJSONData: map[string]string{},
|
||||||
}
|
}
|
||||||
|
|
||||||
config := sqleng.DataPluginConfiguration{
|
|
||||||
DSInfo: dsInfo,
|
|
||||||
MetricColumnTypes: []string{"UNKNOWN", "TEXT", "VARCHAR", "CHAR"},
|
|
||||||
RowLimit: 1000000,
|
|
||||||
}
|
|
||||||
|
|
||||||
queryResultTransformer := postgresQueryResultTransformer{}
|
|
||||||
|
|
||||||
logger := backend.NewLoggerWith("logger", "postgres.test")
|
logger := backend.NewLoggerWith("logger", "postgres.test")
|
||||||
|
|
||||||
db := InitPostgresTestDB(t, jsonData)
|
cnnstr := postgresTestDBConnString()
|
||||||
|
|
||||||
exe, err := sqleng.NewQueryDataHandler(cfg, db, config, &queryResultTransformer, newPostgresMacroEngine(dsInfo.JsonData.Timescaledb),
|
db, exe, err := newPostgres(cfg, dsInfo, cnnstr, logger)
|
||||||
logger)
|
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
@ -1275,15 +1267,10 @@ func TestIntegrationPostgres(t *testing.T) {
|
|||||||
|
|
||||||
t.Run("When row limit set to 1", func(t *testing.T) {
|
t.Run("When row limit set to 1", func(t *testing.T) {
|
||||||
dsInfo := sqleng.DataSourceInfo{}
|
dsInfo := sqleng.DataSourceInfo{}
|
||||||
config := sqleng.DataPluginConfiguration{
|
conf := setting.NewCfg()
|
||||||
DSInfo: dsInfo,
|
conf.DataProxyRowLimit = 1
|
||||||
MetricColumnTypes: []string{"UNKNOWN", "TEXT", "VARCHAR", "CHAR"},
|
_, handler, err := newPostgres(conf, dsInfo, cnnstr, logger)
|
||||||
RowLimit: 1,
|
|
||||||
}
|
|
||||||
|
|
||||||
queryResultTransformer := postgresQueryResultTransformer{}
|
|
||||||
|
|
||||||
handler, err := sqleng.NewQueryDataHandler(setting.NewCfg(), db, config, &queryResultTransformer, newPostgresMacroEngine(false), logger)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
t.Run("When doing a table query that returns 2 rows should limit the result to 1 row", func(t *testing.T) {
|
t.Run("When doing a table query that returns 2 rows should limit the result to 1 row", func(t *testing.T) {
|
||||||
|
Reference in New Issue
Block a user