Add support for containers.conf volume timeouts

Also, do a general cleanup of all the timeout code. Changes
include:
- Convert from int to *uint where possible. Timeouts cannot be
  negative, hence the uint change; and a timeout of 0 is valid,
  so we need a new way to detect that the user set a timeout
  (hence, pointer).
- Change name in the database to avoid conflicts between new data
  type and old one. This will cause timeouts set with 4.2.0 to be
  lost, but considering nobody is using the feature at present
  (and the lack of validation means we could have invalid,
  negative timeouts in the DB) this feels safe.
- Ensure volume plugin timeouts can only be used with volumes
  created using a plugin. Timeouts on the local driver are
  nonsensical.
- Remove the existing test, as it did not use a volume plugin.
  Write a new test that does.

The actual plumbing of the containers.conf timeout in is one line
in volume_api.go; the remainder are the above-described cleanups.

Signed-off-by: Matthew Heon <mheon@redhat.com>
This commit is contained in:
Matthew Heon
2022-08-23 14:46:43 -04:00
parent 3bcd8047cf
commit 0f73935563
36 changed files with 286 additions and 110 deletions

View File

@ -57,7 +57,7 @@ func pollIOCP(ctx context.Context, iocpHandle windows.Handle) {
}).Warn("failed to parse job object message")
continue
}
if err := msq.Write(notification); err == queue.ErrQueueClosed {
if err := msq.Enqueue(notification); err == queue.ErrQueueClosed {
// Write will only return an error when the queue is closed.
// The only time a queue would ever be closed is when we call `Close` on
// the job it belongs to which also removes it from the jobMap, so something

View File

@ -68,6 +68,9 @@ type Options struct {
// `UseNTVariant` specifies if we should use the `Nt` variant of Open/CreateJobObject.
// Defaults to false.
UseNTVariant bool
// `IOTracking` enables tracking I/O statistics on the job object. More specifically this
// calls SetInformationJobObject with the JobObjectIoAttribution class.
EnableIOTracking bool
}
// Create creates a job object.
@ -134,6 +137,12 @@ func Create(ctx context.Context, options *Options) (_ *JobObject, err error) {
job.mq = mq
}
if options.EnableIOTracking {
if err := enableIOTracking(jobHandle); err != nil {
return nil, err
}
}
return job, nil
}
@ -235,7 +244,7 @@ func (job *JobObject) PollNotification() (interface{}, error) {
if job.mq == nil {
return nil, ErrNotRegistered
}
return job.mq.ReadOrWait()
return job.mq.Dequeue()
}
// UpdateProcThreadAttribute updates the passed in ProcThreadAttributeList to contain what is necessary to
@ -330,7 +339,7 @@ func (job *JobObject) Pids() ([]uint32, error) {
err := winapi.QueryInformationJobObject(
job.handle,
winapi.JobObjectBasicProcessIdList,
uintptr(unsafe.Pointer(&info)),
unsafe.Pointer(&info),
uint32(unsafe.Sizeof(info)),
nil,
)
@ -356,7 +365,7 @@ func (job *JobObject) Pids() ([]uint32, error) {
if err = winapi.QueryInformationJobObject(
job.handle,
winapi.JobObjectBasicProcessIdList,
uintptr(unsafe.Pointer(&buf[0])),
unsafe.Pointer(&buf[0]),
uint32(len(buf)),
nil,
); err != nil {
@ -384,7 +393,7 @@ func (job *JobObject) QueryMemoryStats() (*winapi.JOBOBJECT_MEMORY_USAGE_INFORMA
if err := winapi.QueryInformationJobObject(
job.handle,
winapi.JobObjectMemoryUsageInformation,
uintptr(unsafe.Pointer(&info)),
unsafe.Pointer(&info),
uint32(unsafe.Sizeof(info)),
nil,
); err != nil {
@ -406,7 +415,7 @@ func (job *JobObject) QueryProcessorStats() (*winapi.JOBOBJECT_BASIC_ACCOUNTING_
if err := winapi.QueryInformationJobObject(
job.handle,
winapi.JobObjectBasicAccountingInformation,
uintptr(unsafe.Pointer(&info)),
unsafe.Pointer(&info),
uint32(unsafe.Sizeof(info)),
nil,
); err != nil {
@ -415,7 +424,9 @@ func (job *JobObject) QueryProcessorStats() (*winapi.JOBOBJECT_BASIC_ACCOUNTING_
return &info, nil
}
// QueryStorageStats gets the storage (I/O) stats for the job object.
// QueryStorageStats gets the storage (I/O) stats for the job object. This call will error
// if either `EnableIOTracking` wasn't set to true on creation of the job, or SetIOTracking()
// hasn't been called since creation of the job.
func (job *JobObject) QueryStorageStats() (*winapi.JOBOBJECT_IO_ATTRIBUTION_INFORMATION, error) {
job.handleLock.RLock()
defer job.handleLock.RUnlock()
@ -430,7 +441,7 @@ func (job *JobObject) QueryStorageStats() (*winapi.JOBOBJECT_IO_ATTRIBUTION_INFO
if err := winapi.QueryInformationJobObject(
job.handle,
winapi.JobObjectIoAttribution,
uintptr(unsafe.Pointer(&info)),
unsafe.Pointer(&info),
uint32(unsafe.Sizeof(info)),
nil,
); err != nil {
@ -476,7 +487,7 @@ func (job *JobObject) QueryPrivateWorkingSet() (uint64, error) {
status := winapi.NtQueryInformationProcess(
h,
winapi.ProcessVmCounters,
uintptr(unsafe.Pointer(&vmCounters)),
unsafe.Pointer(&vmCounters),
uint32(unsafe.Sizeof(vmCounters)),
nil,
)
@ -497,3 +508,31 @@ func (job *JobObject) QueryPrivateWorkingSet() (uint64, error) {
return jobWorkingSetSize, nil
}
// SetIOTracking enables IO tracking for processes in the job object.
// This enables use of the QueryStorageStats method.
func (job *JobObject) SetIOTracking() error {
job.handleLock.RLock()
defer job.handleLock.RUnlock()
if job.handle == 0 {
return ErrAlreadyClosed
}
return enableIOTracking(job.handle)
}
func enableIOTracking(job windows.Handle) error {
info := winapi.JOBOBJECT_IO_ATTRIBUTION_INFORMATION{
ControlFlags: winapi.JOBOBJECT_IO_ATTRIBUTION_CONTROL_ENABLE,
}
if _, err := windows.SetInformationJobObject(
job,
winapi.JobObjectIoAttribution,
uintptr(unsafe.Pointer(&info)),
uint32(unsafe.Sizeof(info)),
); err != nil {
return fmt.Errorf("failed to enable IO tracking on job object: %w", err)
}
return nil
}

View File

@ -202,7 +202,7 @@ func (job *JobObject) getExtendedInformation() (*windows.JOBOBJECT_EXTENDED_LIMI
if err := winapi.QueryInformationJobObject(
job.handle,
windows.JobObjectExtendedLimitInformation,
uintptr(unsafe.Pointer(&info)),
unsafe.Pointer(&info),
uint32(unsafe.Sizeof(info)),
nil,
); err != nil {
@ -224,7 +224,7 @@ func (job *JobObject) getCPURateControlInformation() (*winapi.JOBOBJECT_CPU_RATE
if err := winapi.QueryInformationJobObject(
job.handle,
windows.JobObjectCpuRateControlInformation,
uintptr(unsafe.Pointer(&info)),
unsafe.Pointer(&info),
uint32(unsafe.Sizeof(info)),
nil,
); err != nil {

View File

@ -5,10 +5,7 @@ import (
"sync"
)
var (
ErrQueueClosed = errors.New("the queue is closed for reading and writing")
ErrQueueEmpty = errors.New("the queue is empty")
)
var ErrQueueClosed = errors.New("the queue is closed for reading and writing")
// MessageQueue represents a threadsafe message queue to be used to retrieve or
// write messages to.
@ -29,8 +26,8 @@ func NewMessageQueue() *MessageQueue {
}
}
// Write writes `msg` to the queue.
func (mq *MessageQueue) Write(msg interface{}) error {
// Enqueue writes `msg` to the queue.
func (mq *MessageQueue) Enqueue(msg interface{}) error {
mq.m.Lock()
defer mq.m.Unlock()
@ -43,55 +40,37 @@ func (mq *MessageQueue) Write(msg interface{}) error {
return nil
}
// Read will read a value from the queue if available, otherwise return an error.
func (mq *MessageQueue) Read() (interface{}, error) {
// Dequeue will read a value from the queue and remove it. If the queue
// is empty, this will block until the queue is closed or a value gets enqueued.
func (mq *MessageQueue) Dequeue() (interface{}, error) {
mq.m.Lock()
defer mq.m.Unlock()
for !mq.closed && mq.size() == 0 {
mq.c.Wait()
}
// We got woken up, check if it's because the queue got closed.
if mq.closed {
return nil, ErrQueueClosed
}
if mq.isEmpty() {
return nil, ErrQueueEmpty
}
val := mq.messages[0]
mq.messages[0] = nil
mq.messages = mq.messages[1:]
return val, nil
}
// ReadOrWait will read a value from the queue if available, else it will wait for a
// value to become available. This will block forever if nothing gets written or until
// the queue gets closed.
func (mq *MessageQueue) ReadOrWait() (interface{}, error) {
mq.m.Lock()
if mq.closed {
mq.m.Unlock()
return nil, ErrQueueClosed
}
if mq.isEmpty() {
for !mq.closed && mq.isEmpty() {
mq.c.Wait()
}
mq.m.Unlock()
return mq.Read()
}
val := mq.messages[0]
mq.messages[0] = nil
mq.messages = mq.messages[1:]
mq.m.Unlock()
return val, nil
}
// IsEmpty returns if the queue is empty
func (mq *MessageQueue) IsEmpty() bool {
// Size returns the size of the queue.
func (mq *MessageQueue) Size() int {
mq.m.RLock()
defer mq.m.RUnlock()
return len(mq.messages) == 0
return mq.size()
}
// Nonexported empty check that doesn't lock so we can call this in Read and Write.
func (mq *MessageQueue) isEmpty() bool {
return len(mq.messages) == 0
// Nonexported size check to check if the queue is empty inside already locked functions.
func (mq *MessageQueue) size() int {
return len(mq.messages)
}
// Close closes the queue for future writes or reads. Any attempts to read or write from the
@ -99,13 +78,15 @@ func (mq *MessageQueue) isEmpty() bool {
func (mq *MessageQueue) Close() {
mq.m.Lock()
defer mq.m.Unlock()
// Already closed
// Already closed, noop
if mq.closed {
return
}
mq.messages = nil
mq.closed = true
// If there's anybody currently waiting on a value from ReadOrWait, we need to
// If there's anybody currently waiting on a value from Dequeue, we need to
// broadcast so the read(s) can return ErrQueueClosed.
mq.c.Broadcast()
}

View File

@ -175,7 +175,7 @@ type JOBOBJECT_ASSOCIATE_COMPLETION_PORT struct {
// LPDWORD lpReturnLength
// );
//
//sys QueryInformationJobObject(jobHandle windows.Handle, infoClass uint32, jobObjectInfo uintptr, jobObjectInformationLength uint32, lpReturnLength *uint32) (err error) = kernel32.QueryInformationJobObject
//sys QueryInformationJobObject(jobHandle windows.Handle, infoClass uint32, jobObjectInfo unsafe.Pointer, jobObjectInformationLength uint32, lpReturnLength *uint32) (err error) = kernel32.QueryInformationJobObject
// HANDLE OpenJobObjectW(
// DWORD dwDesiredAccess,

View File

@ -18,7 +18,7 @@ const ProcessVmCounters = 3
// [out, optional] PULONG ReturnLength
// );
//
//sys NtQueryInformationProcess(processHandle windows.Handle, processInfoClass uint32, processInfo uintptr, processInfoLength uint32, returnLength *uint32) (status uint32) = ntdll.NtQueryInformationProcess
//sys NtQueryInformationProcess(processHandle windows.Handle, processInfoClass uint32, processInfo unsafe.Pointer, processInfoLength uint32, returnLength *uint32) (status uint32) = ntdll.NtQueryInformationProcess
// typedef struct _VM_COUNTERS_EX
// {

View File

@ -12,7 +12,8 @@ const STATUS_INFO_LENGTH_MISMATCH = 0xC0000004
// ULONG SystemInformationLength,
// PULONG ReturnLength
// );
//sys NtQuerySystemInformation(systemInfoClass int, systemInformation uintptr, systemInfoLength uint32, returnLength *uint32) (status uint32) = ntdll.NtQuerySystemInformation
//
//sys NtQuerySystemInformation(systemInfoClass int, systemInformation unsafe.Pointer, systemInfoLength uint32, returnLength *uint32) (status uint32) = ntdll.NtQuerySystemInformation
type SYSTEM_PROCESS_INFORMATION struct {
NextEntryOffset uint32 // ULONG

View File

@ -100,7 +100,7 @@ func resizePseudoConsole(hPc windows.Handle, size uint32) (hr error) {
return
}
func NtQuerySystemInformation(systemInfoClass int, systemInformation uintptr, systemInfoLength uint32, returnLength *uint32) (status uint32) {
func NtQuerySystemInformation(systemInfoClass int, systemInformation unsafe.Pointer, systemInfoLength uint32, returnLength *uint32) (status uint32) {
r0, _, _ := syscall.Syscall6(procNtQuerySystemInformation.Addr(), 4, uintptr(systemInfoClass), uintptr(systemInformation), uintptr(systemInfoLength), uintptr(unsafe.Pointer(returnLength)), 0, 0)
status = uint32(r0)
return
@ -152,7 +152,7 @@ func IsProcessInJob(procHandle windows.Handle, jobHandle windows.Handle, result
return
}
func QueryInformationJobObject(jobHandle windows.Handle, infoClass uint32, jobObjectInfo uintptr, jobObjectInformationLength uint32, lpReturnLength *uint32) (err error) {
func QueryInformationJobObject(jobHandle windows.Handle, infoClass uint32, jobObjectInfo unsafe.Pointer, jobObjectInformationLength uint32, lpReturnLength *uint32) (err error) {
r1, _, e1 := syscall.Syscall6(procQueryInformationJobObject.Addr(), 5, uintptr(jobHandle), uintptr(infoClass), uintptr(jobObjectInfo), uintptr(jobObjectInformationLength), uintptr(unsafe.Pointer(lpReturnLength)), 0)
if r1 == 0 {
if e1 != 0 {
@ -244,7 +244,7 @@ func LocalFree(ptr uintptr) {
return
}
func NtQueryInformationProcess(processHandle windows.Handle, processInfoClass uint32, processInfo uintptr, processInfoLength uint32, returnLength *uint32) (status uint32) {
func NtQueryInformationProcess(processHandle windows.Handle, processInfoClass uint32, processInfo unsafe.Pointer, processInfoLength uint32, returnLength *uint32) (status uint32) {
r0, _, _ := syscall.Syscall6(procNtQueryInformationProcess.Addr(), 5, uintptr(processHandle), uintptr(processInfoClass), uintptr(processInfo), uintptr(processInfoLength), uintptr(unsafe.Pointer(returnLength)), 0)
status = uint32(r0)
return