mirror of
				https://gitcode.com/gitea/gitea.git
				synced 2025-10-26 05:04:27 +08:00 
			
		
		
		
	 2c903383b5
			
		
	
	2c903383b5
	
	
	
		
			
			* Upgrade levelqueue to version 0.2.0 This adds functionality for Unique Queues * Add UniqueQueue interface and functions to create them * Add UniqueQueue implementations * Move TestPullRequests over to use UniqueQueue * Reduce code duplication * Add bytefifos * Ensure invalid types are logged * Fix close race in PersistableChannelQueue Shutdown
		
			
				
	
	
		
			189 lines
		
	
	
		
			6.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			189 lines
		
	
	
		
			6.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2019 The Gitea Authors. All rights reserved.
 | |
| // Use of this source code is governed by a MIT-style
 | |
| // license that can be found in the LICENSE file.
 | |
| 
 | |
| package setting
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"path"
 | |
| 	"path/filepath"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"code.gitea.io/gitea/modules/log"
 | |
| )
 | |
| 
 | |
| // QueueSettings represent the settings for a queue from the ini
 | |
| type QueueSettings struct {
 | |
| 	DataDir          string
 | |
| 	Length           int
 | |
| 	BatchLength      int
 | |
| 	ConnectionString string
 | |
| 	Type             string
 | |
| 	Network          string
 | |
| 	Addresses        string
 | |
| 	Password         string
 | |
| 	QueueName        string
 | |
| 	SetName          string
 | |
| 	DBIndex          int
 | |
| 	WrapIfNecessary  bool
 | |
| 	MaxAttempts      int
 | |
| 	Timeout          time.Duration
 | |
| 	Workers          int
 | |
| 	MaxWorkers       int
 | |
| 	BlockTimeout     time.Duration
 | |
| 	BoostTimeout     time.Duration
 | |
| 	BoostWorkers     int
 | |
| }
 | |
| 
 | |
| // Queue settings
 | |
| var Queue = QueueSettings{}
 | |
| 
 | |
| // GetQueueSettings returns the queue settings for the appropriately named queue
 | |
| func GetQueueSettings(name string) QueueSettings {
 | |
| 	q := QueueSettings{}
 | |
| 	sec := Cfg.Section("queue." + name)
 | |
| 	// DataDir is not directly inheritable
 | |
| 	q.DataDir = filepath.Join(Queue.DataDir, name)
 | |
| 	// QueueName is not directly inheritable either
 | |
| 	q.QueueName = name + Queue.QueueName
 | |
| 	for _, key := range sec.Keys() {
 | |
| 		switch key.Name() {
 | |
| 		case "DATADIR":
 | |
| 			q.DataDir = key.MustString(q.DataDir)
 | |
| 		case "QUEUE_NAME":
 | |
| 			q.QueueName = key.MustString(q.QueueName)
 | |
| 		case "SET_NAME":
 | |
| 			q.SetName = key.MustString(q.SetName)
 | |
| 		}
 | |
| 	}
 | |
| 	if len(q.SetName) == 0 && len(Queue.SetName) > 0 {
 | |
| 		q.SetName = q.QueueName + Queue.SetName
 | |
| 	}
 | |
| 	if !filepath.IsAbs(q.DataDir) {
 | |
| 		q.DataDir = filepath.Join(AppDataPath, q.DataDir)
 | |
| 	}
 | |
| 	_, _ = sec.NewKey("DATADIR", q.DataDir)
 | |
| 	// The rest are...
 | |
| 	q.Length = sec.Key("LENGTH").MustInt(Queue.Length)
 | |
| 	q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength)
 | |
| 	q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString)
 | |
| 	q.Type = sec.Key("TYPE").MustString(Queue.Type)
 | |
| 	q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary)
 | |
| 	q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts)
 | |
| 	q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout)
 | |
| 	q.Workers = sec.Key("WORKERS").MustInt(Queue.Workers)
 | |
| 	q.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(Queue.MaxWorkers)
 | |
| 	q.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(Queue.BlockTimeout)
 | |
| 	q.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(Queue.BoostTimeout)
 | |
| 	q.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(Queue.BoostWorkers)
 | |
| 
 | |
| 	q.Network, q.Addresses, q.Password, q.DBIndex, _ = ParseQueueConnStr(q.ConnectionString)
 | |
| 	return q
 | |
| }
 | |
| 
 | |
| // NewQueueService sets up the default settings for Queues
 | |
| // This is exported for tests to be able to use the queue
 | |
| func NewQueueService() {
 | |
| 	sec := Cfg.Section("queue")
 | |
| 	Queue.DataDir = sec.Key("DATADIR").MustString("queues/")
 | |
| 	if !filepath.IsAbs(Queue.DataDir) {
 | |
| 		Queue.DataDir = filepath.Join(AppDataPath, Queue.DataDir)
 | |
| 	}
 | |
| 	Queue.Length = sec.Key("LENGTH").MustInt(20)
 | |
| 	Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20)
 | |
