Live: pipeline rule crud (file-based, still for MVP) (#39238)

This commit is contained in:
Alexander Emelin
2021-09-30 19:29:32 +03:00
committed by GitHub
parent 3ad5ee87a3
commit a696fc8b2b
9 changed files with 424 additions and 95 deletions

View File

@ -438,6 +438,9 @@ func (hs *HTTPServer) registerRoutes() {
// POST Live data to be processed according to channel rules. // POST Live data to be processed according to channel rules.
liveRoute.Post("/push/:streamId/:path", hs.LivePushGateway.HandlePath) liveRoute.Post("/push/:streamId/:path", hs.LivePushGateway.HandlePath)
liveRoute.Get("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesListHTTP), reqOrgAdmin) liveRoute.Get("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesListHTTP), reqOrgAdmin)
liveRoute.Post("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesPostHTTP), reqOrgAdmin)
liveRoute.Put("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesPutHTTP), reqOrgAdmin)
liveRoute.Delete("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesDeleteHTTP), reqOrgAdmin)
liveRoute.Get("/pipeline-entities", routing.Wrap(hs.Live.HandlePipelineEntitiesListHTTP), reqOrgAdmin) liveRoute.Get("/pipeline-entities", routing.Wrap(hs.Live.HandlePipelineEntitiesListHTTP), reqOrgAdmin)
liveRoute.Get("/remote-write-backends", routing.Wrap(hs.Live.HandleRemoteWriteBackendsListHTTP), reqOrgAdmin) liveRoute.Get("/remote-write-backends", routing.Wrap(hs.Live.HandleRemoteWriteBackendsListHTTP), reqOrgAdmin)
} }

View File

@ -2,8 +2,10 @@ package live
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
@ -883,95 +885,77 @@ func (g *GrafanaLive) HandleChannelRulesListHTTP(c *models.ReqContext) response.
}) })
} }
type configInfo struct { // HandleChannelRulesPostHTTP ...
Type string `json:"type"` func (g *GrafanaLive) HandleChannelRulesPostHTTP(c *models.ReqContext) response.Response {
Description string `json:"description"` body, err := ioutil.ReadAll(c.Req.Body)
Example interface{} `json:"example,omitempty"` if err != nil {
return response.Error(http.StatusInternalServerError, "Error reading body", err)
}
var rule pipeline.ChannelRule
err = json.Unmarshal(body, &rule)
if err != nil {
return response.Error(http.StatusBadRequest, "Error decoding channel rule", err)
}
result, err := g.channelRuleStorage.CreateChannelRule(c.Req.Context(), c.OrgId, rule)
if err != nil {
return response.Error(http.StatusInternalServerError, "Failed to create channel rule", err)
}
return response.JSON(http.StatusOK, util.DynMap{
"rule": result,
})
}
// HandleChannelRulesPutHTTP ...
func (g *GrafanaLive) HandleChannelRulesPutHTTP(c *models.ReqContext) response.Response {
body, err := ioutil.ReadAll(c.Req.Body)
if err != nil {
return response.Error(http.StatusInternalServerError, "Error reading body", err)
}
var rule pipeline.ChannelRule
err = json.Unmarshal(body, &rule)
if err != nil {
return response.Error(http.StatusBadRequest, "Error decoding channel rule", err)
}
if rule.Pattern == "" {
return response.Error(http.StatusBadRequest, "Rule pattern required", nil)
}
rule, err = g.channelRuleStorage.UpdateChannelRule(c.Req.Context(), c.OrgId, rule)
if err != nil {
return response.Error(http.StatusInternalServerError, "Failed to update channel rule", err)
}
return response.JSON(http.StatusOK, util.DynMap{
"rule": rule,
})
}
// HandleChannelRulesDeleteHTTP ...
func (g *GrafanaLive) HandleChannelRulesDeleteHTTP(c *models.ReqContext) response.Response {
body, err := ioutil.ReadAll(c.Req.Body)
if err != nil {
return response.Error(http.StatusInternalServerError, "Error reading body", err)
}
var rule pipeline.ChannelRule
err = json.Unmarshal(body, &rule)
if err != nil {
return response.Error(http.StatusBadRequest, "Error decoding channel rule", err)
}
if rule.Pattern == "" {
return response.Error(http.StatusBadRequest, "Rule pattern required", nil)
}
err = g.channelRuleStorage.DeleteChannelRule(c.Req.Context(), c.OrgId, rule.Pattern)
if err != nil {
return response.Error(http.StatusInternalServerError, "Failed to delete channel rule", err)
}
return response.JSON(http.StatusOK, util.DynMap{})
} }
// HandlePipelineEntitiesListHTTP ... // HandlePipelineEntitiesListHTTP ...
func (g *GrafanaLive) HandlePipelineEntitiesListHTTP(_ *models.ReqContext) response.Response { func (g *GrafanaLive) HandlePipelineEntitiesListHTTP(_ *models.ReqContext) response.Response {
return response.JSON(http.StatusOK, util.DynMap{ return response.JSON(http.StatusOK, util.DynMap{
"subscribers": []configInfo{ "subscribers": pipeline.SubscribersRegistry,
{ "outputs": pipeline.OutputsRegistry,
Type: pipeline.SubscriberTypeBuiltin, "converters": pipeline.ConvertersRegistry,
Description: "apply builtin feature subscribe logic", "processors": pipeline.ProcessorsRegistry,
},
{
Type: pipeline.SubscriberTypeManagedStream,
Description: "apply managed stream subscribe logic",
},
{
Type: pipeline.SubscriberTypeMultiple,
Description: "apply multiple subscribers",
},
{
Type: pipeline.SubscriberTypeAuthorizeRole,
Description: "authorize user role",
},
},
"outputs": []configInfo{
{
Type: pipeline.OutputTypeManagedStream,
Description: "Only send schema when structure changes. Note this also requires a matching subscriber",
Example: pipeline.ManagedStreamOutputConfig{},
},
{
Type: pipeline.OutputTypeMultiple,
Description: "Send the output to multiple destinations",
Example: pipeline.MultipleOutputterConfig{},
},
{
Type: pipeline.OutputTypeConditional,
Description: "send to an output depending on frame values",
Example: pipeline.ConditionalOutputConfig{},
},
{
Type: pipeline.OutputTypeRedirect,
},
{
Type: pipeline.OutputTypeThreshold,
},
{
Type: pipeline.OutputTypeChangeLog,
},
{
Type: pipeline.OutputTypeRemoteWrite,
},
},
"converters": []configInfo{
{
Type: pipeline.ConverterTypeJsonAuto,
},
{
Type: pipeline.ConverterTypeJsonExact,
},
{
Type: pipeline.ConverterTypeInfluxAuto,
Description: "accept influx line protocol",
Example: pipeline.AutoInfluxConverterConfig{},
},
{
Type: pipeline.ConverterTypeJsonFrame,
},
},
"processors": []configInfo{
{
Type: pipeline.ProcessorTypeKeepFields,
Description: "list the fields that should stay",
Example: pipeline.KeepFieldsProcessorConfig{},
},
{
Type: pipeline.ProcessorTypeDropFields,
Description: "list the fields that should be removed",
Example: pipeline.DropFieldsProcessorConfig{},
},
{
Type: pipeline.ProcessorTypeMultiple,
Description: "apply multiple processors",
Example: pipeline.MultipleProcessorConfig{},
},
},
}) })
} }

