fix(deps): update github.com/digitalocean/go-qemu digest to f035778

Signed-off-by: Renovate Bot <bot@renovateapp.com>
This commit is contained in:
renovate[bot]
2023-05-03 09:16:54 +00:00
committed by GitHub
parent 04c45cebcd
commit 305bad1846
24 changed files with 1702 additions and 1031 deletions

View File

@@ -16,166 +16,55 @@ package libvirt
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"reflect"
"strings"
"sync/atomic"
"unsafe"
"github.com/digitalocean/go-libvirt/internal/constants"
"github.com/digitalocean/go-libvirt/internal/event"
xdr "github.com/digitalocean/go-libvirt/internal/go-xdr/xdr2"
"github.com/digitalocean/go-libvirt/socket"
)
// ErrUnsupported is returned if a procedure is not supported by libvirt
var ErrUnsupported = errors.New("unsupported procedure requested")
// request and response types
const (
// Call is used when making calls to the remote server.
Call = iota
// Reply indicates a server reply.
Reply
// Message is an asynchronous notification.
Message
// Stream represents a stream data packet.
Stream
// CallWithFDs is used by a client to indicate the request has
// arguments with file descriptors.
CallWithFDs
// ReplyWithFDs is used by a server to indicate the request has
// arguments with file descriptors.
ReplyWithFDs
)
// request and response statuses
const (
// StatusOK is always set for method calls or events.
// For replies it indicates successful completion of the method.
// For streams it indicates confirmation of the end of file on the stream.
StatusOK = iota
// StatusError for replies indicates that the method call failed
// and error information is being returned. For streams this indicates
// that not all data was sent and the stream has aborted.
StatusError
// StatusContinue is only used for streams.
// This indicates that further data packets will be following.
StatusContinue
)
// header is a libvirt rpc packet header
type header struct {
// Program identifier
Program uint32
// Program version
Version uint32
// Remote procedure identifier
Procedure uint32
// Call type, e.g., Reply
Type uint32
// Call serial number
Serial int32
// Request status, e.g., StatusOK
Status uint32
}
// packet represents a RPC request or response.
type packet struct {
// Size of packet, in bytes, including length.
// Len + Header + Payload
Len uint32
Header header
}
// Global packet instance, for use with unsafe.Sizeof()
var _p packet
// internal rpc response
type response struct {
Payload []byte
Status uint32
}
// libvirt error response
type libvirtError struct {
Code uint32
DomainID uint32
Padding uint8
Message string
Level uint32
// Error reponse from libvirt
type Error struct {
Code uint32
Message string
}
func (e libvirtError) Error() string {
func (e Error) Error() string {
return e.Message
}
// checkError is used to check whether an error is a libvirtError, and if it is,
// whether its error code matches the one passed in. It will return false if
// these conditions are not met.
func checkError(err error, expectedError errorNumber) bool {
e, ok := err.(libvirtError)
if ok {
return e.Code == uint32(expectedError)
func checkError(err error, expectedError ErrorNumber) bool {
for err != nil {
e, ok := err.(Error)
if ok {
return e.Code == uint32(expectedError)
}
err = errors.Unwrap(err)
}
return false
}
// IsNotFound detects libvirt's ERR_NO_DOMAIN.
func IsNotFound(err error) bool {
return checkError(err, errNoDomain)
}
// listen processes incoming data and routes
// responses to their respective callback handler.
func (l *Libvirt) listen() {
for {
// response packet length
length, err := pktlen(l.r)
if err != nil {
// When the underlying connection EOFs or is closed, stop
// this goroutine
if err == io.EOF || strings.Contains(err.Error(), "use of closed network connection") {
return
}
// invalid packet
continue
}
// response header
h, err := extractHeader(l.r)
if err != nil {
// invalid packet
continue
}
// payload: packet length minus what was previously read
size := int(length) - int(unsafe.Sizeof(_p))
buf := make([]byte, size)
_, err = io.ReadFull(l.r, buf)
if err != nil {
// invalid packet
continue
}
// route response to caller
l.route(h, buf)
}
return checkError(err, ErrNoDomain)
}
// callback sends RPC responses to respective callers.
@@ -191,9 +80,9 @@ func (l *Libvirt) callback(id int32, res response) {
c <- res
}
// route sends incoming packets to their listeners.
func (l *Libvirt) route(h *header, buf []byte) {
// route events to their respective listener
// Route sends incoming packets to their listeners.
func (l *Libvirt) Route(h *socket.Header, buf []byte) {
// Route events to their respective listener
var event event.Event
switch {
@@ -243,24 +132,36 @@ func (l *Libvirt) addStream(s *event.Stream) {
l.events[s.CallbackID] = s
}
// removeStream notifies the libvirt server to stop sending events for the
// provided callback ID. Upon successful de-registration the callback handler
// is destroyed. Subsequent calls to removeStream are idempotent and return
// nil.
// TODO: Fix this comment
// removeStream deletes an event stream. The caller should first notify libvirt
// to stop sending events for this stream. Subsequent calls to removeStream are
// idempotent and return nil.
func (l *Libvirt) removeStream(id int32) error {
l.emux.Lock()
defer l.emux.Unlock()
// if the event is already removed, just return nil
_, ok := l.events[id]
q, ok := l.events[id]
if ok {
delete(l.events, id)
q.Shutdown()
}
return nil
}
// removeAllStreams deletes all event streams. This is meant to be used to
// clean up only once the underlying connection to libvirt is disconnected and
// thus does not attempt to notify libvirt to stop sending events.
func (l *Libvirt) removeAllStreams() {
l.emux.Lock()
defer l.emux.Unlock()
for _, ev := range l.events {
ev.Shutdown()
delete(l.events, ev.CallbackID)
}
}
// register configures a method response callback
func (l *Libvirt) register(id int32, c chan response) {
l.cmux.Lock()
@@ -316,7 +217,8 @@ func (l *Libvirt) requestStream(proc uint32, program uint32, payload []byte,
l.deregister(serial)
}()
err := l.sendPacket(serial, proc, program, payload, Call, StatusOK)
err := l.socket.SendPacket(serial, proc, program, payload, socket.Call,
socket.StatusOK)
if err != nil {
return response{}, err
}
@@ -330,7 +232,7 @@ func (l *Libvirt) requestStream(proc uint32, program uint32, payload []byte,
abort := make(chan bool)
outErr := make(chan error)
go func() {
outErr <- l.sendStream(serial, proc, program, out, abort)
outErr <- l.socket.SendStream(serial, proc, program, out, abort)
}()
// Even without incoming stream server sends confirmation once all data is received
@@ -365,7 +267,7 @@ func (l *Libvirt) processIncomingStream(c chan response, inStream io.Writer) (re
}
// StatusOK indicates end of stream
if resp.Status == StatusOK {
if resp.Status == socket.StatusOK {
return resp, nil
}
@@ -386,77 +288,9 @@ func (l *Libvirt) processIncomingStream(c chan response, inStream io.Writer) (re
}
}
func (l *Libvirt) sendStream(serial int32, proc uint32, program uint32, stream io.Reader, abort chan bool) error {
// Keep total packet length under 4 MiB to follow possible limitation in libvirt server code
buf := make([]byte, 4*MiB-unsafe.Sizeof(_p))
for {
select {
case <-abort:
return l.sendPacket(serial, proc, program, nil, Stream, StatusError)
default:
}
n, err := stream.Read(buf)
if n > 0 {
err2 := l.sendPacket(serial, proc, program, buf[:n], Stream, StatusContinue)
if err2 != nil {
return err2
}
}
if err != nil {
if err == io.EOF {
return l.sendPacket(serial, proc, program, nil, Stream, StatusOK)
}
// keep original error
err2 := l.sendPacket(serial, proc, program, nil, Stream, StatusError)
if err2 != nil {
return err2
}
return err
}
}
}
func (l *Libvirt) sendPacket(serial int32, proc uint32, program uint32, payload []byte, typ uint32, status uint32) error {
p := packet{
Header: header{
Program: program,
Version: constants.ProtocolVersion,
Procedure: proc,
Type: typ,
Serial: serial,
Status: status,
},
}
size := int(unsafe.Sizeof(p.Len)) + int(unsafe.Sizeof(p.Header))
if payload != nil {
size += len(payload)
}
p.Len = uint32(size)
// write header
l.mu.Lock()
defer l.mu.Unlock()
err := binary.Write(l.w, binary.BigEndian, p)
if err != nil {
return err
}
// write payload
if payload != nil {
err = binary.Write(l.w, binary.BigEndian, payload)
if err != nil {
return err
}
}
return l.w.Flush()
}
func (l *Libvirt) getResponse(c chan response) (response, error) {
resp := <-c
if resp.Status == StatusError {
if resp.Status == socket.StatusError {
return resp, decodeError(resp.Payload)
}
@@ -473,9 +307,15 @@ func encode(data interface{}) ([]byte, error) {
// decodeError extracts an error message from the provider buffer.
func decodeError(buf []byte) error {
var e libvirtError
dec := xdr.NewDecoder(bytes.NewReader(buf))
e := struct {
Code uint32
DomainID uint32
Padding uint8
Message string
Level uint32
}{}
_, err := dec.Decode(&e)
if err != nil {
return err
@@ -484,12 +324,13 @@ func decodeError(buf []byte) error {
if strings.Contains(e.Message, "unknown procedure") {
return ErrUnsupported
}
// if libvirt returns ERR_OK, ignore the error
if checkError(e, errOk) {
if ErrorNumber(e.Code) == ErrOk {
return nil
}
return e
return Error{Code: uint32(e.Code), Message: e.Message}
}
// eventDecoder decodes an event from a xdr buffer.
@@ -499,40 +340,6 @@ func eventDecoder(buf []byte, e interface{}) error {
return err
}
// pktlen returns the length of an incoming RPC packet. Read errors will
// result in a returned response length of 0 and a non-nil error.
func pktlen(r io.Reader) (uint32, error) {
buf := make([]byte, unsafe.Sizeof(_p.Len))
// extract the packet's length from the header
_, err := io.ReadFull(r, buf)
if err != nil {
return 0, err
}
return binary.BigEndian.Uint32(buf), nil
}
// extractHeader returns the decoded header from an incoming response.
func extractHeader(r io.Reader) (*header, error) {
buf := make([]byte, unsafe.Sizeof(_p.Header))
// extract the packet's header from r
_, err := io.ReadFull(r, buf)
if err != nil {
return nil, err
}
return &header{
Program: binary.BigEndian.Uint32(buf[0:4]),
Version: binary.BigEndian.Uint32(buf[4:8]),
Procedure: binary.BigEndian.Uint32(buf[8:12]),
Type: binary.BigEndian.Uint32(buf[12:16]),
Serial: int32(binary.BigEndian.Uint32(buf[16:20])),
Status: binary.BigEndian.Uint32(buf[20:24]),
}, nil
}
type typedParamDecoder struct{}
// Decode decodes a TypedParam. These are part of the libvirt spec, and not xdr