Controllers: Make available as a target (#110357)

* Controllers: Add to build process
* Allow setting through env variables
This commit is contained in:
Stephanie Hingtgen
2025-08-30 04:27:50 -06:00
committed by GitHub
parent 0d782bdedb
commit 232d68fb8c
10 changed files with 325 additions and 299 deletions

2
.gitignore vendored
View File

@ -94,6 +94,8 @@ example-apiserver/
/devenv/docker/blocks/auth/openldap/certs/
conf/custom.ini
conf/operator.ini
conf/storage.ini
/conf/provisioning/**/*.yaml
!/conf/provisioning/**/sample.yaml

View File

@ -1,28 +0,0 @@
.PHONY: build clean test
BINARY_NAME=job-controller
BUILD_DIR=bin
LDFLAGS=-w -s
build:
@echo "Building $(BINARY_NAME)..."
@mkdir -p $(BUILD_DIR)
go build -ldflags="$(LDFLAGS)" -o $(BUILD_DIR)/$(BINARY_NAME) .
clean:
@echo "Cleaning..."
@rm -rf $(BUILD_DIR)
run:
@echo "Running $(BINARY_NAME)..."
./$(BUILD_DIR)/$(BINARY_NAME)
install:
@echo "Installing $(BINARY_NAME)..."
go install .
help:
@echo "Available targets:"
@echo " build - Build the binary"
@echo " clean - Clean build artifacts"
@echo " run - Run the binary"
@echo " install - Install the binary"
@echo " help - Show this help"

View File

@ -1,234 +0,0 @@
package main
import (
"context"
"crypto/x509"
"flag"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/grafana/authlib/authn"
"github.com/grafana/grafana-app-sdk/logging"
"github.com/urfave/cli/v2"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/transport"
authrt "github.com/grafana/grafana/apps/provisioning/pkg/auth"
"github.com/grafana/grafana/apps/provisioning/pkg/controller"
client "github.com/grafana/grafana/apps/provisioning/pkg/generated/clientset/versioned"
informer "github.com/grafana/grafana/apps/provisioning/pkg/generated/informers/externalversions"
)
var (
token = flag.String("token", "", "Token to use for authentication")
tokenExchangeURL = flag.String("token-exchange-url", "", "Token exchange URL")
provisioningServerURL = flag.String("provisioning-server-url", "", "Provisioning server URL")
tlsInsecure = flag.Bool("tls-insecure", true, "Skip TLS certificate verification")
tlsCertFile = flag.String("tls-cert-file", "", "Path to TLS certificate file")
tlsKeyFile = flag.String("tls-key-file", "", "Path to TLS private key file")
tlsCAFile = flag.String("tls-ca-file", "", "Path to TLS CA certificate file")
)
func main() {
app := &cli.App{
Name: "job-controller",
Usage: "Watch provisioning jobs and manage job history cleanup",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "token",
Usage: "Token to use for authentication",
Value: "",
Destination: token,
},
&cli.StringFlag{
Name: "token-exchange-url",
Usage: "Token exchange URL",
Value: "",
Destination: tokenExchangeURL,
},
&cli.StringFlag{
Name: "provisioning-server-url",
Usage: "Provisioning server URL",
Value: "",
Destination: provisioningServerURL,
},
&cli.BoolFlag{
Name: "tls-insecure",
Usage: "Skip TLS certificate verification",
Value: true,
Destination: tlsInsecure,
},
&cli.StringFlag{
Name: "tls-cert-file",
Usage: "Path to TLS certificate file",
Value: "",
Destination: tlsCertFile,
},
&cli.StringFlag{
Name: "tls-key-file",
Usage: "Path to TLS private key file",
Value: "",
Destination: tlsKeyFile,
},
&cli.StringFlag{
Name: "tls-ca-file",
Usage: "Path to TLS CA certificate file",
Value: "",
Destination: tlsCAFile,
},
&cli.DurationFlag{
Name: "history-expiration",
Usage: "Duration after which HistoricJobs are deleted; 0 disables cleanup. When the Provisioning API is configured to use Loki for job history, leave this at 0.",
Value: 0,
},
},
Action: runJobController,
}
if err := app.Run(os.Args); err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
}
func runJobController(c *cli.Context) error {
// TODO: Wire notifications into a ConcurrentJobDriver when a client-backed Store and Workers are available.
// For now, just log notifications to verify events end-to-end.
logger := logging.NewSLogLogger(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug,
})).With("logger", "provisioning-job-controller")
logger.Info("Starting provisioning job controller")
tokenExchangeClient, err := authn.NewTokenExchangeClient(authn.TokenExchangeConfig{
TokenExchangeURL: *tokenExchangeURL,
Token: *token,
})
if err != nil {
return fmt.Errorf("failed to create token exchange client: %w", err)
}
tlsConfig, err := buildTLSConfig()
if err != nil {
return fmt.Errorf("failed to build TLS configuration: %w", err)
}
config := &rest.Config{
APIPath: "/apis",
Host: *provisioningServerURL,
WrapTransport: transport.WrapperFunc(func(rt http.RoundTripper) http.RoundTripper {
return authrt.NewRoundTripper(tokenExchangeClient, rt)
}),
TLSClientConfig: tlsConfig,
}
provisioningClient, err := client.NewForConfig(config)
if err != nil {
return fmt.Errorf("failed to create provisioning client: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
fmt.Println("Received shutdown signal, stopping controllers")
cancel()
}()
// Jobs informer and controller (resync ~60s like in register.go)
jobInformerFactory := informer.NewSharedInformerFactoryWithOptions(
provisioningClient,
60*time.Second,
)
jobInformer := jobInformerFactory.Provisioning().V0alpha1().Jobs()
jobController, err := controller.NewJobController(jobInformer)
if err != nil {
return fmt.Errorf("failed to create job controller: %w", err)
}
logger.Info("jobs controller started")
notifications := jobController.InsertNotifications()
go func() {
for {
select {
case <-ctx.Done():
return
case <-notifications:
logger.Info("job create notification received")
}
}
}()
// Optionally enable history cleanup if a positive expiration is provided
historyExpiration := c.Duration("history-expiration")
var startHistoryInformers func()
if historyExpiration > 0 {
// History jobs informer and controller (separate factory with resync == expiration)
historyInformerFactory := informer.NewSharedInformerFactoryWithOptions(
provisioningClient,
historyExpiration,
)
historyJobInformer := historyInformerFactory.Provisioning().V0alpha1().HistoricJobs()
_, err = controller.NewHistoryJobController(
provisioningClient.ProvisioningV0alpha1(),
historyJobInformer,
historyExpiration,
)
if err != nil {
return fmt.Errorf("failed to create history job controller: %w", err)
}
logger.Info("history cleanup enabled", "expiration", historyExpiration.String())
startHistoryInformers = func() { historyInformerFactory.Start(ctx.Done()) }
} else {
startHistoryInformers = func() {}
}
// Start informers
go jobInformerFactory.Start(ctx.Done())
go startHistoryInformers()
// Optionally wait for job cache sync; history cleanup can rely on resync events
if !cache.WaitForCacheSync(ctx.Done(), jobInformer.Informer().HasSynced) {
return fmt.Errorf("failed to sync job informer cache")
}
<-ctx.Done()
return nil
}
func buildTLSConfig() (rest.TLSClientConfig, error) {
tlsConfig := rest.TLSClientConfig{
Insecure: *tlsInsecure,
}
// If client certificate and key are provided
if *tlsCertFile != "" && *tlsKeyFile != "" {
tlsConfig.CertFile = *tlsCertFile
tlsConfig.KeyFile = *tlsKeyFile
}
// If CA certificate is provided
if *tlsCAFile != "" {
caCert, err := os.ReadFile(*tlsCAFile)
if err != nil {
return tlsConfig, fmt.Errorf("failed to read CA certificate file: %w", err)
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
return tlsConfig, fmt.Errorf("failed to parse CA certificate")
}
tlsConfig.CAData = caCert
}
return tlsConfig, nil
}

