mirror of
https://github.com/containers/podman.git
synced 2025-10-19 04:03:23 +08:00
Send HTTP Hijack headers after successful attach
Our previous flow was to perform a hijack before passing a connection into Libpod, and then Libpod would attach to the container's attach socket and begin forwarding traffic. A problem emerges: we write the attach header as soon as the attach complete. As soon as we write the header, the client assumes that all is ready, and sends a Start request. This Start may be processed *before* we successfully finish attaching, causing us to lose output. The solution is to handle hijacking inside Libpod. Unfortunately, this requires a downright extensive refactor of the Attach and HTTP Exec StartAndAttach code. I think the result is an improvement in some places (a lot more errors will be handled with a proper HTTP error code, before the hijack occurs) but other parts, like the relocation of printing container logs, are just *bad*. Still, we need this fixed now to get CI back into good shape... Fixes #7195 Signed-off-by: Matthew Heon <matthew.heon@pm.me>
This commit is contained in:
@ -1,18 +1,14 @@
|
|||||||
package libpod
|
package libpod
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"context"
|
"context"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/containers/podman/v2/libpod/define"
|
"github.com/containers/podman/v2/libpod/define"
|
||||||
"github.com/containers/podman/v2/libpod/events"
|
"github.com/containers/podman/v2/libpod/events"
|
||||||
"github.com/containers/podman/v2/libpod/logs"
|
|
||||||
"github.com/opentracing/opentracing-go"
|
"github.com/opentracing/opentracing-go"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
@ -267,15 +263,10 @@ func (c *Container) Attach(streams *define.AttachStreams, keys string, resize <-
|
|||||||
// over the socket; if this is not set, but streamLogs is, only the logs will be
|
// over the socket; if this is not set, but streamLogs is, only the logs will be
|
||||||
// sent.
|
// sent.
|
||||||
// At least one of streamAttach and streamLogs must be set.
|
// At least one of streamAttach and streamLogs must be set.
|
||||||
func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool, streamAttach, streamLogs bool) (deferredErr error) {
|
func (c *Container) HTTPAttach(r *http.Request, w http.ResponseWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool, streamAttach, streamLogs bool, hijackDone chan<- bool) error {
|
||||||
isTerminal := false
|
// Ensure we don't leak a goroutine if we exit before hijack completes.
|
||||||
if c.config.Spec.Process != nil {
|
|
||||||
isTerminal = c.config.Spec.Process.Terminal
|
|
||||||
}
|
|
||||||
// Ensure our contract of writing errors to and closing the HTTP conn is
|
|
||||||
// honored.
|
|
||||||
defer func() {
|
defer func() {
|
||||||
hijackWriteErrorAndClose(deferredErr, c.ID(), isTerminal, httpCon, httpBuf)
|
close(hijackDone)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if !c.batched {
|
if !c.batched {
|
||||||
@ -299,74 +290,8 @@ func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, stre
|
|||||||
|
|
||||||
logrus.Infof("Performing HTTP Hijack attach to container %s", c.ID())
|
logrus.Infof("Performing HTTP Hijack attach to container %s", c.ID())
|
||||||
|
|
||||||
logSize := 0
|
|
||||||
if streamLogs {
|
|
||||||
// Get all logs for the container
|
|
||||||
logChan := make(chan *logs.LogLine)
|
|
||||||
logOpts := new(logs.LogOptions)
|
|
||||||
logOpts.Tail = -1
|
|
||||||
logOpts.WaitGroup = new(sync.WaitGroup)
|
|
||||||
errChan := make(chan error)
|
|
||||||
go func() {
|
|
||||||
var err error
|
|
||||||
// In non-terminal mode we need to prepend with the
|
|
||||||
// stream header.
|
|
||||||
logrus.Debugf("Writing logs for container %s to HTTP attach", c.ID())
|
|
||||||
for logLine := range logChan {
|
|
||||||
if !isTerminal {
|
|
||||||
device := logLine.Device
|
|
||||||
var header []byte
|
|
||||||
headerLen := uint32(len(logLine.Msg))
|
|
||||||
logSize += len(logLine.Msg)
|
|
||||||
switch strings.ToLower(device) {
|
|
||||||
case "stdin":
|
|
||||||
header = makeHTTPAttachHeader(0, headerLen)
|
|
||||||
case "stdout":
|
|
||||||
header = makeHTTPAttachHeader(1, headerLen)
|
|
||||||
case "stderr":
|
|
||||||
header = makeHTTPAttachHeader(2, headerLen)
|
|
||||||
default:
|
|
||||||
logrus.Errorf("Unknown device for log line: %s", device)
|
|
||||||
header = makeHTTPAttachHeader(1, headerLen)
|
|
||||||
}
|
|
||||||
_, err = httpBuf.Write(header)
|
|
||||||
if err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_, err = httpBuf.Write([]byte(logLine.Msg))
|
|
||||||
if err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
_, err = httpBuf.Write([]byte("\n"))
|
|
||||||
if err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
err = httpBuf.Flush()
|
|
||||||
if err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
errChan <- err
|
|
||||||
}()
|
|
||||||
go func() {
|
|
||||||
logOpts.WaitGroup.Wait()
|
|
||||||
close(logChan)
|
|
||||||
}()
|
|
||||||
if err := c.ReadLog(context.Background(), logOpts, logChan); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
logrus.Debugf("Done reading logs for container %s, %d bytes", c.ID(), logSize)
|
|
||||||
if err := <-errChan; err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !streamAttach {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
c.newContainerEvent(events.Attach)
|
c.newContainerEvent(events.Attach)
|
||||||
return c.ociRuntime.HTTPAttach(c, httpCon, httpBuf, streams, detachKeys, cancel)
|
return c.ociRuntime.HTTPAttach(c, r, w, streams, detachKeys, cancel, hijackDone, streamAttach, streamLogs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AttachResize resizes the container's terminal, which is displayed by Attach
|
// AttachResize resizes the container's terminal, which is displayed by Attach
|
||||||
|
@ -1,9 +1,8 @@
|
|||||||
package libpod
|
package libpod
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -373,17 +372,12 @@ func (c *Container) ExecStartAndAttach(sessionID string, streams *define.AttachS
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ExecHTTPStartAndAttach starts and performs an HTTP attach to an exec session.
|
// ExecHTTPStartAndAttach starts and performs an HTTP attach to an exec session.
|
||||||
func (c *Container) ExecHTTPStartAndAttach(sessionID string, httpCon net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool) (deferredErr error) {
|
func (c *Container) ExecHTTPStartAndAttach(sessionID string, r *http.Request, w http.ResponseWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool, hijackDone chan<- bool) error {
|
||||||
// TODO: How do we combine streams with the default streams set in the exec session?
|
// TODO: How do we combine streams with the default streams set in the exec session?
|
||||||
|
|
||||||
// The flow here is somewhat strange, because we need to determine if
|
// Ensure that we don't leak a goroutine here
|
||||||
// there's a terminal ASAP (for error handling).
|
|
||||||
// Until we know, assume it's true (don't add standard stream headers).
|
|
||||||
// Add a defer to ensure our invariant (HTTP session is closed) is
|
|
||||||
// maintained.
|
|
||||||
isTerminal := true
|
|
||||||
defer func() {
|
defer func() {
|
||||||
hijackWriteErrorAndClose(deferredErr, c.ID(), isTerminal, httpCon, httpBuf)
|
close(hijackDone)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if !c.batched {
|
if !c.batched {
|
||||||
@ -399,8 +393,6 @@ func (c *Container) ExecHTTPStartAndAttach(sessionID string, httpCon net.Conn, h
|
|||||||
if !ok {
|
if !ok {
|
||||||
return errors.Wrapf(define.ErrNoSuchExecSession, "container %s has no exec session with ID %s", c.ID(), sessionID)
|
return errors.Wrapf(define.ErrNoSuchExecSession, "container %s has no exec session with ID %s", c.ID(), sessionID)
|
||||||
}
|
}
|
||||||
// We can now finally get the real value of isTerminal.
|
|
||||||
isTerminal = session.Config.Terminal
|
|
||||||
|
|
||||||
// Verify that we are in a good state to continue
|
// Verify that we are in a good state to continue
|
||||||
if !c.ensureState(define.ContainerStateRunning) {
|
if !c.ensureState(define.ContainerStateRunning) {
|
||||||
@ -432,7 +424,13 @@ func (c *Container) ExecHTTPStartAndAttach(sessionID string, httpCon net.Conn, h
|
|||||||
streams.Stderr = session.Config.AttachStderr
|
streams.Stderr = session.Config.AttachStderr
|
||||||
}
|
}
|
||||||
|
|
||||||
pid, attachChan, err := c.ociRuntime.ExecContainerHTTP(c, session.ID(), execOpts, httpCon, httpBuf, streams, cancel)
|
holdConnOpen := make(chan bool)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
close(holdConnOpen)
|
||||||
|
}()
|
||||||
|
|
||||||
|
pid, attachChan, err := c.ociRuntime.ExecContainerHTTP(c, session.ID(), execOpts, r, w, streams, cancel, hijackDone, holdConnOpen)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
session.State = define.ExecStateStopped
|
session.State = define.ExecStateStopped
|
||||||
session.ExitCode = define.TranslateExecErrorToExitCode(define.ExecErrorCodeGeneric, err)
|
session.ExitCode = define.TranslateExecErrorToExitCode(define.ExecErrorCodeGeneric, err)
|
||||||
|
@ -1,8 +1,7 @@
|
|||||||
package libpod
|
package libpod
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"net/http"
|
||||||
"net"
|
|
||||||
|
|
||||||
"github.com/containers/podman/v2/libpod/define"
|
"github.com/containers/podman/v2/libpod/define"
|
||||||
"k8s.io/client-go/tools/remotecommand"
|
"k8s.io/client-go/tools/remotecommand"
|
||||||
@ -63,7 +62,7 @@ type OCIRuntime interface {
|
|||||||
// used instead. Detach keys of "" will disable detaching via keyboard.
|
// used instead. Detach keys of "" will disable detaching via keyboard.
|
||||||
// The streams parameter will determine which streams to forward to the
|
// The streams parameter will determine which streams to forward to the
|
||||||
// client.
|
// client.
|
||||||
HTTPAttach(ctr *Container, httpConn net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool) error
|
HTTPAttach(ctr *Container, r *http.Request, w http.ResponseWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool, hijackDone chan<- bool, streamAttach, streamLogs bool) error
|
||||||
// AttachResize resizes the terminal in use by the given container.
|
// AttachResize resizes the terminal in use by the given container.
|
||||||
AttachResize(ctr *Container, newSize remotecommand.TerminalSize) error
|
AttachResize(ctr *Container, newSize remotecommand.TerminalSize) error
|
||||||
|
|
||||||
@ -80,7 +79,7 @@ type OCIRuntime interface {
|
|||||||
// Maintains the same invariants as ExecContainer (returns on session
|
// Maintains the same invariants as ExecContainer (returns on session
|
||||||
// start, with a goroutine running in the background to handle attach).
|
// start, with a goroutine running in the background to handle attach).
|
||||||
// The HTTP attach itself maintains the same invariants as HTTPAttach.
|
// The HTTP attach itself maintains the same invariants as HTTPAttach.
|
||||||
ExecContainerHTTP(ctr *Container, sessionID string, options *ExecOptions, httpConn net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, cancel <-chan bool) (int, chan error, error)
|
ExecContainerHTTP(ctr *Container, sessionID string, options *ExecOptions, r *http.Request, w http.ResponseWriter, streams *HTTPAttachStreams, cancel <-chan bool, hijackDone chan<- bool, holdConnOpen <-chan bool) (int, chan error, error)
|
||||||
// ExecContainerDetached executes a command in a running container, but
|
// ExecContainerDetached executes a command in a running container, but
|
||||||
// does not attach to it. Returns the PID of the exec session and an
|
// does not attach to it. Returns the PID of the exec session and an
|
||||||
// error (if starting the exec session failed)
|
// error (if starting the exec session failed)
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
package libpod
|
package libpod
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@ -80,7 +80,7 @@ func (r *ConmonOCIRuntime) ExecContainer(c *Container, sessionID string, options
|
|||||||
|
|
||||||
// ExecContainerHTTP executes a new command in an existing container and
|
// ExecContainerHTTP executes a new command in an existing container and
|
||||||
// forwards its standard streams over an attach
|
// forwards its standard streams over an attach
|
||||||
func (r *ConmonOCIRuntime) ExecContainerHTTP(ctr *Container, sessionID string, options *ExecOptions, httpConn net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, cancel <-chan bool) (int, chan error, error) {
|
func (r *ConmonOCIRuntime) ExecContainerHTTP(ctr *Container, sessionID string, options *ExecOptions, req *http.Request, w http.ResponseWriter, streams *HTTPAttachStreams, cancel <-chan bool, hijackDone chan<- bool, holdConnOpen <-chan bool) (int, chan error, error) {
|
||||||
if streams != nil {
|
if streams != nil {
|
||||||
if !streams.Stdin && !streams.Stdout && !streams.Stderr {
|
if !streams.Stdin && !streams.Stdout && !streams.Stderr {
|
||||||
return -1, nil, errors.Wrapf(define.ErrInvalidArg, "must provide at least one stream to attach to")
|
return -1, nil, errors.Wrapf(define.ErrInvalidArg, "must provide at least one stream to attach to")
|
||||||
@ -129,7 +129,7 @@ func (r *ConmonOCIRuntime) ExecContainerHTTP(ctr *Container, sessionID string, o
|
|||||||
attachChan := make(chan error)
|
attachChan := make(chan error)
|
||||||
go func() {
|
go func() {
|
||||||
// attachToExec is responsible for closing pipes
|
// attachToExec is responsible for closing pipes
|
||||||
attachChan <- attachExecHTTP(ctr, sessionID, httpBuf, streams, pipes, detachKeys, options.Terminal, cancel)
|
attachChan <- attachExecHTTP(ctr, sessionID, req, w, streams, pipes, detachKeys, options.Terminal, cancel, hijackDone, holdConnOpen)
|
||||||
close(attachChan)
|
close(attachChan)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -496,7 +496,7 @@ func (r *ConmonOCIRuntime) startExec(c *Container, sessionID string, options *Ex
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Attach to a container over HTTP
|
// Attach to a container over HTTP
|
||||||
func attachExecHTTP(c *Container, sessionID string, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, pipes *execPipes, detachKeys []byte, isTerminal bool, cancel <-chan bool) error {
|
func attachExecHTTP(c *Container, sessionID string, r *http.Request, w http.ResponseWriter, streams *HTTPAttachStreams, pipes *execPipes, detachKeys []byte, isTerminal bool, cancel <-chan bool, hijackDone chan<- bool, holdConnOpen <-chan bool) (deferredErr error) {
|
||||||
if pipes == nil || pipes.startPipe == nil || pipes.attachPipe == nil {
|
if pipes == nil || pipes.startPipe == nil || pipes.attachPipe == nil {
|
||||||
return errors.Wrapf(define.ErrInvalidArg, "must provide a start and attach pipe to finish an exec attach")
|
return errors.Wrapf(define.ErrInvalidArg, "must provide a start and attach pipe to finish an exec attach")
|
||||||
}
|
}
|
||||||
@ -549,6 +549,37 @@ func attachExecHTTP(c *Container, sessionID string, httpBuf *bufio.ReadWriter, s
|
|||||||
attachStdin = streams.Stdin
|
attachStdin = streams.Stdin
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Perform hijack
|
||||||
|
hijacker, ok := w.(http.Hijacker)
|
||||||
|
if !ok {
|
||||||
|
return errors.Errorf("unable to hijack connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
httpCon, httpBuf, err := hijacker.Hijack()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "error hijacking connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
hijackDone <- true
|
||||||
|
|
||||||
|
// Write a header to let the client know what happened
|
||||||
|
writeHijackHeader(r, httpBuf)
|
||||||
|
|
||||||
|
// Force a flush after the header is written.
|
||||||
|
if err := httpBuf.Flush(); err != nil {
|
||||||
|
return errors.Wrapf(err, "error flushing HTTP hijack header")
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
// We need to hold the connection open until the complete exec
|
||||||
|
// function has finished. This channel will be closed in a defer
|
||||||
|
// in that function, so we can wait for it here.
|
||||||
|
// Can't be a defer, because this would block the function from
|
||||||
|
// returning.
|
||||||
|
<-holdConnOpen
|
||||||
|
hijackWriteErrorAndClose(deferredErr, c.ID(), isTerminal, httpCon, httpBuf)
|
||||||
|
}()
|
||||||
|
|
||||||
// Next, STDIN. Avoid entirely if attachStdin unset.
|
// Next, STDIN. Avoid entirely if attachStdin unset.
|
||||||
if attachStdin {
|
if attachStdin {
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -5,16 +5,19 @@ package libpod
|
|||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
"text/template"
|
"text/template"
|
||||||
"time"
|
"time"
|
||||||
@ -22,6 +25,7 @@ import (
|
|||||||
"github.com/containers/common/pkg/config"
|
"github.com/containers/common/pkg/config"
|
||||||
conmonConfig "github.com/containers/conmon/runner/config"
|
conmonConfig "github.com/containers/conmon/runner/config"
|
||||||
"github.com/containers/podman/v2/libpod/define"
|
"github.com/containers/podman/v2/libpod/define"
|
||||||
|
"github.com/containers/podman/v2/libpod/logs"
|
||||||
"github.com/containers/podman/v2/pkg/cgroups"
|
"github.com/containers/podman/v2/pkg/cgroups"
|
||||||
"github.com/containers/podman/v2/pkg/errorhandling"
|
"github.com/containers/podman/v2/pkg/errorhandling"
|
||||||
"github.com/containers/podman/v2/pkg/lookup"
|
"github.com/containers/podman/v2/pkg/lookup"
|
||||||
@ -503,7 +507,9 @@ func (r *ConmonOCIRuntime) UnpauseContainer(ctr *Container) error {
|
|||||||
// this function returns.
|
// this function returns.
|
||||||
// If this is a container with a terminal, we will stream raw. If it is not, we
|
// If this is a container with a terminal, we will stream raw. If it is not, we
|
||||||
// will stream with an 8-byte header to multiplex STDOUT and STDERR.
|
// will stream with an 8-byte header to multiplex STDOUT and STDERR.
|
||||||
func (r *ConmonOCIRuntime) HTTPAttach(ctr *Container, httpConn net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool) (deferredErr error) {
|
// Returns any errors that occurred, and whether the connection was successfully
|
||||||
|
// hijacked before that error occurred.
|
||||||
|
func (r *ConmonOCIRuntime) HTTPAttach(ctr *Container, req *http.Request, w http.ResponseWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool, hijackDone chan<- bool, streamAttach, streamLogs bool) (deferredErr error) {
|
||||||
isTerminal := false
|
isTerminal := false
|
||||||
if ctr.config.Spec.Process != nil {
|
if ctr.config.Spec.Process != nil {
|
||||||
isTerminal = ctr.config.Spec.Process.Terminal
|
isTerminal = ctr.config.Spec.Process.Terminal
|
||||||
@ -521,17 +527,21 @@ func (r *ConmonOCIRuntime) HTTPAttach(ctr *Container, httpConn net.Conn, httpBuf
|
|||||||
}
|
}
|
||||||
socketPath := buildSocketPath(attachSock)
|
socketPath := buildSocketPath(attachSock)
|
||||||
|
|
||||||
conn, err := net.DialUnix("unixpacket", nil, &net.UnixAddr{Name: socketPath, Net: "unixpacket"})
|
var conn *net.UnixConn
|
||||||
if err != nil {
|
if streamAttach {
|
||||||
return errors.Wrapf(err, "failed to connect to container's attach socket: %v", socketPath)
|
newConn, err := net.DialUnix("unixpacket", nil, &net.UnixAddr{Name: socketPath, Net: "unixpacket"})
|
||||||
}
|
if err != nil {
|
||||||
defer func() {
|
return errors.Wrapf(err, "failed to connect to container's attach socket: %v", socketPath)
|
||||||
if err := conn.Close(); err != nil {
|
|
||||||
logrus.Errorf("unable to close container %s attach socket: %q", ctr.ID(), err)
|
|
||||||
}
|
}
|
||||||
}()
|
conn = newConn
|
||||||
|
defer func() {
|
||||||
|
if err := conn.Close(); err != nil {
|
||||||
|
logrus.Errorf("unable to close container %s attach socket: %q", ctr.ID(), err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
logrus.Debugf("Successfully connected to container %s attach socket %s", ctr.ID(), socketPath)
|
logrus.Debugf("Successfully connected to container %s attach socket %s", ctr.ID(), socketPath)
|
||||||
|
}
|
||||||
|
|
||||||
detachString := ctr.runtime.config.Engine.DetachKeys
|
detachString := ctr.runtime.config.Engine.DetachKeys
|
||||||
if detachKeys != nil {
|
if detachKeys != nil {
|
||||||
@ -554,6 +564,111 @@ func (r *ConmonOCIRuntime) HTTPAttach(ctr *Container, httpConn net.Conn, httpBuf
|
|||||||
attachStdin = streams.Stdin
|
attachStdin = streams.Stdin
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logrus.Debugf("Going to hijack container %s attach connection", ctr.ID())
|
||||||
|
|
||||||
|
// Alright, let's hijack.
|
||||||
|
hijacker, ok := w.(http.Hijacker)
|
||||||
|
if !ok {
|
||||||
|
return errors.Errorf("unable to hijack connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
httpCon, httpBuf, err := hijacker.Hijack()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "error hijacking connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
hijackDone <- true
|
||||||
|
|
||||||
|
writeHijackHeader(req, httpBuf)
|
||||||
|
|
||||||
|
// Force a flush after the header is written.
|
||||||
|
if err := httpBuf.Flush(); err != nil {
|
||||||
|
return errors.Wrapf(err, "error flushing HTTP hijack header")
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
hijackWriteErrorAndClose(deferredErr, ctr.ID(), isTerminal, httpCon, httpBuf)
|
||||||
|
}()
|
||||||
|
|
||||||
|
logrus.Debugf("Hijack for container %s attach session done, ready to stream", ctr.ID())
|
||||||
|
|
||||||
|
// TODO: This is gross. Really, really gross.
|
||||||
|
// I want to say we should read all the logs into an array before
|
||||||
|
// calling this, in container_api.go, but that could take a lot of
|
||||||
|
// memory...
|
||||||
|
// On the whole, we need to figure out a better way of doing this,
|
||||||
|
// though.
|
||||||
|
logSize := 0
|
||||||
|
if streamLogs {
|
||||||
|
logrus.Debugf("Will stream logs for container %s attach session", ctr.ID())
|
||||||
|
|
||||||
|
// Get all logs for the container
|
||||||
|
logChan := make(chan *logs.LogLine)
|
||||||
|
logOpts := new(logs.LogOptions)
|
||||||
|
logOpts.Tail = -1
|
||||||
|
logOpts.WaitGroup = new(sync.WaitGroup)
|
||||||
|
errChan := make(chan error)
|
||||||
|
go func() {
|
||||||
|
var err error
|
||||||
|
// In non-terminal mode we need to prepend with the
|
||||||
|
// stream header.
|
||||||
|
logrus.Debugf("Writing logs for container %s to HTTP attach", ctr.ID())
|
||||||
|
for logLine := range logChan {
|
||||||
|
if !isTerminal {
|
||||||
|
device := logLine.Device
|
||||||
|
var header []byte
|
||||||
|
headerLen := uint32(len(logLine.Msg))
|
||||||
|
logSize += len(logLine.Msg)
|
||||||
|
switch strings.ToLower(device) {
|
||||||
|
case "stdin":
|
||||||
|
header = makeHTTPAttachHeader(0, headerLen)
|
||||||
|
case "stdout":
|
||||||
|
header = makeHTTPAttachHeader(1, headerLen)
|
||||||
|
case "stderr":
|
||||||
|
header = makeHTTPAttachHeader(2, headerLen)
|
||||||
|
default:
|
||||||
|
logrus.Errorf("Unknown device for log line: %s", device)
|
||||||
|
header = makeHTTPAttachHeader(1, headerLen)
|
||||||
|
}
|
||||||
|
_, err = httpBuf.Write(header)
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_, err = httpBuf.Write([]byte(logLine.Msg))
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
_, err = httpBuf.Write([]byte("\n"))
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
err = httpBuf.Flush()
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
errChan <- err
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
logOpts.WaitGroup.Wait()
|
||||||
|
close(logChan)
|
||||||
|
}()
|
||||||
|
if err := ctr.ReadLog(context.Background(), logOpts, logChan); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
logrus.Debugf("Done reading logs for container %s, %d bytes", ctr.ID(), logSize)
|
||||||
|
if err := <-errChan; err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !streamAttach {
|
||||||
|
logrus.Debugf("Done streaming logs for container %s attach, exiting as attach streaming not requested", ctr.ID())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
logrus.Debugf("Forwarding attach output for container %s", ctr.ID())
|
||||||
|
|
||||||
// Handle STDOUT/STDERR
|
// Handle STDOUT/STDERR
|
||||||
go func() {
|
go func() {
|
||||||
var err error
|
var err error
|
||||||
|
@ -1,9 +1,8 @@
|
|||||||
package libpod
|
package libpod
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net/http"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
@ -111,7 +110,7 @@ func (r *MissingRuntime) UnpauseContainer(ctr *Container) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// HTTPAttach is not available as the runtime is missing
|
// HTTPAttach is not available as the runtime is missing
|
||||||
func (r *MissingRuntime) HTTPAttach(ctr *Container, httpConn net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool) error {
|
func (r *MissingRuntime) HTTPAttach(ctr *Container, req *http.Request, w http.ResponseWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool, hijackDone chan<- bool, streamAttach, streamLogs bool) error {
|
||||||
return r.printError()
|
return r.printError()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -126,7 +125,7 @@ func (r *MissingRuntime) ExecContainer(ctr *Container, sessionID string, options
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ExecContainerHTTP is not available as the runtime is missing
|
// ExecContainerHTTP is not available as the runtime is missing
|
||||||
func (r *MissingRuntime) ExecContainerHTTP(ctr *Container, sessionID string, options *ExecOptions, httpConn net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, cancel <-chan bool) (int, chan error, error) {
|
func (r *MissingRuntime) ExecContainerHTTP(ctr *Container, sessionID string, options *ExecOptions, req *http.Request, w http.ResponseWriter, streams *HTTPAttachStreams, cancel <-chan bool, hijackDone chan<- bool, holdConnOpen <-chan bool) (int, chan error, error) {
|
||||||
return -1, nil, r.printError()
|
return -1, nil, r.printError()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@ -257,6 +258,27 @@ func makeHTTPAttachHeader(stream byte, length uint32) []byte {
|
|||||||
return header
|
return header
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// writeHijackHeader writes a header appropriate for the type of HTTP Hijack
|
||||||
|
// that occurred in a hijacked HTTP connection used for attach.
|
||||||
|
func writeHijackHeader(r *http.Request, conn io.Writer) {
|
||||||
|
// AttachHeader is the literal header sent for upgraded/hijacked connections for
|
||||||
|
// attach, sourced from Docker at:
|
||||||
|
// https://raw.githubusercontent.com/moby/moby/b95fad8e51bd064be4f4e58a996924f343846c85/api/server/router/container/container_routes.go
|
||||||
|
// Using literally to ensure compatibility with existing clients.
|
||||||
|
c := r.Header.Get("Connection")
|
||||||
|
proto := r.Header.Get("Upgrade")
|
||||||
|
if len(proto) == 0 || !strings.EqualFold(c, "Upgrade") {
|
||||||
|
// OK - can't upgrade if not requested or protocol is not specified
|
||||||
|
fmt.Fprintf(conn,
|
||||||
|
"HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n")
|
||||||
|
} else {
|
||||||
|
// Upraded
|
||||||
|
fmt.Fprintf(conn,
|
||||||
|
"HTTP/1.1 101 UPGRADED\r\nContent-Type: application/vnd.docker.raw-stream\r\nConnection: Upgrade\r\nUpgrade: %s\r\n\r\n",
|
||||||
|
proto)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Convert OCICNI port bindings into Inspect-formatted port bindings.
|
// Convert OCICNI port bindings into Inspect-formatted port bindings.
|
||||||
func makeInspectPortBindings(bindings []ocicni.PortMapping) map[string][]define.InspectHostPort {
|
func makeInspectPortBindings(bindings []ocicni.PortMapping) map[string][]define.InspectHostPort {
|
||||||
portBindings := make(map[string][]define.InspectHostPort)
|
portBindings := make(map[string][]define.InspectHostPort)
|
||||||
|
@ -1,12 +1,7 @@
|
|||||||
package compat
|
package compat
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"net"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/containers/podman/v2/libpod"
|
"github.com/containers/podman/v2/libpod"
|
||||||
"github.com/containers/podman/v2/libpod/define"
|
"github.com/containers/podman/v2/libpod/define"
|
||||||
@ -97,75 +92,30 @@ func AttachContainer(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
connection, buffer, err := AttachConnection(w, r)
|
idleTracker := r.Context().Value("idletracker").(*idletracker.IdleTracker)
|
||||||
if err != nil {
|
hijackChan := make(chan bool, 1)
|
||||||
utils.InternalServerError(w, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
logrus.Debugf("Hijack for attach of container %s successful", ctr.ID())
|
|
||||||
|
|
||||||
// Perform HTTP attach.
|
// Perform HTTP attach.
|
||||||
// HTTPAttach will handle everything about the connection from here on
|
// HTTPAttach will handle everything about the connection from here on
|
||||||
// (including closing it and writing errors to it).
|
// (including closing it and writing errors to it).
|
||||||
if err := ctr.HTTPAttach(connection, buffer, streams, detachKeys, nil, query.Stream, query.Logs); err != nil {
|
if err := ctr.HTTPAttach(r, w, streams, detachKeys, nil, query.Stream, query.Logs, hijackChan); err != nil {
|
||||||
|
hijackComplete := <-hijackChan
|
||||||
|
|
||||||
// We can't really do anything about errors anymore. HTTPAttach
|
// We can't really do anything about errors anymore. HTTPAttach
|
||||||
// should be writing them to the connection.
|
// should be writing them to the connection.
|
||||||
logrus.Errorf("Error attaching to container %s: %v", ctr.ID(), err)
|
logrus.Errorf("Error attaching to container %s: %v", ctr.ID(), err)
|
||||||
|
|
||||||
|
if hijackComplete {
|
||||||
|
// We do need to tell the idle tracker that the
|
||||||
|
// connection has been closed, though. We can guarantee
|
||||||
|
// that is true after HTTPAttach exits.
|
||||||
|
idleTracker.TrackHijackedClosed()
|
||||||
|
} else {
|
||||||
|
// A hijack was not successfully completed. We need to
|
||||||
|
// report the error normally.
|
||||||
|
utils.InternalServerError(w, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Debugf("Attach for container %s completed successfully", ctr.ID())
|
logrus.Debugf("Attach for container %s completed successfully", ctr.ID())
|
||||||
}
|
}
|
||||||
|
|
||||||
type HijackedConnection struct {
|
|
||||||
net.Conn // Connection
|
|
||||||
idleTracker *idletracker.IdleTracker // Connection tracker
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c HijackedConnection) Close() error {
|
|
||||||
logrus.Debugf("Hijacked connection closed")
|
|
||||||
|
|
||||||
c.idleTracker.TrackHijackedClosed()
|
|
||||||
return c.Conn.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func AttachConnection(w http.ResponseWriter, r *http.Request) (net.Conn, *bufio.ReadWriter, error) {
|
|
||||||
idleTracker := r.Context().Value("idletracker").(*idletracker.IdleTracker)
|
|
||||||
|
|
||||||
// Hijack the connection
|
|
||||||
hijacker, ok := w.(http.Hijacker)
|
|
||||||
if !ok {
|
|
||||||
return nil, nil, errors.Errorf("unable to hijack connection")
|
|
||||||
}
|
|
||||||
|
|
||||||
connection, buffer, err := hijacker.Hijack()
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, errors.Wrapf(err, "error hijacking connection")
|
|
||||||
}
|
|
||||||
trackedConnection := HijackedConnection{
|
|
||||||
Conn: connection,
|
|
||||||
idleTracker: idleTracker,
|
|
||||||
}
|
|
||||||
|
|
||||||
WriteAttachHeaders(r, trackedConnection)
|
|
||||||
|
|
||||||
return trackedConnection, buffer, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func WriteAttachHeaders(r *http.Request, connection io.Writer) {
|
|
||||||
// AttachHeader is the literal header sent for upgraded/hijacked connections for
|
|
||||||
// attach, sourced from Docker at:
|
|
||||||
// https://raw.githubusercontent.com/moby/moby/b95fad8e51bd064be4f4e58a996924f343846c85/api/server/router/container/container_routes.go
|
|
||||||
// Using literally to ensure compatibility with existing clients.
|
|
||||||
c := r.Header.Get("Connection")
|
|
||||||
proto := r.Header.Get("Upgrade")
|
|
||||||
if len(proto) == 0 || !strings.EqualFold(c, "Upgrade") {
|
|
||||||
// OK - can't upgrade if not requested or protocol is not specified
|
|
||||||
fmt.Fprintf(connection,
|
|
||||||
"HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n")
|
|
||||||
} else {
|
|
||||||
// Upraded
|
|
||||||
fmt.Fprintf(connection,
|
|
||||||
"HTTP/1.1 101 UPGRADED\r\nContent-Type: application/vnd.docker.raw-stream\r\nConnection: Upgrade\r\nUpgrade: %s\r\n\r\n",
|
|
||||||
proto)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
"github.com/containers/podman/v2/libpod/define"
|
"github.com/containers/podman/v2/libpod/define"
|
||||||
"github.com/containers/podman/v2/pkg/api/handlers"
|
"github.com/containers/podman/v2/pkg/api/handlers"
|
||||||
"github.com/containers/podman/v2/pkg/api/handlers/utils"
|
"github.com/containers/podman/v2/pkg/api/handlers/utils"
|
||||||
|
"github.com/containers/podman/v2/pkg/api/server/idletracker"
|
||||||
"github.com/containers/podman/v2/pkg/specgen/generate"
|
"github.com/containers/podman/v2/pkg/specgen/generate"
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
@ -173,15 +174,24 @@ func ExecStartHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
connection, buffer, err := AttachConnection(w, r)
|
idleTracker := r.Context().Value("idletracker").(*idletracker.IdleTracker)
|
||||||
if err != nil {
|
hijackChan := make(chan bool, 1)
|
||||||
utils.InternalServerError(w, err)
|
|
||||||
return
|
if err := sessionCtr.ExecHTTPStartAndAttach(sessionID, r, w, nil, nil, nil, hijackChan); err != nil {
|
||||||
}
|
hijackComplete := <-hijackChan
|
||||||
logrus.Debugf("Hijack for attach of container %s exec session %s successful", sessionCtr.ID(), sessionID)
|
|
||||||
|
|
||||||
if err := sessionCtr.ExecHTTPStartAndAttach(sessionID, connection, buffer, nil, nil, nil); err != nil {
|
|
||||||
logrus.Errorf("Error attaching to container %s exec session %s: %v", sessionCtr.ID(), sessionID, err)
|
logrus.Errorf("Error attaching to container %s exec session %s: %v", sessionCtr.ID(), sessionID, err)
|
||||||
|
|
||||||
|
if hijackComplete {
|
||||||
|
// We do need to tell the idle tracker that the
|
||||||
|
// connection has been closed, though. We can guarantee
|
||||||
|
// that is true after HTTPAttach exits.
|
||||||
|
idleTracker.TrackHijackedClosed()
|
||||||
|
} else {
|
||||||
|
// A hijack was not successfully completed. We need to
|
||||||
|
// report the error normally.
|
||||||
|
utils.InternalServerError(w, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Debugf("Attach for container %s exec session %s completed successfully", sessionCtr.ID(), sessionID)
|
logrus.Debugf("Attach for container %s exec session %s completed successfully", sessionCtr.ID(), sessionID)
|
||||||
|
@ -46,6 +46,8 @@ func Attach(ctx context.Context, nameOrID string, detachKeys *string, logs, stre
|
|||||||
stderr = (io.Writer)(nil)
|
stderr = (io.Writer)(nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logrus.Infof("Going to attach to container %q", nameOrID)
|
||||||
|
|
||||||
conn, err := bindings.GetClient(ctx)
|
conn, err := bindings.GetClient(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -13,6 +13,7 @@ import (
|
|||||||
"github.com/containers/podman/v2/pkg/bindings"
|
"github.com/containers/podman/v2/pkg/bindings"
|
||||||
"github.com/containers/podman/v2/pkg/domain/entities"
|
"github.com/containers/podman/v2/pkg/domain/entities"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -180,6 +181,7 @@ func Restart(ctx context.Context, nameOrID string, timeout *int) error {
|
|||||||
// or a partial/full ID. The optional parameter for detach keys are to override the default
|
// or a partial/full ID. The optional parameter for detach keys are to override the default
|
||||||
// detach key sequence.
|
// detach key sequence.
|
||||||
func Start(ctx context.Context, nameOrID string, detachKeys *string) error {
|
func Start(ctx context.Context, nameOrID string, detachKeys *string) error {
|
||||||
|
logrus.Infof("Going to start container %q", nameOrID)
|
||||||
conn, err := bindings.GetClient(ctx)
|
conn, err := bindings.GetClient(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
Reference in New Issue
Block a user