play kube: notifyproxy: listen before starting the pod

Starting listening for the READY messages on the sdnotify proxies before
starting the Pod.  Otherwise, we may be missing messages.

[NO NEW TESTS NEEDED] as it's hard to test this very narrow race.

Related to but may not be fixing #16076.

Signed-off-by: Valentin Rothberg <vrothberg@redhat.com>
This commit is contained in:
Valentin Rothberg
2022-10-11 16:40:40 +02:00
parent bb0b1849d7
commit 7b84a3a434

View File

@ -45,9 +45,16 @@ type NotifyProxy struct {
connection *net.UnixConn connection *net.UnixConn
socketPath string socketPath string
container Container // optional container Container // optional
// Channels for synchronizing the goroutine waiting for the READY
// message and the one checking if the optional container is still
// running.
errorChan chan error
readyChan chan bool
} }
// New creates a NotifyProxy. The specified temp directory can be left empty. // New creates a NotifyProxy that starts listening immediately. The specified
// temp directory can be left empty.
func New(tmpDir string) (*NotifyProxy, error) { func New(tmpDir string) (*NotifyProxy, error) {
tempFile, err := os.CreateTemp(tmpDir, "-podman-notify-proxy.sock") tempFile, err := os.CreateTemp(tmpDir, "-podman-notify-proxy.sock")
if err != nil { if err != nil {
@ -69,7 +76,60 @@ func New(tmpDir string) (*NotifyProxy, error) {
return nil, err return nil, err
} }
return &NotifyProxy{connection: conn, socketPath: socketPath}, nil errorChan := make(chan error, 1)
readyChan := make(chan bool, 1)
proxy := &NotifyProxy{
connection: conn,
socketPath: socketPath,
errorChan: errorChan,
readyChan: readyChan,
}
// Start waiting for the READY message in the background. This way,
// the proxy can be created prior to starting the container and
// circumvents a race condition on writing/reading on the socket.
proxy.waitForReady()
return proxy, nil
}
// waitForReady waits for the READY message in the background. The goroutine
// returns on receiving READY or when the socket is closed.
func (p *NotifyProxy) waitForReady() {
go func() {
// Read until the `READY` message is received or the connection
// is closed.
const bufferSize = 1024
sBuilder := strings.Builder{}
for {
for {
buffer := make([]byte, bufferSize)
num, err := p.connection.Read(buffer)
if err != nil {
if !errors.Is(err, io.EOF) {
p.errorChan <- err
return
}
}
sBuilder.Write(buffer[:num])
if num != bufferSize || buffer[num-1] == '\n' {
// Break as we read an entire line that
// we can inspect for the `READY`
// message.
break
}
}
for _, line := range strings.Split(sBuilder.String(), "\n") {
if line == daemon.SdNotifyReady {
p.readyChan <- true
return
}
}
sBuilder.Reset()
}
}()
} }
// SocketPath returns the path of the socket the proxy is listening on. // SocketPath returns the path of the socket the proxy is listening on.
@ -105,54 +165,21 @@ type Container interface {
// the waiting gets canceled and ErrNoReadyMessage is returned. // the waiting gets canceled and ErrNoReadyMessage is returned.
func (p *NotifyProxy) WaitAndClose() error { func (p *NotifyProxy) WaitAndClose() error {
defer func() { defer func() {
// Closing the socket/connection makes sure that the other
// goroutine reading/waiting for the READY message returns.
if err := p.close(); err != nil { if err := p.close(); err != nil {
logrus.Errorf("Closing notify proxy: %v", err) logrus.Errorf("Closing notify proxy: %v", err)
} }
}() }()
// Since reading from the connection is blocking, we need to spin up two // If the proxy has a container we need to watch it as it may exit
// goroutines. One waiting for the `READY` message, the other waiting // without sending a READY message. The goroutine below returns when
// for the container to stop running. // the container exits OR when the function returns (see deferred the
errorChan := make(chan error, 1) // cancel()) in which case we either we've either received the READY
readyChan := make(chan bool, 1) // message or encountered an error reading from the socket.
go func() {
// Read until the `READY` message is received or the connection
// is closed.
const bufferSize = 1024
sBuilder := strings.Builder{}
for {
for {
buffer := make([]byte, bufferSize)
num, err := p.connection.Read(buffer)
if err != nil {
if !errors.Is(err, io.EOF) {
errorChan <- err
return
}
}
sBuilder.Write(buffer[:num])
if num != bufferSize || buffer[num-1] == '\n' {
// Break as we read an entire line that
// we can inspect for the `READY`
// message.
break
}
}
for _, line := range strings.Split(sBuilder.String(), "\n") {
if line == daemon.SdNotifyReady {
readyChan <- true
return
}
}
sBuilder.Reset()
}
}()
if p.container != nil { if p.container != nil {
// Create a cancellable context to make sure the goroutine // Create a cancellable context to make sure the goroutine
// below terminates. // below terminates on function return.
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
go func() { go func() {
@ -162,11 +189,11 @@ func (p *NotifyProxy) WaitAndClose() error {
default: default:
state, err := p.container.State() state, err := p.container.State()
if err != nil { if err != nil {
errorChan <- err p.errorChan <- err
return return
} }
if state != define.ContainerStateRunning { if state != define.ContainerStateRunning {
errorChan <- fmt.Errorf("%w: %s", ErrNoReadyMessage, p.container.ID()) p.errorChan <- fmt.Errorf("%w: %s", ErrNoReadyMessage, p.container.ID())
return return
} }
time.Sleep(time.Second) time.Sleep(time.Second)
@ -176,9 +203,9 @@ func (p *NotifyProxy) WaitAndClose() error {
// Wait for the ready/error channel. // Wait for the ready/error channel.
select { select {
case <-readyChan: case <-p.readyChan:
return nil return nil
case err := <-errorChan: case err := <-p.errorChan:
return err return err
} }
} }