View File

@ -3,7 +3,7 @@
> [!WARNING]
> This controller has current limitations:
>
> - This binary does not start the ConcurrentJobDriver yet. Notifications are logged but not consumed by workers here.
> - Does not start the ConcurrentJobDriver yet. Notifications are logged but not consumed by workers here.
> - Job processing (claim/renew/update/complete) isn't implemented yet as it requires refactoring of some components.
### Behavior
@ -43,55 +43,62 @@ This binary currently wires informers and emits job-create notifications. In the
### How to run
1. Build from this folder:
1. Build grafana:
- `make build`
2. Ensure the following services are running locally: provisioning API server, secrets service API server, repository controller, unified storage, and auth.
3. Create a operator.ini file:
```
[operator]
provisioning_server_url = https://localhost:6446
tls_insecure = true
[grpc_client_authentication]
token = ProvisioningAdminToken
token_exchange_url = http://localhost:6481/sign/access-token
# Uncomment to enable history cleanup via Loki. First ensure the Provisioning API is configured with Loki for job history (see `createJobHistoryConfigFromSettings` in `pkg/registry/apis/provisioning/register.go`).
# history_expiration = 24h
```
3. Start the controller:
- Using Loki for job history:
- Ensure the Provisioning API is configured with Loki for job history (see `createJobHistoryConfigFromSettings` in `pkg/registry/apis/provisioning/register.go`).
- Run without history cleanup:
- `./bin/job-controller --token-exchange-url=http://localhost:6481/sign/access-token --token=ProvisioningAdminToken --provisioning-server-url=https://localhost:6446`
- Without Loki (local/dev or when Loki is unavailable):
- Run without cleanup:
- `./bin/job-controller --token-exchange-url=http://localhost:6481/sign/access-token --token=ProvisioningAdminToken --provisioning-server-url=https://localhost:6446`
- Or enable local HistoricJobs cleanup with a retention window:
- `./bin/job-controller --token-exchange-url=http://localhost:6481/sign/access-token --token=ProvisioningAdminToken --provisioning-server-url=https://localhost:6446 --history-expiration=30s`
- `GF_DEFAULT_TARGET=operator GF_OPERATOR_NAME=provisioning-jobs ./bin/darwin-arm64/grafana server target --config=conf/operator.ini`
#### TLS Configuration Examples
- **Production with proper TLS verification**:
```
[operator]
provisioning_server_url = https://localhost:6446
tls_insecure = false
tls_ca_file = /path/to/ca-cert.pem
```bash
./bin/job-controller \
--token-exchange-url=http://localhost:6481/sign/access-token \
--token=ProvisioningAdminToken \
--provisioning-server-url=https://provisioning.example.com:6446 \
--tls-insecure=false \
--tls-ca-file=/path/to/ca-cert.pem
```
[grpc_client_authentication]
token = ProvisioningAdminToken
token_exchange_url = http://localhost:6481/sign/access-token
```
- **Mutual TLS authentication**:
```
[operator]
provisioning_server_url = https://localhost:6446
tls_insecure = false
tls_ca_file = /path/to/ca-cert.pem
tls_cert_file = /path/to/client-cert.pem
tls_key_file = /path/to/client-key.pem
```bash
./bin/job-controller \
--token-exchange-url=http://localhost:6481/sign/access-token \
--token=ProvisioningAdminToken \
--provisioning-server-url=https://provisioning.example.com:6446 \
--tls-insecure=false \
--tls-ca-file=/path/to/ca-cert.pem \
--tls-cert-file=/path/to/client-cert.pem \
--tls-key-file=/path/to/client-key.pem
```
[grpc_client_authentication]
token = ProvisioningAdminToken
token_exchange_url = http://localhost:6481/sign/access-token
```
- **Development with self-signed certificates (insecure)**:
```
[operator]
provisioning_server_url = https://localhost:6446
tls_insecure = true
```bash
./bin/job-controller \
--token-exchange-url=http://localhost:6481/sign/access-token \
--token=ProvisioningAdminToken \
--provisioning-server-url=https://localhost:6446 \
--tls-insecure=true
```
[grpc_client_authentication]
token = ProvisioningAdminToken
token_exchange_url = http://localhost:6481/sign/access-token
```
### Expected behavior

