From ce9c1b9b86f14aa10c713f719509421e28cbee44 Mon Sep 17 00:00:00 2001 From: Jake Correnti Date: Tue, 17 Oct 2023 15:15:54 -0400 Subject: [PATCH] Refactor machine socket mapping Refactors machine socket mapping to prevent using similar/the same code paths. Moves the shared code to `pkg/machine/sockets.go`. [NO NEW TESTS NEEDED] Signed-off-by: Jake Correnti --- pkg/machine/applehv/machine.go | 37 +++++---------- pkg/machine/hyperv/vsock.go | 27 ++++++----- pkg/machine/qemu/config.go | 7 ++- pkg/machine/qemu/machine.go | 67 ++------------------------ pkg/machine/sockets.go | 87 ++++++++++++++++++++++++++++++++++ 5 files changed, 124 insertions(+), 101 deletions(-) create mode 100644 pkg/machine/sockets.go diff --git a/pkg/machine/applehv/machine.go b/pkg/machine/applehv/machine.go index cb3b2243af..cd7c2b1fc9 100644 --- a/pkg/machine/applehv/machine.go +++ b/pkg/machine/applehv/machine.go @@ -4,7 +4,6 @@ package applehv import ( - "bufio" "encoding/json" "errors" "fmt" @@ -89,16 +88,9 @@ func (m *MacMachine) setGVProxyInfo(runtimeDir string) error { if err != nil { return err } - - gvProxySock, err := machine.NewMachineFile(filepath.Join(runtimeDir, "gvproxy.sock"), nil) - if err != nil { - return err - } - m.GvProxyPid = *gvProxyPid - m.GvProxySock = *gvProxySock - return nil + return machine.SetSocket(&m.GvProxySock, filepath.Join(runtimeDir, "gvproxy.sock"), nil) } // setVfkitInfo stores the default devices, sets the vfkit endpoint, and @@ -237,11 +229,9 @@ func (m *MacMachine) Init(opts machine.InitOptions) (bool, error) { return false, err } - readySocket, err := machine.NewMachineFile(filepath.Join(runtimeDir, fmt.Sprintf("%s_ready.sock", m.Name)), nil) - if err != nil { + if err := machine.SetSocket(&m.ReadySocket, machine.ReadySocketPath(runtimeDir, m.Name), nil); err != nil { return false, err } - m.ReadySocket = *readySocket if err = m.setGVProxyInfo(runtimeDir); err != nil { return false, err @@ -679,25 +669,22 @@ func (m *MacMachine) Start(name string, opts machine.StartOptions) error { logrus.Debug("waiting for ready notification") var conn net.Conn readyChan := make(chan error) - go func() { - conn, err = readyListen.Accept() - if err != nil { - logrus.Error(err) - } - _, err = bufio.NewReader(conn).ReadString('\n') - readyChan <- err - }() + connChan := make(chan net.Conn) + go machine.ListenAndWaitOnSocket(readyChan, connChan, readyListen) if err := cmd.Start(); err != nil { return err } err = <-readyChan - defer func() { - if closeErr := conn.Close(); closeErr != nil { - logrus.Error(closeErr) - } - }() + conn = <-connChan + if conn != nil { + defer func() { + if closeErr := conn.Close(); closeErr != nil { + logrus.Error(closeErr) + } + }() + } if err != nil { return err } diff --git a/pkg/machine/hyperv/vsock.go b/pkg/machine/hyperv/vsock.go index 13136d4b1a..17461c22a3 100644 --- a/pkg/machine/hyperv/vsock.go +++ b/pkg/machine/hyperv/vsock.go @@ -4,12 +4,13 @@ package hyperv import ( - "bufio" "errors" "fmt" + "net" "strings" "github.com/Microsoft/go-winio" + "github.com/containers/podman/v4/pkg/machine" "github.com/containers/podman/v4/utils" "github.com/sirupsen/logrus" "golang.org/x/sys/windows/registry" @@ -255,16 +256,18 @@ func (hv *HVSockRegistryEntry) Listen() error { logrus.Error(err) } }() - conn, err := listener.Accept() - if err != nil { - return err + + errChan := make(chan error) + connChan := make(chan net.Conn) + go machine.ListenAndWaitOnSocket(errChan, connChan, listener) + conn := <-connChan + + if conn != nil { + defer func() { + if err := conn.Close(); err != nil { + logrus.Error(err) + } + }() } - defer func() { - if err := conn.Close(); err != nil { - logrus.Error(err) - } - }() - // Right now we just listen for anything down the pipe (like qemu) - _, err = bufio.NewReader(conn).ReadString('\n') - return err + return <-errChan } diff --git a/pkg/machine/qemu/config.go b/pkg/machine/qemu/config.go index 10111b8891..57066c10cb 100644 --- a/pkg/machine/qemu/config.go +++ b/pkg/machine/qemu/config.go @@ -120,7 +120,12 @@ func (p *QEMUVirtualization) NewMachine(opts machine.InitOptions) (machine.VM, e return nil, err } - if err := vm.setReadySocket(); err != nil { + runtimeDir, err := getRuntimeDir() + if err != nil { + return nil, err + } + symlink := vm.Name + "_ready.sock" + if err := machine.SetSocket(&vm.ReadySocket, machine.ReadySocketPath(runtimeDir+"/podman/", vm.Name), &symlink); err != nil { return nil, err } diff --git a/pkg/machine/qemu/machine.go b/pkg/machine/qemu/machine.go index 0c0bbbb044..f103a086c9 100644 --- a/pkg/machine/qemu/machine.go +++ b/pkg/machine/qemu/machine.go @@ -133,7 +133,8 @@ func migrateVM(configPath string, config []byte, vm *MachineVM) error { } // setReadySocket will stick the entry into the new struct - if err := vm.setReadySocket(); err != nil { + symlink := vm.Name + "_ready.sock" + if err := machine.SetSocket(&vm.ReadySocket, machine.ReadySocketPath(socketPath+"/podman/", vm.Name), &symlink); err != nil { return err } @@ -513,52 +514,6 @@ func runStartVMCommand(cmd *exec.Cmd) error { return nil } -// connectToQMPMonitorSocket attempts to connect to the QMP Monitor Socket after -// `maxBackoffs` attempts -func (v *MachineVM) connectToQMPMonitorSocket(maxBackoffs int, backoff time.Duration) (conn net.Conn, err error) { - for i := 0; i < maxBackoffs; i++ { - if i > 0 { - time.Sleep(backoff) - backoff *= 2 - } - conn, err = net.Dial("unix", v.QMPMonitor.Address.Path) - if err == nil { - break - } - } - return -} - -// connectToPodmanSocket attempts to connect to the podman socket after -// `maxBackoffs` attempts. -func (v *MachineVM) connectToPodmanSocket(maxBackoffs int, backoff time.Duration, qemuPID int, errBuf *bytes.Buffer) (conn net.Conn, dialErr error) { - socketPath, err := getRuntimeDir() - if err != nil { - return nil, err - } - - // The socket is not made until the qemu process is running so here - // we do a backoff waiting for it. Once we have a conn, we break and - // then wait to read it. - for i := 0; i < maxBackoffs; i++ { - if i > 0 { - time.Sleep(backoff) - backoff *= 2 - } - conn, dialErr = net.Dial("unix", filepath.Join(socketPath, "podman", v.Name+"_ready.sock")) - if dialErr == nil { - break - } - // check if qemu is still alive - err := checkProcessStatus("qemu", qemuPID, errBuf) - if err != nil { - return nil, err - } - } - - return -} - // qemuPid returns -1 or the PID of the running QEMU instance. func (v *MachineVM) qemuPid() (int, error) { pidData, err := os.ReadFile(v.VMPidFilePath.GetPath()) @@ -669,7 +624,7 @@ func (v *MachineVM) Start(name string, opts machine.StartOptions) error { return err } - qemuSocketConn, err = v.connectToQMPMonitorSocket(maxBackoffs, defaultBackoff) + qemuSocketConn, err = machine.DialSocketWithBackoffs(maxBackoffs, defaultBackoff, v.QMPMonitor.Address.Path) if err != nil { return err } @@ -724,7 +679,7 @@ func (v *MachineVM) Start(name string, opts machine.StartOptions) error { fmt.Println("Waiting for VM ...") } - conn, err = v.connectToPodmanSocket(maxBackoffs, defaultBackoff, cmd.Process.Pid, stderrBuf) + conn, err = machine.DialSocketWithBackoffsAndProcCheck(maxBackoffs, defaultBackoff, v.ReadySocket.GetPath(), checkProcessStatus, "qemu", cmd.Process.Pid, stderrBuf) if err != nil { return err } @@ -1422,20 +1377,6 @@ func (v *MachineVM) setConfigPath() error { return nil } -func (v *MachineVM) setReadySocket() error { - readySocketName := v.Name + "_ready.sock" - rtPath, err := getRuntimeDir() - if err != nil { - return err - } - virtualSocketPath, err := machine.NewMachineFile(filepath.Join(rtPath, "podman", readySocketName), &readySocketName) - if err != nil { - return err - } - v.ReadySocket = *virtualSocketPath - return nil -} - func (v *MachineVM) setPIDSocket() error { rtPath, err := getRuntimeDir() if err != nil { diff --git a/pkg/machine/sockets.go b/pkg/machine/sockets.go new file mode 100644 index 0000000000..8c53448340 --- /dev/null +++ b/pkg/machine/sockets.go @@ -0,0 +1,87 @@ +package machine + +import ( + "bufio" + "bytes" + "fmt" + "net" + "path/filepath" + "time" +) + +// SetSocket creates a new machine file for the socket and assigns it to +// `socketLoc` +func SetSocket(socketLoc *VMFile, path string, symlink *string) error { + socket, err := NewMachineFile(path, symlink) + if err != nil { + return err + } + *socketLoc = *socket + return nil +} + +// ReadySocketPath returns the filepath for the ready socket +func ReadySocketPath(runtimeDir, machineName string) string { + return filepath.Join(runtimeDir, fmt.Sprintf("%s_ready.sock", machineName)) +} + +// ListenAndWaitOnSocket waits for a new connection to the listener and sends +// any error back through the channel. ListenAndWaitOnSocket is intended to be +// used as a goroutine +func ListenAndWaitOnSocket(errChan chan<- error, connChan chan<- net.Conn, listener net.Listener) { + conn, err := listener.Accept() + if err != nil { + errChan <- err + connChan <- nil + return + } + _, err = bufio.NewReader(conn).ReadString('\n') + errChan <- err + connChan <- conn +} + +// DialSocketWithBackoffs attempts to connect to the socket in maxBackoffs attempts +func DialSocketWithBackoffs(maxBackoffs int, backoff time.Duration, socketPath string) (conn net.Conn, err error) { + for i := 0; i < maxBackoffs; i++ { + if i > 0 { + time.Sleep(backoff) + backoff *= 2 + } + conn, err = net.Dial("unix", socketPath) + if err == nil { + return conn, nil + } + } + return nil, err +} + +// DialSocketWithBackoffsAndProcCheck attempts to connect to the socket in +// maxBackoffs attempts. After every failure to connect, it makes sure the +// specified process is alive +func DialSocketWithBackoffsAndProcCheck( + maxBackoffs int, + backoff time.Duration, + socketPath string, + checkProccessStatus func(string, int, *bytes.Buffer) error, + procHint string, + procPid int, + errBuf *bytes.Buffer, +) (conn net.Conn, err error) { + for i := 0; i < maxBackoffs; i++ { + if i > 0 { + time.Sleep(backoff) + backoff *= 2 + } + conn, err = net.Dial("unix", socketPath) + if err == nil { + return conn, nil + } + + // check to make sure process denoted by procHint is alive + err = checkProccessStatus(procHint, procPid, errBuf) + if err != nil { + return nil, err + } + } + return nil, err +}