Refactor machine socket mapping

Refactors machine socket mapping to prevent using similar/the same code
paths. Moves the shared code to `pkg/machine/sockets.go`.

[NO NEW TESTS NEEDED]

Signed-off-by: Jake Correnti <jakecorrenti+github@proton.me>
This commit is contained in:
Jake Correnti
2023-10-17 15:15:54 -04:00
parent ed58ea7849
commit ce9c1b9b86
5 changed files with 124 additions and 101 deletions

View File

@ -4,7 +4,6 @@
package applehv package applehv
import ( import (
"bufio"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -89,16 +88,9 @@ func (m *MacMachine) setGVProxyInfo(runtimeDir string) error {
if err != nil { if err != nil {
return err return err
} }
gvProxySock, err := machine.NewMachineFile(filepath.Join(runtimeDir, "gvproxy.sock"), nil)
if err != nil {
return err
}
m.GvProxyPid = *gvProxyPid m.GvProxyPid = *gvProxyPid
m.GvProxySock = *gvProxySock
return nil return machine.SetSocket(&m.GvProxySock, filepath.Join(runtimeDir, "gvproxy.sock"), nil)
} }
// setVfkitInfo stores the default devices, sets the vfkit endpoint, and // setVfkitInfo stores the default devices, sets the vfkit endpoint, and
@ -237,11 +229,9 @@ func (m *MacMachine) Init(opts machine.InitOptions) (bool, error) {
return false, err return false, err
} }
readySocket, err := machine.NewMachineFile(filepath.Join(runtimeDir, fmt.Sprintf("%s_ready.sock", m.Name)), nil) if err := machine.SetSocket(&m.ReadySocket, machine.ReadySocketPath(runtimeDir, m.Name), nil); err != nil {
if err != nil {
return false, err return false, err
} }
m.ReadySocket = *readySocket
if err = m.setGVProxyInfo(runtimeDir); err != nil { if err = m.setGVProxyInfo(runtimeDir); err != nil {
return false, err return false, err
@ -679,25 +669,22 @@ func (m *MacMachine) Start(name string, opts machine.StartOptions) error {
logrus.Debug("waiting for ready notification") logrus.Debug("waiting for ready notification")
var conn net.Conn var conn net.Conn
readyChan := make(chan error) readyChan := make(chan error)
go func() { connChan := make(chan net.Conn)
conn, err = readyListen.Accept() go machine.ListenAndWaitOnSocket(readyChan, connChan, readyListen)
if err != nil {
logrus.Error(err)
}
_, err = bufio.NewReader(conn).ReadString('\n')
readyChan <- err
}()
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
return err return err
} }
err = <-readyChan err = <-readyChan
conn = <-connChan
if conn != nil {
defer func() { defer func() {
if closeErr := conn.Close(); closeErr != nil { if closeErr := conn.Close(); closeErr != nil {
logrus.Error(closeErr) logrus.Error(closeErr)
} }
}() }()
}
if err != nil { if err != nil {
return err return err
} }

View File

@ -4,12 +4,13 @@
package hyperv package hyperv
import ( import (
"bufio"
"errors" "errors"
"fmt" "fmt"
"net"
"strings" "strings"
"github.com/Microsoft/go-winio" "github.com/Microsoft/go-winio"
"github.com/containers/podman/v4/pkg/machine"
"github.com/containers/podman/v4/utils" "github.com/containers/podman/v4/utils"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"golang.org/x/sys/windows/registry" "golang.org/x/sys/windows/registry"
@ -255,16 +256,18 @@ func (hv *HVSockRegistryEntry) Listen() error {
logrus.Error(err) logrus.Error(err)
} }
}() }()
conn, err := listener.Accept()
if err != nil { errChan := make(chan error)
return err connChan := make(chan net.Conn)
} go machine.ListenAndWaitOnSocket(errChan, connChan, listener)
conn := <-connChan
if conn != nil {
defer func() { defer func() {
if err := conn.Close(); err != nil { if err := conn.Close(); err != nil {
logrus.Error(err) logrus.Error(err)
} }
}() }()
// Right now we just listen for anything down the pipe (like qemu) }
_, err = bufio.NewReader(conn).ReadString('\n') return <-errChan
return err
} }

View File

@ -120,7 +120,12 @@ func (p *QEMUVirtualization) NewMachine(opts machine.InitOptions) (machine.VM, e
return nil, err return nil, err
} }
if err := vm.setReadySocket(); err != nil { runtimeDir, err := getRuntimeDir()
if err != nil {
return nil, err
}
symlink := vm.Name + "_ready.sock"
if err := machine.SetSocket(&vm.ReadySocket, machine.ReadySocketPath(runtimeDir+"/podman/", vm.Name), &symlink); err != nil {
return nil, err return nil, err
} }

View File