View File

@ -0,0 +1,210 @@
package operators
import (
"context"
"crypto/x509"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/grafana/authlib/authn"
"github.com/grafana/grafana-app-sdk/logging"
"github.com/urfave/cli/v2"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/transport"
"github.com/grafana/grafana/pkg/server"
"github.com/grafana/grafana/pkg/services/apiserver/standalone"
"github.com/grafana/grafana/pkg/setting"
authrt "github.com/grafana/grafana/apps/provisioning/pkg/auth"
"github.com/grafana/grafana/apps/provisioning/pkg/controller"
client "github.com/grafana/grafana/apps/provisioning/pkg/generated/clientset/versioned"
informer "github.com/grafana/grafana/apps/provisioning/pkg/generated/informers/externalversions"
)
func init() {
server.RegisterOperator(server.Operator{
Name: "provisioning-jobs",
Description: "Watch provisioning jobs and manage job history cleanup",
RunFunc: runJobController,
})
}
type controllerConfig struct {
provisioningClient *client.Clientset
historyExpiration time.Duration
}
func runJobController(opts standalone.BuildInfo, c *cli.Context, cfg *setting.Cfg) error {
logger := logging.NewSLogLogger(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug,
})).With("logger", "provisioning-job-controller")
logger.Info("Starting provisioning job controller")
controllerCfg, err := setupFromConfig(cfg)
if err != nil {
return fmt.Errorf("failed to setup operator: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
fmt.Println("Received shutdown signal, stopping controllers")
cancel()
}()
// Jobs informer and controller (resync ~60s like in register.go)
jobInformerFactory := informer.NewSharedInformerFactoryWithOptions(
controllerCfg.provisioningClient,
60*time.Second,
)
jobInformer := jobInformerFactory.Provisioning().V0alpha1().Jobs()
jobController, err := controller.NewJobController(jobInformer)
if err != nil {
return fmt.Errorf("failed to create job controller: %w", err)
}
logger.Info("jobs controller started")
notifications := jobController.InsertNotifications()
go func() {
for {
select {
case <-ctx.Done():
return
case <-notifications:
logger.Info("job create notification received")
}
}
}()
var startHistoryInformers func()
if controllerCfg.historyExpiration > 0 {
// History jobs informer and controller (separate factory with resync == expiration)
historyInformerFactory := informer.NewSharedInformerFactoryWithOptions(
controllerCfg.provisioningClient,
controllerCfg.historyExpiration,
)
historyJobInformer := historyInformerFactory.Provisioning().V0alpha1().HistoricJobs()
_, err = controller.NewHistoryJobController(
controllerCfg.provisioningClient.ProvisioningV0alpha1(),
historyJobInformer,
controllerCfg.historyExpiration,
)
if err != nil {
return fmt.Errorf("failed to create history job controller: %w", err)
}
logger.Info("history cleanup enabled", "expiration", controllerCfg.historyExpiration.String())
startHistoryInformers = func() { historyInformerFactory.Start(ctx.Done()) }
} else {
startHistoryInformers = func() {}
}
// Start informers
go jobInformerFactory.Start(ctx.Done())
go startHistoryInformers()
// Optionally wait for job cache sync; history cleanup can rely on resync events
if !cache.WaitForCacheSync(ctx.Done(), jobInformer.Informer().HasSynced) {
return fmt.Errorf("failed to sync job informer cache")
}
<-ctx.Done()
return nil
}
func setupFromConfig(cfg *setting.Cfg) (controllerCfg *controllerConfig, err error) {
if cfg == nil {
return nil, fmt.Errorf("no configuration available")
}
gRPCAuth := cfg.SectionWithEnvOverrides("grpc_client_authentication")
token := gRPCAuth.Key("token").String()
if token == "" {
return nil, fmt.Errorf("token is required in [grpc_client_authentication] section")
}
tokenExchangeURL := gRPCAuth.Key("token_exchange_url").String()
if tokenExchangeURL == "" {
return nil, fmt.Errorf("token_exchange_url is required in [grpc_client_authentication] section")
}
operatorSec := cfg.SectionWithEnvOverrides("operator")
provisioningServerURL := operatorSec.Key("provisioning_server_url").String()
if provisioningServerURL == "" {
return nil, fmt.Errorf("provisioning_server_url is required in [operator] section")
}
tlsInsecure := operatorSec.Key("tls_insecure").MustBool(false)
tlsCertFile := operatorSec.Key("tls_cert_file").String()
tlsKeyFile := operatorSec.Key("tls_key_file").String()
tlsCAFile := operatorSec.Key("tls_ca_file").String()
tokenExchangeClient, err := authn.NewTokenExchangeClient(authn.TokenExchangeConfig{
TokenExchangeURL: tokenExchangeURL,
Token: token,
})
if err != nil {
return nil, fmt.Errorf("failed to create token exchange client: %w", err)
}
tlsConfig, err := buildTLSConfig(tlsInsecure, tlsCertFile, tlsKeyFile, tlsCAFile)
if err != nil {
return nil, fmt.Errorf("failed to build TLS configuration: %w", err)
}
config := &rest.Config{
APIPath: "/apis",
Host: provisioningServerURL,
WrapTransport: transport.WrapperFunc(func(rt http.RoundTripper) http.RoundTripper {
return authrt.NewRoundTripper(tokenExchangeClient, rt)
}),
TLSClientConfig: tlsConfig,
}
provisioningClient, err := client.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create provisioning client: %w", err)
}
return &controllerConfig{
provisioningClient: provisioningClient,
historyExpiration: operatorSec.Key("history_expiration").MustDuration(0),
}, nil
}
func buildTLSConfig(insecure bool, certFile, keyFile, caFile string) (rest.TLSClientConfig, error) {
tlsConfig := rest.TLSClientConfig{
Insecure: insecure,
}
if certFile != "" && keyFile != "" {
tlsConfig.CertFile = certFile
tlsConfig.KeyFile = keyFile
}
if caFile != "" {
// caFile is set in operator.ini file
// nolint:gosec
caCert, err := os.ReadFile(caFile)
if err != nil {
return tlsConfig, fmt.Errorf("failed to read CA certificate file: %w", err)
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
return tlsConfig, fmt.Errorf("failed to parse CA certificate")
}
tlsConfig.CAData = caCert
}
return tlsConfig, nil
}