| 	Queue.ConnectionString = sec.Key("CONN_STR").MustString(path.Join(AppDataPath, ""))
 | |
| 	Queue.Type = sec.Key("TYPE").MustString("persistable-channel")
 | |
| 	Queue.Network, Queue.Addresses, Queue.Password, Queue.DBIndex, _ = ParseQueueConnStr(Queue.ConnectionString)
 | |
| 	Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true)
 | |
| 	Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10)
 | |
| 	Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second)
 | |
| 	Queue.Workers = sec.Key("WORKERS").MustInt(1)
 | |
| 	Queue.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(10)
 | |
| 	Queue.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(1 * time.Second)
 | |
| 	Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute)
 | |
| 	Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(5)
 | |
| 	Queue.QueueName = sec.Key("QUEUE_NAME").MustString("_queue")
 | |
| 	Queue.SetName = sec.Key("SET_NAME").MustString("")
 | |
| 
 | |
| 	// Now handle the old issue_indexer configuration
 | |
| 	section := Cfg.Section("queue.issue_indexer")
 | |
| 	sectionMap := map[string]bool{}
 | |
| 	for _, key := range section.Keys() {
 | |
| 		sectionMap[key.Name()] = true
 | |
| 	}
 | |
| 	if _, ok := sectionMap["TYPE"]; !ok {
 | |
| 		switch Indexer.IssueQueueType {
 | |
| 		case LevelQueueType:
 | |
| 			_, _ = section.NewKey("TYPE", "level")
 | |
| 		case ChannelQueueType:
 | |
| 			_, _ = section.NewKey("TYPE", "persistable-channel")
 | |
| 		case RedisQueueType:
 | |
| 			_, _ = section.NewKey("TYPE", "redis")
 | |
| 		default:
 | |
| 			log.Fatal("Unsupported indexer queue type: %v",
 | |
| 				Indexer.IssueQueueType)
 | |
| 		}
 | |
| 	}
 | |
| 	if _, ok := sectionMap["LENGTH"]; !ok {
 | |
| 		_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Indexer.UpdateQueueLength))
 | |
| 	}
 | |
| 	if _, ok := sectionMap["BATCH_LENGTH"]; !ok {
 | |
| 		_, _ = section.NewKey("BATCH_LENGTH", fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber))
 | |
| 	}
 | |
| 	if _, ok := sectionMap["DATADIR"]; !ok {
 | |
| 		_, _ = section.NewKey("DATADIR", Indexer.IssueQueueDir)
 | |
| 	}
 | |
| 	if _, ok := sectionMap["CONN_STR"]; !ok {
 | |
| 		_, _ = section.NewKey("CONN_STR", Indexer.IssueQueueConnStr)
 | |
| 	}
 | |
| 
 | |
| 	// Handle the old mailer configuration
 | |
| 	section = Cfg.Section("queue.mailer")
 | |
| 	sectionMap = map[string]bool{}
 | |
| 	for _, key := range section.Keys() {
 | |
| 		sectionMap[key.Name()] = true
 | |
| 	}
 | |
| 	if _, ok := sectionMap["LENGTH"]; !ok {
 | |
| 		_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100)))
 | |
| 	}
 | |
| 
 | |
| 	// Handle the old test pull requests configuration
 | |
| 	// Please note this will be a unique queue
 | |
| 	section = Cfg.Section("queue.pr_patch_checker")
 | |
| 	sectionMap = map[string]bool{}
 | |
| 	for _, key := range section.Keys() {
 | |
| 		sectionMap[key.Name()] = true
 | |
| 	}
 | |
| 	if _, ok := sectionMap["LENGTH"]; !ok {
 | |
| 		_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.PullRequestQueueLength))
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // ParseQueueConnStr parses a queue connection string
 | |
| func ParseQueueConnStr(connStr string) (network, addrs, password string, dbIdx int, err error) {
 | |
| 	fields := strings.Fields(connStr)
 | |
| 	for _, f := range fields {
 | |
| 		items := strings.SplitN(f, "=", 2)
 | |
| 		if len(items) < 2 {
 | |
| 			continue
 | |
| 		}
 | |
| 		switch strings.ToLower(items[0]) {
 | |
| 		case "network":
 | |
| 			network = items[1]
 | |
| 		case "addrs":
 | |
| 			addrs = items[1]
 | |
| 		case "password":
 | |
| 			password = items[1]
 | |
| 		case "db":
 | |
| 			dbIdx, err = strconv.Atoi(items[1])
 | |
| 			if err != nil {
 | |
| 				return
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	return
 | |
| }
 |