make tests for notifications provider agnostic

Signed-off-by: Somtochi Onyekwere <somtochionyekwere@gmail.com>
This commit is contained in:
Somtochi Onyekwere
2023-06-27 10:34:07 +01:00
committed by Sunny
parent 7c1b897919
commit e63ddb99de
18 changed files with 515 additions and 287 deletions

View File

@ -20,34 +20,37 @@ import (
"context"
"fmt"
"io"
"log"
"os"
"strings"
"cloud.google.com/go/pubsub"
tfjson "github.com/hashicorp/terraform-json"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/fluxcd/pkg/git"
"github.com/fluxcd/test-infra/tftestenv"
tfjson "github.com/hashicorp/terraform-json"
)
const (
gkeDevOpsKnownHosts = "[source.developers.google.com]:2022 ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBB5Iy4/cq/gt/fPqe3uyMy4jwv1Alc94yVPxmnwNhBzJqEV5gRPiRk5u4/JJMbbu9QUVAguBABxL7sBZa5PH/xY="
gcpSourceRepoKnownHosts = "[source.developers.google.com]:2022 ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBB5Iy4/cq/gt/fPqe3uyMy4jwv1Alc94yVPxmnwNhBzJqEV5gRPiRk5u4/JJMbbu9QUVAguBABxL7sBZa5PH/xY="
)
// createKubeConfigGKE constructs kubeconfig for a GKE cluster from the
// terraform state output at the given kubeconfig path.
func createKubeConfigGKE(ctx context.Context, state map[string]*tfjson.StateOutput, kcPath string) error {
kubeconfigYaml, ok := state["kubeconfig"].Value.(string)
kubeconfigYaml, ok := state["gke_kubeconfig"].Value.(string)
if !ok || kubeconfigYaml == "" {
return fmt.Errorf("failed to obtain kubeconfig from tf output")
}
return tftestenv.CreateKubeconfigGKE(ctx, kubeconfigYaml, kcPath)
}
// registryLoginGCR logs into the container/artifact registries using the
// provider's CLI tools and returns a list of test repositories.
// registryLoginGCR logs into the Artifact registries using the gcloud
// and returns a list of test repositories.
func registryLoginGCR(ctx context.Context, output map[string]*tfjson.StateOutput) (string, error) {
// NOTE: ACR registry accept dynamic repository creation by just pushing a
// new image with a new repository name.
project := output["gcp_project"].Value.(string)
project := output["gcp_project_id"].Value.(string)
region := output["gcp_region"].Value.(string)
repositoryID := output["artifact_registry_id"].Value.(string)
artifactRegistryURL, artifactRepoURL := tftestenv.GetGoogleArtifactRegistryAndRepository(project, region, repositoryID)
@ -61,37 +64,57 @@ func registryLoginGCR(ctx context.Context, output map[string]*tfjson.StateOutput
func getTestConfigGKE(ctx context.Context, outputs map[string]*tfjson.StateOutput) (*testConfig, error) {
sharedSopsId := outputs["sops_id"].Value.(string)
privateKeyFile, ok := os.LookupEnv("GCP_SOURCEREPO_SSH")
privateKeyFile, ok := os.LookupEnv(envVarGitRepoSSHPath)
if !ok {
return nil, fmt.Errorf("GCP_SOURCEREPO_SSH env variable isn't set")
return nil, fmt.Errorf("%s env variable isn't set", envVarGitRepoSSHPath)
}
privateKeyData, err := os.ReadFile(privateKeyFile)
if err != nil {
return nil, fmt.Errorf("error getting gcp source repositories private key, '%s': %w", privateKeyFile, err)
}
pubKeyFile, ok := os.LookupEnv("GCP_SOURCEREPO_SSH_PUB")
pubKeyFile, ok := os.LookupEnv(envVarGitRepoSSHPubPath)
if !ok {
return nil, fmt.Errorf("GCP_SOURCEREPO_SSH_PUB env variable isn't set")
return nil, fmt.Errorf("%s env variable isn't set", envVarGitRepoSSHPubPath)
}
pubKeyData, err := os.ReadFile(pubKeyFile)
if err != nil {
return nil, fmt.Errorf("error getting ssh pubkey '%s', %w", pubKeyFile, err)
}
c := make(chan []byte, 10)
projectID := outputs["gcp_project_id"].Value.(string)
topicID := outputs["pubsub_topic"].Value.(string)
fn, err := setupPubSubReceiver(ctx, c, projectID, topicID)
if err != nil {
return nil, err
}
var notificationCfg = notificationConfig{
providerType: "googlepubsub",
providerChannel: topicID,
notificationChan: c,
closeChan: fn,
secret: map[string]string{
"address": projectID,
},
}
config := &testConfig{
defaultGitTransport: git.SSH,
gitUsername: "git",
gitPrivateKey: string(privateKeyData),
gitPublicKey: string(pubKeyData),
knownHosts: gkeDevOpsKnownHosts,
fleetInfraRepository: repoConfig{
ssh: outputs["fleet_infra_url"].Value.(string),
knownHosts: gcpSourceRepoKnownHosts,
fleetInfraRepository: gitUrl{
ssh: outputs["fleet_infra_repository"].Value.(string),
},
applicationRepository: repoConfig{
ssh: outputs["application_url"].Value.(string),
applicationRepository: gitUrl{
ssh: outputs["application_repository"].Value.(string),
},
sopsArgs: fmt.Sprintf("--gcp-kms %s", sharedSopsId),
notificationCfg: notificationCfg,
sopsArgs: fmt.Sprintf("--gcp-kms %s", sharedSopsId),
}
opts, err := authOpts(config.fleetInfraRepository.ssh, map[string][]byte{
@ -127,3 +150,29 @@ func getTestConfigGKE(ctx context.Context, outputs map[string]*tfjson.StateOutpu
return config, nil
}
func setupPubSubReceiver(ctx context.Context, c chan []byte, projectID string, topicID string) (func(), error) {
newCtx, cancel := context.WithCancel(ctx)
pubsubClient, err := pubsub.NewClient(newCtx, projectID)
if err != nil {
cancel()
return nil, fmt.Errorf("error creating pubsub client: %s", err)
}
sub := pubsubClient.Subscription(topicID)
go func() {
err = sub.Receive(ctx, func(ctx context.Context, message *pubsub.Message) {
c <- message.Data
message.Ack()
})
if err != nil && status.Code(err) != codes.Canceled {
log.Printf("error receiving message in subscription: %s\n", err)
return
}
}()
return func() {
cancel()
pubsubClient.Close()
}, nil
}