View File

@ -21,7 +21,7 @@ import (
func TargetCommand(version, commit, buildBranch, buildstamp string) *cli.Command {
return &cli.Command{
Name: "target",
Usage: "target specific grafana dskit services",
Usage: "target specific grafana services",
Flags: commonFlags,
Action: func(context *cli.Context) error {
return RunTargetServer(standalone.BuildInfo{

View File

@ -7,6 +7,7 @@ import (
"github.com/fatih/color"
"github.com/urfave/cli/v2"
_ "github.com/grafana/grafana/apps/provisioning/pkg/operators"
gcli "github.com/grafana/grafana/pkg/cmd/grafana-cli/commands"
"github.com/grafana/grafana/pkg/cmd/grafana-server/commands"
"github.com/grafana/grafana/pkg/server"

View File

@ -13,6 +13,7 @@ const (
ZanzanaServer string = "zanzana-server"
InstrumentationServer string = "instrumentation-server"
FrontendServer string = "frontend-server"
OperatorServer string = "operator"
)
var dependencyMap = map[string][]string{
@ -25,4 +26,5 @@ var dependencyMap = map[string][]string{
Core: {},
All: {Core},
FrontendServer: {},
OperatorServer: {InstrumentationServer},
}

View File

@ -14,6 +14,7 @@ import (
"github.com/grafana/dskit/ring"
ringclient "github.com/grafana/dskit/ring/client"
"github.com/prometheus/client_golang/prometheus"
"github.com/urfave/cli/v2"
"github.com/grafana/dskit/services"
@ -21,6 +22,7 @@ import (
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/modules"
"github.com/grafana/grafana/pkg/services/apiserver/standalone"
"github.com/grafana/grafana/pkg/services/authz"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/frontend"
@ -196,11 +198,40 @@ func (s *ModuleServer) Run() error {
return frontend.ProvideFrontendService(s.cfg, s.features, s.promGatherer, s.registerer, s.license)
})
m.RegisterModule(modules.OperatorServer, s.initOperatorServer)
m.RegisterModule(modules.All, nil)
return m.Run(s.context)
}
func (s *ModuleServer) initOperatorServer() (services.Service, error) {
operatorName := os.Getenv("GF_OPERATOR_NAME")
if operatorName == "" {
s.log.Debug("GF_OPERATOR_NAME environment variable empty or unset, can't start operator")
return nil, nil
}
for _, op := range GetRegisteredOperators() {
if op.Name == operatorName {
return services.NewBasicService(
nil,
func(ctx context.Context) error {
context := cli.NewContext(&cli.App{}, nil, nil)
return op.RunFunc(standalone.BuildInfo{
Version: s.version,
Commit: s.commit,
BuildBranch: s.buildBranch,
}, context, s.cfg)
},
nil,
).WithName("operator"), nil
}
}
return nil, fmt.Errorf("unknown operator: %s. available operators: %v", operatorName, GetRegisteredOperatorNames())
}
// Shutdown initiates Grafana graceful shutdown. This shuts down all
// running background services. Since Run blocks Shutdown supposed to
// be run from a separate goroutine.

35
pkg/server/operator.go Normal file
View File

@ -0,0 +1,35 @@
package server
import (
"github.com/grafana/grafana/pkg/services/apiserver/standalone"
"github.com/grafana/grafana/pkg/setting"
"github.com/urfave/cli/v2"
)
// Operator represents an app operator that is available in the Grafana binary
type Operator struct {
Name string
Description string
RunFunc func(standalone.BuildInfo, *cli.Context, *setting.Cfg) error
}
var operatorsRegistry []Operator
// RegisterOperator registers an app operator that is baked into the Grafana binary
func RegisterOperator(operator Operator) {
operatorsRegistry = append(operatorsRegistry, operator)
}
// GetRegisteredOperators returns all registered operators
func GetRegisteredOperators() []Operator {
return operatorsRegistry
}
// GetRegisteredOperatorNames returns the names of all registered operators
func GetRegisteredOperatorNames() []string {
names := make([]string, len(operatorsRegistry))
for i, op := range operatorsRegistry {
names[i] = op.Name
}
return names
}