mirror of
https://github.com/fluxcd/flux2.git
synced 2025-11-01 18:26:25 +08:00
Merge pull request #3945 from fluxcd/lenient-logs-cmd
Make `flux logs` more lenient
This commit is contained in:
@ -20,6 +20,7 @@ import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
@ -76,7 +77,7 @@ type logsFlags struct {
|
||||
sinceSeconds time.Duration
|
||||
}
|
||||
|
||||
var logsArgs = &logsFlags{
|
||||
var logsArgs = logsFlags{
|
||||
tail: -1,
|
||||
}
|
||||
|
||||
@ -115,7 +116,7 @@ func logsCmdRun(cmd *cobra.Command, args []string) error {
|
||||
return fmt.Errorf("no argument required")
|
||||
}
|
||||
|
||||
pods, err := getPods(ctx, clientset, fluxSelector)
|
||||
pods, err := getPods(ctx, clientset, logsArgs.fluxNamespace, fluxSelector)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -163,13 +164,16 @@ func logsCmdRun(cmd *cobra.Command, args []string) error {
|
||||
return podLogs(ctx, requests)
|
||||
}
|
||||
|
||||
func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]corev1.Pod, error) {
|
||||
// getPods searches for all Deployments in the given namespace that match the given label and returns a list of Pods
|
||||
// from these Deployments. For each Deployment a single Pod is chosen (based on various factors such as the running
|
||||
// state). If no Pod is found, an error is returned.
|
||||
func getPods(ctx context.Context, c *kubernetes.Clientset, ns string, label string) ([]corev1.Pod, error) {
|
||||
var ret []corev1.Pod
|
||||
|
||||
opts := metav1.ListOptions{
|
||||
LabelSelector: label,
|
||||
}
|
||||
deployList, err := c.AppsV1().Deployments(logsArgs.fluxNamespace).List(ctx, opts)
|
||||
deployList, err := c.AppsV1().Deployments(ns).List(ctx, opts)
|
||||
if err != nil {
|
||||
return ret, err
|
||||
}
|
||||
@ -179,7 +183,7 @@ func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]core
|
||||
opts := metav1.ListOptions{
|
||||
LabelSelector: createLabelStringFromMap(label),
|
||||
}
|
||||
podList, err := c.CoreV1().Pods(logsArgs.fluxNamespace).List(ctx, opts)
|
||||
podList, err := c.CoreV1().Pods(ns).List(ctx, opts)
|
||||
if err != nil {
|
||||
return ret, err
|
||||
}
|
||||
@ -196,11 +200,16 @@ func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]core
|
||||
}
|
||||
}
|
||||
|
||||
if len(ret) == 0 {
|
||||
return nil, fmt.Errorf("no Flux pods found in namespace %q", ns)
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error {
|
||||
reader, writer := io.Pipe()
|
||||
errReader, errWriter := io.Pipe()
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(len(requests))
|
||||
|
||||
@ -208,7 +217,7 @@ func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error
|
||||
go func(req rest.ResponseWrapper) {
|
||||
defer wg.Done()
|
||||
if err := logRequest(ctx, req, writer); err != nil {
|
||||
writer.CloseWithError(err)
|
||||
fmt.Fprintf(errWriter, "failed getting logs: %s\n", err)
|
||||
return
|
||||
}
|
||||
}(request)
|
||||
@ -217,20 +226,40 @@ func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error
|
||||
go func() {
|
||||
wg.Wait()
|
||||
writer.Close()
|
||||
errWriter.Close()
|
||||
}()
|
||||
|
||||
_, err := io.Copy(os.Stdout, reader)
|
||||
return err
|
||||
stdoutErrCh := asyncCopy(os.Stdout, reader)
|
||||
stderrErrCh := asyncCopy(os.Stderr, errReader)
|
||||
|
||||
return errors.Join(<-stdoutErrCh, <-stderrErrCh)
|
||||
}
|
||||
|
||||
// asyncCopy copies all data from from dst to src asynchronously and returns a channel for reading an error value.
|
||||
// This is basically an asynchronous wrapper around `io.Copy`. The returned channel is unbuffered and always is sent
|
||||
// a value (either nil or the error from `io.Copy`) as soon as `io.Copy` returns.
|
||||
// This function lets you copy from multiple sources into multiple destinations in parallel.
|
||||
func asyncCopy(dst io.Writer, src io.Reader) <-chan error {
|
||||
errCh := make(chan error)
|
||||
go func(errCh chan error) {
|
||||
_, err := io.Copy(dst, src)
|
||||
errCh <- err
|
||||
}(errCh)
|
||||
|
||||
return errCh
|
||||
}
|
||||
|
||||
func podLogs(ctx context.Context, requests []rest.ResponseWrapper) error {
|
||||
var retErr error
|
||||
for _, req := range requests {
|
||||
if err := logRequest(ctx, req, os.Stdout); err != nil {
|
||||
return err
|
||||
fmt.Fprintf(os.Stderr, "failed getting logs: %s\n", err)
|
||||
retErr = fmt.Errorf("failed to collect logs from all Flux pods")
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return retErr
|
||||
}
|
||||
|
||||
func createLabelStringFromMap(m map[string]string) string {
|
||||
|
||||
88
cmd/flux/logs_e2e_test.go
Normal file
88
cmd/flux/logs_e2e_test.go
Normal file
@ -0,0 +1,88 @@
|
||||
//go:build e2e
|
||||
// +build e2e
|
||||
|
||||
/*
|
||||
Copyright 2021 The Flux authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestLogsNoArgs(t *testing.T) {
|
||||
cmd := cmdTestCase{
|
||||
args: "logs",
|
||||
assert: assertSuccess(),
|
||||
}
|
||||
cmd.runTestCmd(t)
|
||||
}
|
||||
|
||||
func TestLogsWrongNamespace(t *testing.T) {
|
||||
cmd := cmdTestCase{
|
||||
args: "logs --flux-namespace=default",
|
||||
assert: assertError(`no Flux pods found in namespace "default"`),
|
||||
}
|
||||
cmd.runTestCmd(t)
|
||||
}
|
||||
|
||||
func TestLogsAllNamespaces(t *testing.T) {
|
||||
cmd := cmdTestCase{
|
||||
args: "logs --all-namespaces",
|
||||
assert: assertSuccess(),
|
||||
}
|
||||
cmd.runTestCmd(t)
|
||||
}
|
||||
|
||||
func TestLogsSince(t *testing.T) {
|
||||
cmd := cmdTestCase{
|
||||
args: "logs --since=2m",
|
||||
assert: assertSuccess(),
|
||||
}
|
||||
cmd.runTestCmd(t)
|
||||
}
|
||||
|
||||
func TestLogsSinceInvalid(t *testing.T) {
|
||||
cmd := cmdTestCase{
|
||||
args: "logs --since=XXX",
|
||||
assert: assertError(`invalid argument "XXX" for "--since" flag: time: invalid duration "XXX"`),
|
||||
}
|
||||
cmd.runTestCmd(t)
|
||||
}
|
||||
|
||||
func TestLogsSinceTime(t *testing.T) {
|
||||
cmd := cmdTestCase{
|
||||
args: "logs --since-time=2021-08-06T14:26:25.546Z",
|
||||
assert: assertSuccess(),
|
||||
}
|
||||
cmd.runTestCmd(t)
|
||||
}
|
||||
|
||||
func TestLogsSinceTimeInvalid(t *testing.T) {
|
||||
cmd := cmdTestCase{
|
||||
args: "logs --since-time=XXX",
|
||||
assert: assertError("XXX is not a valid (RFC3339) time"),
|
||||
}
|
||||
cmd.runTestCmd(t)
|
||||
}
|
||||
|
||||
func TestLogsSinceOnlyOneAllowed(t *testing.T) {
|
||||
cmd := cmdTestCase{
|
||||
args: "logs --since=2m --since-time=2021-08-06T14:26:25.546Z",
|
||||
assert: assertError("at most one of `sinceTime` or `sinceSeconds` may be specified"),
|
||||
}
|
||||
cmd.runTestCmd(t)
|
||||
}
|
||||
@ -30,73 +30,17 @@ import (
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
func TestLogsNoArgs(t *testing.T) {
|
||||
cmd := cmdTestCase{
|
||||
args: "logs",
|
||||
assert: assertSuccess(),
|
||||
}
|
||||
cmd.runTestCmd(t)
|
||||
}
|
||||
|
||||
func TestLogsAllNamespaces(t *testing.T) {
|
||||
cmd := cmdTestCase{
|
||||
args: "logs --all-namespaces",
|
||||
assert: assertSuccess(),
|
||||
}
|
||||
cmd.runTestCmd(t)
|
||||
}
|
||||
|
||||
func TestLogsSince(t *testing.T) {
|
||||
cmd := cmdTestCase{
|
||||
args: "logs --since=2m",
|
||||
assert: assertSuccess(),
|
||||
}
|
||||
cmd.runTestCmd(t)
|
||||
}
|
||||
|
||||
func TestLogsSinceInvalid(t *testing.T) {
|
||||
cmd := cmdTestCase{
|
||||
args: "logs --since=XXX",
|
||||
assert: assertError(`invalid argument "XXX" for "--since" flag: time: invalid duration "XXX"`),
|
||||
}
|
||||
cmd.runTestCmd(t)
|
||||
}
|
||||
|
||||
func TestLogsSinceTime(t *testing.T) {
|
||||
cmd := cmdTestCase{
|
||||
args: "logs --since-time=2021-08-06T14:26:25.546Z",
|
||||
assert: assertSuccess(),
|
||||
}
|
||||
cmd.runTestCmd(t)
|
||||
}
|
||||
|
||||
func TestLogsSinceTimeInvalid(t *testing.T) {
|
||||
cmd := cmdTestCase{
|
||||
args: "logs --since-time=XXX",
|
||||
assert: assertError("XXX is not a valid (RFC3339) time"),
|
||||
}
|
||||
cmd.runTestCmd(t)
|
||||
}
|
||||
|
||||
func TestLogsSinceOnlyOneAllowed(t *testing.T) {
|
||||
cmd := cmdTestCase{
|
||||
args: "logs --since=2m --since-time=2021-08-06T14:26:25.546Z",
|
||||
assert: assertError("at most one of `sinceTime` or `sinceSeconds` may be specified"),
|
||||
}
|
||||
cmd.runTestCmd(t)
|
||||
}
|
||||
|
||||
func TestLogRequest(t *testing.T) {
|
||||
mapper := &testResponseMapper{}
|
||||
tests := []struct {
|
||||
name string
|
||||
namespace string
|
||||
flags *logsFlags
|
||||
flags logsFlags
|
||||
assertFile string
|
||||
}{
|
||||
{
|
||||
name: "all logs",
|
||||
flags: &logsFlags{
|
||||
flags: logsFlags{
|
||||
tail: -1,
|
||||
allNamespaces: true,
|
||||
},
|
||||
@ -105,14 +49,14 @@ func TestLogRequest(t *testing.T) {
|
||||
{
|
||||
name: "filter by namespace",
|
||||
namespace: "default",
|
||||
flags: &logsFlags{
|
||||
flags: logsFlags{
|
||||
tail: -1,
|
||||
},
|
||||
assertFile: "testdata/logs/namespace.txt",
|
||||
},
|
||||
{
|
||||
name: "filter by kind and namespace",
|
||||
flags: &logsFlags{
|
||||
flags: logsFlags{
|
||||
tail: -1,
|
||||
kind: "Kustomization",
|
||||
},
|
||||
@ -120,7 +64,7 @@ func TestLogRequest(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "filter by loglevel",
|
||||
flags: &logsFlags{
|
||||
flags: logsFlags{
|
||||
tail: -1,
|
||||
logLevel: "error",
|
||||
allNamespaces: true,
|
||||
@ -130,7 +74,7 @@ func TestLogRequest(t *testing.T) {
|
||||
{
|
||||
name: "filter by namespace, name, loglevel and kind",
|
||||
namespace: "flux-system",
|
||||
flags: &logsFlags{
|
||||
flags: logsFlags{
|
||||
tail: -1,
|
||||
logLevel: "error",
|
||||
kind: "Kustomization",
|
||||
@ -163,7 +107,7 @@ func TestLogRequest(t *testing.T) {
|
||||
|
||||
// reset flags to default
|
||||
*kubeconfigArgs.Namespace = rootArgs.defaults.Namespace
|
||||
logsArgs = &logsFlags{
|
||||
logsArgs = logsFlags{
|
||||
tail: -1,
|
||||
}
|
||||
})
|
||||
@ -392,6 +392,10 @@ func resetCmdArgs() {
|
||||
alertProviderArgs = alertProviderFlags{}
|
||||
bootstrapArgs = NewBootstrapFlags()
|
||||
bServerArgs = bServerFlags{}
|
||||
logsArgs = logsFlags{
|
||||
tail: -1,
|
||||
fluxNamespace: rootArgs.defaults.Namespace,
|
||||
}
|
||||
buildKsArgs = buildKsFlags{}
|
||||
checkArgs = checkFlags{}
|
||||
createArgs = createFlags{}
|
||||
|
||||
Reference in New Issue
Block a user