Improve APIv2 support for Attach

A few major fixes here:
- Support for attaching to Configured containers, to match Docker
  behavior.
- Support for stream parameter has been improved (we now properly
  handle cases where it is not set).
- Initial support for logs parameter has been added.
- Setting attach streams when the container has a terminal is now
  supported.
- Errors are properly reported once the hijack has begun.

Signed-off-by: Matthew Heon <mheon@redhat.com>
This commit is contained in:
Matthew Heon
2020-04-07 16:52:47 -04:00
parent c0e29b4a31
commit 71f14bd792
5 changed files with 157 additions and 53 deletions

View File

@ -6,10 +6,13 @@ import (
"io/ioutil"
"net"
"os"
"strings"
"sync"
"time"
"github.com/containers/libpod/libpod/define"
"github.com/containers/libpod/libpod/events"
"github.com/containers/libpod/libpod/logs"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@ -244,15 +247,28 @@ func (c *Container) Attach(streams *define.AttachStreams, keys string, resize <-
// forwarded to the client.
// This function returns when the attach finishes. It does not hold the lock for
// the duration of its runtime, only using it at the beginning to verify state.
func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool) error {
// The streamLogs parameter indicates that all the container's logs until present
// will be streamed at the beginning of the attach.
// The streamAttach parameter indicates that the attach itself will be streamed
// over the socket; if this is not set, but streamLogs is, only the logs will be
// sent.
// At least one of streamAttach and streamLogs must be set.
func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool, streamAttach, streamLogs bool) (deferredErr error) {
isTerminal := false
if c.config.Spec.Process != nil {
isTerminal = c.config.Spec.Process.Terminal
}
// Ensure our contract of writing errors to and closing the HTTP conn is
// honored.
defer func() {
hijackWriteErrorAndClose(deferredErr, c.ID(), isTerminal, httpCon, httpBuf)
}()
if !c.batched {
c.lock.Lock()
if err := c.syncContainer(); err != nil {
c.lock.Unlock()
// Write any errors to the HTTP buffer before we close.
hijackWriteErrorAndClose(err, c.ID(), httpCon, httpBuf)
return err
}
// We are NOT holding the lock for the duration of the function.
@ -260,16 +276,80 @@ func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, stre
}
if !c.ensureState(define.ContainerStateCreated, define.ContainerStateRunning) {
toReturn := errors.Wrapf(define.ErrCtrStateInvalid, "can only attach to created or running containers")
return errors.Wrapf(define.ErrCtrStateInvalid, "can only attach to created or running containers")
}
// Write any errors to the HTTP buffer before we close.
hijackWriteErrorAndClose(toReturn, c.ID(), httpCon, httpBuf)
return toReturn
if !streamAttach && !streamLogs {
return errors.Wrapf(define.ErrInvalidArg, "must specify at least one of stream or logs")
}
logrus.Infof("Performing HTTP Hijack attach to container %s", c.ID())
if streamLogs {
// Get all logs for the container
logChan := make(chan *logs.LogLine)
logOpts := new(logs.LogOptions)
logOpts.Tail = -1
logOpts.WaitGroup = new(sync.WaitGroup)
errChan := make(chan error)
go func() {
var err error
// In non-terminal mode we need to prepend with the
// stream header.
logrus.Debugf("Writing logs for container %s to HTTP attach", c.ID())
for logLine := range logChan {
if !isTerminal {
device := logLine.Device
var header []byte
headerLen := uint32(len(logLine.Msg))
switch strings.ToLower(device) {
case "stdin":
header = makeHTTPAttachHeader(0, headerLen)
case "stdout":
header = makeHTTPAttachHeader(1, headerLen)
case "stderr":
header = makeHTTPAttachHeader(2, headerLen)
default:
logrus.Errorf("Unknown device for log line: %s", device)
header = makeHTTPAttachHeader(1, headerLen)
}
_, err = httpBuf.Write(header)
if err != nil {
break
}
}
_, err = httpBuf.Write([]byte(logLine.Msg))
if err != nil {
break
}
_, err = httpBuf.Write([]byte("\n"))
if err != nil {
break
}
err = httpBuf.Flush()
if err != nil {
break
}
}
errChan <- err
}()
go func() {
logOpts.WaitGroup.Wait()
close(logChan)
}()
if err := c.ReadLog(logOpts, logChan); err != nil {
return err
}
logrus.Debugf("Done reading logs for container %s", c.ID())
if err := <-errChan; err != nil {
return err
}
}
if !streamAttach {
return nil
}
c.newContainerEvent(events.Attach)
return c.ociRuntime.HTTPAttach(c, httpCon, httpBuf, streams, detachKeys, cancel)
}