mirror of
https://github.com/grafana/grafana.git
synced 2025-07-29 17:12:29 +08:00
258 lines
8.0 KiB
Go
258 lines
8.0 KiB
Go
package responsewriter
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"sync/atomic"
|
|
|
|
"github.com/grafana/grafana-app-sdk/logging"
|
|
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
|
"k8s.io/apiserver/pkg/authentication/user"
|
|
"k8s.io/apiserver/pkg/endpoints/request"
|
|
"k8s.io/apiserver/pkg/endpoints/responsewriter"
|
|
"k8s.io/component-base/tracing"
|
|
"k8s.io/klog/v2"
|
|
)
|
|
|
|
var _ responsewriter.CloseNotifierFlusher = (*ResponseAdapter)(nil)
|
|
var _ http.ResponseWriter = (*ResponseAdapter)(nil)
|
|
var _ io.ReadCloser = (*ResponseAdapter)(nil)
|
|
|
|
// WrapHandler wraps an http.Handler to return a function that can be used as a [http.RoundTripper].
|
|
// This is used to directly connect the LoopbackConfig [http.RoundTripper]
|
|
// with the apiserver's [http.Handler], which avoids the need to start a listener
|
|
// for internal clients that use the LoopbackConfig.
|
|
// All other requests should not use this wrapper, and should be handled by the
|
|
// Grafana HTTP server to ensure that signedInUser middleware is applied.
|
|
func WrapHandler(handler http.Handler) func(req *http.Request) (*http.Response, error) {
|
|
// ignore the lint error because the response is passed directly to the client,
|
|
// so the client will be responsible for closing the response body.
|
|
//nolint:bodyclose
|
|
return func(req *http.Request) (*http.Response, error) {
|
|
ctx, cancel, err := createLimitedContext(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// The cancel happens in the goroutine we spawn, so as to not cancel it too early.
|
|
req = req.WithContext(ctx) // returns a shallow copy, so we can't do it as part of the adapter.
|
|
|
|
w := NewAdapter(req)
|
|
go func() {
|
|
defer cancel()
|
|
handler.ServeHTTP(w, req)
|
|
if err := w.CloseWriter(); err != nil {
|
|
klog.Errorf("error closing writer: %v", err)
|
|
}
|
|
}()
|
|
|
|
return w.Response()
|
|
}
|
|
}
|
|
|
|
// createLimitedContext creates a new context based on the the req's.
|
|
// It contains vital information such as a logger for the driver of the request, a user for auth, tracing, and deadlines. It propagates the parent's cancellation.
|
|
func createLimitedContext(req *http.Request) (context.Context, context.CancelFunc, error) {
|
|
refCtx := req.Context()
|
|
newCtx := context.Background()
|
|
|
|
if ns, ok := request.NamespaceFrom(refCtx); ok {
|
|
newCtx = request.WithNamespace(newCtx, ns)
|
|
}
|
|
if signal := request.ServerShutdownSignalFrom(refCtx); signal != nil {
|
|
newCtx = request.WithServerShutdownSignal(newCtx, signal)
|
|
}
|
|
|
|
requester, _ := identity.GetRequester(refCtx)
|
|
if requester != nil {
|
|
newCtx = identity.WithRequester(newCtx, requester)
|
|
}
|
|
|
|
usr, ok := request.UserFrom(refCtx)
|
|
if !ok && requester != nil {
|
|
// add in k8s user if not there yet
|
|
var ok bool
|
|
usr, ok = requester.(user.Info)
|
|
if !ok {
|
|
return nil, nil, fmt.Errorf("could not convert user to Kubernetes user")
|
|
}
|
|
}
|
|
if ok {
|
|
newCtx = request.WithUser(newCtx, usr)
|
|
}
|
|
|
|
// App SDK logger
|
|
appLogger := logging.FromContext(refCtx)
|
|
newCtx = logging.Context(newCtx, appLogger)
|
|
// Klog logger
|
|
klogger := klog.FromContext(refCtx)
|
|
if klogger.Enabled() {
|
|
newCtx = klog.NewContext(newCtx, klogger)
|
|
}
|
|
|
|
// The tracing package deals with both k8s trace and otel.
|
|
if span := tracing.SpanFromContext(refCtx); span != nil && *span != (tracing.Span{}) {
|
|
newCtx = tracing.ContextWithSpan(newCtx, span)
|
|
}
|
|
|
|
deadlineCancel := context.CancelFunc(func() {})
|
|
if deadline, ok := refCtx.Deadline(); ok {
|
|
newCtx, deadlineCancel = context.WithDeadline(newCtx, deadline)
|
|
}
|
|
|
|
newCtx, cancel := context.WithCancelCause(newCtx)
|
|
// We intentionally do not defer a cancel(nil) here. It wouldn't make sense to cancel until (*ResponseAdapter).Close() is called.
|
|
go func() { // Even context's own impls do goroutines for this type of pattern.
|
|
select {
|
|
case <-newCtx.Done():
|
|
// We don't have to do anything!
|
|
case <-refCtx.Done():
|
|
cancel(context.Cause(refCtx))
|
|
}
|
|
deadlineCancel()
|
|
}()
|
|
|
|
return newCtx, context.CancelFunc(func() {
|
|
cancel(nil)
|
|
deadlineCancel()
|
|
}), nil
|
|
}
|
|
|
|
// ResponseAdapter is an implementation of [http.ResponseWriter] that allows conversion to a [http.Response].
|
|
type ResponseAdapter struct {
|
|
req *http.Request
|
|
res http.Response
|
|
reader io.ReadCloser
|
|
writer io.WriteCloser
|
|
buffered *bufio.ReadWriter
|
|
ready chan struct{}
|
|
wroteHeader int32
|
|
}
|
|
|
|
// NewAdapter returns an initialized [ResponseAdapter].
|
|
func NewAdapter(req *http.Request) *ResponseAdapter {
|
|
r, w := io.Pipe()
|
|
writer := bufio.NewWriter(w)
|
|
reader := bufio.NewReader(r)
|
|
buffered := bufio.NewReadWriter(reader, writer)
|
|
if req.Method == http.MethodDelete && req.Body == nil {
|
|
// The apiserver tries to read the body of DELETE requests,
|
|
// which causes a panic if the body is nil.
|
|
// https://github.com/kubernetes/apiserver/blob/v0.32.1/pkg/endpoints/handlers/delete.go#L88
|
|
req.Body = http.NoBody
|
|
}
|
|
return &ResponseAdapter{
|
|
req: req,
|
|
res: http.Response{
|
|
Proto: req.Proto,
|
|
ProtoMajor: req.ProtoMajor,
|
|
ProtoMinor: req.ProtoMinor,
|
|
Header: make(http.Header),
|
|
},
|
|
reader: r,
|
|
writer: w,
|
|
buffered: buffered,
|
|
ready: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Header implements [http.ResponseWriter].
|
|
// It returns the response headers to mutate within a handler.
|
|
func (ra *ResponseAdapter) Header() http.Header {
|
|
return ra.res.Header
|
|
}
|
|
|
|
// Write implements [http.ResponseWriter].
|
|
func (ra *ResponseAdapter) Write(buf []byte) (int, error) {
|
|
// via https://pkg.go.dev/net/http#ResponseWriter.Write
|
|
// If WriteHeader is not called explicitly, the first call to Write will trigger an implicit WriteHeader(http.StatusOK).
|
|
ra.WriteHeader(http.StatusOK)
|
|
return ra.buffered.Write(buf)
|
|
}
|
|
|
|
// Read implements [io.Reader].
|
|
func (ra *ResponseAdapter) Read(buf []byte) (int, error) {
|
|
return ra.buffered.Read(buf)
|
|
}
|
|
|
|
// WriteHeader implements [http.ResponseWriter].
|
|
func (ra *ResponseAdapter) WriteHeader(code int) {
|
|
if atomic.CompareAndSwapInt32(&ra.wroteHeader, 0, 1) {
|
|
ra.res.StatusCode = code
|
|
ra.res.Status = fmt.Sprintf("%03d %s", code, http.StatusText(code))
|
|
close(ra.ready)
|
|
}
|
|
}
|
|
|
|
// FlushError implements [http.Flusher].
|
|
func (ra *ResponseAdapter) Flush() {
|
|
// We discard io.ErrClosedPipe. This is because as we return the response as
|
|
// soon as we have the first write or the status set, the client side with
|
|
// the response could potentially call Close on the response body, which
|
|
// would cause the reader side of the io.Pipe to be closed. This would cause
|
|
// a subsequent call to Write or Flush/FlushError (that have data to write
|
|
// to the pipe) to fail with this error. This is expected and legit, and
|
|
// this error should be checked by the handler side by either validating the
|
|
// error in Write or the one in FlushError. This means it is a
|
|
// responsibility of the handler author(s) to handle this error. In other
|
|
// cases, we log the error, as it could be potentially not easy to check
|
|
// otherwise.
|
|
if err := ra.FlushError(); err != nil && !errors.Is(err, io.ErrClosedPipe) {
|
|
klog.Error("Error flushing response buffer: ", "error", err)
|
|
}
|
|
}
|
|
|
|
// FlushError implements an alternative Flush that returns an error. This is
|
|
// internally used in net/http and in some standard library utilities.
|
|
func (ra *ResponseAdapter) FlushError() error {
|
|
if ra.buffered.Writer.Buffered() == 0 {
|
|
return nil
|
|
}
|
|
|
|
return ra.buffered.Flush()
|
|
}
|
|
|
|
// Response returns the [http.Response] generated by the [http.Handler].
|
|
func (ra *ResponseAdapter) Response() (*http.Response, error) {
|
|
ctx := ra.req.Context()
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
|
|
case <-ra.ready:
|
|
res := ra.res
|
|
res.Body = ra
|
|
|
|
return &res, nil
|
|
}
|
|
}
|
|
|
|
// Decorate implements [responsewriter.UserProvidedDecorator].
|
|
func (ra *ResponseAdapter) Unwrap() http.ResponseWriter {
|
|
return ra
|
|
}
|
|
|
|
// CloseNotify implements [http.CloseNotifier].
|
|
func (ra *ResponseAdapter) CloseNotify() <-chan bool {
|
|
ch := make(chan bool)
|
|
go func() {
|
|
<-ra.req.Context().Done()
|
|
ch <- true
|
|
}()
|
|
return ch
|
|
}
|
|
|
|
// Close implements [io.Closer].
|
|
func (ra *ResponseAdapter) Close() error {
|
|
return ra.reader.Close()
|
|
}
|
|
|
|
// CloseWriter should be called after the http.Handler has returned.
|
|
func (ra *ResponseAdapter) CloseWriter() error {
|
|
ra.Flush()
|
|
return ra.writer.Close()
|
|
}
|