mirror of
https://github.com/containers/podman.git
synced 2025-08-06 11:32:07 +08:00
add pkg/systemd/notifyproxy
Add a new package for proxying notify sockets and waiting for the READY=1 message to appear. May subject to further changes in future commits. Tests make sure that it behaves properly. Signed-off-by: Valentin Rothberg <vrothberg@redhat.com>
This commit is contained in:
94
pkg/systemd/notifyproxy/notifyproxy.go
Normal file
94
pkg/systemd/notifyproxy/notifyproxy.go
Normal file
@ -0,0 +1,94 @@
|
||||
package notifyproxy
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/coreos/go-systemd/v22/daemon"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// SendMessage sends the specified message to the specified socket.
|
||||
func SendMessage(socketPath string, message string) error {
|
||||
socketAddr := &net.UnixAddr{
|
||||
Name: socketPath,
|
||||
Net: "unixgram",
|
||||
}
|
||||
conn, err := net.DialUnix(socketAddr.Net, nil, socketAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
_, err = conn.Write([]byte(message))
|
||||
return err
|
||||
}
|
||||
|
||||
// NotifyProxy can be used to proxy notify messages.
|
||||
type NotifyProxy struct {
|
||||
connection *net.UnixConn
|
||||
socketPath string
|
||||
}
|
||||
|
||||
// New creates a NotifyProxy. The specified temp directory can be left empty.
|
||||
func New(tmpDir string) (*NotifyProxy, error) {
|
||||
tempFile, err := ioutil.TempFile(tmpDir, "-podman-notify-proxy.sock")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer tempFile.Close()
|
||||
|
||||
socketPath := tempFile.Name()
|
||||
if err := syscall.Unlink(socketPath); err != nil { // Unlink the socket so we can bind it
|
||||
return nil, err
|
||||
}
|
||||
|
||||
socketAddr := &net.UnixAddr{
|
||||
Name: socketPath,
|
||||
Net: "unixgram",
|
||||
}
|
||||
conn, err := net.ListenUnixgram(socketAddr.Net, socketAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &NotifyProxy{connection: conn, socketPath: socketPath}, nil
|
||||
}
|
||||
|
||||
// SocketPath returns the path of the socket the proxy is listening on.
|
||||
func (p *NotifyProxy) SocketPath() string {
|
||||
return p.socketPath
|
||||
}
|
||||
|
||||
// close closes the listener and removes the socket.
|
||||
func (p *NotifyProxy) close() error {
|
||||
defer os.Remove(p.socketPath)
|
||||
return p.connection.Close()
|
||||
}
|
||||
|
||||
// WaitAndClose waits until receiving the `READY` notify message and close the
|
||||
// listener. Note that the this function must only be executed inside a systemd
|
||||
// service which will kill the process after a given timeout.
|
||||
func (p *NotifyProxy) WaitAndClose() error {
|
||||
defer func() {
|
||||
if err := p.close(); err != nil {
|
||||
logrus.Errorf("Closing notify proxy: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
buf := make([]byte, 1024)
|
||||
num, err := p.connection.Read(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, s := range strings.Split(string(buf[:num]), "\n") {
|
||||
if s == daemon.SdNotifyReady {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
58
pkg/systemd/notifyproxy/notifyproxy_test.go
Normal file
58
pkg/systemd/notifyproxy/notifyproxy_test.go
Normal file
@ -0,0 +1,58 @@
|
||||
package notifyproxy
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/go-systemd/v22/daemon"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// Helper function to send the specified message over the socket of the proxy.
|
||||
func sendMessage(t *testing.T, proxy *NotifyProxy, message string) {
|
||||
err := SendMessage(proxy.SocketPath(), message)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestNotifyProxy(t *testing.T) {
|
||||
proxy, err := New("")
|
||||
require.NoError(t, err)
|
||||
require.FileExists(t, proxy.SocketPath())
|
||||
require.NoError(t, proxy.close())
|
||||
require.NoFileExists(t, proxy.SocketPath())
|
||||
}
|
||||
|
||||
func TestWaitAndClose(t *testing.T) {
|
||||
proxy, err := New("")
|
||||
require.NoError(t, err)
|
||||
require.FileExists(t, proxy.SocketPath())
|
||||
|
||||
ch := make(chan error)
|
||||
|
||||
go func() {
|
||||
ch <- proxy.WaitAndClose()
|
||||
}()
|
||||
|
||||
sendMessage(t, proxy, "foo\n")
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
select {
|
||||
case err := <-ch:
|
||||
t.Fatalf("Should stil be waiting but received %v", err)
|
||||
default:
|
||||
}
|
||||
|
||||
sendMessage(t, proxy, daemon.SdNotifyReady+"\nsomething else")
|
||||
done := func() bool {
|
||||
for i := 0; i < 10; i++ {
|
||||
select {
|
||||
case err := <-ch:
|
||||
require.NoError(t, err, "Waiting should succeed")
|
||||
return true
|
||||
default:
|
||||
time.Sleep(time.Duration(i*250) * time.Millisecond)
|
||||
}
|
||||
}
|
||||
return false
|
||||
}()
|
||||
require.True(t, done, "READY MESSAGE SHOULD HAVE ARRIVED")
|
||||
}
|
Reference in New Issue
Block a user