@ -133,7 +133,8 @@ func migrateVM(configPath string, config []byte, vm *MachineVM) error {
} }
// setReadySocket will stick the entry into the new struct // setReadySocket will stick the entry into the new struct
if err := vm.setReadySocket(); err != nil { symlink := vm.Name + "_ready.sock"
if err := machine.SetSocket(&vm.ReadySocket, machine.ReadySocketPath(socketPath+"/podman/", vm.Name), &symlink); err != nil {
return err return err
} }
@ -513,52 +514,6 @@ func runStartVMCommand(cmd *exec.Cmd) error {
return nil return nil
} }
// connectToQMPMonitorSocket attempts to connect to the QMP Monitor Socket after
// `maxBackoffs` attempts
func (v *MachineVM) connectToQMPMonitorSocket(maxBackoffs int, backoff time.Duration) (conn net.Conn, err error) {
for i := 0; i < maxBackoffs; i++ {
if i > 0 {
time.Sleep(backoff)
backoff *= 2
}
conn, err = net.Dial("unix", v.QMPMonitor.Address.Path)
if err == nil {
break
}
}
return
}
// connectToPodmanSocket attempts to connect to the podman socket after
// `maxBackoffs` attempts.
func (v *MachineVM) connectToPodmanSocket(maxBackoffs int, backoff time.Duration, qemuPID int, errBuf *bytes.Buffer) (conn net.Conn, dialErr error) {
socketPath, err := getRuntimeDir()
if err != nil {
return nil, err
}
// The socket is not made until the qemu process is running so here
// we do a backoff waiting for it. Once we have a conn, we break and
// then wait to read it.
for i := 0; i < maxBackoffs; i++ {
if i > 0 {
time.Sleep(backoff)
backoff *= 2
}
conn, dialErr = net.Dial("unix", filepath.Join(socketPath, "podman", v.Name+"_ready.sock"))
if dialErr == nil {
break
}
// check if qemu is still alive
err := checkProcessStatus("qemu", qemuPID, errBuf)
if err != nil {
return nil, err
}
}
return
}
// qemuPid returns -1 or the PID of the running QEMU instance. // qemuPid returns -1 or the PID of the running QEMU instance.
func (v *MachineVM) qemuPid() (int, error) { func (v *MachineVM) qemuPid() (int, error) {
pidData, err := os.ReadFile(v.VMPidFilePath.GetPath()) pidData, err := os.ReadFile(v.VMPidFilePath.GetPath())
@ -669,7 +624,7 @@ func (v *MachineVM) Start(name string, opts machine.StartOptions) error {
return err return err
} }
qemuSocketConn, err = v.connectToQMPMonitorSocket(maxBackoffs, defaultBackoff) qemuSocketConn, err = machine.DialSocketWithBackoffs(maxBackoffs, defaultBackoff, v.QMPMonitor.Address.Path)
if err != nil { if err != nil {
return err return err
} }
@ -724,7 +679,7 @@ func (v *MachineVM) Start(name string, opts machine.StartOptions) error {
fmt.Println("Waiting for VM ...") fmt.Println("Waiting for VM ...")
} }
conn, err = v.connectToPodmanSocket(maxBackoffs, defaultBackoff, cmd.Process.Pid, stderrBuf) conn, err = machine.DialSocketWithBackoffsAndProcCheck(maxBackoffs, defaultBackoff, v.ReadySocket.GetPath(), checkProcessStatus, "qemu", cmd.Process.Pid, stderrBuf)
if err != nil { if err != nil {
return err return err
} }
@ -1422,20 +1377,6 @@ func (v *MachineVM) setConfigPath() error {
return nil return nil
} }
func (v *MachineVM) setReadySocket() error {
readySocketName := v.Name + "_ready.sock"
rtPath, err := getRuntimeDir()
if err != nil {
return err
}
virtualSocketPath, err := machine.NewMachineFile(filepath.Join(rtPath, "podman", readySocketName), &readySocketName)
if err != nil {
return err
}
v.ReadySocket = *virtualSocketPath
return nil
}
func (v *MachineVM) setPIDSocket() error { func (v *MachineVM) setPIDSocket() error {
rtPath, err := getRuntimeDir() rtPath, err := getRuntimeDir()
if err != nil { if err != nil {

87
pkg/machine/sockets.go Normal file
View File

@ -0,0 +1,87 @@
package machine
import (
"bufio"
"bytes"
"fmt"
"net"
"path/filepath"
"time"
)
// SetSocket creates a new machine file for the socket and assigns it to
// `socketLoc`
func SetSocket(socketLoc *VMFile, path string, symlink *string) error {
socket, err := NewMachineFile(path, symlink)
if err != nil {
return err
}
*socketLoc = *socket
return nil
}
// ReadySocketPath returns the filepath for the ready socket
func ReadySocketPath(runtimeDir, machineName string) string {
return filepath.Join(runtimeDir, fmt.Sprintf("%s_ready.sock", machineName))
}
// ListenAndWaitOnSocket waits for a new connection to the listener and sends
// any error back through the channel. ListenAndWaitOnSocket is intended to be
// used as a goroutine
func ListenAndWaitOnSocket(errChan chan<- error, connChan chan<- net.Conn, listener net.Listener) {
conn, err := listener.Accept()
if err != nil {
errChan <- err
connChan <- nil
return
}
_, err = bufio.NewReader(conn).ReadString('\n')
errChan <- err
connChan <- conn
}
// DialSocketWithBackoffs attempts to connect to the socket in maxBackoffs attempts
func DialSocketWithBackoffs(maxBackoffs int, backoff time.Duration, socketPath string) (conn net.Conn, err error) {
for i := 0; i < maxBackoffs; i++ {
if i > 0 {
time.Sleep(backoff)
backoff *= 2
}
conn, err = net.Dial("unix", socketPath)
if err == nil {
return conn, nil
}
}
return nil, err
}
// DialSocketWithBackoffsAndProcCheck attempts to connect to the socket in
// maxBackoffs attempts. After every failure to connect, it makes sure the
// specified process is alive
func DialSocketWithBackoffsAndProcCheck(
maxBackoffs int,
backoff time.Duration,
socketPath string,
checkProccessStatus func(string, int, *bytes.Buffer) error,
procHint string,
procPid int,
errBuf *bytes.Buffer,
) (conn net.Conn, err error) {
for i := 0; i < maxBackoffs; i++ {
if i > 0 {
time.Sleep(backoff)
backoff *= 2
}
conn, err = net.Dial("unix", socketPath)
if err == nil {
return conn, nil
}
// check to make sure process denoted by procHint is alive
err = checkProccessStatus(procHint, procPid, errBuf)
if err != nil {
return nil, err
}
}
return nil, err
}