diff --git a/CHANGELOG.md b/CHANGELOG.md index 89730be2b..0f5f557ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,10 +20,14 @@ Main (unreleased) ### Features - (_Experimental_) Additions to experimental `database_observability.mysql` component: - - `explain_plans` collector now changes schema before returning the connection to the pool (@cristiangreco) + - `explain_plans` + - collector now changes schema before returning the connection to the pool (@cristiangreco) + - collector now passes queries more permissively, expressly to allow queries beginning in `with` (@rgeyer) - (_Experimental_) Additions to experimental `database_observability.postgres` component: - - `explain_plans` added the explain plan collector (@rgeyer) + - `explain_plans` + - added the explain plan collector (@rgeyer) + - collector now passes queries more permissively, expressly to allow queries beginning in `with` (@rgeyer) - add `user` field to wait events within `query_samples` collector (@gaantunes) - rework the query samples collector to buffer per-query execution state across scrapes and emit finalized entries (@gaantunes) diff --git a/internal/component/database_observability/explain_plan_output.go b/internal/component/database_observability/explain_plan_output.go index 627635fbe..68c4e38c0 100644 --- a/internal/component/database_observability/explain_plan_output.go +++ b/internal/component/database_observability/explain_plan_output.go @@ -35,6 +35,44 @@ const ( ExplainPlanJoinAlgorithmNestedLoop ExplainPlanJoinAlgorithm = "nested_loop" ) +// ExplainReservedWordDenyList contains SQL reserved words that indicate write operations +// to the database. These are primarily DML (Data Manipulation Language) and DDL +// (Data Definition Language) commands that modify database state. +// This was extracted from the MySQL and PostgreSQL documentation by Claude Sonnet 4 on Oct 28, 2025 +// and audited by @rgeyer and others in the dbo11y team. +var ExplainReservedWordDenyList = map[string]bool{ + // Data Manipulation Language (DML) - Write operations + "INSERT": true, "UPDATE": true, "DELETE": true, "REPLACE": true, "MERGE": true, "UPSERT": true, + "FOR UPDATE": true, + + // Data Definition Language (DDL) - Schema modifications + "CREATE": true, "ALTER": true, "DROP": true, "RENAME": true, "TRUNCATE": true, + + // Transaction control that can commit writes + "COMMIT": true, "ROLLBACK": true, "SAVEPOINT": true, + + // Database/Schema management + "USE": true, "DATABASE": true, "SCHEMA": true, + + // Index operations + "REINDEX": true, "ANALYZE": true, "OPTIMIZE": true, + "REINDEX TABLE": true, "ANALYZE TABLE": true, "OPTIMIZE TABLE": true, + // User/Permission management + "GRANT": true, "REVOKE": true, "CREATE USER": true, "DROP USER": true, "ALTER USER": true, + + // MySQL specific write operations + "LOAD": true, "DELAYED": true, "IGNORE": true, "ON DUPLICATE KEY": true, + "LOW_PRIORITY": true, "HIGH_PRIORITY": true, "QUICK": true, + "LOCK IN SHARE MODE": true, + + // PostgreSQL specific write operations + "COPY": true, "VACUUM": true, "CLUSTER": true, "LISTEN": true, "NOTIFY": true, "DISCARD": true, + "PREPARE": true, "EXECUTE": true, "DEALLOCATE": true, "RESET": true, "SET": true, + + // dbo11 specific operations we'd like to exclude + "EXPLAIN": true, +} + type ExplainPlanOutput struct { Metadata ExplainPlanMetadataInfo `json:"metadata"` Plan ExplainPlanNode `json:"plan"` diff --git a/internal/component/database_observability/lexer.go b/internal/component/database_observability/lexer.go new file mode 100644 index 000000000..9d7c5ab3f --- /dev/null +++ b/internal/component/database_observability/lexer.go @@ -0,0 +1,63 @@ +package database_observability + +import ( + "fmt" + "strings" + + "github.com/DataDog/go-sqllexer" +) + +// ExtractTableNames extracts the table names from a SQL query +func ExtractTableNames(sql string) ([]string, error) { + normalizer := sqllexer.NewNormalizer( + sqllexer.WithCollectTables(true), + ) + _, metadata, err := normalizer.Normalize(sql, sqllexer.WithDBMS(sqllexer.DBMSPostgres)) + if err != nil { + return nil, fmt.Errorf("failed to normalize SQL: %w", err) + } + + // Return all table names, including those that end with "..." for truncated queries, as we can't know if the table name was truncated or not + return metadata.Tables, nil +} + +// RedactSql obfuscates a SQL query by replacing literals with ? placeholders +func RedactSql(sql string) string { + obfuscatedSql := sqllexer.NewObfuscator().Obfuscate(sql) + return obfuscatedSql +} + +// ContainsReservedKeywords checks if the SQL query contains any reserved keywords +// that indicate write operations, excluding those in string literals or comments +func ContainsReservedKeywords(query string, reservedWords map[string]bool, dbms sqllexer.DBMSType) bool { + // Use the lexer to tokenize the query + lexer := sqllexer.New(query, sqllexer.WithDBMS(dbms)) + + // Scan all tokens + for { + token := lexer.Scan() + if token.Type == sqllexer.EOF { + break + } + if token.Type == sqllexer.ERROR { + // If lexing fails, fall back to simple string search for safety + uppercaseQuery := strings.ToUpper(query) + for word := range reservedWords { + if strings.Contains(uppercaseQuery, word) { + return true + } + } + return false + } + + // Check commands, keywords, and identifiers (since some reserved words might be classified as identifiers) + // but exclude string literals, comments, and other non-SQL-keyword tokens + if token.Type == sqllexer.COMMAND || token.Type == sqllexer.KEYWORD || token.Type == sqllexer.IDENT { + if _, ok := reservedWords[strings.ToUpper(token.Value)]; ok { + return true + } + } + } + + return false +} diff --git a/internal/component/database_observability/postgres/collector/lexer_test.go b/internal/component/database_observability/lexer_test.go similarity index 66% rename from internal/component/database_observability/postgres/collector/lexer_test.go rename to internal/component/database_observability/lexer_test.go index bb824055a..3b62b5328 100644 --- a/internal/component/database_observability/postgres/collector/lexer_test.go +++ b/internal/component/database_observability/lexer_test.go @@ -1,7 +1,9 @@ -package collector +package database_observability import ( "testing" + + "github.com/DataDog/go-sqllexer" ) func TestPgSqlParser_Redact(t *testing.T) { @@ -174,7 +176,7 @@ func TestPgSqlParser_Redact(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := redact(tt.sql) + got := RedactSql(tt.sql) if got != tt.want { t.Errorf("\nRedact()\nGOT:\n%s\nWANT:\n%s", got, tt.want) } @@ -276,7 +278,7 @@ func TestPgSqlParser_ExtractTableNames(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := extractTableNames(tt.sql) + got, err := ExtractTableNames(tt.sql) if (err != nil) != tt.wantErr { t.Errorf("ExtractTableNames() error = %v, wantErr %v", err, tt.wantErr) return @@ -309,3 +311,177 @@ func TestPgSqlParser_ExtractTableNames(t *testing.T) { }) } } + +func TestContainsReservedKeywords(t *testing.T) { + reservedWords := map[string]bool{"INSERT": true, "UPDATE": true, "DELETE": true, "CREATE": true, "DROP": true} + + tests := []struct { + name string + query string + expected bool + }{ + { + name: "actual INSERT statement", + query: "INSERT INTO users (name) VALUES ('John')", + expected: true, + }, + { + name: "SELECT with INSERT in string literal", + query: "SELECT 'INSERT INTO table' FROM users", + expected: false, + }, + { + name: "SELECT with insert in column name", + query: "SELECT insert_date FROM users", + expected: false, + }, + { + name: "SELECT with INSERT in comment", + query: "SELECT * FROM users -- INSERT comment", + expected: false, + }, + { + name: "SELECT with INSERT in block comment", + query: "SELECT * FROM users /* INSERT block comment */", + expected: false, + }, + { + name: "CREATE TABLE statement", + query: "CREATE TABLE users (id INT, name VARCHAR(50))", + expected: true, + }, + { + name: "SELECT with CREATE in quoted identifier", + query: `SELECT "create_date" FROM users`, + expected: false, + }, + { + name: "UPDATE statement", + query: "UPDATE users SET name = 'John' WHERE id = 1", + expected: true, + }, + { + name: "SELECT with update in string", + query: "SELECT * FROM users WHERE status = 'update_pending'", + expected: false, + }, + { + name: "DELETE statement", + query: "DELETE FROM users WHERE id = 1", + expected: true, + }, + { + name: "SELECT with delete in table name", + query: "SELECT * FROM delete_log", + expected: false, + }, + { + name: "plain SELECT statement", + query: "SELECT * FROM users WHERE name = 'John'", + expected: false, + }, + { + name: "complex SELECT with joins", + query: "SELECT u.name, p.title FROM users u JOIN posts p ON u.id = p.user_id", + expected: false, + }, + { + name: "SELECT with reserved word in WHERE clause string", + query: "SELECT * FROM users WHERE description LIKE '%CREATE%'", + expected: false, + }, + { + name: "DROP statement", + query: "DROP TABLE users", + expected: true, + }, + { + name: "SELECT with drop in column alias", + query: "SELECT name AS drop_reason FROM users", + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := ContainsReservedKeywords(tt.query, reservedWords, sqllexer.DBMSMySQL) + if result != tt.expected { + t.Errorf("Expected %v, got %v for query: %s", tt.expected, result, tt.query) + } + }) + } +} + +func TestContainsReservedKeywords_WithActualDenyList(t *testing.T) { + tests := []struct { + name string + query string + expected bool + }{ + { + name: "EXPLAIN statement should be detected", + query: "EXPLAIN SELECT * FROM users", + expected: true, + }, + { + name: "SELECT with explain in string should not be detected", + query: "SELECT 'explain this' FROM users", + expected: false, + }, + { + name: "PREPARE statement should be detected", + query: "PREPARE stmt AS SELECT * FROM users WHERE id = $1", + expected: true, + }, + { + name: "SELECT with prepare in column name should not be detected", + query: "SELECT prepare_date FROM users", + expected: false, + }, + { + name: "SET statement should be detected", + query: "SET search_path TO public", + expected: true, + }, + { + name: "SELECT with set in string should not be detected", + query: "SELECT 'set this value' FROM users", + expected: false, + }, + { + name: "SELECT with reserved word in quoted identifier", + query: `SELECT "insert" FROM users`, + expected: false, + }, + { + name: "SELECT with reserved word in table alias", + query: "SELECT * FROM users AS insert_table", + expected: false, + }, + { + name: "SELECT with legacy LOCK IN SHARE MODE", + query: "SELECT name FROM users LOCK IN SHARE MODE", + expected: false, + }, + { + name: "SELECT with FOR UPDATE", + query: "SELECT name FROM users FOR UPDATE", + expected: true, + }, + } + + for _, tt := range tests { + t.Run("MySQL: "+tt.name, func(t *testing.T) { + result := ContainsReservedKeywords(tt.query, ExplainReservedWordDenyList, sqllexer.DBMSMySQL) + if result != tt.expected { + t.Errorf("Expected %v, got %v for query: %s", tt.expected, result, tt.query) + } + }) + t.Run("PostgreSQL: "+tt.name, func(t *testing.T) { + result := ContainsReservedKeywords(tt.query, ExplainReservedWordDenyList, sqllexer.DBMSPostgres) + if result != tt.expected { + t.Errorf("Expected %v, got %v for query: %s", tt.expected, result, tt.query) + } + }) + } +} diff --git a/internal/component/database_observability/mysql/collector/explain_plans.go b/internal/component/database_observability/mysql/collector/explain_plans.go index b6cd80ab1..dd1f18b1d 100644 --- a/internal/component/database_observability/mysql/collector/explain_plans.go +++ b/internal/component/database_observability/mysql/collector/explain_plans.go @@ -14,6 +14,7 @@ import ( "time" "unicode/utf8" + "github.com/DataDog/go-sqllexer" "github.com/go-kit/log" "go.uber.org/atomic" @@ -568,7 +569,10 @@ func (c *ExplainPlans) fetchExplainPlans(ctx context.Context) error { continue } - if !strings.HasPrefix(strings.ToLower(qi.queryText), "select") { + containsReservedWord := database_observability.ContainsReservedKeywords(qi.queryText, database_observability.ExplainReservedWordDenyList, sqllexer.DBMSMySQL) + + if containsReservedWord { + level.Debug(logger).Log("msg", "skipping query containing reserved word") continue } diff --git a/internal/component/database_observability/mysql/collector/explain_plans_test.go b/internal/component/database_observability/mysql/collector/explain_plans_test.go index f5fb1ab0a..d127f930b 100644 --- a/internal/component/database_observability/mysql/collector/explain_plans_test.go +++ b/internal/component/database_observability/mysql/collector/explain_plans_test.go @@ -1666,6 +1666,43 @@ func TestExplainPlans(t *testing.T) { ) }) + t.Run("passes queries beginning in with", func(t *testing.T) { + lokiClient.Clear() + logBuffer.Reset() + mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + "schema_name", + "digest", + "query_sample_text", + "last_seen", + }).AddRow( + "some_schema", + "some_digest", + "with cte as (select * from some_table where id = 1) select * from cte", + lastSeen, + )) + + mock.ExpectExec("USE `some_schema`").WithoutArgs().WillReturnResult(sqlmock.NewResult(0, 0)) + + mock.ExpectQuery(selectExplainPlanPrefix + "with cte as (select * from some_table where id = 1) select * from cte").WillReturnRows(sqlmock.NewRows([]string{ + "json", + }).AddRow( + []byte(`{"query_block": {"select_id": 1}}`), + )) + + err = c.fetchExplainPlans(t.Context()) + require.NoError(t, err) + + require.NotContains(t, logBuffer.String(), "error") + + require.Eventually( + t, + func() bool { return len(lokiClient.Received()) == 1 }, + 5*time.Second, + 10*time.Millisecond, + "did not receive the explain plan output log message within the timeout", + ) + }) + err = mock.ExpectationsWereMet() require.NoError(t, err) }) diff --git a/internal/component/database_observability/postgres/collector/explain_plan.go b/internal/component/database_observability/postgres/collector/explain_plan.go index 461ae0d6a..7b9172d9a 100644 --- a/internal/component/database_observability/postgres/collector/explain_plan.go +++ b/internal/component/database_observability/postgres/collector/explain_plan.go @@ -13,6 +13,7 @@ import ( "time" "unicode/utf8" + "github.com/DataDog/go-sqllexer" "github.com/blang/semver/v4" "github.com/go-kit/log" "go.uber.org/atomic" @@ -125,7 +126,7 @@ func (p *PlanNode) ToExplainPlanOutputNode() (database_observability.ExplainPlan } if !strings.EqualFold(p.Filter, "") { - redacted := redact(p.Filter) + redacted := database_observability.RedactSql(p.Filter) output.Details.Condition = &redacted } @@ -410,7 +411,10 @@ func (c *ExplainPlan) fetchExplainPlans(ctx context.Context) error { continue } - if !strings.HasPrefix(strings.ToLower(qi.queryText), "select") { + containsReservedWord := database_observability.ContainsReservedKeywords(qi.queryText, database_observability.ExplainReservedWordDenyList, sqllexer.DBMSPostgres) + + if containsReservedWord { + level.Debug(logger).Log("msg", "skipping query containing reserved word") continue } @@ -440,7 +444,7 @@ func (c *ExplainPlan) fetchExplainPlans(ctx context.Context) error { continue } - redactedByteExplainPlanJSON := redact(string(byteExplainPlanJSON)) + redactedByteExplainPlanJSON := database_observability.RedactSql(string(byteExplainPlanJSON)) level.Debug(logger).Log("msg", "db native explain plan", "db_native_explain_plan", base64.StdEncoding.EncodeToString([]byte(redactedByteExplainPlanJSON))) @@ -492,8 +496,9 @@ func (c *ExplainPlan) fetchExplainPlanJSON(ctx context.Context, qi queryInfo) ([ defer conn.Close() preparedStatementName := strings.ReplaceAll(fmt.Sprintf("explain_plan_%s", qi.queryId), "-", "_") - logger := log.With(c.logger, "query_id", qi.queryId, "datname", qi.datname, "preparedStatementName", preparedStatementName) - if _, err := conn.ExecContext(ctx, fmt.Sprintf("PREPARE %s AS %s", preparedStatementName, qi.queryText)); err != nil { + preparedStatementText := fmt.Sprintf("PREPARE %s AS %s", preparedStatementName, qi.queryText) + logger := log.With(c.logger, "query_id", qi.queryId, "datname", qi.datname, "preparedStatementName", preparedStatementName, "preparedStatementText", preparedStatementText) + if _, err := conn.ExecContext(ctx, preparedStatementText); err != nil { return nil, fmt.Errorf("failed to prepare explain plan: %w", err) } diff --git a/internal/component/database_observability/postgres/collector/explain_plan_test.go b/internal/component/database_observability/postgres/collector/explain_plan_test.go index 0a40352b7..083773cf3 100644 --- a/internal/component/database_observability/postgres/collector/explain_plan_test.go +++ b/internal/component/database_observability/postgres/collector/explain_plan_test.go @@ -2814,6 +2814,10 @@ func TestExplainPlanFetchExplainPlans(t *testing.T) { }) t.Run("passes queries beginning in select", func(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.NoError(t, err) + defer db.Close() + lokiClient.Clear() logBuffer.Reset() dbConnFactory := &mockDbConnectionFactory{ @@ -2876,6 +2880,73 @@ func TestExplainPlanFetchExplainPlans(t *testing.T) { require.NotContains(t, logBuffer.String(), "error") }) + t.Run("passes queries beginning in with", func(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.NoError(t, err) + defer db.Close() + + lokiClient.Clear() + logBuffer.Reset() + dbConnFactory := &mockDbConnectionFactory{ + db: db, + Mock: &mock, + InstantiationCount: 0, + } + explainPlan = &ExplainPlan{ + dbConnection: db, + dbDSN: "postgres://user:pass@host:1234/database", + dbConnectionFactory: dbConnFactory.NewDBConnection, + dbVersion: post17ver, + queryCache: map[string]*queryInfo{ + "testdb123456": { + datname: "testdb", + queryId: "123456", + queryText: "with cte as (select * from some_table where id = $1) select * from cte", + calls: int64(10), + callsReset: time.Now(), + }, + }, + queryDenylist: map[string]*queryInfo{}, + finishedQueryCache: map[string]*queryInfo{}, + excludeSchemas: []string{}, + perScrapeRatio: 1.0, + logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), + currentBatchSize: 1, + entryHandler: lokiClient, + } + + archive, err := txtar.ParseFile("./testdata/explain_plan/complex_aggregation_with_case.txtar") + require.NoError(t, err) + require.Equal(t, 1, len(archive.Files)) + jsonFile := archive.Files[0] + require.Equal(t, "complex_aggregation_with_case.json", jsonFile.Name) + jsonData := jsonFile.Data + + mock.ExpectExec("PREPARE explain_plan_123456 AS with cte as (select * from some_table where id = $1) select * from cte").WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("SET search_path TO testdb, public").WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("SET plan_cache_mode = force_generic_plan").WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectQuery("EXPLAIN (FORMAT JSON) EXECUTE explain_plan_123456(null)").WillReturnRows(sqlmock.NewRows([]string{"json"}).AddRow(jsonData)) + mock.ExpectExec("DEALLOCATE explain_plan_123456").WillReturnResult(sqlmock.NewResult(0, 1)) + + err = explainPlan.fetchExplainPlans(t.Context()) + require.NoError(t, err) + + assert.NoError(t, mock.ExpectationsWereMet()) + assert.Equal(t, 1, dbConnFactory.InstantiationCount) + + require.Eventually( + t, + func() bool { + return len(lokiClient.Received()) == 1 + }, + 5*time.Second, + 10*time.Millisecond, + "did not receive the explain plan output log message within the timeout", + ) + + require.NotContains(t, logBuffer.String(), "error") + }) + err = mock.ExpectationsWereMet() require.NoError(t, err) }) diff --git a/internal/component/database_observability/postgres/collector/lexer.go b/internal/component/database_observability/postgres/collector/lexer.go deleted file mode 100644 index ea153ebee..000000000 --- a/internal/component/database_observability/postgres/collector/lexer.go +++ /dev/null @@ -1,27 +0,0 @@ -package collector - -import ( - "fmt" - - "github.com/DataDog/go-sqllexer" -) - -// extractTableNames extracts the table names from a SQL query -func extractTableNames(sql string) ([]string, error) { - normalizer := sqllexer.NewNormalizer( - sqllexer.WithCollectTables(true), - ) - _, metadata, err := normalizer.Normalize(sql, sqllexer.WithDBMS(sqllexer.DBMSPostgres)) - if err != nil { - return nil, fmt.Errorf("failed to normalize SQL: %w", err) - } - - // Return all table names, including those that end with "..." for truncated queries, as we can't know if the table name was truncated or not - return metadata.Tables, nil -} - -// redact obfuscates a SQL query by replacing literals with ? placeholders -func redact(sql string) string { - obfuscatedSql := sqllexer.NewObfuscator().Obfuscate(sql) - return obfuscatedSql -} diff --git a/internal/component/database_observability/postgres/collector/query_details.go b/internal/component/database_observability/postgres/collector/query_details.go index 8a9e9d541..4cf411667 100644 --- a/internal/component/database_observability/postgres/collector/query_details.go +++ b/internal/component/database_observability/postgres/collector/query_details.go @@ -166,7 +166,7 @@ func (c QueryDetails) fetchAndAssociate(ctx context.Context) error { func (c QueryDetails) tryTokenizeTableNames(sqlText string) ([]string, error) { sqlText = strings.TrimSuffix(sqlText, "...") - tables, err := extractTableNames(sqlText) + tables, err := database_observability.ExtractTableNames(sqlText) if err != nil { return nil, fmt.Errorf("failed to extract table names: %w", err) }