diff --git a/modules/queue/queue_batch.go b/modules/queue/queue_batch.go index 07166441e6d..2731ac5e23c 100644 --- a/modules/queue/queue_batch.go +++ b/modules/queue/queue_batch.go @@ -18,6 +18,7 @@ const BatchedChannelQueueType Type = "batched-channel" type BatchedChannelQueueConfiguration struct { QueueLength int BatchLength int + Workers int } // BatchedChannelQueue implements @@ -38,6 +39,7 @@ func NewBatchedChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queu queue: make(chan Data, config.QueueLength), handle: handle, exemplar: exemplar, + workers: config.Workers, }, config.BatchLength, }, nil @@ -51,26 +53,28 @@ func (c *BatchedChannelQueue) Run(atShutdown, atTerminate func(context.Context, atTerminate(context.Background(), func() { log.Warn("BatchedChannelQueue is not terminatable!") }) - go func() { - delay := time.Millisecond * 300 - var datas = make([]Data, 0, c.batchLength) - for { - select { - case data := <-c.queue: - datas = append(datas, data) - if len(datas) >= c.batchLength { - c.handle(datas...) - datas = make([]Data, 0, c.batchLength) - } - case <-time.After(delay): - delay = time.Millisecond * 100 - if len(datas) > 0 { - c.handle(datas...) - datas = make([]Data, 0, c.batchLength) + for i := 0; i < c.workers; i++ { + go func() { + delay := time.Millisecond * 300 + var datas = make([]Data, 0, c.batchLength) + for { + select { + case data := <-c.queue: + datas = append(datas, data) + if len(datas) >= c.batchLength { + c.handle(datas...) + datas = make([]Data, 0, c.batchLength) + } + case <-time.After(delay): + delay = time.Millisecond * 100 + if len(datas) > 0 { + c.handle(datas...) + datas = make([]Data, 0, c.batchLength) + } } } - } - }() + }() + } } func init() { diff --git a/modules/queue/queue_batch_test.go b/modules/queue/queue_batch_test.go index 08d3641da12..13a85a0aada 100644 --- a/modules/queue/queue_batch_test.go +++ b/modules/queue/queue_batch_test.go @@ -22,7 +22,7 @@ func TestBatchedChannelQueue(t *testing.T) { nilFn := func(_ context.Context, _ func()) {} - queue, err := NewBatchedChannelQueue(handle, BatchedChannelQueueConfiguration{QueueLength: 20, BatchLength: 2}, &testData{}) + queue, err := NewBatchedChannelQueue(handle, BatchedChannelQueueConfiguration{QueueLength: 20, BatchLength: 2, Workers: 1}, &testData{}) assert.NoError(t, err) go queue.Run(nilFn, nilFn) diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index e0cba2db01d..9d0ab11d21c 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -18,6 +18,7 @@ const ChannelQueueType Type = "channel" // ChannelQueueConfiguration is the configuration for a ChannelQueue type ChannelQueueConfiguration struct { QueueLength int + Workers int } // ChannelQueue implements @@ -25,6 +26,7 @@ type ChannelQueue struct { queue chan Data handle HandlerFunc exemplar interface{} + workers int } // NewChannelQueue create a memory channel queue @@ -38,6 +40,7 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro queue: make(chan Data, config.QueueLength), handle: handle, exemplar: exemplar, + workers: config.Workers, }, nil } @@ -49,11 +52,13 @@ func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func()) atTerminate(context.Background(), func() { log.Warn("ChannelQueue is not terminatable!") }) - go func() { - for data := range c.queue { - c.handle(data) - } - }() + for i := 0; i < c.workers; i++ { + go func() { + for data := range c.queue { + c.handle(data) + } + }() + } } // Push will push the indexer data to queue diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go index 77f4a8fe8f5..9e72bed85d7 100644 --- a/modules/queue/queue_channel_test.go +++ b/modules/queue/queue_channel_test.go @@ -22,7 +22,7 @@ func TestChannelQueue(t *testing.T) { nilFn := func(_ context.Context, _ func()) {} - queue, err := NewChannelQueue(handle, ChannelQueueConfiguration{QueueLength: 20}, &testData{}) + queue, err := NewChannelQueue(handle, ChannelQueueConfiguration{QueueLength: 20, Workers: 1}, &testData{}) assert.NoError(t, err) go queue.Run(nilFn, nilFn) diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go index dafff5c21c8..799bc98046f 100644 --- a/modules/queue/queue_disk.go +++ b/modules/queue/queue_disk.go @@ -9,6 +9,7 @@ import ( "encoding/json" "fmt" "reflect" + "sync" "time" "code.gitea.io/gitea/modules/log" @@ -23,6 +24,7 @@ const LevelQueueType Type = "level" type LevelQueueConfiguration struct { DataDir string BatchLength int + Workers int } // LevelQueue implements a disk library queue @@ -32,6 +34,7 @@ type LevelQueue struct { batchLength int closed chan struct{} exemplar interface{} + workers int } // NewLevelQueue creates a ledis local queue @@ -53,6 +56,7 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) batchLength: config.BatchLength, exemplar: exemplar, closed: make(chan struct{}), + workers: config.Workers, }, nil } @@ -60,6 +64,19 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { atShutdown(context.Background(), l.Shutdown) atTerminate(context.Background(), l.Terminate) + + wg := sync.WaitGroup{} + for i := 0; i < l.workers; i++ { + wg.Add(1) + go func() { + l.worker() + wg.Done() + }() + } + wg.Wait() +} + +func (l *LevelQueue) worker() { var i int var datas = make([]Data, 0, l.batchLength) for { diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index b13f1b9603d..428e104fb5a 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -6,6 +6,7 @@ package queue import ( "context" + "sync" "time" ) @@ -19,6 +20,7 @@ type PersistableChannelQueueConfiguration struct { QueueLength int Timeout time.Duration MaxAttempts int + Workers int } // PersistableChannelQueue wraps a channel queue and level queue together @@ -40,14 +42,17 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( batchChannelQueue, err := NewBatchedChannelQueue(handle, BatchedChannelQueueConfiguration{ QueueLength: config.QueueLength, BatchLength: config.BatchLength, + Workers: config.Workers, }, exemplar) if err != nil { return nil, err } + // the level backend only needs one worker to catch up with the previously dropped work levelCfg := LevelQueueConfiguration{ DataDir: config.DataDir, BatchLength: config.BatchLength, + Workers: 1, } levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar) @@ -100,6 +105,19 @@ func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte // Just run the level queue - we shut it down later go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) + + wg := sync.WaitGroup{} + for i := 0; i < p.workers; i++ { + wg.Add(1) + go func() { + p.worker() + wg.Done() + }() + } + wg.Wait() +} + +func (p *PersistableChannelQueue) worker() { delay := time.Millisecond * 300 var datas = make([]Data, 0, p.batchLength) loop: diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index 66c90f3bc3f..5f6f614bd8c 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -35,6 +35,7 @@ func TestPersistableChannelQueue(t *testing.T) { DataDir: tmpDir, BatchLength: 2, QueueLength: 20, + Workers: 1, }, &testData{}) assert.NoError(t, err) @@ -83,6 +84,7 @@ func TestPersistableChannelQueue(t *testing.T) { DataDir: tmpDir, BatchLength: 2, QueueLength: 20, + Workers: 1, }, &testData{}) assert.NoError(t, err) diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go index 9bc689b5f06..7033fc6a34a 100644 --- a/modules/queue/queue_disk_test.go +++ b/modules/queue/queue_disk_test.go @@ -29,6 +29,7 @@ func TestLevelQueue(t *testing.T) { queue, err := NewLevelQueue(handle, LevelQueueConfiguration{ DataDir: "level-queue-test-data", BatchLength: 2, + Workers: 1, }, &testData{}) assert.NoError(t, err) @@ -76,6 +77,7 @@ func TestLevelQueue(t *testing.T) { queue, err = NewLevelQueue(handle, LevelQueueConfiguration{ DataDir: "level-queue-test-data", BatchLength: 2, + Workers: 1, }, &testData{}) assert.NoError(t, err) diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go index b785f0073f7..80ce67233c3 100644 --- a/modules/queue/queue_redis.go +++ b/modules/queue/queue_redis.go @@ -11,6 +11,7 @@ import ( "fmt" "reflect" "strings" + "sync" "time" "code.gitea.io/gitea/modules/log" @@ -36,6 +37,7 @@ type RedisQueue struct { batchLength int closed chan struct{} exemplar interface{} + workers int } // RedisQueueConfiguration is the configuration for the redis queue @@ -45,6 +47,7 @@ type RedisQueueConfiguration struct { DBIndex int BatchLength int QueueName string + Workers int } // NewRedisQueue creates single redis or cluster redis queue @@ -62,6 +65,7 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) batchLength: config.BatchLength, exemplar: exemplar, closed: make(chan struct{}), + workers: config.Workers, } if len(dbs) == 0 { return nil, errors.New("no redis host found") @@ -86,6 +90,18 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) { atShutdown(context.Background(), r.Shutdown) atTerminate(context.Background(), r.Terminate) + wg := sync.WaitGroup{} + for i := 0; i < r.workers; i++ { + wg.Add(1) + go func() { + r.worker() + wg.Done() + }() + } + wg.Wait() +} + +func (r *RedisQueue) worker() { var i int var datas = make([]Data, 0, r.batchLength) for { diff --git a/modules/setting/queue.go b/modules/setting/queue.go index 4c80c79079e..4f7da32ce91 100644 --- a/modules/setting/queue.go +++ b/modules/setting/queue.go @@ -44,6 +44,7 @@ func CreateQueue(name string, handle queue.HandlerFunc, exemplar interface{}) qu opts["Password"] = q.Password opts["DBIndex"] = q.DBIndex opts["QueueName"] = name + opts["Workers"] = q.Workers cfg, err := json.Marshal(opts) if err != nil { @@ -117,6 +118,8 @@ func newQueueService() { Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10) Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second) Queue.Workers = sec.Key("WORKER").MustInt(1) + + Cfg.Section("queue.notification").Key("WORKER").MustInt(5) } // ParseQueueConnStr parses a queue connection string