From 1071098ee24f2764946c72065b3e422eddc47666 Mon Sep 17 00:00:00 2001
From: Valentin Rothberg <vrothberg@redhat.com>
Date: Thu, 15 Sep 2022 14:15:15 +0200
Subject: [PATCH 1/2] kube play: sdnotify proxy: use a wait group

Use a wait group to a) wait for all proxies in parallel
                    b) avoid the potential for ABBA deadlocks

[NO NEW TESTS NEEDED] as it is not changing functionality

Signed-off-by: Valentin Rothberg <vrothberg@redhat.com>
---
 pkg/domain/infra/abi/play.go | 22 +++++++++++++++++++---
 1 file changed, 19 insertions(+), 3 deletions(-)

diff --git a/pkg/domain/infra/abi/play.go b/pkg/domain/infra/abi/play.go
index d447b4d001..a65403fb4f 100644
--- a/pkg/domain/infra/abi/play.go
+++ b/pkg/domain/infra/abi/play.go
@@ -11,6 +11,7 @@ import (
 	"path/filepath"
 	"strconv"
 	"strings"
+	"sync"
 
 	buildahDefine "github.com/containers/buildah/define"
 	"github.com/containers/common/libimage"
@@ -699,9 +700,24 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
 			fmt.Println(playKubePod.ContainerErrors)
 		}
 
-		// Wait for each proxy to receive a READY message.
-		for _, proxy := range sdNotifyProxies {
-			if err := proxy.WaitAndClose(); err != nil {
+		// Wait for each proxy to receive a READY message. Use a wait
+		// group to prevent the potential for ABBA kinds of deadlocks.
+		var wg sync.WaitGroup
+		errors := make([]error, len(sdNotifyProxies))
+		for i := range sdNotifyProxies {
+			wg.Add(1)
+			go func(i int) {
+				err := sdNotifyProxies[i].WaitAndClose()
+				if err != nil {
+					err = fmt.Errorf("waiting for sd-notify proxy: %w", err)
+				}
+				errors[i] = err
+				wg.Done()
+			}(i)
+		}
+		wg.Wait()
+		for _, err := range errors {
+			if err != nil {
 				return nil, err
 			}
 		}

From 4a053a821aab8891498cb5dd3f01ce3437fdf0ef Mon Sep 17 00:00:00 2001
From: Valentin Rothberg <vrothberg@redhat.com>
Date: Thu, 15 Sep 2022 15:54:30 +0200
Subject: [PATCH 2/2] notifyproxy: don't set a read deadline

The read deadline may yield the READY message to be lost in space.
Instead, use a more Go-idiomatic alternative by using two goroutines;
one reading from the connection, the other watching the container.

[NO NEW TESTS NEEDED] since existing tests are exercising this
functionality already.

Fixes: #15800
Signed-off-by: Valentin Rothberg <vrothberg@redhat.com>
---
 pkg/systemd/notifyproxy/notifyproxy.go | 96 +++++++++++++++++---------
 1 file changed, 62 insertions(+), 34 deletions(-)

diff --git a/pkg/systemd/notifyproxy/notifyproxy.go b/pkg/systemd/notifyproxy/notifyproxy.go
index 1bfab9ca07..32fea48bfb 100644
--- a/pkg/systemd/notifyproxy/notifyproxy.go
+++ b/pkg/systemd/notifyproxy/notifyproxy.go
@@ -1,6 +1,7 @@
 package notifyproxy
 
 import (
+	"context"
 	"errors"
 	"fmt"
 	"io"
@@ -110,48 +111,75 @@ func (p *NotifyProxy) WaitAndClose() error {
 		}
 	}()
 
-	const bufferSize = 1024
-	sBuilder := strings.Builder{}
-	for {
-		// Set a read deadline of one second such that we achieve a
-		// non-blocking read and can check if the container has already
-		// stopped running; in that case no READY message will be send
-		// and we're done.
-		if err := p.connection.SetReadDeadline(time.Now().Add(time.Second)); err != nil {
-			return err
-		}
+	// Since reading from the connection is blocking, we need to spin up two
+	// goroutines.  One waiting for the `READY` message, the other waiting
+	// for the container to stop running.
+	errorChan := make(chan error, 1)
+	readyChan := make(chan bool, 1)
 
+	go func() {
+		// Read until the `READY` message is received or the connection
+		// is closed.
+		const bufferSize = 1024
+		sBuilder := strings.Builder{}
 		for {
-			buffer := make([]byte, bufferSize)
-			num, err := p.connection.Read(buffer)
-			if err != nil {
-				if !errors.Is(err, os.ErrDeadlineExceeded) && !errors.Is(err, io.EOF) {
-					return err
+			for {
+				buffer := make([]byte, bufferSize)
+				num, err := p.connection.Read(buffer)
+				if err != nil {
+					if !errors.Is(err, io.EOF) {
+						errorChan <- err
+						return
+					}
+				}
+				sBuilder.Write(buffer[:num])
+				if num != bufferSize || buffer[num-1] == '\n' {
+					// Break as we read an entire line that
+					// we can inspect for the `READY`
+					// message.
+					break
 				}
 			}
-			sBuilder.Write(buffer[:num])
-			if num != bufferSize || buffer[num-1] == '\n' {
-				break
+
+			for _, line := range strings.Split(sBuilder.String(), "\n") {
+				if line == daemon.SdNotifyReady {
+					readyChan <- true
+					return
+				}
 			}
+			sBuilder.Reset()
 		}
+	}()
 
-		for _, line := range strings.Split(sBuilder.String(), "\n") {
-			if line == daemon.SdNotifyReady {
-				return nil
+	if p.container != nil {
+		// Create a cancellable context to make sure the goroutine
+		// below terminates.
+		ctx, cancel := context.WithCancel(context.Background())
+		defer cancel()
+		go func() {
+			select {
+			case <-ctx.Done():
+				return
+			default:
+				state, err := p.container.State()
+				if err != nil {
+					errorChan <- err
+					return
+				}
+				if state != define.ContainerStateRunning {
+					errorChan <- fmt.Errorf("%w: %s", ErrNoReadyMessage, p.container.ID())
+					return
+				}
+				time.Sleep(time.Second)
 			}
-		}
-		sBuilder.Reset()
+		}()
+	}
 
-		if p.container == nil {
-			continue
-		}
-
-		state, err := p.container.State()
-		if err != nil {
-			return err
-		}
-		if state != define.ContainerStateRunning {
-			return fmt.Errorf("%w: %s", ErrNoReadyMessage, p.container.ID())
-		}
+	// Wait for the ready/error channel.
+	select {
+	case <-readyChan:
+		return nil
+	case err := <-errorChan:
+		return err
 	}
 }