Merge pull request #15820 from vrothberg/fix-15800

kube: notifyproxy: fix lost READY message
This commit is contained in:
OpenShift Merge Robot
2022-09-26 13:37:40 +02:00
committed by GitHub
2 changed files with 81 additions and 37 deletions

View File

@ -10,6 +10,7 @@ import (
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync"
buildahDefine "github.com/containers/buildah/define" buildahDefine "github.com/containers/buildah/define"
"github.com/containers/common/libimage" "github.com/containers/common/libimage"
@ -698,9 +699,24 @@ func (ic *ContainerEngine) playKubePod(ctx context.Context, podName string, podY
fmt.Println(playKubePod.ContainerErrors) fmt.Println(playKubePod.ContainerErrors)
} }
// Wait for each proxy to receive a READY message. // Wait for each proxy to receive a READY message. Use a wait
for _, proxy := range sdNotifyProxies { // group to prevent the potential for ABBA kinds of deadlocks.
if err := proxy.WaitAndClose(); err != nil { var wg sync.WaitGroup
errors := make([]error, len(sdNotifyProxies))
for i := range sdNotifyProxies {
wg.Add(1)
go func(i int) {
err := sdNotifyProxies[i].WaitAndClose()
if err != nil {
err = fmt.Errorf("waiting for sd-notify proxy: %w", err)
}
errors[i] = err
wg.Done()
}(i)
}
wg.Wait()
for _, err := range errors {
if err != nil {
return nil, err return nil, err
} }
} }

View File

@ -1,6 +1,7 @@
package notifyproxy package notifyproxy
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -109,48 +110,75 @@ func (p *NotifyProxy) WaitAndClose() error {
} }
}() }()
// Since reading from the connection is blocking, we need to spin up two
// goroutines. One waiting for the `READY` message, the other waiting
// for the container to stop running.
errorChan := make(chan error, 1)
readyChan := make(chan bool, 1)
go func() {
// Read until the `READY` message is received or the connection
// is closed.
const bufferSize = 1024 const bufferSize = 1024
sBuilder := strings.Builder{} sBuilder := strings.Builder{}
for { for {
// Set a read deadline of one second such that we achieve a
// non-blocking read and can check if the container has already
// stopped running; in that case no READY message will be send
// and we're done.
if err := p.connection.SetReadDeadline(time.Now().Add(time.Second)); err != nil {
return err
}
for { for {
buffer := make([]byte, bufferSize) buffer := make([]byte, bufferSize)
num, err := p.connection.Read(buffer) num, err := p.connection.Read(buffer)
if err != nil { if err != nil {
if !errors.Is(err, os.ErrDeadlineExceeded) && !errors.Is(err, io.EOF) { if !errors.Is(err, io.EOF) {
return err errorChan <- err
return
} }
} }
sBuilder.Write(buffer[:num]) sBuilder.Write(buffer[:num])
if num != bufferSize || buffer[num-1] == '\n' { if num != bufferSize || buffer[num-1] == '\n' {
// Break as we read an entire line that
// we can inspect for the `READY`
// message.
break break
} }
} }
for _, line := range strings.Split(sBuilder.String(), "\n") { for _, line := range strings.Split(sBuilder.String(), "\n") {
if line == daemon.SdNotifyReady { if line == daemon.SdNotifyReady {
return nil readyChan <- true
return
} }
} }
sBuilder.Reset() sBuilder.Reset()
if p.container == nil {
continue
} }
}()
if p.container != nil {
// Create a cancellable context to make sure the goroutine
// below terminates.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
case <-ctx.Done():
return
default:
state, err := p.container.State() state, err := p.container.State()
if err != nil { if err != nil {
return err errorChan <- err
return
} }
if state != define.ContainerStateRunning { if state != define.ContainerStateRunning {
return fmt.Errorf("%w: %s", ErrNoReadyMessage, p.container.ID()) errorChan <- fmt.Errorf("%w: %s", ErrNoReadyMessage, p.container.ID())
} return
}
time.Sleep(time.Second)
}
}()
}
// Wait for the ready/error channel.
select {
case <-readyChan:
return nil
case err := <-errorChan:
return err
} }
} }