mirror of
				https://gitcode.com/gitea/gitea.git
				synced 2025-10-26 05:04:27 +08:00 
			
		
		
		
	 58dfaf3a75
			
		
	
	58dfaf3a75
	
	
	
		
			
			Although some features are mixed together in this PR, this PR is not
that large, and these features are all related.
Actually there are more than 70 lines are for a toy "test queue", so
this PR is quite simple.
Major features:
1. Allow site admin to clear a queue (remove all items in a queue)
* Because there is no transaction, the "unique queue" could be corrupted
in rare cases, that's unfixable.
* eg: the item is in the "set" but not in the "list", so the item would
never be able to be pushed into the queue.
* Now site admin could simply clear the queue, then everything becomes
correct, the lost items could be re-pushed into queue by future
operations.
3. Split the "admin/monitor" to separate pages
4. Allow to download diagnosis report
* In history, there were many users reporting that Gitea queue gets
stuck, or Gitea's CPU is 100%
    * With diagnosis report, maintainers could know what happens clearly
The diagnosis report sample:
[gitea-diagnosis-20230510-192913.zip](https://github.com/go-gitea/gitea/files/11441346/gitea-diagnosis-20230510-192913.zip)
, use "go tool pprof profile.dat" to view the report.
Screenshots:



---------
Co-authored-by: Jason Song <i@wolfogre.com>
Co-authored-by: Giteabot <teabot@gitea.io>
		
	
		
			
				
	
	
		
			132 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			132 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2023 The Gitea Authors. All rights reserved.
 | |
| // SPDX-License-Identifier: MIT
 | |
| 
 | |
| package queue
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"code.gitea.io/gitea/modules/container"
 | |
| )
 | |
| 
 | |
| var errChannelClosed = errors.New("channel is closed")
 | |
| 
 | |
| type baseChannel struct {
 | |
| 	c   chan []byte
 | |
| 	set container.Set[string]
 | |
| 	mu  sync.Mutex
 | |
| 
 | |
| 	isUnique bool
 | |
| }
 | |
| 
 | |
| var _ baseQueue = (*baseChannel)(nil)
 | |
| 
 | |
| func newBaseChannelGeneric(cfg *BaseConfig, unique bool) (baseQueue, error) {
 | |
| 	q := &baseChannel{c: make(chan []byte, cfg.Length), isUnique: unique}
 | |
| 	if unique {
 | |
| 		q.set = container.Set[string]{}
 | |
| 	}
 | |
| 	return q, nil
 | |
| }
 | |
| 
 | |
| func newBaseChannelSimple(cfg *BaseConfig) (baseQueue, error) {
 | |
| 	return newBaseChannelGeneric(cfg, false)
 | |
| }
 | |
| 
 | |
| func newBaseChannelUnique(cfg *BaseConfig) (baseQueue, error) {
 | |
| 	return newBaseChannelGeneric(cfg, true)
 | |
| }
 | |
| 
 | |
| func (q *baseChannel) PushItem(ctx context.Context, data []byte) error {
 | |
| 	if q.c == nil {
 | |
| 		return errChannelClosed
 | |
| 	}
 | |
| 
 | |
| 	if q.isUnique {
 | |
| 		q.mu.Lock()
 | |
| 		has := q.set.Contains(string(data))
 | |
| 		q.mu.Unlock()
 | |
| 		if has {
 | |
| 			return ErrAlreadyInQueue
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	select {
 | |
| 	case q.c <- data:
 | |
| 		if q.isUnique {
 | |
| 			q.mu.Lock()
 | |
| 			q.set.Add(string(data))
 | |
| 			q.mu.Unlock()
 | |
| 		}
 | |
| 		return nil
 | |
| 	case <-time.After(pushBlockTime):
 | |
| 		return context.DeadlineExceeded
 | |
| 	case <-ctx.Done():
 | |
| 		return ctx.Err()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (q *baseChannel) PopItem(ctx context.Context) ([]byte, error) {
 | |
| 	select {
 | |
| 	case data, ok := <-q.c:
 | |
| 		if !ok {
 | |
| 			return nil, errChannelClosed
 | |
| 		}
 | |
| 		q.mu.Lock()
 | |
| 		q.set.Remove(string(data))
 | |
| 		q.mu.Unlock()
 | |
| 		return data, nil
 | |
| 	case <-ctx.Done():
 | |
| 		return nil, ctx.Err()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (q *baseChannel) HasItem(ctx context.Context, data []byte) (bool, error) {
 | |
| 	q.mu.Lock()
 | |
| 	defer q.mu.Unlock()
 | |
| 	if !q.isUnique {
 | |
| 		return false, nil
 | |
| 	}
 | |
| 	return q.set.Contains(string(data)), nil
 | |
| }
 | |
| 
 | |
| func (q *baseChannel) Len(ctx context.Context) (int, error) {
 | |
| 	q.mu.Lock()
 | |
| 	defer q.mu.Unlock()
 | |
| 
 | |
| 	if q.c == nil {
 | |
| 		return 0, errChannelClosed
 | |
| 	}
 | |
| 
 | |
| 	return len(q.c), nil
 | |
| }
 | |
| 
 | |
| func (q *baseChannel) Close() error {
 | |
| 	q.mu.Lock()
 | |
| 	defer q.mu.Unlock()
 | |
| 
 | |
| 	close(q.c)
 | |
| 	if q.isUnique {
 | |
| 		q.set = container.Set[string]{}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (q *baseChannel) RemoveAll(ctx context.Context) error {
 | |
| 	q.mu.Lock()
 | |
| 	defer q.mu.Unlock()
 | |
| 
 | |
| 	for q.c != nil && len(q.c) > 0 {
 | |
| 		<-q.c
 | |
| 	}
 | |
| 
 | |
| 	if q.isUnique {
 | |
| 		q.set = container.Set[string]{}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 |