mirror of
https://github.com/ipfs/kubo.git
synced 2025-05-17 23:16:11 +08:00
feat: add tracing to the commands client
This commit is contained in:
@ -10,8 +10,15 @@ import (
|
||||
"net/http"
|
||||
"os"
|
||||
"runtime/pprof"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
cmds "github.com/ipfs/go-ipfs-cmds"
|
||||
"github.com/ipfs/go-ipfs-cmds/cli"
|
||||
cmdhttp "github.com/ipfs/go-ipfs-cmds/http"
|
||||
u "github.com/ipfs/go-ipfs-util"
|
||||
logging "github.com/ipfs/go-log"
|
||||
"github.com/ipfs/kubo/cmd/ipfs/util"
|
||||
oldcmds "github.com/ipfs/kubo/commands"
|
||||
"github.com/ipfs/kubo/core"
|
||||
@ -21,22 +28,19 @@ import (
|
||||
"github.com/ipfs/kubo/repo"
|
||||
"github.com/ipfs/kubo/repo/fsrepo"
|
||||
"github.com/ipfs/kubo/tracing"
|
||||
|
||||
cmds "github.com/ipfs/go-ipfs-cmds"
|
||||
"github.com/ipfs/go-ipfs-cmds/cli"
|
||||
cmdhttp "github.com/ipfs/go-ipfs-cmds/http"
|
||||
u "github.com/ipfs/go-ipfs-util"
|
||||
logging "github.com/ipfs/go-log"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
madns "github.com/multiformats/go-multiaddr-dns"
|
||||
manet "github.com/multiformats/go-multiaddr/net"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// log is the command logger
|
||||
var log = logging.Logger("cmd/ipfs")
|
||||
var tracer trace.Tracer
|
||||
|
||||
// declared as a var for testing purposes
|
||||
var dnsResolver = madns.DefaultResolver
|
||||
@ -91,7 +95,6 @@ func newUUID(key string) logging.Metadata {
|
||||
func mainRet() (exitCode int) {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
ctx := logging.ContextWithLoggable(context.Background(), newUUID("session"))
|
||||
var err error
|
||||
|
||||
tp, err := tracing.NewTracerProvider(ctx)
|
||||
if err != nil {
|
||||
@ -103,6 +106,7 @@ func mainRet() (exitCode int) {
|
||||
}
|
||||
}()
|
||||
otel.SetTracerProvider(tp)
|
||||
tracer = tp.Tracer("Kubo-cli")
|
||||
|
||||
stopFunc, err := profileIfEnabled()
|
||||
if err != nil {
|
||||
@ -219,7 +223,7 @@ func apiAddrOption(req *cmds.Request) (ma.Multiaddr, error) {
|
||||
}
|
||||
|
||||
func makeExecutor(req *cmds.Request, env interface{}) (cmds.Executor, error) {
|
||||
exe := cmds.NewExecutor(req.Root)
|
||||
exe := tracingWrappedExecutor{cmds.NewExecutor(req.Root)}
|
||||
cctx := env.(*oldcmds.Context)
|
||||
|
||||
// Check if the command is disabled.
|
||||
@ -294,23 +298,44 @@ func makeExecutor(req *cmds.Request, env interface{}) (cmds.Executor, error) {
|
||||
opts = append(opts, cmdhttp.ClientWithFallback(exe))
|
||||
}
|
||||
|
||||
var tpt http.RoundTripper
|
||||
switch network {
|
||||
case "tcp", "tcp4", "tcp6":
|
||||
tpt = http.DefaultTransport
|
||||
case "unix":
|
||||
path := host
|
||||
host = "unix"
|
||||
opts = append(opts, cmdhttp.ClientWithHTTPClient(&http.Client{
|
||||
Transport: &http.Transport{
|
||||
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
|
||||
return net.Dial("unix", path)
|
||||
},
|
||||
tpt = &http.Transport{
|
||||
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
|
||||
return net.Dial("unix", path)
|
||||
},
|
||||
}))
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported API address: %s", apiAddr)
|
||||
}
|
||||
opts = append(opts, cmdhttp.ClientWithHTTPClient(&http.Client{
|
||||
Transport: otelhttp.NewTransport(tpt,
|
||||
otelhttp.WithPropagators(tracing.Propagator()),
|
||||
),
|
||||
}))
|
||||
|
||||
return cmdhttp.NewClient(host, opts...), nil
|
||||
return tracingWrappedExecutor{cmdhttp.NewClient(host, opts...)}, nil
|
||||
}
|
||||
|
||||
type tracingWrappedExecutor struct {
|
||||
exec cmds.Executor
|
||||
}
|
||||
|
||||
func (twe tracingWrappedExecutor) Execute(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment) error {
|
||||
ctx, span := tracer.Start(req.Context, "cmds."+strings.Join(req.Path, "."), trace.WithAttributes(attribute.StringSlice("Arguments", req.Arguments)))
|
||||
defer span.End()
|
||||
req.Context = ctx
|
||||
|
||||
err := twe.exec.Execute(req, re, env)
|
||||
if err != nil {
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func getRepoPath(req *cmds.Request) (string, error) {
|
||||
|
@ -9,15 +9,16 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
version "github.com/ipfs/kubo"
|
||||
oldcmds "github.com/ipfs/kubo/commands"
|
||||
"github.com/ipfs/kubo/core"
|
||||
corecommands "github.com/ipfs/kubo/core/commands"
|
||||
|
||||
cmds "github.com/ipfs/go-ipfs-cmds"
|
||||
cmdsHttp "github.com/ipfs/go-ipfs-cmds/http"
|
||||
path "github.com/ipfs/go-path"
|
||||
version "github.com/ipfs/kubo"
|
||||
oldcmds "github.com/ipfs/kubo/commands"
|
||||
config "github.com/ipfs/kubo/config"
|
||||
"github.com/ipfs/kubo/core"
|
||||
corecommands "github.com/ipfs/kubo/core/commands"
|
||||
"github.com/ipfs/kubo/tracing"
|
||||
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -146,6 +147,7 @@ func commandsOption(cctx oldcmds.Context, command *cmds.Command, allowGet bool)
|
||||
patchCORSVars(cfg, l.Addr())
|
||||
|
||||
cmdHandler := cmdsHttp.NewHandler(&cctx, command, cfg)
|
||||
cmdHandler = otelhttp.NewHandler(cmdHandler, "corehttp.cmdsHandler", otelhttp.WithPropagators(tracing.Propagator()))
|
||||
mux.Handle(APIPath+"/", cmdHandler)
|
||||
return mux, nil
|
||||
}
|
||||
|
@ -48,6 +48,8 @@ func GatewayOption(paths ...string) ServeOption {
|
||||
}
|
||||
|
||||
gw := gateway.NewHandler(gwConfig, gwAPI)
|
||||
// TODO: Add otelhttp.WithPropagators(tracing.Propagator()) option to
|
||||
// propagate traces through the gateway once we test this feature.
|
||||
gw = otelhttp.NewHandler(gw, "Gateway.Request")
|
||||
|
||||
// By default, our HTTP handler is the gateway handler.
|
||||
|
@ -23,6 +23,7 @@ test_ls_cmd() {
|
||||
printf "HTTP/1.1 200 OK\r\n" >expected_output &&
|
||||
printf "Access-Control-Allow-Headers: X-Stream-Output, X-Chunked-Output, X-Content-Length\r\n" >>expected_output &&
|
||||
printf "Access-Control-Expose-Headers: X-Stream-Output, X-Chunked-Output, X-Content-Length\r\n" >>expected_output &&
|
||||
printf "Connection: close\r\n" >>expected_output &&
|
||||
printf "Content-Type: text/plain\r\n" >>expected_output &&
|
||||
printf "Server: kubo/%s\r\n" $(ipfs version -n) >>expected_output &&
|
||||
printf "Trailer: X-Stream-Error\r\n" >>expected_output &&
|
||||
@ -46,6 +47,7 @@ test_ls_cmd() {
|
||||
printf "HTTP/1.1 200 OK\r\n" >expected_output &&
|
||||
printf "Access-Control-Allow-Headers: X-Stream-Output, X-Chunked-Output, X-Content-Length\r\n" >>expected_output &&
|
||||
printf "Access-Control-Expose-Headers: X-Stream-Output, X-Chunked-Output, X-Content-Length\r\n" >>expected_output &&
|
||||
printf "Connection: close\r\n" >>expected_output &&
|
||||
printf "Content-Type: application/json\r\n" >>expected_output &&
|
||||
printf "Server: kubo/%s\r\n" $(ipfs version -n) >>expected_output &&
|
||||
printf "Trailer: X-Stream-Error\r\n" >>expected_output &&
|
||||
|
@ -13,6 +13,7 @@ import (
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
|
||||
"go.opentelemetry.io/otel/exporters/zipkin"
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
"go.opentelemetry.io/otel/sdk/trace"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
|
||||
@ -130,7 +131,7 @@ func NewTracerProvider(ctx context.Context) (shutdownTracerProvider, error) {
|
||||
r, err := resource.Merge(
|
||||
resource.Default(),
|
||||
resource.NewSchemaless(
|
||||
semconv.ServiceNameKey.String("go-ipfs"),
|
||||
semconv.ServiceNameKey.String("Kubo"),
|
||||
semconv.ServiceVersionKey.String(version.CurrentVersionNumber),
|
||||
),
|
||||
)
|
||||
@ -144,5 +145,9 @@ func NewTracerProvider(ctx context.Context) (shutdownTracerProvider, error) {
|
||||
|
||||
// Span starts a new span using the standard IPFS tracing conventions.
|
||||
func Span(ctx context.Context, componentName string, spanName string, opts ...traceapi.SpanStartOption) (context.Context, traceapi.Span) {
|
||||
return otel.Tracer("go-ipfs").Start(ctx, fmt.Sprintf("%s.%s", componentName, spanName), opts...)
|
||||
return otel.Tracer("Kubo").Start(ctx, fmt.Sprintf("%s.%s", componentName, spanName), opts...)
|
||||
}
|
||||
|
||||
func Propagator() propagation.TextMapPropagator {
|
||||
return propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
|
||||
}
|
||||
|
Reference in New Issue
Block a user