V2 Update attach bindings to use Readers/Writers vs chan

* Change function call to use readers/writers in place channels
* Support stdin for pushing data from client to container
* Add bindings test

Signed-off-by: Jhon Honce <jhonce@redhat.com>
This commit is contained in:
Jhon Honce
2020-05-13 14:53:53 -07:00
parent 7e9ed37c09
commit d34e5a142a
3 changed files with 136 additions and 47 deletions

@ -39,6 +39,7 @@ type APIResponse struct {
type Connection struct { type Connection struct {
_url *url.URL _url *url.URL
client *http.Client client *http.Client
conn *net.Conn
} }
type valueKey string type valueKey string
@ -88,26 +89,26 @@ func NewConnection(ctx context.Context, uri string, identity ...string) (context
} }
// Now we setup the http client to use the connection above // Now we setup the http client to use the connection above
var client *http.Client var connection Connection
switch _url.Scheme { switch _url.Scheme {
case "ssh": case "ssh":
secure, err = strconv.ParseBool(_url.Query().Get("secure")) secure, err = strconv.ParseBool(_url.Query().Get("secure"))
if err != nil { if err != nil {
secure = false secure = false
} }
client, err = sshClient(_url, identity[0], secure) connection, err = sshClient(_url, identity[0], secure)
case "unix": case "unix":
if !strings.HasPrefix(uri, "unix:///") { if !strings.HasPrefix(uri, "unix:///") {
// autofix unix://path_element vs unix:///path_element // autofix unix://path_element vs unix:///path_element
_url.Path = JoinURL(_url.Host, _url.Path) _url.Path = JoinURL(_url.Host, _url.Path)
_url.Host = "" _url.Host = ""
} }
client, err = unixClient(_url) connection, err = unixClient(_url)
case "tcp": case "tcp":
if !strings.HasPrefix(uri, "tcp://") { if !strings.HasPrefix(uri, "tcp://") {
return nil, errors.New("tcp URIs should begin with tcp://") return nil, errors.New("tcp URIs should begin with tcp://")
} }
client, err = tcpClient(_url) connection, err = tcpClient(_url)
default: default:
return nil, errors.Errorf("'%s' is not a supported schema", _url.Scheme) return nil, errors.Errorf("'%s' is not a supported schema", _url.Scheme)
} }
@ -115,22 +116,30 @@ func NewConnection(ctx context.Context, uri string, identity ...string) (context
return nil, errors.Wrapf(err, "Failed to create %sClient", _url.Scheme) return nil, errors.Wrapf(err, "Failed to create %sClient", _url.Scheme)
} }
ctx = context.WithValue(ctx, clientKey, &Connection{_url, client}) ctx = context.WithValue(ctx, clientKey, &connection)
if err := pingNewConnection(ctx); err != nil { if err := pingNewConnection(ctx); err != nil {
return nil, err return nil, err
} }
return ctx, nil return ctx, nil
} }
func tcpClient(_url *url.URL) (*http.Client, error) { func tcpClient(_url *url.URL) (Connection, error) {
return &http.Client{ connection := Connection{
_url: _url,
}
connection.client = &http.Client{
Transport: &http.Transport{ Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
return net.Dial("tcp", _url.Host) conn, err := net.Dial("tcp", _url.Host)
if c, ok := ctx.Value(clientKey).(*Connection); ok {
c.conn = &conn
}
return conn, err
}, },
DisableCompression: true, DisableCompression: true,
}, },
}, nil }
return connection, nil
} }
// pingNewConnection pings to make sure the RESTFUL service is up // pingNewConnection pings to make sure the RESTFUL service is up
@ -151,10 +160,10 @@ func pingNewConnection(ctx context.Context) error {
return errors.Errorf("ping response was %q", response.StatusCode) return errors.Errorf("ping response was %q", response.StatusCode)
} }
func sshClient(_url *url.URL, identity string, secure bool) (*http.Client, error) { func sshClient(_url *url.URL, identity string, secure bool) (Connection, error) {
auth, err := publicKey(identity) auth, err := publicKey(identity)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "Failed to parse identity %s: %v\n", _url.String(), identity) return Connection{}, errors.Wrapf(err, "Failed to parse identity %s: %v\n", _url.String(), identity)
} }
callback := ssh.InsecureIgnoreHostKey() callback := ssh.InsecureIgnoreHostKey()
@ -188,26 +197,39 @@ func sshClient(_url *url.URL, identity string, secure bool) (*http.Client, error
}, },
) )
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "Connection to bastion host (%s) failed.", _url.String()) return Connection{}, errors.Wrapf(err, "Connection to bastion host (%s) failed.", _url.String())
}
return &http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return bastion.Dial("unix", _url.Path)
},
}}, nil
} }
func unixClient(_url *url.URL) (*http.Client, error) { connection := Connection{_url: _url}
return &http.Client{ connection.client = &http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
conn, err := bastion.Dial("unix", _url.Path)
if c, ok := ctx.Value(clientKey).(*Connection); ok {
c.conn = &conn
}
return conn, err
},
}}
return connection, nil
}
func unixClient(_url *url.URL) (Connection, error) {
connection := Connection{_url: _url}
connection.client = &http.Client{
Transport: &http.Transport{ Transport: &http.Transport{
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
d := net.Dialer{} d := net.Dialer{}
return d.DialContext(ctx, "unix", _url.Path) conn, err := d.DialContext(ctx, "unix", _url.Path)
if c, ok := ctx.Value(clientKey).(*Connection); ok {
c.conn = &conn
}
return conn, err
}, },
DisableCompression: true, DisableCompression: true,
}, },
}, nil }
return connection, nil
} }
// DoRequest assembles the http request and returns the response // DoRequest assembles the http request and returns the response
@ -232,6 +254,7 @@ func (c *Connection) DoRequest(httpBody io.Reader, httpMethod, endpoint string,
if len(queryParams) > 0 { if len(queryParams) > 0 {
req.URL.RawQuery = queryParams.Encode() req.URL.RawQuery = queryParams.Encode()
} }
req = req.WithContext(context.WithValue(context.Background(), clientKey, c))
// Give the Do three chances in the case of a comm/service hiccup // Give the Do three chances in the case of a comm/service hiccup
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
response, err = c.client.Do(req) // nolint response, err = c.client.Do(req) // nolint
@ -243,6 +266,10 @@ func (c *Connection) DoRequest(httpBody io.Reader, httpMethod, endpoint string,
return &APIResponse{response, req}, err return &APIResponse{response, req}, err
} }
func (c *Connection) Write(b []byte) (int, error) {
return (*c.conn).Write(b)
}
// FiltersToString converts our typical filter format of a // FiltersToString converts our typical filter format of a
// map[string][]string to a query/html safe string. // map[string][]string to a query/html safe string.
func FiltersToString(filters map[string][]string) (string, error) { func FiltersToString(filters map[string][]string) (string, error) {
@ -295,8 +322,8 @@ func publicKey(path string) (ssh.AuthMethod, error) {
func hostKey(host string) ssh.PublicKey { func hostKey(host string) ssh.PublicKey {
// parse OpenSSH known_hosts file // parse OpenSSH known_hosts file
// ssh or use ssh-keyscan to get initial key // ssh or use ssh-keyscan to get initial key
known_hosts := filepath.Join(homedir.HomeDir(), ".ssh", "known_hosts") knownHosts := filepath.Join(homedir.HomeDir(), ".ssh", "known_hosts")
fd, err := os.Open(known_hosts) fd, err := os.Open(knownHosts)
if err != nil { if err != nil {
logrus.Error(err) logrus.Error(err)
return nil return nil

@ -15,6 +15,7 @@ import (
"github.com/containers/libpod/pkg/bindings" "github.com/containers/libpod/pkg/bindings"
"github.com/containers/libpod/pkg/domain/entities" "github.com/containers/libpod/pkg/domain/entities"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus"
) )
var ( var (
@ -341,12 +342,18 @@ func ContainerInit(ctx context.Context, nameOrID string) error {
} }
// Attach attaches to a running container // Attach attaches to a running container
func Attach(ctx context.Context, nameOrId string, detachKeys *string, logs, stream *bool, stdin *bool, stdout io.Writer, stderr io.Writer) error { func Attach(ctx context.Context, nameOrId string, detachKeys *string, logs, stream *bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) error {
conn, err := bindings.GetClient(ctx) conn, err := bindings.GetClient(ctx)
if err != nil { if err != nil {
return err return err
} }
// Do we need to wire in stdin?
ctnr, err := Inspect(ctx, nameOrId, &bindings.PFalse)
if err != nil {
return err
}
params := url.Values{} params := url.Values{}
if detachKeys != nil { if detachKeys != nil {
params.Add("detachKeys", *detachKeys) params.Add("detachKeys", *detachKeys)
@ -357,7 +364,7 @@ func Attach(ctx context.Context, nameOrId string, detachKeys *string, logs, stre
if stream != nil { if stream != nil {
params.Add("stream", fmt.Sprintf("%t", *stream)) params.Add("stream", fmt.Sprintf("%t", *stream))
} }
if stdin != nil && *stdin { if stdin != nil {
params.Add("stdin", "true") params.Add("stdin", "true")
} }
if stdout != nil { if stdout != nil {
@ -373,11 +380,23 @@ func Attach(ctx context.Context, nameOrId string, detachKeys *string, logs, stre
} }
defer response.Body.Close() defer response.Body.Close()
ctype := response.Header.Get("Content-Type") if stdin != nil {
upgrade := response.Header.Get("Connection") go func() {
_, err := io.Copy(conn, stdin)
if err != nil {
logrus.Error("failed to write input to service: " + err.Error())
}
}()
}
buffer := make([]byte, 1024) buffer := make([]byte, 1024)
if ctype == "application/vnd.docker.raw-stream" && upgrade == "Upgrade" { if ctnr.Config.Tty {
// If not multiplex'ed, read from server and write to stdout
_, err := io.Copy(stdout, response.Body)
if err != nil {
return err
}
} else {
for { for {
// Read multiplexed channels and write to appropriate stream // Read multiplexed channels and write to appropriate stream
fd, l, err := DemuxHeader(response.Body, buffer) fd, l, err := DemuxHeader(response.Body, buffer)
@ -396,30 +415,27 @@ func Attach(ctx context.Context, nameOrId string, detachKeys *string, logs, stre
} }
switch { switch {
case fd == 0 && stdin != nil && *stdin: case fd == 0 && stdin != nil:
stdout.Write(frame) _, err := stdout.Write(frame[0:l])
if err != nil {
return err
}
case fd == 1 && stdout != nil: case fd == 1 && stdout != nil:
stdout.Write(frame) _, err := stdout.Write(frame[0:l])
if err != nil {
return err
}
case fd == 2 && stderr != nil: case fd == 2 && stderr != nil:
stderr.Write(frame) _, err := stderr.Write(frame[0:l])
if err != nil {
return err
}
case fd == 3: case fd == 3:
return fmt.Errorf("error from daemon in stream: %s", frame) return fmt.Errorf("error from daemon in stream: %s", frame)
default: default:
return fmt.Errorf("unrecognized input header: %d", fd) return fmt.Errorf("unrecognized input header: %d", fd)
} }
} }
} else {
// If not multiplex'ed from server just dump stream to stdout
for {
_, err := response.Body.Read(buffer)
if err != nil {
if !errors.Is(err, io.EOF) {
return err
}
break
}
stdout.Write(buffer)
}
} }
return err return err
} }

@ -2,10 +2,13 @@ package test_bindings
import ( import (
"bytes" "bytes"
"fmt"
"time" "time"
"github.com/containers/libpod/libpod/define"
"github.com/containers/libpod/pkg/bindings" "github.com/containers/libpod/pkg/bindings"
"github.com/containers/libpod/pkg/bindings/containers" "github.com/containers/libpod/pkg/bindings/containers"
"github.com/containers/libpod/pkg/specgen"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/onsi/gomega/gexec" "github.com/onsi/gomega/gexec"
@ -31,7 +34,7 @@ var _ = Describe("Podman containers attach", func() {
bt.cleanup() bt.cleanup()
}) })
It("attach", func() { It("can run top in container", func() {
name := "TopAttachTest" name := "TopAttachTest"
id, err := bt.RunTopContainer(&name, nil, nil) id, err := bt.RunTopContainer(&name, nil, nil)
Expect(err).ShouldNot(HaveOccurred()) Expect(err).ShouldNot(HaveOccurred())
@ -51,13 +54,56 @@ var _ = Describe("Podman containers attach", func() {
go func() { go func() {
defer GinkgoRecover() defer GinkgoRecover()
err := containers.Attach(bt.conn, id, nil, &bindings.PTrue, &bindings.PTrue, &bindings.PTrue, stdout, stderr) err := containers.Attach(bt.conn, id, nil, &bindings.PTrue, &bindings.PTrue, nil, stdout, stderr)
Expect(err).ShouldNot(HaveOccurred()) Expect(err).ShouldNot(HaveOccurred())
}() }()
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
// First character/First line of top output // First character/First line of top output
Expect(stdout.String()).Should(ContainSubstring("Mem: ")) Expect(stdout.String()).Should(ContainSubstring("Mem: "))
}) })
It("can echo data via cat in container", func() {
s := specgen.NewSpecGenerator(alpine.name, false)
s.Name = "CatAttachTest"
s.Terminal = true
s.Command = []string{"/bin/cat"}
ctnr, err := containers.CreateWithSpec(bt.conn, s)
Expect(err).ShouldNot(HaveOccurred())
err = containers.Start(bt.conn, ctnr.ID, nil)
Expect(err).ShouldNot(HaveOccurred())
wait := define.ContainerStateRunning
_, err = containers.Wait(bt.conn, ctnr.ID, &wait)
Expect(err).ShouldNot(HaveOccurred())
tickTock := time.NewTimer(2 * time.Second)
go func() {
<-tickTock.C
timeout := uint(5)
err := containers.Stop(bt.conn, ctnr.ID, &timeout)
if err != nil {
GinkgoWriter.Write([]byte(err.Error()))
}
}()
msg := "Hello, World"
stdin := &bytes.Buffer{}
stdin.WriteString(msg + "\n")
stdout := &bytes.Buffer{}
stderr := &bytes.Buffer{}
go func() {
defer GinkgoRecover()
err := containers.Attach(bt.conn, ctnr.ID, nil, &bindings.PFalse, &bindings.PTrue, stdin, stdout, stderr)
Expect(err).ShouldNot(HaveOccurred())
}()
time.Sleep(5 * time.Second)
// Tty==true so we get echo'ed stdin + expected output
Expect(stdout.String()).Should(Equal(fmt.Sprintf("%[1]s\r\n%[1]s\r\n", msg)))
Expect(stderr.String()).Should(BeEmpty())
})
}) })