Add bindings for exec and enable attached remote

This adds bindings for starting exec sessions, and then uses them
to wire up detached exec. Code is heavily based on Attach code
for containers, slightly modified to handle exec sessions.

Bindings are presently attached-only, detached is pending on a
Conmon update landing in CI. I'll probably get to that next.

Signed-off-by: Matthew Heon <matthew.heon@pm.me>
This commit is contained in:
Matthew Heon
2020-05-27 17:01:12 -04:00
committed by Matthew Heon
parent 990514ea92
commit 45a7e7266e
8 changed files with 544 additions and 298 deletions

View File

@ -67,14 +67,14 @@ func execFlags(flags *pflag.FlagSet) {
func init() {
registry.Commands = append(registry.Commands, registry.CliCommand{
Mode: []entities.EngineMode{entities.ABIMode},
Mode: []entities.EngineMode{entities.ABIMode, entities.TunnelMode},
Command: execCommand,
})
flags := execCommand.Flags()
execFlags(flags)
registry.Commands = append(registry.Commands, registry.CliCommand{
Mode: []entities.EngineMode{entities.ABIMode},
Mode: []entities.EngineMode{entities.ABIMode, entities.TunnelMode},
Command: containerExecCommand,
Parent: containerCmd,
})

View File

@ -8,6 +8,7 @@ import (
"github.com/containers/libpod/libpod"
"github.com/containers/libpod/libpod/define"
"github.com/containers/libpod/pkg/api/handlers/utils"
"github.com/gorilla/mux"
"github.com/gorilla/schema"
"github.com/pkg/errors"
"k8s.io/client-go/tools/remotecommand"
@ -37,9 +38,9 @@ func ResizeTTY(w http.ResponseWriter, r *http.Request) {
}
var status int
name := utils.GetName(r)
switch {
case strings.Contains(r.URL.Path, "/containers/"):
name := utils.GetName(r)
ctnr, err := runtime.LookupContainer(name)
if err != nil {
utils.ContainerNotFound(w, name, err)
@ -61,6 +62,7 @@ func ResizeTTY(w http.ResponseWriter, r *http.Request) {
// reasons.
status = http.StatusOK
case strings.Contains(r.URL.Path, "/exec/"):
name := mux.Vars(r)["id"]
ctnr, err := runtime.GetExecSessionContainer(name)
if err != nil {
utils.SessionNotFound(w, name, err)

View File

@ -310,7 +310,7 @@ func (s *APIServer) registerExecHandlers(r *mux.Router) error {
// $ref: "#/responses/NoSuchExecInstance"
// 500:
// $ref: "#/responses/InternalError"
r.Handle(VersionedPath("/libpod/exec/{id}/resize"), s.APIHandler(compat.UnsupportedHandler)).Methods(http.MethodPost)
r.Handle(VersionedPath("/libpod/exec/{id}/resize"), s.APIHandler(compat.ResizeTTY)).Methods(http.MethodPost)
// swagger:operation GET /libpod/exec/{id}/json libpod libpodInspectExec
// ---
// tags:

View File

@ -24,7 +24,7 @@ import (
)
var (
basePath = &url.URL{
BasePath = &url.URL{
Scheme: "http",
Host: "d",
Path: "/v" + APIVersion.String() + "/libpod",
@ -37,15 +37,14 @@ type APIResponse struct {
}
type Connection struct {
_url *url.URL
client *http.Client
conn *net.Conn
Uri *url.URL
Client *http.Client
}
type valueKey string
const (
clientKey = valueKey("client")
clientKey = valueKey("Client")
)
// GetClient from context build by NewConnection()
@ -59,7 +58,7 @@ func GetClient(ctx context.Context) (*Connection, error) {
// JoinURL elements with '/'
func JoinURL(elements ...string) string {
return strings.Join(elements, "/")
return "/" + strings.Join(elements, "/")
}
// NewConnection takes a URI as a string and returns a context with the
@ -88,7 +87,7 @@ func NewConnection(ctx context.Context, uri string, identity ...string) (context
return nil, errors.Wrapf(err, "Value of PODMAN_HOST is not a valid url: %s", uri)
}
// Now we setup the http client to use the connection above
// Now we setup the http Client to use the connection above
var connection Connection
switch _url.Scheme {
case "ssh":
@ -125,16 +124,12 @@ func NewConnection(ctx context.Context, uri string, identity ...string) (context
func tcpClient(_url *url.URL) (Connection, error) {
connection := Connection{
_url: _url,
Uri: _url,
}
connection.client = &http.Client{
connection.Client = &http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
conn, err := net.Dial("tcp", _url.Host)
if c, ok := ctx.Value(clientKey).(*Connection); ok {
c.conn = &conn
}
return conn, err
return net.Dial("tcp", _url.Host)
},
DisableCompression: true,
},
@ -167,11 +162,11 @@ func pingNewConnection(ctx context.Context) error {
}
switch APIVersion.Compare(versionSrv) {
case 1, 0:
// Server's job when client version is equal or older
case -1, 0:
// Server's job when Client version is equal or older
return nil
case -1:
return errors.Errorf("server API version is too old. client %q server %q", APIVersion.String(), versionSrv.String())
case 1:
return errors.Errorf("server API version is too old. Client %q server %q", APIVersion.String(), versionSrv.String())
}
}
return errors.Errorf("ping response was %q", response.StatusCode)
@ -217,31 +212,22 @@ func sshClient(_url *url.URL, identity string, secure bool) (Connection, error)
return Connection{}, errors.Wrapf(err, "Connection to bastion host (%s) failed.", _url.String())
}
connection := Connection{_url: _url}
connection.client = &http.Client{
connection := Connection{Uri: _url}
connection.Client = &http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
conn, err := bastion.Dial("unix", _url.Path)
if c, ok := ctx.Value(clientKey).(*Connection); ok {
c.conn = &conn
}
return conn, err
return bastion.Dial("unix", _url.Path)
},
}}
return connection, nil
}
func unixClient(_url *url.URL) (Connection, error) {
connection := Connection{_url: _url}
connection.client = &http.Client{
connection := Connection{Uri: _url}
connection.Client = &http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
d := net.Dialer{}
conn, err := d.DialContext(ctx, "unix", _url.Path)
if c, ok := ctx.Value(clientKey).(*Connection); ok {
c.conn = &conn
}
return conn, err
return (&net.Dialer{}).DialContext(ctx, "unix", _url.Path)
},
DisableCompression: true,
},
@ -263,7 +249,7 @@ func (c *Connection) DoRequest(httpBody io.Reader, httpMethod, endpoint string,
// Lets eventually use URL for this which might lead to safer
// usage
safeEndpoint := fmt.Sprintf(endpoint, safePathValues...)
e := basePath.String() + safeEndpoint
e := BasePath.String() + safeEndpoint
req, err := http.NewRequest(httpMethod, e, httpBody)
if err != nil {
return nil, err
@ -277,7 +263,7 @@ func (c *Connection) DoRequest(httpBody io.Reader, httpMethod, endpoint string,
req = req.WithContext(context.WithValue(context.Background(), clientKey, c))
// Give the Do three chances in the case of a comm/service hiccup
for i := 0; i < 3; i++ {
response, err = c.client.Do(req) // nolint
response, err = c.Client.Do(req) // nolint
if err == nil {
break
}
@ -286,10 +272,6 @@ func (c *Connection) DoRequest(httpBody io.Reader, httpMethod, endpoint string,
return &APIResponse{response, req}, err
}
func (c *Connection) Write(b []byte) (int, error) {
return (*c.conn).Write(b)
}
// FiltersToString converts our typical filter format of a
// map[string][]string to a query/html safe string.
func FiltersToString(filters map[string][]string) (string, error) {

View File

@ -0,0 +1,481 @@
package containers
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"reflect"
"strconv"
"time"
"github.com/containers/libpod/libpod/define"
"github.com/containers/libpod/pkg/bindings"
sig "github.com/containers/libpod/pkg/signal"
"github.com/containers/libpod/utils"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh/terminal"
)
// Attach attaches to a running container
func Attach(ctx context.Context, nameOrId string, detachKeys *string, logs, stream *bool, stdin io.Reader, stdout io.Writer, stderr io.Writer, attachReady chan bool) error {
isSet := struct {
stdin bool
stdout bool
stderr bool
}{
stdin: !(stdin == nil || reflect.ValueOf(stdin).IsNil()),
stdout: !(stdout == nil || reflect.ValueOf(stdout).IsNil()),
stderr: !(stderr == nil || reflect.ValueOf(stderr).IsNil()),
}
// Ensure golang can determine that interfaces are "really" nil
if !isSet.stdin {
stdin = (io.Reader)(nil)
}
if !isSet.stdout {
stdout = (io.Writer)(nil)
}
if !isSet.stderr {
stderr = (io.Writer)(nil)
}
conn, err := bindings.GetClient(ctx)
if err != nil {
return err
}
// Do we need to wire in stdin?
ctnr, err := Inspect(ctx, nameOrId, bindings.PFalse)
if err != nil {
return err
}
params := url.Values{}
if detachKeys != nil {
params.Add("detachKeys", *detachKeys)
}
if logs != nil {
params.Add("logs", fmt.Sprintf("%t", *logs))
}
if stream != nil {
params.Add("stream", fmt.Sprintf("%t", *stream))
}
if isSet.stdin {
params.Add("stdin", "true")
}
if isSet.stdout {
params.Add("stdout", "true")
}
if isSet.stderr {
params.Add("stderr", "true")
}
// Unless all requirements are met, don't use "stdin" is a terminal
file, ok := stdin.(*os.File)
needTTY := ok && terminal.IsTerminal(int(file.Fd())) && ctnr.Config.Tty
if needTTY {
state, err := setRawTerminal(file)
if err != nil {
return err
}
defer func() {
if err := terminal.Restore(int(file.Fd()), state); err != nil {
logrus.Errorf("unable to restore terminal: %q", err)
}
logrus.SetFormatter(&logrus.TextFormatter{})
}()
}
headers := make(map[string]string)
headers["Connection"] = "Upgrade"
headers["Upgrade"] = "tcp"
var socket net.Conn
socketSet := false
dialContext := conn.Client.Transport.(*http.Transport).DialContext
t := &http.Transport{
DialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
c, err := dialContext(ctx, network, address)
if err != nil {
return nil, err
}
if !socketSet {
socket = c
socketSet = true
}
return c, err
},
IdleConnTimeout: time.Duration(0),
}
conn.Client.Transport = t
response, err := conn.DoRequest(nil, http.MethodPost, "/containers/%s/attach", params, headers, nameOrId)
if err != nil {
return err
}
if !(response.IsSuccess() || response.IsInformational()) {
return response.Process(nil)
}
if needTTY {
winChange := make(chan os.Signal, 1)
signal.Notify(winChange, sig.SIGWINCH)
winCtx, winCancel := context.WithCancel(ctx)
defer winCancel()
go attachHandleResize(ctx, winCtx, winChange, false, nameOrId, file)
}
// If we are attaching around a start, we need to "signal"
// back that we are in fact attached so that started does
// not execute before we can attach.
if attachReady != nil {
attachReady <- true
}
if isSet.stdin {
go func() {
logrus.Debugf("Copying STDIN to socket")
_, err := utils.CopyDetachable(socket, stdin, []byte{})
if err != nil {
logrus.Error("failed to write input to service: " + err.Error())
}
}()
}
buffer := make([]byte, 1024)
if ctnr.Config.Tty {
logrus.Debugf("Copying STDOUT of container in terminal mode")
if !isSet.stdout {
return fmt.Errorf("container %q requires stdout to be set", ctnr.ID)
}
// If not multiplex'ed, read from server and write to stdout
_, err := io.Copy(stdout, socket)
if err != nil {
return err
}
} else {
logrus.Debugf("Copying standard streams of container in non-terminal mode")
for {
// Read multiplexed channels and write to appropriate stream
fd, l, err := DemuxHeader(socket, buffer)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return err
}
frame, err := DemuxFrame(socket, buffer, l)
if err != nil {
return err
}
switch {
case fd == 0 && isSet.stdout:
_, err := stdout.Write(frame[0:l])
if err != nil {
return err
}
case fd == 1 && isSet.stdout:
_, err := stdout.Write(frame[0:l])
if err != nil {
return err
}
case fd == 2 && isSet.stderr:
_, err := stderr.Write(frame[0:l])
if err != nil {
return err
}
case fd == 3:
return fmt.Errorf("error from service from stream: %s", frame)
default:
return fmt.Errorf("unrecognized channel in header: %d, 0-3 supported", fd)
}
}
}
return nil
}
// DemuxHeader reads header for stream from server multiplexed stdin/stdout/stderr/2nd error channel
func DemuxHeader(r io.Reader, buffer []byte) (fd, sz int, err error) {
n, err := io.ReadFull(r, buffer[0:8])
if err != nil {
return
}
if n < 8 {
err = io.ErrUnexpectedEOF
return
}
fd = int(buffer[0])
if fd < 0 || fd > 3 {
err = errors.Wrapf(ErrLostSync, fmt.Sprintf(`channel "%d" found, 0-3 supported`, fd))
return
}
sz = int(binary.BigEndian.Uint32(buffer[4:8]))
return
}
// DemuxFrame reads contents for frame from server multiplexed stdin/stdout/stderr/2nd error channel
func DemuxFrame(r io.Reader, buffer []byte, length int) (frame []byte, err error) {
if len(buffer) < length {
buffer = append(buffer, make([]byte, length-len(buffer)+1)...)
}
n, err := io.ReadFull(r, buffer[0:length])
if err != nil {
return nil, nil
}
if n < length {
err = io.ErrUnexpectedEOF
return
}
return buffer[0:length], nil
}
// ResizeContainerTTY sets container's TTY height and width in characters
func ResizeContainerTTY(ctx context.Context, nameOrId string, height *int, width *int) error {
return resizeTTY(ctx, bindings.JoinURL("containers", nameOrId, "resize"), height, width)
}
// ResizeExecTTY sets session's TTY height and width in characters
func ResizeExecTTY(ctx context.Context, nameOrId string, height *int, width *int) error {
return resizeTTY(ctx, bindings.JoinURL("exec", nameOrId, "resize"), height, width)
}
// resizeTTY set size of TTY of container
func resizeTTY(ctx context.Context, endpoint string, height *int, width *int) error {
conn, err := bindings.GetClient(ctx)
if err != nil {
return err
}
params := url.Values{}
if height != nil {
params.Set("h", strconv.Itoa(*height))
}
if width != nil {
params.Set("w", strconv.Itoa(*width))
}
rsp, err := conn.DoRequest(nil, http.MethodPost, endpoint, params, nil)
if err != nil {
return err
}
return rsp.Process(nil)
}
type rawFormatter struct {
logrus.TextFormatter
}
func (f *rawFormatter) Format(entry *logrus.Entry) ([]byte, error) {
buffer, err := f.TextFormatter.Format(entry)
if err != nil {
return buffer, err
}
return append(buffer, '\r'), nil
}
// This is intended to be run as a goroutine, handling resizing for a container
// or exec session.
func attachHandleResize(ctx, winCtx context.Context, winChange chan os.Signal, isExec bool, id string, file *os.File) {
// Prime the pump, we need one reset to ensure everything is ready
winChange <- sig.SIGWINCH
for {
select {
case <-winCtx.Done():
return
case <-winChange:
h, w, err := terminal.GetSize(int(file.Fd()))
if err != nil {
logrus.Warnf("failed to obtain TTY size: " + err.Error())
}
var resizeErr error
if isExec {
resizeErr = ResizeExecTTY(ctx, id, &h, &w)
} else {
resizeErr = ResizeContainerTTY(ctx, id, &h, &w)
}
if resizeErr != nil {
logrus.Warnf("failed to resize TTY: " + resizeErr.Error())
}
}
}
}
// Configure the given terminal for raw mode
func setRawTerminal(file *os.File) (*terminal.State, error) {
state, err := terminal.MakeRaw(int(file.Fd()))
if err != nil {
return nil, err
}
logrus.SetFormatter(&rawFormatter{})
return state, err
}
// ExecStartAndAttach starts and attaches to a given exec session.
func ExecStartAndAttach(ctx context.Context, sessionID string, streams *define.AttachStreams) error {
conn, err := bindings.GetClient(ctx)
if err != nil {
return err
}
// TODO: Make this configurable (can't use streams' InputStream as it's
// buffered)
terminalFile := os.Stdin
logrus.Debugf("Starting & Attaching to exec session ID %q", sessionID)
// We need to inspect the exec session first to determine whether to use
// -t.
resp, err := conn.DoRequest(nil, http.MethodGet, "/exec/%s/json", nil, nil, sessionID)
if err != nil {
return err
}
respStruct := new(define.InspectExecSession)
if err := resp.Process(respStruct); err != nil {
return err
}
isTerm := true
if respStruct.ProcessConfig != nil {
isTerm = respStruct.ProcessConfig.Tty
}
// If we are in TTY mode, we need to set raw mode for the terminal.
// TODO: Share all of this with Attach() for containers.
needTTY := terminalFile != nil && terminal.IsTerminal(int(terminalFile.Fd())) && isTerm
if needTTY {
state, err := setRawTerminal(terminalFile)
if err != nil {
return err
}
defer func() {
if err := terminal.Restore(int(terminalFile.Fd()), state); err != nil {
logrus.Errorf("unable to restore terminal: %q", err)
}
logrus.SetFormatter(&logrus.TextFormatter{})
}()
}
body := struct {
Detach bool `json:"Detach"`
}{
Detach: false,
}
bodyJSON, err := json.Marshal(body)
if err != nil {
return err
}
var socket net.Conn
socketSet := false
dialContext := conn.Client.Transport.(*http.Transport).DialContext
t := &http.Transport{
DialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
c, err := dialContext(ctx, network, address)
if err != nil {
return nil, err
}
if !socketSet {
socket = c
socketSet = true
}
return c, err
},
IdleConnTimeout: time.Duration(0),
}
conn.Client.Transport = t
response, err := conn.DoRequest(bytes.NewReader(bodyJSON), http.MethodPost, "/exec/%s/start", nil, nil, sessionID)
if err != nil {
return err
}
if !(response.IsSuccess() || response.IsInformational()) {
return response.Process(nil)
}
if needTTY {
winChange := make(chan os.Signal, 1)
signal.Notify(winChange, sig.SIGWINCH)
winCtx, winCancel := context.WithCancel(ctx)
defer winCancel()
go attachHandleResize(ctx, winCtx, winChange, true, sessionID, terminalFile)
}
if streams.AttachInput {
go func() {
logrus.Debugf("Copying STDIN to socket")
_, err := utils.CopyDetachable(socket, streams.InputStream, []byte{})
if err != nil {
logrus.Error("failed to write input to service: " + err.Error())
}
}()
}
buffer := make([]byte, 1024)
if isTerm {
logrus.Debugf("Handling terminal attach to exec")
if !streams.AttachOutput {
return fmt.Errorf("exec session %s has a terminal and must have STDOUT enabled", sessionID)
}
// If not multiplex'ed, read from server and write to stdout
_, err := utils.CopyDetachable(streams.OutputStream, socket, []byte{})
if err != nil {
return err
}
} else {
logrus.Debugf("Handling non-terminal attach to exec")
for {
// Read multiplexed channels and write to appropriate stream
fd, l, err := DemuxHeader(socket, buffer)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return err
}
frame, err := DemuxFrame(socket, buffer, l)
if err != nil {
return err
}
switch {
case fd == 0 && streams.AttachOutput:
_, err := streams.OutputStream.Write(frame[0:l])
if err != nil {
return err
}
case fd == 1 && streams.AttachInput:
// Write STDIN to STDOUT (echoing characters
// typed by another attach session)
_, err := streams.OutputStream.Write(frame[0:l])
if err != nil {
return err
}
case fd == 2 && streams.AttachError:
_, err := streams.ErrorStream.Write(frame[0:l])
if err != nil {
return err
}
case fd == 3:
return fmt.Errorf("error from service from stream: %s", frame)
default:
return fmt.Errorf("unrecognized channel in header: %d, 0-3 supported", fd)
}
}
}
return nil
}

View File

@ -2,14 +2,9 @@ package containers
import (
"context"
"encoding/binary"
"fmt"
"io"
"net/http"
"net/url"
"os"
"os/signal"
"reflect"
"strconv"
"strings"
@ -17,10 +12,7 @@ import (
"github.com/containers/libpod/pkg/api/handlers"
"github.com/containers/libpod/pkg/bindings"
"github.com/containers/libpod/pkg/domain/entities"
sig "github.com/containers/libpod/pkg/signal"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh/terminal"
)
var (
@ -345,248 +337,3 @@ func ContainerInit(ctx context.Context, nameOrID string) error {
}
return response.Process(nil)
}
// Attach attaches to a running container
func Attach(ctx context.Context, nameOrId string, detachKeys *string, logs, stream *bool, stdin io.Reader, stdout io.Writer, stderr io.Writer, attachReady chan bool) error {
isSet := struct {
stdin bool
stdout bool
stderr bool
}{
stdin: !(stdin == nil || reflect.ValueOf(stdin).IsNil()),
stdout: !(stdout == nil || reflect.ValueOf(stdout).IsNil()),
stderr: !(stderr == nil || reflect.ValueOf(stderr).IsNil()),
}
// Ensure golang can determine that interfaces are "really" nil
if !isSet.stdin {
stdin = (io.Reader)(nil)
}
if !isSet.stdout {
stdout = (io.Writer)(nil)
}
if !isSet.stderr {
stderr = (io.Writer)(nil)
}
conn, err := bindings.GetClient(ctx)
if err != nil {
return err
}
// Do we need to wire in stdin?
ctnr, err := Inspect(ctx, nameOrId, bindings.PFalse)
if err != nil {
return err
}
params := url.Values{}
if detachKeys != nil {
params.Add("detachKeys", *detachKeys)
}
if logs != nil {
params.Add("logs", fmt.Sprintf("%t", *logs))
}
if stream != nil {
params.Add("stream", fmt.Sprintf("%t", *stream))
}
if isSet.stdin {
params.Add("stdin", "true")
}
if isSet.stdout {
params.Add("stdout", "true")
}
if isSet.stderr {
params.Add("stderr", "true")
}
// Unless all requirements are met, don't use "stdin" is a terminal
file, ok := stdin.(*os.File)
needTTY := ok && terminal.IsTerminal(int(file.Fd())) && ctnr.Config.Tty
if needTTY {
state, err := terminal.MakeRaw(int(file.Fd()))
if err != nil {
return err
}
logrus.SetFormatter(&rawFormatter{})
defer func() {
if err := terminal.Restore(int(file.Fd()), state); err != nil {
logrus.Errorf("unable to restore terminal: %q", err)
}
logrus.SetFormatter(&logrus.TextFormatter{})
}()
winChange := make(chan os.Signal, 1)
signal.Notify(winChange, sig.SIGWINCH)
winCtx, winCancel := context.WithCancel(ctx)
defer winCancel()
go func() {
// Prime the pump, we need one reset to ensure everything is ready
winChange <- sig.SIGWINCH
for {
select {
case <-winCtx.Done():
return
case <-winChange:
h, w, err := terminal.GetSize(int(file.Fd()))
if err != nil {
logrus.Warnf("failed to obtain TTY size: " + err.Error())
}
if err := ResizeContainerTTY(ctx, nameOrId, &h, &w); err != nil {
logrus.Warnf("failed to resize TTY: " + err.Error())
}
}
}
}()
}
response, err := conn.DoRequest(stdin, http.MethodPost, "/containers/%s/attach", params, nil, nameOrId)
if err != nil {
return err
}
if !(response.IsSuccess() || response.IsInformational()) {
return response.Process(nil)
}
// If we are attaching around a start, we need to "signal"
// back that we are in fact attached so that started does
// not execute before we can attach.
if attachReady != nil {
attachReady <- true
}
buffer := make([]byte, 1024)
if ctnr.Config.Tty {
if !isSet.stdout {
return fmt.Errorf("container %q requires stdout to be set", ctnr.ID)
}
// If not multiplex'ed, read from server and write to stdout
_, err := io.Copy(stdout, response.Body)
if err != nil {
return err
}
} else {
for {
// Read multiplexed channels and write to appropriate stream
fd, l, err := DemuxHeader(response.Body, buffer)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return err
}
frame, err := DemuxFrame(response.Body, buffer, l)
if err != nil {
return err
}
switch {
case fd == 0 && isSet.stdout:
_, err := stdout.Write(frame[0:l])
if err != nil {
return err
}
case fd == 1 && isSet.stdout:
_, err := stdout.Write(frame[0:l])
if err != nil {
return err
}
case fd == 2 && isSet.stderr:
_, err := stderr.Write(frame[0:l])
if err != nil {
return err
}
case fd == 3:
return fmt.Errorf("error from service from stream: %s", frame)
default:
return fmt.Errorf("unrecognized channel in header: %d, 0-3 supported", fd)
}
}
}
return nil
}
// DemuxHeader reads header for stream from server multiplexed stdin/stdout/stderr/2nd error channel
func DemuxHeader(r io.Reader, buffer []byte) (fd, sz int, err error) {
n, err := io.ReadFull(r, buffer[0:8])
if err != nil {
return
}
if n < 8 {
err = io.ErrUnexpectedEOF
return
}
fd = int(buffer[0])
if fd < 0 || fd > 3 {
err = errors.Wrapf(ErrLostSync, fmt.Sprintf(`channel "%d" found, 0-3 supported`, fd))
return
}
sz = int(binary.BigEndian.Uint32(buffer[4:8]))
return
}
// DemuxFrame reads contents for frame from server multiplexed stdin/stdout/stderr/2nd error channel
func DemuxFrame(r io.Reader, buffer []byte, length int) (frame []byte, err error) {
if len(buffer) < length {
buffer = append(buffer, make([]byte, length-len(buffer)+1)...)
}
n, err := io.ReadFull(r, buffer[0:length])
if err != nil {
return nil, nil
}
if n < length {
err = io.ErrUnexpectedEOF
return
}
return buffer[0:length], nil
}
// ResizeContainerTTY sets container's TTY height and width in characters
func ResizeContainerTTY(ctx context.Context, nameOrId string, height *int, width *int) error {
return resizeTTY(ctx, bindings.JoinURL("containers", nameOrId, "resize"), height, width)
}
// ResizeExecTTY sets session's TTY height and width in characters
func ResizeExecTTY(ctx context.Context, nameOrId string, height *int, width *int) error {
return resizeTTY(ctx, bindings.JoinURL("exec", nameOrId, "resize"), height, width)
}
// resizeTTY set size of TTY of container
func resizeTTY(ctx context.Context, endpoint string, height *int, width *int) error {
conn, err := bindings.GetClient(ctx)
if err != nil {
return err
}
params := url.Values{}
if height != nil {
params.Set("h", strconv.Itoa(*height))
}
if width != nil {
params.Set("w", strconv.Itoa(*width))
}
rsp, err := conn.DoRequest(nil, http.MethodPost, endpoint, params, nil)
if err != nil {
return err
}
return rsp.Process(nil)
}
type rawFormatter struct {
logrus.TextFormatter
}
func (f *rawFormatter) Format(entry *logrus.Entry) ([]byte, error) {
buffer, err := f.TextFormatter.Format(entry)
if err != nil {
return buffer, err
}
return append(buffer, '\r'), nil
}

View File

@ -2,6 +2,7 @@ package tunnel
import (
"context"
"fmt"
"io"
"os"
"strconv"
@ -11,6 +12,7 @@ import (
"github.com/containers/common/pkg/config"
"github.com/containers/image/v5/docker/reference"
"github.com/containers/libpod/libpod/define"
"github.com/containers/libpod/pkg/api/handlers"
"github.com/containers/libpod/pkg/bindings"
"github.com/containers/libpod/pkg/bindings/containers"
"github.com/containers/libpod/pkg/domain/entities"
@ -375,7 +377,39 @@ func (ic *ContainerEngine) ContainerAttach(ctx context.Context, nameOrId string,
}
func (ic *ContainerEngine) ContainerExec(ctx context.Context, nameOrId string, options entities.ExecOptions, streams define.AttachStreams) (int, error) {
return 125, errors.New("not implemented")
env := []string{}
for k, v := range options.Envs {
env = append(env, fmt.Sprintf("%s=%s", k, v))
}
createConfig := new(handlers.ExecCreateConfig)
createConfig.User = options.User
createConfig.Privileged = options.Privileged
createConfig.Tty = options.Tty
createConfig.AttachStdin = options.Interactive
createConfig.AttachStdout = true
createConfig.AttachStderr = true
createConfig.Detach = false
createConfig.DetachKeys = options.DetachKeys
createConfig.Env = env
createConfig.WorkingDir = options.WorkDir
createConfig.Cmd = options.Cmd
sessionID, err := containers.ExecCreate(ic.ClientCxt, nameOrId, createConfig)
if err != nil {
return 125, err
}
if err := containers.ExecStartAndAttach(ic.ClientCxt, sessionID, &streams); err != nil {
return 125, err
}
inspectOut, err := containers.ExecInspect(ic.ClientCxt, sessionID)
if err != nil {
return 125, err
}
return inspectOut.ExitCode, nil
}
func (ic *ContainerEngine) ContainerExecDetached(ctx context.Context, nameOrID string, options entities.ExecOptions) (string, error) {

View File

@ -18,7 +18,6 @@ var _ = Describe("Podman exec", func() {
)
BeforeEach(func() {
Skip(v2remotefail)
tempdir, err = CreateTempDirInTempDir()
if err != nil {
os.Exit(1)
@ -285,6 +284,7 @@ var _ = Describe("Podman exec", func() {
})
It("podman exec --detach", func() {
Skip(v2remotefail)
ctrName := "testctr"
ctr := podmanTest.Podman([]string{"run", "-t", "-i", "-d", "--name", ctrName, ALPINE, "top"})
ctr.WaitWithDefaultTimeout()