Merge pull request #3367 from baude/varlinkmore

account for varlink calls that dont use more
This commit is contained in:
OpenShift Merge Robot
2019-07-10 16:50:45 +02:00
committed by GitHub
3 changed files with 112 additions and 70 deletions

View File

@ -0,0 +1,34 @@
package channelwriter
import "github.com/pkg/errors"
// Writer is an io.writer-like object that "writes" to a channel
// instead of a buffer or file, etc. It is handy for varlink endpoints when
// needing to handle endpoints that do logging "real-time"
type Writer struct {
ByteChannel chan []byte
}
// NewChannelWriter creates a new channel writer and adds a
// byte slice channel into it.
func NewChannelWriter() *Writer {
byteChannel := make(chan []byte)
return &Writer{
ByteChannel: byteChannel,
}
}
// Write method for Writer
func (c *Writer) Write(w []byte) (int, error) {
if c.ByteChannel == nil {
return 0, errors.New("channel writer channel cannot be nil")
}
c.ByteChannel <- w
return len(w), nil
}
// Close method for Writer
func (c *Writer) Close() error {
close(c.ByteChannel)
return nil
}

View File

@ -25,6 +25,7 @@ import (
"github.com/containers/libpod/libpod"
"github.com/containers/libpod/libpod/define"
"github.com/containers/libpod/libpod/image"
"github.com/containers/libpod/pkg/channelwriter"
"github.com/containers/libpod/pkg/util"
"github.com/containers/libpod/utils"
"github.com/containers/storage/pkg/archive"
@ -495,9 +496,19 @@ func (i *LibpodAPI) DeleteUnusedImages(call iopodman.VarlinkCall) error {
// Commit ...
func (i *LibpodAPI) Commit(call iopodman.VarlinkCall, name, imageName string, changes []string, author, message string, pause bool, manifestType string) error {
var newImage *image.Image
var (
newImage *image.Image
log []string
mimeType string
)
output := channelwriter.NewChannelWriter()
channelClose := func() {
if err := output.Close(); err != nil {
logrus.Errorf("failed to close channel writer: %q", err)
}
}
defer channelClose()
output := bytes.NewBuffer([]byte{})
ctr, err := i.Runtime.LookupContainer(name)
if err != nil {
return call.ReplyContainerNotFound(name, err.Error())
@ -507,7 +518,6 @@ func (i *LibpodAPI) Commit(call iopodman.VarlinkCall, name, imageName string, ch
return call.ReplyErrorOccurred(err.Error())
}
sc := image.GetSystemContext(rtc.SignaturePolicyPath, "", false)
var mimeType string
switch manifestType {
case "oci", "": //nolint
mimeType = buildah.OCIv1ImageManifest
@ -535,6 +545,7 @@ func (i *LibpodAPI) Commit(call iopodman.VarlinkCall, name, imageName string, ch
}
c := make(chan error)
defer close(c)
go func() {
newImage, err = ctr.Commit(getContext(), imageName, options)
@ -542,48 +553,22 @@ func (i *LibpodAPI) Commit(call iopodman.VarlinkCall, name, imageName string, ch
c <- err
}
c <- nil
close(c)
}()
var log []string
done := false
for {
line, err := output.ReadString('\n')
if err == nil {
log = append(log, line)
continue
} else if err == io.EOF {
select {
case err := <-c:
if err != nil {
logrus.Errorf("reading of output during commit failed for %s", name)
return call.ReplyErrorOccurred(err.Error())
}
done = true
default:
if !call.WantsMore() {
break
}
br := iopodman.MoreResponse{
Logs: log,
}
call.ReplyCommit(br)
log = []string{}
}
} else {
return call.ReplyErrorOccurred(err.Error())
}
if done {
break
}
// reply is the func being sent to the output forwarder. in this case it is replying
// with a more response struct
reply := func(br iopodman.MoreResponse) error {
return call.ReplyCommit(br)
}
log, err = forwardOutput(log, c, call.WantsMore(), output, reply)
if err != nil {
return call.ReplyErrorOccurred(err.Error())
}
call.Continues = false
br := iopodman.MoreResponse{
Logs: log,
Id: newImage.ID(),
}
return call.ReplyCommit(br)
}
@ -636,6 +621,7 @@ func (i *LibpodAPI) ExportImage(call iopodman.VarlinkCall, name, destination str
func (i *LibpodAPI) PullImage(call iopodman.VarlinkCall, name string) error {
var (
imageID string
err error
)
dockerRegistryOptions := image.DockerRegistryOptions{}
so := image.SigningOptions{}
@ -643,8 +629,16 @@ func (i *LibpodAPI) PullImage(call iopodman.VarlinkCall, name string) error {
if call.WantsMore() {
call.Continues = true
}
output := bytes.NewBuffer([]byte{})
output := channelwriter.NewChannelWriter()
channelClose := func() {
if err := output.Close(); err != nil {
logrus.Errorf("failed to close channel writer: %q", err)
}
}
defer channelClose()
c := make(chan error)
defer close(c)
go func() {
if strings.HasPrefix(name, dockerarchive.Transport.Name()+":") {
srcRef, err := alltransports.ParseImageName(name)
@ -666,43 +660,17 @@ func (i *LibpodAPI) PullImage(call iopodman.VarlinkCall, name string) error {
}
}
c <- nil
close(c)
}()
var log []string
done := false
for {
line, err := output.ReadString('\n')
if err == nil {
log = append(log, line)
continue
} else if err == io.EOF {
select {
case err := <-c:
if err != nil {
logrus.Errorf("reading of output during pull failed for %s", name)
return call.ReplyErrorOccurred(err.Error())
}
done = true
default:
if !call.WantsMore() {
break
}
br := iopodman.MoreResponse{
Logs: log,
}
call.ReplyPullImage(br)
log = []string{}
}
} else {
return call.ReplyErrorOccurred(err.Error())
}
if done {
break
}
reply := func(br iopodman.MoreResponse) error {
return call.ReplyPullImage(br)
}
log, err = forwardOutput(log, c, call.WantsMore(), output, reply)
if err != nil {
return call.ReplyErrorOccurred(err.Error())
}
call.Continues = false
br := iopodman.MoreResponse{
Logs: log,
Id: imageID,

View File

@ -13,6 +13,7 @@ import (
"github.com/containers/libpod/cmd/podman/varlink"
"github.com/containers/libpod/libpod"
"github.com/containers/libpod/libpod/define"
"github.com/containers/libpod/pkg/channelwriter"
"github.com/containers/storage/pkg/archive"
)
@ -196,3 +197,42 @@ func makePsOpts(inOpts iopodman.PsOpts) shared.PsOptions {
Sync: derefBool(inOpts.Sync),
}
}
// forwardOutput is a helper method for varlink endpoints that employ both more and without
// more. it is capable of sending updates as the output writer gets them or append them
// all to a log. the chan error is the error from the libpod call so we can honor
// and error event in that case.
func forwardOutput(log []string, c chan error, wantsMore bool, output *channelwriter.Writer, reply func(br iopodman.MoreResponse) error) ([]string, error) {
done := false
for {
select {
// We need to check if the libpod func being called has returned an
// error yet
case err := <-c:
if err != nil {
return nil, err
}
done = true
// if no error is found, we pull what we can from the log writer and
// append it to log string slice
case line := <-output.ByteChannel:
log = append(log, string(line))
// If the end point is being used in more mode, send what we have
if wantsMore {
br := iopodman.MoreResponse{
Logs: log,
}
if err := reply(br); err != nil {
return nil, err
}
// "reset" the log to empty because we are sending what we
// get as we get it
log = []string{}
}
}
if done {
break
}
}
return log, nil
}