mirror of
https://github.com/grafana/grafana.git
synced 2025-07-31 18:02:18 +08:00
postgres: socks proxy: use plugin-sdk (#82376)
This commit is contained in:
@ -13,7 +13,6 @@ import (
|
|||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
||||||
sdkproxy "github.com/grafana/grafana-plugin-sdk-go/backend/proxy"
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
|
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
@ -21,7 +20,6 @@ import (
|
|||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||||
"github.com/grafana/grafana/pkg/setting"
|
"github.com/grafana/grafana/pkg/setting"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/sqleng"
|
"github.com/grafana/grafana/pkg/tsdb/sqleng"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/sqleng/proxyutil"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func ProvideService(cfg *setting.Cfg) *Service {
|
func ProvideService(cfg *setting.Cfg) *Service {
|
||||||
@ -57,23 +55,28 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
|
|||||||
return dsInfo.QueryData(ctx, req)
|
return dsInfo.QueryData(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPostgres(cfg *setting.Cfg, dsInfo sqleng.DataSourceInfo, cnnstr string, logger log.Logger, settings backend.DataSourceInstanceSettings) (*sql.DB, *sqleng.DataSourceHandler, error) {
|
func newPostgres(ctx context.Context, cfg *setting.Cfg, dsInfo sqleng.DataSourceInfo, cnnstr string, logger log.Logger, settings backend.DataSourceInstanceSettings) (*sql.DB, *sqleng.DataSourceHandler, error) {
|
||||||
connector, err := pq.NewConnector(cnnstr)
|
connector, err := pq.NewConnector(cnnstr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("postgres connector creation failed", "error", err)
|
logger.Error("postgres connector creation failed", "error", err)
|
||||||
return nil, nil, fmt.Errorf("postgres connector creation failed")
|
return nil, nil, fmt.Errorf("postgres connector creation failed")
|
||||||
}
|
}
|
||||||
|
|
||||||
// use the proxy-dialer if the secure socks proxy is enabled
|
proxyClient, err := settings.ProxyClient(ctx)
|
||||||
proxyOpts := proxyutil.GetSQLProxyOptions(cfg.SecureSocksDSProxy, dsInfo, settings.Name, settings.Type)
|
if err != nil {
|
||||||
if sdkproxy.New(proxyOpts).SecureSocksProxyEnabled() {
|
logger.Error("postgres proxy creation failed", "error", err)
|
||||||
dialer, err := newPostgresProxyDialer(proxyOpts)
|
return nil, nil, fmt.Errorf("postgres proxy creation failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
if proxyClient.SecureSocksProxyEnabled() {
|
||||||
|
dialer, err := proxyClient.NewSecureSocksProxyContextDialer()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("postgres proxy creation failed", "error", err)
|
logger.Error("postgres proxy creation failed", "error", err)
|
||||||
return nil, nil, fmt.Errorf("postgres proxy creation failed")
|
return nil, nil, fmt.Errorf("postgres proxy creation failed")
|
||||||
}
|
}
|
||||||
|
postgresDialer := newPostgresProxyDialer(dialer)
|
||||||
// update the postgres dialer with the proxy dialer
|
// update the postgres dialer with the proxy dialer
|
||||||
connector.Dialer(dialer)
|
connector.Dialer(postgresDialer)
|
||||||
}
|
}
|
||||||
|
|
||||||
config := sqleng.DataPluginConfiguration{
|
config := sqleng.DataPluginConfiguration{
|
||||||
@ -103,7 +106,7 @@ func newPostgres(cfg *setting.Cfg, dsInfo sqleng.DataSourceInfo, cnnstr string,
|
|||||||
|
|
||||||
func (s *Service) newInstanceSettings(cfg *setting.Cfg) datasource.InstanceFactoryFunc {
|
func (s *Service) newInstanceSettings(cfg *setting.Cfg) datasource.InstanceFactoryFunc {
|
||||||
logger := s.logger
|
logger := s.logger
|
||||||
return func(_ context.Context, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
return func(ctx context.Context, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
||||||
logger.Debug("Creating Postgres query endpoint")
|
logger.Debug("Creating Postgres query endpoint")
|
||||||
jsonData := sqleng.JsonData{
|
jsonData := sqleng.JsonData{
|
||||||
MaxOpenConns: cfg.SqlDatasourceMaxOpenConnsDefault,
|
MaxOpenConns: cfg.SqlDatasourceMaxOpenConnsDefault,
|
||||||
@ -140,7 +143,7 @@ func (s *Service) newInstanceSettings(cfg *setting.Cfg) datasource.InstanceFacto
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, handler, err := newPostgres(cfg, dsInfo, cnnstr, logger, settings)
|
_, handler, err := newPostgres(ctx, cfg, dsInfo, cnnstr, logger, settings)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("Failed connecting to Postgres", "err", err)
|
logger.Error("Failed connecting to Postgres", "err", err)
|
||||||
|
@ -164,7 +164,7 @@ func TestIntegrationPostgresSnapshots(t *testing.T) {
|
|||||||
|
|
||||||
cnnstr := getCnnStr()
|
cnnstr := getCnnStr()
|
||||||
|
|
||||||
db, handler, err := newPostgres(cfg, dsInfo, cnnstr, logger, backend.DataSourceInstanceSettings{})
|
db, handler, err := newPostgres(context.Background(), cfg, dsInfo, cnnstr, logger, backend.DataSourceInstanceSettings{})
|
||||||
|
|
||||||
t.Cleanup((func() {
|
t.Cleanup((func() {
|
||||||
_, err := db.Exec("DROP TABLE tbl")
|
_, err := db.Exec("DROP TABLE tbl")
|
||||||
|
@ -212,7 +212,7 @@ func TestIntegrationPostgres(t *testing.T) {
|
|||||||
|
|
||||||
cnnstr := postgresTestDBConnString()
|
cnnstr := postgresTestDBConnString()
|
||||||
|
|
||||||
db, exe, err := newPostgres(cfg, dsInfo, cnnstr, logger, backend.DataSourceInstanceSettings{})
|
db, exe, err := newPostgres(context.Background(), cfg, dsInfo, cnnstr, logger, backend.DataSourceInstanceSettings{})
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
@ -1268,7 +1268,7 @@ func TestIntegrationPostgres(t *testing.T) {
|
|||||||
dsInfo := sqleng.DataSourceInfo{}
|
dsInfo := sqleng.DataSourceInfo{}
|
||||||
conf := setting.NewCfg()
|
conf := setting.NewCfg()
|
||||||
conf.DataProxyRowLimit = 1
|
conf.DataProxyRowLimit = 1
|
||||||
_, handler, err := newPostgres(conf, dsInfo, cnnstr, logger, backend.DataSourceInstanceSettings{})
|
_, handler, err := newPostgres(context.Background(), conf, dsInfo, cnnstr, logger, backend.DataSourceInstanceSettings{})
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -5,21 +5,13 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
sdkproxy "github.com/grafana/grafana-plugin-sdk-go/backend/proxy"
|
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
"golang.org/x/net/proxy"
|
"golang.org/x/net/proxy"
|
||||||
)
|
)
|
||||||
|
|
||||||
// newPostgresProxyDriver updates the dialer for a postgres connector with a dialer that proxies connections through the secure socks proxy
|
// we wrap the proxy.Dialer to become dialer that the postgres module accepts
|
||||||
// and returns a new postgres driver to register
|
func newPostgresProxyDialer(dialer proxy.Dialer) pq.Dialer {
|
||||||
func newPostgresProxyDialer(opts *sdkproxy.Options) (pq.Dialer, error) {
|
return &postgresProxyDialer{d: dialer}
|
||||||
dialer, err := sdkproxy.New(opts).NewSecureSocksProxyContextDialer()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// update the postgres dialer with the proxy dialer
|
|
||||||
return &postgresProxyDialer{d: dialer}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ pq.Dialer = (&postgresProxyDialer{})
|
var _ pq.Dialer = (&postgresProxyDialer{})
|
||||||
|
@ -3,40 +3,37 @@ package postgres
|
|||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/setting"
|
|
||||||
"github.com/grafana/grafana/pkg/tsdb/sqleng"
|
|
||||||
"github.com/grafana/grafana/pkg/tsdb/sqleng/proxyutil"
|
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/net/proxy"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type testDialer struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *testDialer) Dial(network, addr string) (c net.Conn, err error) {
|
||||||
|
return nil, fmt.Errorf("test-dialer is not functional")
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ proxy.Dialer = (&testDialer{})
|
||||||
|
|
||||||
func TestPostgresProxyDriver(t *testing.T) {
|
func TestPostgresProxyDriver(t *testing.T) {
|
||||||
settings := proxyutil.SetupTestSecureSocksProxySettings(t)
|
|
||||||
proxySettings := setting.SecureSocksDSProxySettings{
|
|
||||||
Enabled: true,
|
|
||||||
ClientCert: settings.ClientCert,
|
|
||||||
ClientKey: settings.ClientKey,
|
|
||||||
RootCA: settings.RootCA,
|
|
||||||
ProxyAddress: settings.ProxyAddress,
|
|
||||||
ServerName: settings.ServerName,
|
|
||||||
}
|
|
||||||
opts := proxyutil.GetSQLProxyOptions(proxySettings, sqleng.DataSourceInfo{UID: "1", JsonData: sqleng.JsonData{SecureDSProxy: true}}, "pg", "postgres")
|
|
||||||
dbURL := "localhost:5432"
|
dbURL := "localhost:5432"
|
||||||
cnnstr := fmt.Sprintf("postgres://auser:password@%s/db?sslmode=disable", dbURL)
|
cnnstr := fmt.Sprintf("postgres://auser:password@%s/db?sslmode=disable", dbURL)
|
||||||
|
|
||||||
t.Run("Connector should use dialer context that routes through the socks proxy to db", func(t *testing.T) {
|
t.Run("Connector should use dialer context that routes through the socks proxy to db", func(t *testing.T) {
|
||||||
connector, err := pq.NewConnector(cnnstr)
|
connector, err := pq.NewConnector(cnnstr)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
dialer, err := newPostgresProxyDialer(opts)
|
dialer := newPostgresProxyDialer(&testDialer{})
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
connector.Dialer(dialer)
|
connector.Dialer(dialer)
|
||||||
|
|
||||||
db := sql.OpenDB(connector)
|
db := sql.OpenDB(connector)
|
||||||
err = db.Ping()
|
err = db.Ping()
|
||||||
|
|
||||||
require.Contains(t, err.Error(), fmt.Sprintf("socks connect %s %s->%s", "tcp", settings.ProxyAddress, dbURL))
|
require.Contains(t, err.Error(), "test-dialer is not functional")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user