Merge pull request #16709 from vrothberg/fix-16515

kube sdnotify: run proxies for the lifespan of the service
This commit is contained in:
OpenShift Merge Robot
2022-12-07 18:10:31 -05:00
committed by GitHub
6 changed files with 216 additions and 114 deletions

View File

@ -155,7 +155,25 @@ func (ic *ContainerEngine) PlayKube(ctx context.Context, body io.Reader, options
var configMaps []v1.ConfigMap
ranContainers := false
// FIXME: both, the service container and the proxies, should ideally
// be _state_ of an object. The Kube code below is quite Spaghetti-code
// which we should refactor at some point to make it easier to extend
// (via shared state instead of passing data around) and make it more
// maintainable long term.
var serviceContainer *libpod.Container
var notifyProxies []*notifyproxy.NotifyProxy
defer func() {
// Close the notify proxy on return. At that point we know
// that a) all containers have send their READY message and
// that b) the service container has exited (and hence all
// containers).
for _, proxy := range notifyProxies {
if err := proxy.Close(); err != nil {
logrus.Errorf("Closing notify proxy %q: %v", proxy.SocketPath(), err)
}
}
}()
// create pod on each document if it is a pod or deployment
// any other kube kind will be skipped
for _, document := range documentList {
@ -205,10 +223,11 @@ func (ic *ContainerEngine) PlayKube(ctx context.Context, body io.Reader, options
podYAML.Annotations[name] = val
}
r, err := ic.playKubePod(ctx, podTemplateSpec.ObjectMeta.Name, &podTemplateSpec, options, &ipIndex, podYAML.Annotations, configMaps, serviceContainer)
r, proxies, err := ic.playKubePod(ctx, podTemplateSpec.ObjectMeta.Name, &podTemplateSpec, options, &ipIndex, podYAML.Annotations, configMaps, serviceContainer)
if err != nil {
return nil, err
}
notifyProxies = append(notifyProxies, proxies...)
report.Pods = append(report.Pods, r.Pods...)
validKinds++
@ -220,10 +239,11 @@ func (ic *ContainerEngine) PlayKube(ctx context.Context, body io.Reader, options
return nil, fmt.Errorf("unable to read YAML as Kube Deployment: %w", err)
}
r, err := ic.playKubeDeployment(ctx, &deploymentYAML, options, &ipIndex, configMaps, serviceContainer)
r, proxies, err := ic.playKubeDeployment(ctx, &deploymentYAML, options, &ipIndex, configMaps, serviceContainer)
if err != nil {
return nil, err
}
notifyProxies = append(notifyProxies, proxies...)
report.Pods = append(report.Pods, r.Pods...)
validKinds++
@ -289,23 +309,20 @@ func (ic *ContainerEngine) PlayKube(ctx context.Context, body io.Reader, options
}
if options.ServiceContainer && ranContainers {
// We can consider the service to be up and running now.
// Send the sd-notify messages pointing systemd to the
// service container.
data, err := serviceContainer.Inspect(false)
if err != nil {
return nil, err
}
message := fmt.Sprintf("MAINPID=%d\n%s", data.State.ConmonPid, daemon.SdNotifyReady)
message := fmt.Sprintf("MAINPID=%d\n%s", os.Getpid(), daemon.SdNotifyReady)
if err := notifyproxy.SendMessage("", message); err != nil {
return nil, err
}
if _, err := serviceContainer.Wait(ctx); err != nil {
return nil, fmt.Errorf("waiting for service container: %w", err)
}
}
return report, nil
}
func (ic *ContainerEngine) playKubeDeployment(ctx context.Context, deploymentYAML *v1apps.Deployment, options entities.PlayKubeOptions, ipIndex *int, configMaps []v1.ConfigMap, serviceContainer *libpod.Container) (*entities.PlayKubeReport, error) {
func (ic *ContainerEngine) playKubeDeployment(ctx context.Context, deploymentYAML *v1apps.Deployment, options entities.PlayKubeOptions, ipIndex *int, configMaps []v1.ConfigMap, serviceContainer *libpod.Container) (*entities.PlayKubeReport, []*notifyproxy.NotifyProxy, error) {
var (
deploymentName string
podSpec v1.PodTemplateSpec
@ -316,7 +333,7 @@ func (ic *ContainerEngine) playKubeDeployment(ctx context.Context, deploymentYAM
deploymentName = deploymentYAML.ObjectMeta.Name
if deploymentName == "" {
return nil, errors.New("deployment does not have a name")
return nil, nil, errors.New("deployment does not have a name")
}
numReplicas = 1
if deploymentYAML.Spec.Replicas != nil {
@ -325,18 +342,20 @@ func (ic *ContainerEngine) playKubeDeployment(ctx context.Context, deploymentYAM
podSpec = deploymentYAML.Spec.Template
// create "replicas" number of pods
var notifyProxies []*notifyproxy.NotifyProxy
for i = 0; i < numReplicas; i++ {
podName := fmt.Sprintf("%s-pod-%d", deploymentName, i)
podReport, err := ic.playKubePod(ctx, podName, &podSpec, options, ipIndex, deploymentYAML.Annotations, configMaps, serviceContainer)
podReport, proxies, err := ic.playKubePod(ctx, podName, &podSpec, options, ipIndex, deploymentYAML.Annotations, configMaps, serviceContainer)
if err != nil {
return nil, fmt.Errorf("encountered while bringing up pod %s: %w", podName, err)
return nil, notifyProxies, fmt.Errorf("encountered while bringing up pod %s: %w", podName, err)
}
report.Pods = append(report.Pods, podReport.Pods...)
notifyProxies = append(notifyProxies, proxies...)
}
return &report, nil
return &report, notifyProxies, nil
}
func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podYAML *v1.PodTemplateSpec, options entities.PlayKubeOptions, ipIndex *int, annotations map[string]string, configMaps []v1.ConfigMap, serviceContainer *libpod.Container) (*entities.PlayKubeReport, error) {
func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podYAML *v1.PodTemplateSpec, options entities.PlayKubeOptions, ipIndex *int, annotations map[string]string, configMaps []v1.ConfigMap, serviceContainer *libpod.Container) (*entities.PlayKubeReport, []*notifyproxy.NotifyProxy, error) {
var (
writer io.Writer
playKubePod entities.PlayKubePod
@ -345,18 +364,18 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
mainSdNotifyMode, err := getSdNotifyMode(annotations, "")
if err != nil {
return nil, err
return nil, nil, err
}
// Create the secret manager before hand
secretsManager, err := ic.Libpod.SecretsManager()
if err != nil {
return nil, err
return nil, nil, err
}
// Assert the pod has a name
if podName == "" {
return nil, fmt.Errorf("pod does not have a name")
return nil, nil, fmt.Errorf("pod does not have a name")
}
podOpt := entities.PodCreateOptions{
@ -366,7 +385,7 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
}
podOpt, err = kube.ToPodOpt(ctx, podName, podOpt, podYAML)
if err != nil {
return nil, err
return nil, nil, err
}
// add kube default network if no network is explicitly added
@ -384,7 +403,7 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
ns, networks, netOpts, err := specgen.ParseNetworkFlag(options.Networks, pastaNetworkNameExists)
if err != nil {
return nil, err
return nil, nil, err
}
podOpt.Net.Network = ns
@ -404,16 +423,16 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
// Validate the userns modes supported.
podOpt.Userns, err = specgen.ParseUserNamespace(options.Userns)
if err != nil {
return nil, err
return nil, nil, err
}
// FIXME This is very hard to support properly with a good ux
if len(options.StaticIPs) > *ipIndex {
if !podOpt.Net.Network.IsBridge() {
return nil, fmt.Errorf("static ip addresses can only be set when the network mode is bridge: %w", define.ErrInvalidArg)
return nil, nil, fmt.Errorf("static ip addresses can only be set when the network mode is bridge: %w", define.ErrInvalidArg)
}
if len(podOpt.Net.Networks) != 1 {
return nil, fmt.Errorf("cannot set static ip addresses for more than network, use netname:ip=<ip> syntax to specify ips for more than network: %w", define.ErrInvalidArg)
return nil, nil, fmt.Errorf("cannot set static ip addresses for more than network, use netname:ip=<ip> syntax to specify ips for more than network: %w", define.ErrInvalidArg)
}
for name, netOpts := range podOpt.Net.Networks {
netOpts.StaticIPs = append(netOpts.StaticIPs, options.StaticIPs[*ipIndex])
@ -425,10 +444,10 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
}
if len(options.StaticMACs) > *ipIndex {
if !podOpt.Net.Network.IsBridge() {
return nil, fmt.Errorf("static mac address can only be set when the network mode is bridge: %w", define.ErrInvalidArg)
return nil, nil, fmt.Errorf("static mac address can only be set when the network mode is bridge: %w", define.ErrInvalidArg)
}
if len(podOpt.Net.Networks) != 1 {
return nil, fmt.Errorf("cannot set static mac address for more than network, use netname:mac=<mac> syntax to specify mac for more than network: %w", define.ErrInvalidArg)
return nil, nil, fmt.Errorf("cannot set static mac address for more than network, use netname:mac=<mac> syntax to specify mac for more than network: %w", define.ErrInvalidArg)
}
for name, netOpts := range podOpt.Net.Networks {
netOpts.StaticMAC = nettypes.HardwareAddr(options.StaticMACs[*ipIndex])
@ -442,12 +461,12 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
p := specgen.NewPodSpecGenerator()
if err != nil {
return nil, err
return nil, nil, err
}
p, err = entities.ToPodSpecGen(*p, &podOpt)
if err != nil {
return nil, err
return nil, nil, err
}
podSpec := entities.PodSpec{PodSpecGen: *p}
@ -458,17 +477,17 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
for _, p := range options.ConfigMaps {
f, err := os.Open(p)
if err != nil {
return nil, err
return nil, nil, err
}
defer f.Close()
cm, err := readConfigMapFromFile(f)
if err != nil {
return nil, fmt.Errorf("%q: %w", p, err)
return nil, nil, fmt.Errorf("%q: %w", p, err)
}
if _, present := configMapIndex[cm.Name]; present {
return nil, fmt.Errorf("ambiguous configuration: the same config map %s is present in YAML and in --configmaps %s file", cm.Name, p)
return nil, nil, fmt.Errorf("ambiguous configuration: the same config map %s is present in YAML and in --configmaps %s file", cm.Name, p)
}
configMaps = append(configMaps, cm)
@ -476,7 +495,7 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
volumes, err := kube.InitializeVolumes(podYAML.Spec.Volumes, configMaps, secretsManager)
if err != nil {
return nil, err
return nil, nil, err
}
// Go through the volumes and create a podman volume for all volumes that have been
@ -490,27 +509,27 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
// error out instead reuse the current volume.
vol, err = ic.Libpod.GetVolume(v.Source)
if err != nil {
return nil, fmt.Errorf("cannot re-use local volume for volume from configmap %q: %w", v.Source, err)
return nil, nil, fmt.Errorf("cannot re-use local volume for volume from configmap %q: %w", v.Source, err)
}
} else {
return nil, fmt.Errorf("cannot create a local volume for volume from configmap %q: %w", v.Source, err)
return nil, nil, fmt.Errorf("cannot create a local volume for volume from configmap %q: %w", v.Source, err)
}
}
mountPoint, err := vol.MountPoint()
if err != nil || mountPoint == "" {
return nil, fmt.Errorf("unable to get mountpoint of volume %q: %w", vol.Name(), err)
return nil, nil, fmt.Errorf("unable to get mountpoint of volume %q: %w", vol.Name(), err)
}
// Create files and add data to the volume mountpoint based on the Items in the volume
for k, v := range v.Items {
dataPath := filepath.Join(mountPoint, k)
f, err := os.Create(dataPath)
if err != nil {
return nil, fmt.Errorf("cannot create file %q at volume mountpoint %q: %w", k, mountPoint, err)
return nil, nil, fmt.Errorf("cannot create file %q at volume mountpoint %q: %w", k, mountPoint, err)
}
defer f.Close()
_, err = f.Write(v)
if err != nil {
return nil, err
return nil, nil, err
}
}
}
@ -518,7 +537,7 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
seccompPaths, err := kube.InitializeSeccompPaths(podYAML.ObjectMeta.Annotations, options.SeccompProfileRoot)
if err != nil {
return nil, err
return nil, nil, err
}
var ctrRestartPolicy string
@ -546,7 +565,7 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
err = specgenutil.FillOutSpecGen(podSpec.PodSpecGen.InfraContainerSpec, &infraOptions, []string{})
if err != nil {
return nil, err
return nil, nil, err
}
}
@ -557,12 +576,12 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
// Create the Pod
pod, err := generate.MakePod(&podSpec, ic.Libpod)
if err != nil {
return nil, err
return nil, nil, err
}
podInfraID, err := pod.InfraContainerID()
if err != nil {
return nil, err
return nil, nil, err
}
if !options.Quiet {
@ -578,7 +597,7 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
} else {
cwd, err = os.Getwd()
if err != nil {
return nil, err
return nil, nil, err
}
}
@ -586,16 +605,16 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
for _, initCtr := range podYAML.Spec.InitContainers {
// Error out if same name is used for more than one container
if _, ok := ctrNames[initCtr.Name]; ok {
return nil, fmt.Errorf("the pod %q is invalid; duplicate container name %q detected", podName, initCtr.Name)
return nil, nil, fmt.Errorf("the pod %q is invalid; duplicate container name %q detected", podName, initCtr.Name)
}
ctrNames[initCtr.Name] = ""
// Init containers cannot have either of lifecycle, livenessProbe, readinessProbe, or startupProbe set
if initCtr.Lifecycle != nil || initCtr.LivenessProbe != nil || initCtr.ReadinessProbe != nil || initCtr.StartupProbe != nil {
return nil, fmt.Errorf("cannot create an init container that has either of lifecycle, livenessProbe, readinessProbe, or startupProbe set")
return nil, nil, fmt.Errorf("cannot create an init container that has either of lifecycle, livenessProbe, readinessProbe, or startupProbe set")
}
pulledImage, labels, err := ic.getImageAndLabelInfo(ctx, cwd, annotations, writer, initCtr, options)
if err != nil {
return nil, err
return nil, nil, err
}
for k, v := range podSpec.PodSpecGen.Labels { // add podYAML labels
@ -628,17 +647,17 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
}
specGen, err := kube.ToSpecGen(ctx, &specgenOpts)
if err != nil {
return nil, err
return nil, nil, err
}
specGen.SdNotifyMode = define.SdNotifyModeIgnore
rtSpec, spec, opts, err := generate.MakeContainer(ctx, ic.Libpod, specGen, false, nil)
if err != nil {
return nil, err
return nil, nil, err
}
opts = append(opts, libpod.WithSdNotifyMode(define.SdNotifyModeIgnore))
ctr, err := generate.ExecuteCreate(ctx, ic.Libpod, rtSpec, spec, false, opts...)
if err != nil {
return nil, err
return nil, nil, err
}
initContainers = append(initContainers, ctr)
@ -649,12 +668,12 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
for _, container := range podYAML.Spec.Containers {
// Error out if the same name is used for more than one container
if _, ok := ctrNames[container.Name]; ok {
return nil, fmt.Errorf("the pod %q is invalid; duplicate container name %q detected", podName, container.Name)
return nil, nil, fmt.Errorf("the pod %q is invalid; duplicate container name %q detected", podName, container.Name)
}
ctrNames[container.Name] = ""
pulledImage, labels, err := ic.getImageAndLabelInfo(ctx, cwd, annotations, writer, container, options)
if err != nil {
return nil, err
return nil, nil, err
}
for k, v := range podSpec.PodSpecGen.Labels { // add podYAML labels
@ -683,18 +702,18 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
specGen, err := kube.ToSpecGen(ctx, &specgenOpts)
if err != nil {
return nil, err
return nil, nil, err
}
specGen.RawImageName = container.Image
rtSpec, spec, opts, err := generate.MakeContainer(ctx, ic.Libpod, specGen, false, nil)
if err != nil {
return nil, err
return nil, nil, err
}
sdNotifyMode := mainSdNotifyMode
ctrNotifyMode, err := getSdNotifyMode(annotations, container.Name)
if err != nil {
return nil, err
return nil, nil, err
}
if ctrNotifyMode != "" {
sdNotifyMode = ctrNotifyMode
@ -710,7 +729,7 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
if sdNotifyMode != "" && sdNotifyMode != define.SdNotifyModeIgnore {
proxy, err = notifyproxy.New("")
if err != nil {
return nil, err
return nil, nil, err
}
sdNotifyProxies = append(sdNotifyProxies, proxy)
opts = append(opts, libpod.WithSdNotifySocket(proxy.SocketPath()))
@ -718,7 +737,7 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
ctr, err := generate.ExecuteCreate(ctx, ic.Libpod, rtSpec, spec, false, opts...)
if err != nil {
return nil, err
return nil, nil, err
}
if proxy != nil {
proxy.AddContainer(ctr)
@ -730,7 +749,7 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
// Start the containers
podStartErrors, err := pod.Start(ctx)
if err != nil && !errors.Is(err, define.ErrPodPartialFail) {
return nil, err
return nil, nil, err
}
for id, err := range podStartErrors {
playKubePod.ContainerErrors = append(playKubePod.ContainerErrors, fmt.Errorf("starting container %s: %w", id, err).Error())
@ -743,8 +762,13 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
errors := make([]error, len(sdNotifyProxies))
for i := range sdNotifyProxies {
wg.Add(1)
defer func() {
if err := sdNotifyProxies[i].Close(); err != nil {
logrus.Errorf("Closing sdnotify proxy %q: %v", sdNotifyProxies[i].SocketPath(), err)
}
}()
go func(i int) {
err := sdNotifyProxies[i].WaitAndClose()
err := sdNotifyProxies[i].Wait()
if err != nil {
err = fmt.Errorf("waiting for sd-notify proxy: %w", err)
}
@ -755,7 +779,11 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
wg.Wait()
for _, err := range errors {
if err != nil {
return nil, err
// Close all proxies on error.
for _, proxy := range sdNotifyProxies {
_ = proxy.Close()
}
return nil, nil, err
}
}
}
@ -770,7 +798,7 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
report.Pods = append(report.Pods, playKubePod)
return &report, nil
return &report, sdNotifyProxies, nil
}
// getImageAndLabelInfo returns the image information and how the image should be pulled plus as well as labels to be used for the container in the pod.

View File

@ -14,6 +14,16 @@ import (
"github.com/containers/podman/v4/libpod/define"
"github.com/coreos/go-systemd/v22/daemon"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
)
const (
// All constants below are defined by systemd.
_notifyRcvbufSize = 8 * 1024 * 1024
_notifyBufferMax = 4096
_notifyFdMax = 768
_notifyBarrierMsg = "BARRIER=1"
_notifyRdyMsg = daemon.SdNotifyReady
)
// SendMessage sends the specified message to the specified socket.
@ -76,6 +86,10 @@ func New(tmpDir string) (*NotifyProxy, error) {
return nil, err
}
if err := conn.SetReadBuffer(_notifyRcvbufSize); err != nil {
return nil, fmt.Errorf("setting read buffer: %w", err)
}
errorChan := make(chan error, 1)
readyChan := make(chan bool, 1)
@ -100,34 +114,69 @@ func (p *NotifyProxy) waitForReady() {
go func() {
// Read until the `READY` message is received or the connection
// is closed.
const bufferSize = 1024
// See https://github.com/containers/podman/issues/16515 for a description of the protocol.
fdSize := unix.CmsgSpace(4)
buffer := make([]byte, _notifyBufferMax)
oob := make([]byte, _notifyFdMax*fdSize)
sBuilder := strings.Builder{}
for {
for {
buffer := make([]byte, bufferSize)
num, err := p.connection.Read(buffer)
if err != nil {
if !errors.Is(err, io.EOF) {
p.errorChan <- err
return
}
n, oobn, flags, _, err := p.connection.ReadMsgUnix(buffer, oob)
if err != nil {
if !errors.Is(err, io.EOF) {
p.errorChan <- err
return
}
sBuilder.Write(buffer[:num])
if num != bufferSize || buffer[num-1] == '\n' {
// Break as we read an entire line that
// we can inspect for the `READY`
// message.
break
logrus.Errorf("Error reading unix message on socket %q: %v", p.socketPath, err)
}
if n > _notifyBufferMax || oobn > _notifyFdMax*fdSize {
logrus.Errorf("Ignoring unix message on socket %q: incorrect number of bytes read (n=%d, oobn=%d)", p.socketPath, n, oobn)
continue
}
if flags&unix.MSG_CTRUNC != 0 {
logrus.Errorf("Ignoring unix message on socket %q: message truncated", p.socketPath)
continue
}
sBuilder.Reset()
sBuilder.Write(buffer[:n])
var isBarrier, isReady bool
for _, line := range strings.Split(sBuilder.String(), "\n") {
switch line {
case _notifyRdyMsg:
isReady = true
case _notifyBarrierMsg:
isBarrier = true
}
}
for _, line := range strings.Split(sBuilder.String(), "\n") {
if line == daemon.SdNotifyReady {
p.readyChan <- true
return
if isBarrier {
scms, err := unix.ParseSocketControlMessage(oob)
if err != nil {
logrus.Errorf("parsing control message on socket %q: %v", p.socketPath, err)
}
for _, scm := range scms {
fds, err := unix.ParseUnixRights(&scm)
if err != nil {
logrus.Errorf("parsing unix rights of control message on socket %q: %v", p.socketPath, err)
continue
}
for _, fd := range fds {
if err := unix.Close(fd); err != nil {
logrus.Errorf("closing fd passed on socket %q: %v", fd, err)
continue
}
}
}
continue
}
if isReady {
p.readyChan <- true
}
sBuilder.Reset()
}
}()
}
@ -137,8 +186,8 @@ func (p *NotifyProxy) SocketPath() string {
return p.socketPath
}
// close closes the listener and removes the socket.
func (p *NotifyProxy) close() error {
// Close closes the listener and removes the socket.
func (p *NotifyProxy) Close() error {
defer os.Remove(p.socketPath)
return p.connection.Close()
}
@ -158,20 +207,12 @@ type Container interface {
ID() string
}
// WaitAndClose waits until receiving the `READY` notify message and close the
// listener. Note that the this function must only be executed inside a systemd
// service which will kill the process after a given timeout.
// If the (optional) container stopped running before the `READY` is received,
// the waiting gets canceled and ErrNoReadyMessage is returned.
func (p *NotifyProxy) WaitAndClose() error {
defer func() {
// Closing the socket/connection makes sure that the other
// goroutine reading/waiting for the READY message returns.
if err := p.close(); err != nil {
logrus.Errorf("Closing notify proxy: %v", err)
}
}()
// WaitAndClose waits until receiving the `READY` notify message. Note that the
// this function must only be executed inside a systemd service which will kill
// the process after a given timeout. If the (optional) container stopped
// running before the `READY` is received, the waiting gets canceled and
// ErrNoReadyMessage is returned.
func (p *NotifyProxy) Wait() error {
// If the proxy has a container we need to watch it as it may exit
// without sending a READY message. The goroutine below returns when
// the container exits OR when the function returns (see deferred the

View File

@ -18,7 +18,7 @@ func TestNotifyProxy(t *testing.T) {
proxy, err := New("")
require.NoError(t, err)
require.FileExists(t, proxy.SocketPath())
require.NoError(t, proxy.close())
require.NoError(t, proxy.Close())
require.NoFileExists(t, proxy.SocketPath())
}
@ -28,9 +28,12 @@ func TestWaitAndClose(t *testing.T) {
require.FileExists(t, proxy.SocketPath())
ch := make(chan error)
defer func() {
err := proxy.Close()
require.NoError(t, err, "proxy should close successfully")
}()
go func() {
ch <- proxy.WaitAndClose()
ch <- proxy.Wait()
}()
sendMessage(t, proxy, "foo\n")