From a696fc8b2bf80df536ed591bc51b01ec0c86c7a7 Mon Sep 17 00:00:00 2001 From: Alexander Emelin Date: Thu, 30 Sep 2021 19:29:32 +0300 Subject: [PATCH] Live: pipeline rule crud (file-based, still for MVP) (#39238) --- pkg/api/api.go | 3 + pkg/services/live/live.go | 152 ++++++++---------- pkg/services/live/pipeline/config.go | 60 +++++++ .../live/pipeline/converter_json_auto.go | 2 +- pkg/services/live/pipeline/pattern/pattern.go | 25 +++ .../live/pipeline/pattern/pattern_test.go | 37 +++++ pkg/services/live/pipeline/registry.go | 91 +++++++++++ .../pipeline/rule_cache_segmented_test.go | 17 +- pkg/services/live/pipeline/storage_file.go | 132 ++++++++++++++- 9 files changed, 424 insertions(+), 95 deletions(-) create mode 100644 pkg/services/live/pipeline/pattern/pattern.go create mode 100644 pkg/services/live/pipeline/pattern/pattern_test.go create mode 100644 pkg/services/live/pipeline/registry.go diff --git a/pkg/api/api.go b/pkg/api/api.go index 917512b9174..26329adf021 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -438,6 +438,9 @@ func (hs *HTTPServer) registerRoutes() { // POST Live data to be processed according to channel rules. liveRoute.Post("/push/:streamId/:path", hs.LivePushGateway.HandlePath) liveRoute.Get("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesListHTTP), reqOrgAdmin) + liveRoute.Post("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesPostHTTP), reqOrgAdmin) + liveRoute.Put("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesPutHTTP), reqOrgAdmin) + liveRoute.Delete("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesDeleteHTTP), reqOrgAdmin) liveRoute.Get("/pipeline-entities", routing.Wrap(hs.Live.HandlePipelineEntitiesListHTTP), reqOrgAdmin) liveRoute.Get("/remote-write-backends", routing.Wrap(hs.Live.HandleRemoteWriteBackendsListHTTP), reqOrgAdmin) } diff --git a/pkg/services/live/live.go b/pkg/services/live/live.go index f1e46000777..db84e87a372 100644 --- a/pkg/services/live/live.go +++ b/pkg/services/live/live.go @@ -2,8 +2,10 @@ package live import ( "context" + "encoding/json" "errors" "fmt" + "io/ioutil" "net/http" "net/url" "os" @@ -883,95 +885,77 @@ func (g *GrafanaLive) HandleChannelRulesListHTTP(c *models.ReqContext) response. }) } -type configInfo struct { - Type string `json:"type"` - Description string `json:"description"` - Example interface{} `json:"example,omitempty"` +// HandleChannelRulesPostHTTP ... +func (g *GrafanaLive) HandleChannelRulesPostHTTP(c *models.ReqContext) response.Response { + body, err := ioutil.ReadAll(c.Req.Body) + if err != nil { + return response.Error(http.StatusInternalServerError, "Error reading body", err) + } + var rule pipeline.ChannelRule + err = json.Unmarshal(body, &rule) + if err != nil { + return response.Error(http.StatusBadRequest, "Error decoding channel rule", err) + } + result, err := g.channelRuleStorage.CreateChannelRule(c.Req.Context(), c.OrgId, rule) + if err != nil { + return response.Error(http.StatusInternalServerError, "Failed to create channel rule", err) + } + return response.JSON(http.StatusOK, util.DynMap{ + "rule": result, + }) +} + +// HandleChannelRulesPutHTTP ... +func (g *GrafanaLive) HandleChannelRulesPutHTTP(c *models.ReqContext) response.Response { + body, err := ioutil.ReadAll(c.Req.Body) + if err != nil { + return response.Error(http.StatusInternalServerError, "Error reading body", err) + } + var rule pipeline.ChannelRule + err = json.Unmarshal(body, &rule) + if err != nil { + return response.Error(http.StatusBadRequest, "Error decoding channel rule", err) + } + if rule.Pattern == "" { + return response.Error(http.StatusBadRequest, "Rule pattern required", nil) + } + rule, err = g.channelRuleStorage.UpdateChannelRule(c.Req.Context(), c.OrgId, rule) + if err != nil { + return response.Error(http.StatusInternalServerError, "Failed to update channel rule", err) + } + return response.JSON(http.StatusOK, util.DynMap{ + "rule": rule, + }) +} + +// HandleChannelRulesDeleteHTTP ... +func (g *GrafanaLive) HandleChannelRulesDeleteHTTP(c *models.ReqContext) response.Response { + body, err := ioutil.ReadAll(c.Req.Body) + if err != nil { + return response.Error(http.StatusInternalServerError, "Error reading body", err) + } + var rule pipeline.ChannelRule + err = json.Unmarshal(body, &rule) + if err != nil { + return response.Error(http.StatusBadRequest, "Error decoding channel rule", err) + } + if rule.Pattern == "" { + return response.Error(http.StatusBadRequest, "Rule pattern required", nil) + } + err = g.channelRuleStorage.DeleteChannelRule(c.Req.Context(), c.OrgId, rule.Pattern) + if err != nil { + return response.Error(http.StatusInternalServerError, "Failed to delete channel rule", err) + } + return response.JSON(http.StatusOK, util.DynMap{}) } // HandlePipelineEntitiesListHTTP ... func (g *GrafanaLive) HandlePipelineEntitiesListHTTP(_ *models.ReqContext) response.Response { return response.JSON(http.StatusOK, util.DynMap{ - "subscribers": []configInfo{ - { - Type: pipeline.SubscriberTypeBuiltin, - Description: "apply builtin feature subscribe logic", - }, - { - Type: pipeline.SubscriberTypeManagedStream, - Description: "apply managed stream subscribe logic", - }, - { - Type: pipeline.SubscriberTypeMultiple, - Description: "apply multiple subscribers", - }, - { - Type: pipeline.SubscriberTypeAuthorizeRole, - Description: "authorize user role", - }, - }, - "outputs": []configInfo{ - { - Type: pipeline.OutputTypeManagedStream, - Description: "Only send schema when structure changes. Note this also requires a matching subscriber", - Example: pipeline.ManagedStreamOutputConfig{}, - }, - { - Type: pipeline.OutputTypeMultiple, - Description: "Send the output to multiple destinations", - Example: pipeline.MultipleOutputterConfig{}, - }, - { - Type: pipeline.OutputTypeConditional, - Description: "send to an output depending on frame values", - Example: pipeline.ConditionalOutputConfig{}, - }, - { - Type: pipeline.OutputTypeRedirect, - }, - { - Type: pipeline.OutputTypeThreshold, - }, - { - Type: pipeline.OutputTypeChangeLog, - }, - { - Type: pipeline.OutputTypeRemoteWrite, - }, - }, - "converters": []configInfo{ - { - Type: pipeline.ConverterTypeJsonAuto, - }, - { - Type: pipeline.ConverterTypeJsonExact, - }, - { - Type: pipeline.ConverterTypeInfluxAuto, - Description: "accept influx line protocol", - Example: pipeline.AutoInfluxConverterConfig{}, - }, - { - Type: pipeline.ConverterTypeJsonFrame, - }, - }, - "processors": []configInfo{ - { - Type: pipeline.ProcessorTypeKeepFields, - Description: "list the fields that should stay", - Example: pipeline.KeepFieldsProcessorConfig{}, - }, - { - Type: pipeline.ProcessorTypeDropFields, - Description: "list the fields that should be removed", - Example: pipeline.DropFieldsProcessorConfig{}, - }, - { - Type: pipeline.ProcessorTypeMultiple, - Description: "apply multiple processors", - Example: pipeline.MultipleProcessorConfig{}, - }, - }, + "subscribers": pipeline.SubscribersRegistry, + "outputs": pipeline.OutputsRegistry, + "converters": pipeline.ConvertersRegistry, + "processors": pipeline.ProcessorsRegistry, }) } diff --git a/pkg/services/live/pipeline/config.go b/pkg/services/live/pipeline/config.go index 949f24ca26d..ccec43b698a 100644 --- a/pkg/services/live/pipeline/config.go +++ b/pkg/services/live/pipeline/config.go @@ -4,7 +4,10 @@ import ( "context" "fmt" + "github.com/grafana/grafana/pkg/services/live/pipeline/tree" + "github.com/grafana/grafana/pkg/services/live/managedstream" + "github.com/grafana/grafana/pkg/services/live/pipeline/pattern" "github.com/centrifugal/centrifuge" ) @@ -79,6 +82,43 @@ type ChannelRule struct { Settings ChannelRuleSettings `json:"settings"` } +func (r ChannelRule) Valid() (bool, string) { + ok, reason := pattern.Valid(r.Pattern) + if !ok { + return false, fmt.Sprintf("invalid pattern: %s", reason) + } + if r.Settings.Converter != nil { + if !typeRegistered(r.Settings.Converter.Type, ConvertersRegistry) { + return false, fmt.Sprintf("unknown converter type: %s", r.Settings.Converter.Type) + } + } + if r.Settings.Subscriber != nil { + if !typeRegistered(r.Settings.Subscriber.Type, SubscribersRegistry) { + return false, fmt.Sprintf("unknown subscriber type: %s", r.Settings.Subscriber.Type) + } + } + if r.Settings.Processor != nil { + if !typeRegistered(r.Settings.Processor.Type, ProcessorsRegistry) { + return false, fmt.Sprintf("unknown processor type: %s", r.Settings.Processor.Type) + } + } + if r.Settings.Outputter != nil { + if !typeRegistered(r.Settings.Outputter.Type, OutputsRegistry) { + return false, fmt.Sprintf("unknown output type: %s", r.Settings.Outputter.Type) + } + } + return true, "" +} + +func typeRegistered(entityType string, registry []EntityInfo) bool { + for _, info := range registry { + if info.Type == entityType { + return true + } + } + return false +} + type RemoteWriteBackend struct { OrgId int64 `json:"-"` UID string `json:"uid"` @@ -93,6 +133,23 @@ type ChannelRules struct { Rules []ChannelRule `json:"rules"` } +func checkRulesValid(orgID int64, rules []ChannelRule) (ok bool, reason string) { + t := tree.New() + defer func() { + if r := recover(); r != nil { + reason = fmt.Sprintf("%v", r) + ok = false + } + }() + for _, rule := range rules { + if rule.OrgId == orgID || (rule.OrgId == 0 && orgID == 1) { + t.AddRoute("/"+rule.Pattern, struct{}{}) + } + } + ok = true + return ok, reason +} + type MultipleConditionCheckerConfig struct { Type ConditionType `json:"type"` Conditions []ConditionCheckerConfig `json:"conditions"` @@ -113,6 +170,9 @@ type ConditionCheckerConfig struct { type RuleStorage interface { ListRemoteWriteBackends(_ context.Context, orgID int64) ([]RemoteWriteBackend, error) ListChannelRules(_ context.Context, orgID int64) ([]ChannelRule, error) + CreateChannelRule(_ context.Context, orgID int64, rule ChannelRule) (ChannelRule, error) + UpdateChannelRule(_ context.Context, orgID int64, rule ChannelRule) (ChannelRule, error) + DeleteChannelRule(_ context.Context, orgID int64, pattern string) error } type StorageRuleBuilder struct { diff --git a/pkg/services/live/pipeline/converter_json_auto.go b/pkg/services/live/pipeline/converter_json_auto.go index 110631505ef..7b599eea33d 100644 --- a/pkg/services/live/pipeline/converter_json_auto.go +++ b/pkg/services/live/pipeline/converter_json_auto.go @@ -6,7 +6,7 @@ import ( ) type AutoJsonConverterConfig struct { - FieldTips map[string]Field `json:"fieldTips"` + FieldTips map[string]Field `json:"fieldTips,omitempty"` } type AutoJsonConverter struct { diff --git a/pkg/services/live/pipeline/pattern/pattern.go b/pkg/services/live/pipeline/pattern/pattern.go new file mode 100644 index 00000000000..9c4ced3e148 --- /dev/null +++ b/pkg/services/live/pipeline/pattern/pattern.go @@ -0,0 +1,25 @@ +package pattern + +import ( + "fmt" + "regexp" + "strings" +) + +var patternReString = `^[A-z0-9_\-/=.:*]*$` +var patternRe = regexp.MustCompile(patternReString) + +var maxPatternLength = 160 + +func Valid(pattern string) (bool, string) { + if strings.HasPrefix(pattern, "/") { + return false, "pattern can't start with /" + } + if !patternRe.MatchString(pattern) { + return false, fmt.Sprintf("pattern format error, must match %s", patternReString) + } + if len(pattern) > maxPatternLength { + return false, fmt.Sprintf("pattern max length exceeded (%d)", maxPatternLength) + } + return true, "" +} diff --git a/pkg/services/live/pipeline/pattern/pattern_test.go b/pkg/services/live/pipeline/pattern/pattern_test.go new file mode 100644 index 00000000000..92566c63935 --- /dev/null +++ b/pkg/services/live/pipeline/pattern/pattern_test.go @@ -0,0 +1,37 @@ +package pattern + +import "testing" + +func TestValid(t *testing.T) { + type args struct { + pattern string + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "valid", + args: args{ + pattern: "xxx", + }, + want: true, + }, + { + name: "invalid", + args: args{ + pattern: "/xxx", + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, _ := Valid(tt.args.pattern) + if got != tt.want { + t.Errorf("Valid() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/services/live/pipeline/registry.go b/pkg/services/live/pipeline/registry.go new file mode 100644 index 00000000000..18c0ab17474 --- /dev/null +++ b/pkg/services/live/pipeline/registry.go @@ -0,0 +1,91 @@ +package pipeline + +type EntityInfo struct { + Type string `json:"type"` + Description string `json:"description"` + Example interface{} `json:"example,omitempty"` +} + +var SubscribersRegistry = []EntityInfo{ + { + Type: SubscriberTypeBuiltin, + Description: "apply builtin feature subscribe logic", + }, + { + Type: SubscriberTypeManagedStream, + Description: "apply managed stream subscribe logic", + }, + { + Type: SubscriberTypeMultiple, + Description: "apply multiple subscribers", + }, + { + Type: SubscriberTypeAuthorizeRole, + Description: "authorize user role", + }, +} + +var OutputsRegistry = []EntityInfo{ + { + Type: OutputTypeManagedStream, + Description: "Only send schema when structure changes. Note this also requires a matching subscriber", + Example: ManagedStreamOutputConfig{}, + }, + { + Type: OutputTypeMultiple, + Description: "Send the output to multiple destinations", + Example: MultipleOutputterConfig{}, + }, + { + Type: OutputTypeConditional, + Description: "send to an output depending on frame values", + Example: ConditionalOutputConfig{}, + }, + { + Type: OutputTypeRedirect, + }, + { + Type: OutputTypeThreshold, + }, + { + Type: OutputTypeChangeLog, + }, + { + Type: OutputTypeRemoteWrite, + }, +} + +var ConvertersRegistry = []EntityInfo{ + { + Type: ConverterTypeJsonAuto, + }, + { + Type: ConverterTypeJsonExact, + }, + { + Type: ConverterTypeInfluxAuto, + Description: "accept influx line protocol", + Example: AutoInfluxConverterConfig{}, + }, + { + Type: ConverterTypeJsonFrame, + }, +} + +var ProcessorsRegistry = []EntityInfo{ + { + Type: ProcessorTypeKeepFields, + Description: "list the fields that should stay", + Example: KeepFieldsProcessorConfig{}, + }, + { + Type: ProcessorTypeDropFields, + Description: "list the fields that should be removed", + Example: DropFieldsProcessorConfig{}, + }, + { + Type: ProcessorTypeMultiple, + Description: "apply multiple processors", + Example: MultipleProcessorConfig{}, + }, +} diff --git a/pkg/services/live/pipeline/rule_cache_segmented_test.go b/pkg/services/live/pipeline/rule_cache_segmented_test.go index 9b227ab2912..23462274c79 100644 --- a/pkg/services/live/pipeline/rule_cache_segmented_test.go +++ b/pkg/services/live/pipeline/rule_cache_segmented_test.go @@ -21,8 +21,14 @@ func (t *testBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelRule Pattern: "stream/telegraf/:metric", }, { - OrgId: 1, - Pattern: "stream/telegraf/:metric/:extra", + OrgId: 1, + Pattern: "stream/telegraf/:metric/:extra", + Outputter: NewRedirectOutput(RedirectOutputConfig{}), + }, + { + OrgId: 1, + Pattern: "stream/boom:er", + Converter: NewExactJsonConverter(ExactJsonConverterConfig{}), }, }, nil } @@ -42,7 +48,12 @@ func TestStorage_Get(t *testing.T) { rule, ok, err = s.Get(1, "stream/telegraf/mem/rss") require.NoError(t, err) require.True(t, ok) - require.Nil(t, rule.Converter) + require.Equal(t, OutputTypeRedirect, rule.Outputter.Type()) + + rule, ok, err = s.Get(1, "stream/booms") + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, ConverterTypeJsonExact, rule.Converter.Type()) } func BenchmarkRuleGet(b *testing.B) { diff --git a/pkg/services/live/pipeline/storage_file.go b/pkg/services/live/pipeline/storage_file.go index 384f298921d..02431dbb0c9 100644 --- a/pkg/services/live/pipeline/storage_file.go +++ b/pkg/services/live/pipeline/storage_file.go @@ -3,8 +3,10 @@ package pipeline import ( "context" "encoding/json" + "errors" "fmt" "io/ioutil" + "os" "path/filepath" ) @@ -33,14 +35,9 @@ func (f *FileStorage) ListRemoteWriteBackends(_ context.Context, orgID int64) ([ } func (f *FileStorage) ListChannelRules(_ context.Context, orgID int64) ([]ChannelRule, error) { - ruleBytes, err := ioutil.ReadFile(filepath.Join(f.DataPath, "pipeline", "live-channel-rules.json")) + channelRules, err := f.readRules() if err != nil { - return nil, fmt.Errorf("can't read ./data/live-channel-rules.json file: %w", err) - } - var channelRules ChannelRules - err = json.Unmarshal(ruleBytes, &channelRules) - if err != nil { - return nil, fmt.Errorf("can't unmarshal live-channel-rules.json data: %w", err) + return nil, fmt.Errorf("can't read channel rules: %w", err) } var rules []ChannelRule for _, r := range channelRules.Rules { @@ -50,3 +47,124 @@ func (f *FileStorage) ListChannelRules(_ context.Context, orgID int64) ([]Channe } return rules, nil } + +func (f *FileStorage) CreateChannelRule(_ context.Context, orgID int64, rule ChannelRule) (ChannelRule, error) { + channelRules, err := f.readRules() + if err != nil { + return rule, fmt.Errorf("can't read channel rules: %w", err) + } + ok, reason := rule.Valid() + if !ok { + return rule, fmt.Errorf("invalid channel rule: %s", reason) + } + for _, existingRule := range channelRules.Rules { + if patternMatch(orgID, rule.Pattern, existingRule) { + return rule, fmt.Errorf("pattern already exists in org: %s", rule.Pattern) + } + } + channelRules.Rules = append(channelRules.Rules, rule) + err = f.saveChannelRules(orgID, channelRules) + return rule, err +} + +func patternMatch(orgID int64, pattern string, existingRule ChannelRule) bool { + return pattern == existingRule.Pattern && (existingRule.OrgId == orgID || (existingRule.OrgId == 0 && orgID == 1)) +} + +func (f *FileStorage) UpdateChannelRule(ctx context.Context, orgID int64, rule ChannelRule) (ChannelRule, error) { + channelRules, err := f.readRules() + if err != nil { + return rule, fmt.Errorf("can't read channel rules: %w", err) + } + + ok, reason := rule.Valid() + if !ok { + return rule, fmt.Errorf("invalid channel rule: %s", reason) + } + + index := -1 + + for i, existingRule := range channelRules.Rules { + if patternMatch(orgID, rule.Pattern, existingRule) { + index = i + break + } + } + if index > -1 { + channelRules.Rules[index] = rule + } else { + return f.CreateChannelRule(ctx, orgID, rule) + } + + err = f.saveChannelRules(orgID, channelRules) + return rule, err +} + +func removeChannelRuleByIndex(s []ChannelRule, index int) []ChannelRule { + return append(s[:index], s[index+1:]...) +} + +func (f *FileStorage) ruleFilePath() string { + return filepath.Join(f.DataPath, "pipeline", "live-channel-rules.json") +} + +func (f *FileStorage) readRules() (ChannelRules, error) { + ruleFile := f.ruleFilePath() + // Safe to ignore gosec warning G304. + // nolint:gosec + ruleBytes, err := ioutil.ReadFile(ruleFile) + if err != nil { + return ChannelRules{}, fmt.Errorf("can't read ./data/live-channel-rules.json file: %w", err) + } + var channelRules ChannelRules + err = json.Unmarshal(ruleBytes, &channelRules) + if err != nil { + return ChannelRules{}, fmt.Errorf("can't unmarshal live-channel-rules.json data: %w", err) + } + return channelRules, nil +} + +func (f *FileStorage) saveChannelRules(orgID int64, rules ChannelRules) error { + ok, reason := checkRulesValid(orgID, rules.Rules) + if !ok { + return errors.New(reason) + } + ruleFile := f.ruleFilePath() + // Safe to ignore gosec warning G304. + // nolint:gosec + file, err := os.OpenFile(ruleFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + return fmt.Errorf("can't open channel rule file: %w", err) + } + defer func() { _ = file.Close() }() + enc := json.NewEncoder(file) + enc.SetIndent("", " ") + err = enc.Encode(rules) + if err != nil { + return fmt.Errorf("can't save rules to file: %w", err) + } + return nil +} + +func (f *FileStorage) DeleteChannelRule(_ context.Context, orgID int64, pattern string) error { + channelRules, err := f.readRules() + if err != nil { + return fmt.Errorf("can't read channel rules: %w", err) + } + + index := -1 + for i, existingRule := range channelRules.Rules { + if patternMatch(orgID, pattern, existingRule) { + index = i + break + } + } + + if index > -1 { + channelRules.Rules = removeChannelRuleByIndex(channelRules.Rules, index) + } else { + return fmt.Errorf("rule not found") + } + + return f.saveChannelRules(orgID, channelRules) +}