mirror of
https://github.com/containers/podman.git
synced 2025-05-28 13:40:33 +08:00
Finish up remote exec implementation
Signed-off-by: Peter Hunt <pehunt@redhat.com>
This commit is contained in:
@ -3,6 +3,7 @@
|
||||
package adapter
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@ -555,93 +556,6 @@ func (r *LocalRuntime) Ps(c *cliconfig.PsValues, opts shared.PsOptions) ([]share
|
||||
return psContainers, nil
|
||||
}
|
||||
|
||||
func (r *LocalRuntime) attach(ctx context.Context, stdin, stdout *os.File, cid string, start bool, detachKeys string) (chan error, error) {
|
||||
var (
|
||||
oldTermState *term.State
|
||||
)
|
||||
errChan := make(chan error)
|
||||
spec, err := r.Spec(cid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resize := make(chan remotecommand.TerminalSize, 5)
|
||||
haveTerminal := terminal.IsTerminal(int(os.Stdin.Fd()))
|
||||
|
||||
// Check if we are attached to a terminal. If we are, generate resize
|
||||
// events, and set the terminal to raw mode
|
||||
if haveTerminal && spec.Process.Terminal {
|
||||
logrus.Debugf("Handling terminal attach")
|
||||
|
||||
subCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
resizeTty(subCtx, resize)
|
||||
oldTermState, err = term.SaveState(os.Stdin.Fd())
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "unable to save terminal state")
|
||||
}
|
||||
|
||||
logrus.SetFormatter(&RawTtyFormatter{})
|
||||
term.SetRawTerminal(os.Stdin.Fd())
|
||||
|
||||
}
|
||||
// TODO add detach keys support
|
||||
reply, err := iopodman.Attach().Send(r.Conn, varlink.Upgrade, cid, detachKeys, start)
|
||||
if err != nil {
|
||||
restoreTerminal(oldTermState)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// See if the server accepts the upgraded connection or returns an error
|
||||
_, err = reply()
|
||||
|
||||
if err != nil {
|
||||
restoreTerminal(oldTermState)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// These are the varlink sockets
|
||||
reader := r.Conn.Reader
|
||||
writer := r.Conn.Writer
|
||||
|
||||
// These are the special writers that encode input from the client.
|
||||
varlinkStdinWriter := virtwriter.NewVirtWriteCloser(writer, virtwriter.ToStdin)
|
||||
varlinkResizeWriter := virtwriter.NewVirtWriteCloser(writer, virtwriter.TerminalResize)
|
||||
|
||||
go func() {
|
||||
// Read from the wire and direct to stdout or stderr
|
||||
err := virtwriter.Reader(reader, stdout, os.Stderr, nil, nil)
|
||||
defer restoreTerminal(oldTermState)
|
||||
errChan <- err
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for termResize := range resize {
|
||||
b, err := json.Marshal(termResize)
|
||||
if err != nil {
|
||||
defer restoreTerminal(oldTermState)
|
||||
errChan <- err
|
||||
}
|
||||
_, err = varlinkResizeWriter.Write(b)
|
||||
if err != nil {
|
||||
defer restoreTerminal(oldTermState)
|
||||
errChan <- err
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Takes stdinput and sends it over the wire after being encoded
|
||||
go func() {
|
||||
if _, err := io.Copy(varlinkStdinWriter, stdin); err != nil {
|
||||
defer restoreTerminal(oldTermState)
|
||||
errChan <- err
|
||||
}
|
||||
|
||||
}()
|
||||
return errChan, nil
|
||||
|
||||
}
|
||||
|
||||
// Attach to a remote terminal
|
||||
func (r *LocalRuntime) Attach(ctx context.Context, c *cliconfig.AttachValues) error {
|
||||
ctr, err := r.LookupContainer(c.InputArgs[0])
|
||||
@ -796,6 +710,50 @@ func (r *LocalRuntime) Start(ctx context.Context, c *cliconfig.StartValues, sigP
|
||||
return exitCode, finalErr
|
||||
}
|
||||
|
||||
func (r *LocalRuntime) attach(ctx context.Context, stdin, stdout *os.File, cid string, start bool, detachKeys string) (chan error, error) {
|
||||
var (
|
||||
oldTermState *term.State
|
||||
)
|
||||
spec, err := r.Spec(cid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resize := make(chan remotecommand.TerminalSize, 5)
|
||||
haveTerminal := terminal.IsTerminal(int(os.Stdin.Fd()))
|
||||
|
||||
// Check if we are attached to a terminal. If we are, generate resize
|
||||
// events, and set the terminal to raw mode
|
||||
if haveTerminal && spec.Process.Terminal {
|
||||
cancel, oldTermState, err := handleTerminalAttach(ctx, resize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer cancel()
|
||||
defer restoreTerminal(oldTermState)
|
||||
|
||||
logrus.SetFormatter(&RawTtyFormatter{})
|
||||
term.SetRawTerminal(os.Stdin.Fd())
|
||||
}
|
||||
|
||||
// TODO add detach keys support
|
||||
reply, err := iopodman.Attach().Send(r.Conn, varlink.Upgrade, cid, detachKeys, start)
|
||||
if err != nil {
|
||||
restoreTerminal(oldTermState)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// See if the server accepts the upgraded connection or returns an error
|
||||
_, err = reply()
|
||||
|
||||
if err != nil {
|
||||
restoreTerminal(oldTermState)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
errChan := configureVarlinkAttachStdio(r.Conn.Reader, r.Conn.Writer, stdin, stdout, oldTermState, resize, nil)
|
||||
return errChan, nil
|
||||
}
|
||||
|
||||
// PauseContainers pauses container(s) based on CLI inputs.
|
||||
func (r *LocalRuntime) PauseContainers(ctx context.Context, cli *cliconfig.PauseValues) ([]string, map[string]error, error) {
|
||||
var (
|
||||
@ -1037,8 +995,11 @@ func (r *LocalRuntime) Commit(ctx context.Context, c *cliconfig.CommitValues, co
|
||||
|
||||
// ExecContainer executes a command in the container
|
||||
func (r *LocalRuntime) ExecContainer(ctx context.Context, cli *cliconfig.ExecValues) (int, error) {
|
||||
var (
|
||||
oldTermState *term.State
|
||||
ec int = 125
|
||||
)
|
||||
// default invalid command exit code
|
||||
ec := 125
|
||||
// Validate given environment variables
|
||||
env := map[string]string{}
|
||||
if err := parse.ReadKVStrings(env, []string{}, cli.Env); err != nil {
|
||||
@ -1051,6 +1012,24 @@ func (r *LocalRuntime) ExecContainer(ctx context.Context, cli *cliconfig.ExecVal
|
||||
envs = append(envs, fmt.Sprintf("%s=%s", k, v))
|
||||
}
|
||||
|
||||
resize := make(chan remotecommand.TerminalSize, 5)
|
||||
haveTerminal := terminal.IsTerminal(int(os.Stdin.Fd()))
|
||||
|
||||
// Check if we are attached to a terminal. If we are, generate resize
|
||||
// events, and set the terminal to raw mode
|
||||
// TODO FIXME tty
|
||||
if haveTerminal && cli.Tty {
|
||||
cancel, oldTermState, err := handleTerminalAttach(ctx, resize)
|
||||
if err != nil {
|
||||
return ec, err
|
||||
}
|
||||
defer cancel()
|
||||
defer restoreTerminal(oldTermState)
|
||||
|
||||
logrus.SetFormatter(&RawTtyFormatter{})
|
||||
term.SetRawTerminal(os.Stdin.Fd())
|
||||
}
|
||||
|
||||
opts := iopodman.ExecOpts{
|
||||
Name: cli.InputArgs[0],
|
||||
Tty: cli.Tty,
|
||||
@ -1061,16 +1040,69 @@ func (r *LocalRuntime) ExecContainer(ctx context.Context, cli *cliconfig.ExecVal
|
||||
Env: &envs,
|
||||
}
|
||||
|
||||
receive, err := iopodman.ExecContainer().Send(r.Conn, varlink.Upgrade, opts)
|
||||
inputStream := os.Stdin
|
||||
if !cli.Interactive {
|
||||
inputStream = nil
|
||||
}
|
||||
|
||||
reply, err := iopodman.ExecContainer().Send(r.Conn, varlink.Upgrade, opts)
|
||||
if err != nil {
|
||||
return ec, errors.Wrapf(err, "Exec failed to contact service for %s", cli.InputArgs)
|
||||
}
|
||||
|
||||
_, err = receive()
|
||||
_, err = reply()
|
||||
if err != nil {
|
||||
return ec, errors.Wrapf(err, "Exec operation failed for %s", cli.InputArgs)
|
||||
}
|
||||
|
||||
// TODO return exit code from exec call
|
||||
return 0, nil
|
||||
ecChan := make(chan int, 1)
|
||||
errChan := configureVarlinkAttachStdio(r.Conn.Reader, r.Conn.Writer, inputStream, os.Stdout, oldTermState, resize, ecChan)
|
||||
|
||||
select {
|
||||
case err = <-errChan:
|
||||
break
|
||||
case ec = <-ecChan:
|
||||
break
|
||||
}
|
||||
|
||||
return ec, err
|
||||
}
|
||||
|
||||
func configureVarlinkAttachStdio(reader *bufio.Reader, writer *bufio.Writer, stdin *os.File, stdout *os.File, oldTermState *term.State, resize chan remotecommand.TerminalSize, ecChan chan int) chan error {
|
||||
var errChan chan error
|
||||
// These are the special writers that encode input from the client.
|
||||
varlinkStdinWriter := virtwriter.NewVirtWriteCloser(writer, virtwriter.ToStdin)
|
||||
varlinkResizeWriter := virtwriter.NewVirtWriteCloser(writer, virtwriter.TerminalResize)
|
||||
|
||||
go func() {
|
||||
// Read from the wire and direct to stdout or stderr
|
||||
err := virtwriter.Reader(reader, stdout, os.Stderr, nil, nil, ecChan)
|
||||
defer restoreTerminal(oldTermState)
|
||||
errChan <- err
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for termResize := range resize {
|
||||
b, err := json.Marshal(termResize)
|
||||
if err != nil {
|
||||
defer restoreTerminal(oldTermState)
|
||||
errChan <- err
|
||||
}
|
||||
_, err = varlinkResizeWriter.Write(b)
|
||||
if err != nil {
|
||||
defer restoreTerminal(oldTermState)
|
||||
errChan <- err
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Takes stdinput and sends it over the wire after being encoded
|
||||
go func() {
|
||||
if _, err := io.Copy(varlinkStdinWriter, stdin); err != nil {
|
||||
defer restoreTerminal(oldTermState)
|
||||
errChan <- err
|
||||
}
|
||||
|
||||
}()
|
||||
return errChan
|
||||
}
|
||||
|
@ -68,7 +68,7 @@ func (i *LibpodAPI) Attach(call iopodman.VarlinkCall, name string, detachKeys st
|
||||
reader, writer, _, pw, streams := setupStreams(call)
|
||||
|
||||
go func() {
|
||||
if err := virtwriter.Reader(reader, nil, nil, pw, resize); err != nil {
|
||||
if err := virtwriter.Reader(reader, nil, nil, pw, resize, nil); err != nil {
|
||||
errChan <- err
|
||||
}
|
||||
}()
|
||||
@ -83,7 +83,7 @@ func (i *LibpodAPI) Attach(call iopodman.VarlinkCall, name string, detachKeys st
|
||||
logrus.Error(finalErr)
|
||||
}
|
||||
|
||||
if err = virtwriter.HangUp(writer); err != nil {
|
||||
if err = virtwriter.HangUp(writer, 0); err != nil {
|
||||
logrus.Errorf("Failed to HANG-UP attach to %s: %s", ctr.ID(), err.Error())
|
||||
}
|
||||
return call.Writer.Flush()
|
||||
|
@ -796,45 +796,55 @@ func (i *LibpodAPI) ExecContainer(call iopodman.VarlinkCall, opts iopodman.ExecO
|
||||
if opts.Workdir != nil {
|
||||
workDir = *opts.Workdir
|
||||
}
|
||||
// ACK the client upgrade request
|
||||
call.ReplyExecContainer()
|
||||
|
||||
resizeChan := make(chan remotecommand.TerminalSize)
|
||||
errChan := make(chan error)
|
||||
|
||||
reader, writer, _, pipeWriter, streams := setupStreams(call)
|
||||
//reader, _, _, pipeWriter, streams := setupStreams(call)
|
||||
ecChan := make(chan uint32, 1)
|
||||
|
||||
go func() {
|
||||
fmt.Printf("ExecContainer Start Reader\n")
|
||||
if err := virtwriter.Reader(reader, nil, nil, pipeWriter, resizeChan); err != nil {
|
||||
fmt.Printf("ExecContainer Reader err %s, %s\n", err.Error(), errors.Cause(err).Error())
|
||||
errChan <- err
|
||||
if err := virtwriter.Reader(reader, nil, nil, pipeWriter, resizeChan, nil); err != nil {
|
||||
//fmt.Printf("ExecContainer Reader err %s, %s\n", err.Error(), errors.Cause(err).Error())
|
||||
errChan <- errors.Wrapf(err, "error")
|
||||
}
|
||||
}()
|
||||
|
||||
// Debugging...
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
fmt.Printf("ExecContainer Start ctr.Exec\n")
|
||||
// TODO detach keys and resize
|
||||
// TODO add handling for exit code
|
||||
// TODO capture exit code and return to main thread
|
||||
go func() {
|
||||
fmt.Printf("ExecContainer Start ctr.Exec\n")
|
||||
// TODO detach keys and resize
|
||||
// TODO add handling for exit code
|
||||
// TODO capture exit code and return to main thread
|
||||
_, err := ctr.Exec(opts.Tty, opts.Privileged, envs, opts.Cmd, user, workDir, streams, 0, nil, "")
|
||||
ec, err := ctr.Exec(opts.Tty, opts.Privileged, envs, opts.Cmd, user, workDir, streams, 0, resizeChan, "")
|
||||
if err != nil {
|
||||
fmt.Printf("ExecContainer Exec err %s, %s\n", err.Error(), errors.Cause(err).Error())
|
||||
errChan <- errors.Wrapf(err, "ExecContainer failed for container %s", ctr.ID())
|
||||
logrus.Errorf("ExecContainer Exec err %s, %s\n", err.Error(), errors.Cause(err).Error())
|
||||
errChan <-err
|
||||
}
|
||||
ecChan <-uint32(ec)
|
||||
|
||||
}()
|
||||
|
||||
execErr := <-errChan
|
||||
|
||||
if execErr != nil && errors.Cause(execErr) != io.EOF {
|
||||
fmt.Printf("ExecContainer err: %s\n", execErr.Error())
|
||||
return call.ReplyErrorOccurred(execErr.Error())
|
||||
ec := uint32(125)
|
||||
var execErr error
|
||||
select {
|
||||
case execErr = <-errChan:
|
||||
fmt.Println(execErr.Error())
|
||||
case ec = <-ecChan:
|
||||
fmt.Println("found", ec)
|
||||
}
|
||||
|
||||
if err = virtwriter.HangUp(writer); err != nil {
|
||||
fmt.Printf("ExecContainer hangup err: %s\n", err.Error())
|
||||
// TODO FIXME prevent all of vthese conversions
|
||||
if err = virtwriter.HangUp(writer, int(ec)); err != nil {
|
||||
logrus.Errorf("ExecContainer failed to HANG-UP on %s: %s", ctr.ID(), err.Error())
|
||||
}
|
||||
return call.Writer.Flush()
|
||||
defer fmt.Println("Succeeded in exec'ing")
|
||||
if err := call.Writer.Flush(); err != nil {
|
||||
logrus.Errorf("Exec Container err: %s", err.Error())
|
||||
}
|
||||
|
||||
return execErr
|
||||
}
|
||||
|
@ -89,10 +89,14 @@ func (v VirtWriteCloser) Write(input []byte) (int, error) {
|
||||
}
|
||||
|
||||
// Reader decodes the content that comes over the wire and directs it to the proper destination.
|
||||
func Reader(r *bufio.Reader, output io.Writer, errput io.Writer, input io.Writer, resize chan remotecommand.TerminalSize) error {
|
||||
func Reader(r *bufio.Reader, output, errput, input io.Writer, resize chan remotecommand.TerminalSize, execEcChan chan int) error {
|
||||
var messageSize int64
|
||||
headerBytes := make([]byte, 8)
|
||||
|
||||
if r == nil {
|
||||
return errors.Errorf("Reader must not be nil")
|
||||
}
|
||||
|
||||
for {
|
||||
n, err := io.ReadFull(r, headerBytes)
|
||||
if err != nil {
|
||||
@ -106,44 +110,57 @@ func Reader(r *bufio.Reader, output io.Writer, errput io.Writer, input io.Writer
|
||||
|
||||
switch IntToSocketDest(int(headerBytes[0])) {
|
||||
case ToStdout:
|
||||
_, err := io.CopyN(output, r, messageSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case ToStderr:
|
||||
_, err := io.CopyN(errput, r, messageSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case ToStdin:
|
||||
_, err := io.CopyN(input, r, messageSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case TerminalResize:
|
||||
out := make([]byte, messageSize)
|
||||
if messageSize > 0 {
|
||||
_, err = io.ReadFull(r, out)
|
||||
|
||||
if output != nil {
|
||||
_, err := io.CopyN(output, r, messageSize)
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrapf(err, "issue stdout")
|
||||
}
|
||||
}
|
||||
// Resize events come over in bytes, need to be reserialized
|
||||
resizeEvent := remotecommand.TerminalSize{}
|
||||
if err := json.Unmarshal(out, &resizeEvent); err != nil {
|
||||
return err
|
||||
case ToStderr:
|
||||
if errput != nil {
|
||||
_, err := io.CopyN(errput, r, messageSize)
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrapf(err, "issue stderr")
|
||||
}
|
||||
}
|
||||
case ToStdin:
|
||||
if input != nil {
|
||||
_, err := io.CopyN(input, r, messageSize)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "issue stdin")
|
||||
}
|
||||
}
|
||||
case TerminalResize:
|
||||
if resize != nil {
|
||||
out := make([]byte, messageSize)
|
||||
if messageSize > 0 {
|
||||
_, err = io.ReadFull(r, out)
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "issue resizing")
|
||||
}
|
||||
}
|
||||
// Resize events come over in bytes, need to be reserialized
|
||||
resizeEvent := remotecommand.TerminalSize{}
|
||||
if err := json.Unmarshal(out, &resizeEvent); err != nil {
|
||||
return errors.Wrapf(err, "issue resizing")
|
||||
}
|
||||
resize <- resizeEvent
|
||||
}
|
||||
resize <- resizeEvent
|
||||
case Quit:
|
||||
out := make([]byte, messageSize)
|
||||
if messageSize > 0 {
|
||||
_, err = io.ReadFull(r, out)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrapf(err, "issue quitting")
|
||||
}
|
||||
}
|
||||
if execEcChan != nil {
|
||||
ecInt := binary.BigEndian.Uint32(out)
|
||||
execEcChan <-int(ecInt)
|
||||
}
|
||||
return nil
|
||||
|
||||
default:
|
||||
@ -154,9 +171,12 @@ func Reader(r *bufio.Reader, output io.Writer, errput io.Writer, input io.Writer
|
||||
}
|
||||
|
||||
// HangUp sends message to peer to close connection
|
||||
func HangUp(writer *bufio.Writer) (err error) {
|
||||
func HangUp(writer *bufio.Writer, ec int) (err error) {
|
||||
n := 0
|
||||
msg := []byte("HANG-UP")
|
||||
msg := make([]byte, 4)
|
||||
|
||||
binary.LittleEndian.PutUint32(msg, uint32(ec))
|
||||
|
||||
|
||||
writeQuit := NewVirtWriteCloser(writer, Quit)
|
||||
if n, err = writeQuit.Write(msg); err != nil {
|
||||
|
@ -1,5 +1,3 @@
|
||||
// +build !remoteclient
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
|
Reference in New Issue
Block a user