View File

@ -4,7 +4,10 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/grafana/grafana/pkg/services/live/pipeline/tree"
"github.com/grafana/grafana/pkg/services/live/managedstream" "github.com/grafana/grafana/pkg/services/live/managedstream"
"github.com/grafana/grafana/pkg/services/live/pipeline/pattern"
"github.com/centrifugal/centrifuge" "github.com/centrifugal/centrifuge"
) )
@ -79,6 +82,43 @@ type ChannelRule struct {
Settings ChannelRuleSettings `json:"settings"` Settings ChannelRuleSettings `json:"settings"`
} }
func (r ChannelRule) Valid() (bool, string) {
ok, reason := pattern.Valid(r.Pattern)
if !ok {
return false, fmt.Sprintf("invalid pattern: %s", reason)
}
if r.Settings.Converter != nil {
if !typeRegistered(r.Settings.Converter.Type, ConvertersRegistry) {
return false, fmt.Sprintf("unknown converter type: %s", r.Settings.Converter.Type)
}
}
if r.Settings.Subscriber != nil {
if !typeRegistered(r.Settings.Subscriber.Type, SubscribersRegistry) {
return false, fmt.Sprintf("unknown subscriber type: %s", r.Settings.Subscriber.Type)
}
}
if r.Settings.Processor != nil {
if !typeRegistered(r.Settings.Processor.Type, ProcessorsRegistry) {
return false, fmt.Sprintf("unknown processor type: %s", r.Settings.Processor.Type)
}
}
if r.Settings.Outputter != nil {
if !typeRegistered(r.Settings.Outputter.Type, OutputsRegistry) {
return false, fmt.Sprintf("unknown output type: %s", r.Settings.Outputter.Type)
}
}
return true, ""
}
func typeRegistered(entityType string, registry []EntityInfo) bool {
for _, info := range registry {
if info.Type == entityType {
return true
}
}
return false
}
type RemoteWriteBackend struct { type RemoteWriteBackend struct {
OrgId int64 `json:"-"` OrgId int64 `json:"-"`
UID string `json:"uid"` UID string `json:"uid"`
@ -93,6 +133,23 @@ type ChannelRules struct {
Rules []ChannelRule `json:"rules"` Rules []ChannelRule `json:"rules"`
} }
func checkRulesValid(orgID int64, rules []ChannelRule) (ok bool, reason string) {
t := tree.New()
defer func() {
if r := recover(); r != nil {
reason = fmt.Sprintf("%v", r)
ok = false
}
}()
for _, rule := range rules {
if rule.OrgId == orgID || (rule.OrgId == 0 && orgID == 1) {
t.AddRoute("/"+rule.Pattern, struct{}{})
}
}
ok = true
return ok, reason
}
type MultipleConditionCheckerConfig struct { type MultipleConditionCheckerConfig struct {
Type ConditionType `json:"type"` Type ConditionType `json:"type"`
Conditions []ConditionCheckerConfig `json:"conditions"` Conditions []ConditionCheckerConfig `json:"conditions"`
@ -113,6 +170,9 @@ type ConditionCheckerConfig struct {
type RuleStorage interface { type RuleStorage interface {
ListRemoteWriteBackends(_ context.Context, orgID int64) ([]RemoteWriteBackend, error) ListRemoteWriteBackends(_ context.Context, orgID int64) ([]RemoteWriteBackend, error)
ListChannelRules(_ context.Context, orgID int64) ([]ChannelRule, error) ListChannelRules(_ context.Context, orgID int64) ([]ChannelRule, error)
CreateChannelRule(_ context.Context, orgID int64, rule ChannelRule) (ChannelRule, error)
UpdateChannelRule(_ context.Context, orgID int64, rule ChannelRule) (ChannelRule, error)
DeleteChannelRule(_ context.Context, orgID int64, pattern string) error
} }
type StorageRuleBuilder struct { type StorageRuleBuilder struct {

View File

@ -6,7 +6,7 @@ import (
) )
type AutoJsonConverterConfig struct { type AutoJsonConverterConfig struct {
FieldTips map[string]Field `json:"fieldTips"` FieldTips map[string]Field `json:"fieldTips,omitempty"`
} }
type AutoJsonConverter struct { type AutoJsonConverter struct {

View File

@ -0,0 +1,25 @@
package pattern
import (
"fmt"
"regexp"
"strings"
)
var patternReString = `^[A-z0-9_\-/=.:*]*$`
var patternRe = regexp.MustCompile(patternReString)
var maxPatternLength = 160
func Valid(pattern string) (bool, string) {
if strings.HasPrefix(pattern, "/") {
return false, "pattern can't start with /"
}
if !patternRe.MatchString(pattern) {
return false, fmt.Sprintf("pattern format error, must match %s", patternReString)
}
if len(pattern) > maxPatternLength {
return false, fmt.Sprintf("pattern max length exceeded (%d)", maxPatternLength)
}
return true, ""
}

View File

@ -0,0 +1,37 @@
package pattern
import "testing"
func TestValid(t *testing.T) {
type args struct {
pattern string
}
tests := []struct {
name string
args args
want bool
}{
{
name: "valid",
args: args{
pattern: "xxx",
},
want: true,
},
{
name: "invalid",
args: args{
pattern: "/xxx",
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, _ := Valid(tt.args.pattern)
if got != tt.want {
t.Errorf("Valid() got = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -0,0 +1,91 @@
package pipeline
type EntityInfo struct {
Type string `json:"type"`
Description string `json:"description"`
Example interface{} `json:"example,omitempty"`
}
var SubscribersRegistry = []EntityInfo{
{
Type: SubscriberTypeBuiltin,
Description: "apply builtin feature subscribe logic",
},
{
Type: SubscriberTypeManagedStream,
Description: "apply managed stream subscribe logic",
},
{
Type: SubscriberTypeMultiple,
Description: "apply multiple subscribers",
},
{
Type: SubscriberTypeAuthorizeRole,
Description: "authorize user role",
},
}
var OutputsRegistry = []EntityInfo{
{
Type: OutputTypeManagedStream,
Description: "Only send schema when structure changes. Note this also requires a matching subscriber",
Example: ManagedStreamOutputConfig{},
},
{
Type: OutputTypeMultiple,
Description: "Send the output to multiple destinations",
Example: MultipleOutputterConfig{},
},
{
Type: OutputTypeConditional,
Description: "send to an output depending on frame values",
Example: ConditionalOutputConfig{},
},
{
Type: OutputTypeRedirect,
},
{
Type: OutputTypeThreshold,
},
{
Type: OutputTypeChangeLog,
},
{
Type: OutputTypeRemoteWrite,
},
}
var ConvertersRegistry = []EntityInfo{
{
Type: ConverterTypeJsonAuto,
},
{
Type: ConverterTypeJsonExact,
},
{
Type: ConverterTypeInfluxAuto,
Description: "accept influx line protocol",
Example: AutoInfluxConverterConfig{},
},
{
Type: ConverterTypeJsonFrame,
},
}
var ProcessorsRegistry = []EntityInfo{
{
Type: ProcessorTypeKeepFields,
Description: "list the fields that should stay",
Example: KeepFieldsProcessorConfig{},
},
{
Type: ProcessorTypeDropFields,
Description: "list the fields that should be removed",
Example: DropFieldsProcessorConfig{},
},
{
Type: ProcessorTypeMultiple,
Description: "apply multiple processors",
Example: MultipleProcessorConfig{},
},
}

View File

@ -21,8 +21,14 @@ func (t *testBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelRule
Pattern: "stream/telegraf/:metric", Pattern: "stream/telegraf/:metric",
}, },
{ {
OrgId: 1, OrgId: 1,
Pattern: "stream/telegraf/:metric/:extra", Pattern: "stream/telegraf/:metric/:extra",
Outputter: NewRedirectOutput(RedirectOutputConfig{}),
},
{
OrgId: 1,
Pattern: "stream/boom:er",
Converter: NewExactJsonConverter(ExactJsonConverterConfig{}),
}, },
}, nil }, nil
} }
@ -42,7 +48,12 @@ func TestStorage_Get(t *testing.T) {
rule, ok, err = s.Get(1, "stream/telegraf/mem/rss") rule, ok, err = s.Get(1, "stream/telegraf/mem/rss")
require.NoError(t, err) require.NoError(t, err)
require.True(t, ok) require.True(t, ok)
require.Nil(t, rule.Converter) require.Equal(t, OutputTypeRedirect, rule.Outputter.Type())
rule, ok, err = s.Get(1, "stream/booms")
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, ConverterTypeJsonExact, rule.Converter.Type())
} }
func BenchmarkRuleGet(b *testing.B) { func BenchmarkRuleGet(b *testing.B) {

View File

@ -3,8 +3,10 @@ package pipeline
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os"
"path/filepath" "path/filepath"
) )
@ -33,14 +35,9 @@ func (f *FileStorage) ListRemoteWriteBackends(_ context.Context, orgID int64) ([
} }
func (f *FileStorage) ListChannelRules(_ context.Context, orgID int64) ([]ChannelRule, error) { func (f *FileStorage) ListChannelRules(_ context.Context, orgID int64) ([]ChannelRule, error) {
ruleBytes, err := ioutil.ReadFile(filepath.Join(f.DataPath, "pipeline", "live-channel-rules.json")) channelRules, err := f.readRules()
if err != nil { if err != nil {
return nil, fmt.Errorf("can't read ./data/live-channel-rules.json file: %w", err) return nil, fmt.Errorf("can't read channel rules: %w", err)
}
var channelRules ChannelRules
err = json.Unmarshal(ruleBytes, &channelRules)
if err != nil {
return nil, fmt.Errorf("can't unmarshal live-channel-rules.json data: %w", err)
} }
var rules []ChannelRule var rules []ChannelRule
for _, r := range channelRules.Rules { for _, r := range channelRules.Rules {
@ -50,3 +47,124 @@ func (f *FileStorage) ListChannelRules(_ context.Context, orgID int64) ([]Channe
} }
return rules, nil return rules, nil
} }
func (f *FileStorage) CreateChannelRule(_ context.Context, orgID int64, rule ChannelRule) (ChannelRule, error) {
channelRules, err := f.readRules()
if err != nil {
return rule, fmt.Errorf("can't read channel rules: %w", err)
}
ok, reason := rule.Valid()
if !ok {
return rule, fmt.Errorf("invalid channel rule: %s", reason)
}
for _, existingRule := range channelRules.Rules {
if patternMatch(orgID, rule.Pattern, existingRule) {
return rule, fmt.Errorf("pattern already exists in org: %s", rule.Pattern)
}
}
channelRules.Rules = append(channelRules.Rules, rule)
err = f.saveChannelRules(orgID, channelRules)
return rule, err
}
func patternMatch(orgID int64, pattern string, existingRule ChannelRule) bool {
return pattern == existingRule.Pattern && (existingRule.OrgId == orgID || (existingRule.OrgId == 0 && orgID == 1))
}
func (f *FileStorage) UpdateChannelRule(ctx context.Context, orgID int64, rule ChannelRule) (ChannelRule, error) {
channelRules, err := f.readRules()
if err != nil {
return rule, fmt.Errorf("can't read channel rules: %w", err)
}
ok, reason := rule.Valid()
if !ok {
return rule, fmt.Errorf("invalid channel rule: %s", reason)
}
index := -1
for i, existingRule := range channelRules.Rules {
if patternMatch(orgID, rule.Pattern, existingRule) {
index = i
break
}
}
if index > -1 {
channelRules.Rules[index] = rule
} else {
return f.CreateChannelRule(ctx, orgID, rule)
}
err = f.saveChannelRules(orgID, channelRules)
return rule, err
}
func removeChannelRuleByIndex(s []ChannelRule, index int) []ChannelRule {
return append(s[:index], s[index+1:]...)
}
func (f *FileStorage) ruleFilePath() string {
return filepath.Join(f.DataPath, "pipeline", "live-channel-rules.json")
}
func (f *FileStorage) readRules() (ChannelRules, error) {
ruleFile := f.ruleFilePath()
// Safe to ignore gosec warning G304.
// nolint:gosec
ruleBytes, err := ioutil.ReadFile(ruleFile)
if err != nil {
return ChannelRules{}, fmt.Errorf("can't read ./data/live-channel-rules.json file: %w", err)
}
var channelRules ChannelRules
err = json.Unmarshal(ruleBytes, &channelRules)
if err != nil {
return ChannelRules{}, fmt.Errorf("can't unmarshal live-channel-rules.json data: %w", err)
}
return channelRules, nil
}
func (f *FileStorage) saveChannelRules(orgID int64, rules ChannelRules) error {
ok, reason := checkRulesValid(orgID, rules.Rules)
if !ok {
return errors.New(reason)
}
ruleFile := f.ruleFilePath()
// Safe to ignore gosec warning G304.
// nolint:gosec
file, err := os.OpenFile(ruleFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return fmt.Errorf("can't open channel rule file: %w", err)
}
defer func() { _ = file.Close() }()
enc := json.NewEncoder(file)
enc.SetIndent("", " ")
err = enc.Encode(rules)
if err != nil {
return fmt.Errorf("can't save rules to file: %w", err)
}
return nil
}
func (f *FileStorage) DeleteChannelRule(_ context.Context, orgID int64, pattern string) error {
channelRules, err := f.readRules()
if err != nil {
return fmt.Errorf("can't read channel rules: %w", err)
}
index := -1
for i, existingRule := range channelRules.Rules {
if patternMatch(orgID, pattern, existingRule) {
index = i
break
}
}
if index > -1 {
channelRules.Rules = removeChannelRuleByIndex(channelRules.Rules, index)
} else {
return fmt.Errorf("rule not found")
}
return f.saveChannelRules(orgID, channelRules)
}