notifyproxy: don't set a read deadline

The read deadline may yield the READY message to be lost in space.
Instead, use a more Go-idiomatic alternative by using two goroutines;
one reading from the connection, the other watching the container.

[NO NEW TESTS NEEDED] since existing tests are exercising this
functionality already.

Fixes: #15800
Signed-off-by: Valentin Rothberg <vrothberg@redhat.com>
This commit is contained in:
Valentin Rothberg
2022-09-15 15:54:30 +02:00
parent 1071098ee2
commit 4a053a821a

View File

@ -1,6 +1,7 @@
package notifyproxy package notifyproxy
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -110,48 +111,75 @@ func (p *NotifyProxy) WaitAndClose() error {
} }
}() }()
const bufferSize = 1024 // Since reading from the connection is blocking, we need to spin up two
sBuilder := strings.Builder{} // goroutines. One waiting for the `READY` message, the other waiting
for { // for the container to stop running.
// Set a read deadline of one second such that we achieve a errorChan := make(chan error, 1)
// non-blocking read and can check if the container has already readyChan := make(chan bool, 1)
// stopped running; in that case no READY message will be send
// and we're done.
if err := p.connection.SetReadDeadline(time.Now().Add(time.Second)); err != nil {
return err
}
go func() {
// Read until the `READY` message is received or the connection
// is closed.
const bufferSize = 1024
sBuilder := strings.Builder{}
for { for {
buffer := make([]byte, bufferSize) for {
num, err := p.connection.Read(buffer) buffer := make([]byte, bufferSize)
if err != nil { num, err := p.connection.Read(buffer)
if !errors.Is(err, os.ErrDeadlineExceeded) && !errors.Is(err, io.EOF) { if err != nil {
return err if !errors.Is(err, io.EOF) {
errorChan <- err
return
}
}
sBuilder.Write(buffer[:num])
if num != bufferSize || buffer[num-1] == '\n' {
// Break as we read an entire line that
// we can inspect for the `READY`
// message.
break
} }
} }
sBuilder.Write(buffer[:num])
if num != bufferSize || buffer[num-1] == '\n' { for _, line := range strings.Split(sBuilder.String(), "\n") {
break if line == daemon.SdNotifyReady {
readyChan <- true
return
}
} }
sBuilder.Reset()
} }
}()
for _, line := range strings.Split(sBuilder.String(), "\n") { if p.container != nil {
if line == daemon.SdNotifyReady { // Create a cancellable context to make sure the goroutine
return nil // below terminates.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
case <-ctx.Done():
return
default:
state, err := p.container.State()
if err != nil {
errorChan <- err
return
}
if state != define.ContainerStateRunning {
errorChan <- fmt.Errorf("%w: %s", ErrNoReadyMessage, p.container.ID())
return
}
time.Sleep(time.Second)
} }
} }()
sBuilder.Reset() }
if p.container == nil { // Wait for the ready/error channel.
continue select {
} case <-readyChan:
return nil
state, err := p.container.State() case err := <-errorChan:
if err != nil { return err
return err
}
if state != define.ContainerStateRunning {
return fmt.Errorf("%w: %s", ErrNoReadyMessage, p.container.ID())
}
} }
} }