Merge pull request #6203 from jwhonce/wip/attach

V2 attach bindings and test
This commit is contained in:
OpenShift Merge Robot
2020-05-13 12:30:14 -07:00
committed by GitHub
9 changed files with 210 additions and 10 deletions

View File

@ -285,6 +285,7 @@ func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, stre
logrus.Infof("Performing HTTP Hijack attach to container %s", c.ID())
logSize := 0
if streamLogs {
// Get all logs for the container
logChan := make(chan *logs.LogLine)
@ -302,7 +303,7 @@ func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, stre
device := logLine.Device
var header []byte
headerLen := uint32(len(logLine.Msg))
logSize += len(logLine.Msg)
switch strings.ToLower(device) {
case "stdin":
header = makeHTTPAttachHeader(0, headerLen)
@ -341,7 +342,7 @@ func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, stre
if err := c.ReadLog(logOpts, logChan); err != nil {
return err
}
logrus.Debugf("Done reading logs for container %s", c.ID())
logrus.Debugf("Done reading logs for container %s, %d bytes", c.ID(), logSize)
if err := <-errChan; err != nil {
return err
}

View File

@ -1704,6 +1704,8 @@ func httpAttachTerminalCopy(container *net.UnixConn, http *bufio.ReadWriter, cid
buf := make([]byte, bufferSize)
for {
numR, err := container.Read(buf)
logrus.Debugf("Read fd(%d) %d/%d bytes for container %s", int(buf[0]), numR, len(buf), cid)
if numR > 0 {
switch buf[0] {
case AttachPipeStdout:

View File

@ -249,9 +249,8 @@ func hijackWriteErrorAndClose(toWrite error, cid string, terminal bool, httpCon
// length and stream. Accepts an integer indicating which stream we are sending
// to (STDIN = 0, STDOUT = 1, STDERR = 2).
func makeHTTPAttachHeader(stream byte, length uint32) []byte {
headerBuf := []byte{stream, 0, 0, 0}
lenBuf := []byte{0, 0, 0, 0}
binary.BigEndian.PutUint32(lenBuf, length)
headerBuf = append(headerBuf, lenBuf...)
return headerBuf
header := make([]byte, 8)
header[0] = stream
binary.BigEndian.PutUint32(header[4:], length)
return header
}

View File

@ -108,7 +108,7 @@ func AttachContainer(w http.ResponseWriter, r *http.Request) {
// This header string sourced from Docker:
// https://raw.githubusercontent.com/moby/moby/b95fad8e51bd064be4f4e58a996924f343846c85/api/server/router/container/container_routes.go
// Using literally to ensure compatability with existing clients.
// Using literally to ensure compatibility with existing clients.
fmt.Fprintf(connection, "HTTP/1.1 101 UPGRADED\r\nContent-Type: application/vnd.docker.raw-stream\r\nConnection: Upgrade\r\nUpgrade: tcp\r\n\r\n")
logrus.Debugf("Hijack for attach of container %s successful", ctr.ID())

View File

@ -2,6 +2,8 @@ package containers
import (
"context"
"encoding/binary"
"fmt"
"io"
"net/http"
"net/url"
@ -15,6 +17,10 @@ import (
"github.com/pkg/errors"
)
var (
ErrLostSync = errors.New("lost synchronization with attach multiplexed result")
)
// List obtains a list of containers in local storage. All parameters to this method are optional.
// The filters are used to determine which containers are listed. The last parameter indicates to only return
// the most recent number of containers. The pod and size booleans indicate that pod information and rootfs
@ -247,7 +253,7 @@ func Unpause(ctx context.Context, nameOrID string) error {
// Wait blocks until the given container reaches a condition. If not provided, the condition will
// default to stopped. If the condition is stopped, an exit code for the container will be provided. The
// nameOrID can be a container name or a partial/full ID.
func Wait(ctx context.Context, nameOrID string, condition *define.ContainerStatus) (int32, error) { //nolint
func Wait(ctx context.Context, nameOrID string, condition *define.ContainerStatus) (int32, error) { // nolint
var exitCode int32
conn, err := bindings.GetClient(ctx)
if err != nil {
@ -333,3 +339,125 @@ func ContainerInit(ctx context.Context, nameOrID string) error {
}
return response.Process(nil)
}
// Attach attaches to a running container
func Attach(ctx context.Context, nameOrId string, detachKeys *string, logs, stream *bool, stdin *bool, stdout io.Writer, stderr io.Writer) error {
conn, err := bindings.GetClient(ctx)
if err != nil {
return err
}
params := url.Values{}
if detachKeys != nil {
params.Add("detachKeys", *detachKeys)
}
if logs != nil {
params.Add("logs", fmt.Sprintf("%t", *logs))
}
if stream != nil {
params.Add("stream", fmt.Sprintf("%t", *stream))
}
if stdin != nil && *stdin {
params.Add("stdin", "true")
}
if stdout != nil {
params.Add("stdout", "true")
}
if stderr != nil {
params.Add("stderr", "true")
}
response, err := conn.DoRequest(nil, http.MethodPost, "/containers/%s/attach", params, nameOrId)
if err != nil {
return err
}
defer response.Body.Close()
ctype := response.Header.Get("Content-Type")
upgrade := response.Header.Get("Connection")
buffer := make([]byte, 1024)
if ctype == "application/vnd.docker.raw-stream" && upgrade == "Upgrade" {
for {
// Read multiplexed channels and write to appropriate stream
fd, l, err := DemuxHeader(response.Body, buffer)
if err != nil {
switch {
case errors.Is(err, io.EOF):
return nil
case errors.Is(err, io.ErrUnexpectedEOF):
continue
}
return err
}
frame, err := DemuxFrame(response.Body, buffer, l)
if err != nil {
return err
}
switch {
case fd == 0 && stdin != nil && *stdin:
stdout.Write(frame)
case fd == 1 && stdout != nil:
stdout.Write(frame)
case fd == 2 && stderr != nil:
stderr.Write(frame)
case fd == 3:
return fmt.Errorf("error from daemon in stream: %s", frame)
default:
return fmt.Errorf("unrecognized input header: %d", fd)
}
}
} else {
// If not multiplex'ed from server just dump stream to stdout
for {
_, err := response.Body.Read(buffer)
if err != nil {
if !errors.Is(err, io.EOF) {
return err
}
break
}
stdout.Write(buffer)
}
}
return err
}
// DemuxHeader reads header for stream from server multiplexed stdin/stdout/stderr/2nd error channel
func DemuxHeader(r io.Reader, buffer []byte) (fd, sz int, err error) {
n, err := io.ReadFull(r, buffer[0:8])
if err != nil {
return
}
if n < 8 {
err = io.ErrUnexpectedEOF
return
}
fd = int(buffer[0])
if fd < 0 || fd > 3 {
err = ErrLostSync
return
}
sz = int(binary.BigEndian.Uint32(buffer[4:8]))
return
}
// DemuxFrame reads contents for frame from server multiplexed stdin/stdout/stderr/2nd error channel
func DemuxFrame(r io.Reader, buffer []byte, length int) (frame []byte, err error) {
if len(buffer) < length {
buffer = append(buffer, make([]byte, length-len(buffer)+1)...)
}
n, err := io.ReadFull(r, buffer[0:length])
if err != nil {
return nil, nil
}
if n < length {
err = io.ErrUnexpectedEOF
return
}
return buffer[0:length], nil
}

View File

@ -0,0 +1,63 @@
package test_bindings
import (
"bytes"
"time"
"github.com/containers/libpod/pkg/bindings"
"github.com/containers/libpod/pkg/bindings/containers"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gexec"
)
var _ = Describe("Podman containers attach", func() {
var (
bt *bindingTest
s *gexec.Session
)
BeforeEach(func() {
bt = newBindingTest()
bt.RestoreImagesFromCache()
s = bt.startAPIService()
time.Sleep(1 * time.Second)
err := bt.NewConnection()
Expect(err).ShouldNot(HaveOccurred())
})
AfterEach(func() {
s.Kill()
bt.cleanup()
})
It("attach", func() {
name := "TopAttachTest"
id, err := bt.RunTopContainer(&name, nil, nil)
Expect(err).ShouldNot(HaveOccurred())
tickTock := time.NewTimer(2 * time.Second)
go func() {
<-tickTock.C
timeout := uint(5)
err := containers.Stop(bt.conn, id, &timeout)
if err != nil {
GinkgoWriter.Write([]byte(err.Error()))
}
}()
stdout := &bytes.Buffer{}
stderr := &bytes.Buffer{}
go func() {
defer GinkgoRecover()
err := containers.Attach(bt.conn, id, nil, &bindings.PTrue, &bindings.PTrue, &bindings.PTrue, stdout, stderr)
Expect(err).ShouldNot(HaveOccurred())
}()
time.Sleep(5 * time.Second)
// First character/First line of top output
Expect(stdout.String()).Should(ContainSubstring("Mem: "))
})
})

View File

@ -191,7 +191,7 @@ func (b *bindingTest) restoreImageFromCache(i testImage) {
func (b *bindingTest) RunTopContainer(containerName *string, insidePod *bool, podName *string) (string, error) {
s := specgen.NewSpecGenerator(alpine.name, false)
s.Terminal = false
s.Command = []string{"top"}
s.Command = []string{"/usr/bin/top"}
if containerName != nil {
s.Name = *containerName
}

View File

@ -302,6 +302,8 @@ var _ = Describe("Podman containers ", func() {
errChan = make(chan error)
go func() {
defer GinkgoRecover()
_, waitErr := containers.Wait(bt.conn, name, &running)
errChan <- waitErr
close(errChan)

View File

@ -5,9 +5,14 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/sirupsen/logrus"
)
func TestTest(t *testing.T) {
if testing.Verbose() {
logrus.SetLevel(logrus.DebugLevel)
}
RegisterFailHandler(Fail)
RunSpecs(t, "Test Suite")
}