diff --git a/docs/content/doc/advanced/config-cheat-sheet.en-us.md b/docs/content/doc/advanced/config-cheat-sheet.en-us.md index 2db543f5e63..6ffb43fcd89 100644 --- a/docs/content/doc/advanced/config-cheat-sheet.en-us.md +++ b/docs/content/doc/advanced/config-cheat-sheet.en-us.md @@ -236,7 +236,7 @@ relation to port exhaustion. ## Queue (`queue`) -- `TYPE`: **persistable-channel**: General queue type, currently support: `persistable-channel`, `batched-channel`, `channel`, `level`, `redis`, `dummy` +- `TYPE`: **persistable-channel**: General queue type, currently support: `persistable-channel`, `channel`, `level`, `redis`, `dummy` - `DATADIR`: **queues/**: Base DataDir for storing persistent and level queues. - `LENGTH`: **20**: Maximal queue size before channel queues block - `BATCH_LENGTH`: **20**: Batch data before passing to the handler diff --git a/modules/queue/queue_batch.go b/modules/queue/queue_batch.go deleted file mode 100644 index 2731ac5e23c..00000000000 --- a/modules/queue/queue_batch.go +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package queue - -import ( - "context" - "time" - - "code.gitea.io/gitea/modules/log" -) - -// BatchedChannelQueueType is the type for batched channel queue -const BatchedChannelQueueType Type = "batched-channel" - -// BatchedChannelQueueConfiguration is the configuration for a BatchedChannelQueue -type BatchedChannelQueueConfiguration struct { - QueueLength int - BatchLength int - Workers int -} - -// BatchedChannelQueue implements -type BatchedChannelQueue struct { - *ChannelQueue - batchLength int -} - -// NewBatchedChannelQueue create a memory channel queue -func NewBatchedChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { - configInterface, err := toConfig(BatchedChannelQueueConfiguration{}, cfg) - if err != nil { - return nil, err - } - config := configInterface.(BatchedChannelQueueConfiguration) - return &BatchedChannelQueue{ - &ChannelQueue{ - queue: make(chan Data, config.QueueLength), - handle: handle, - exemplar: exemplar, - workers: config.Workers, - }, - config.BatchLength, - }, nil -} - -// Run starts to run the queue -func (c *BatchedChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { - atShutdown(context.Background(), func() { - log.Warn("BatchedChannelQueue is not shutdownable!") - }) - atTerminate(context.Background(), func() { - log.Warn("BatchedChannelQueue is not terminatable!") - }) - for i := 0; i < c.workers; i++ { - go func() { - delay := time.Millisecond * 300 - var datas = make([]Data, 0, c.batchLength) - for { - select { - case data := <-c.queue: - datas = append(datas, data) - if len(datas) >= c.batchLength { - c.handle(datas...) - datas = make([]Data, 0, c.batchLength) - } - case <-time.After(delay): - delay = time.Millisecond * 100 - if len(datas) > 0 { - c.handle(datas...) - datas = make([]Data, 0, c.batchLength) - } - } - } - }() - } -} - -func init() { - queuesMap[BatchedChannelQueueType] = NewBatchedChannelQueue -} diff --git a/modules/queue/queue_batch_test.go b/modules/queue/queue_batch_test.go deleted file mode 100644 index 13a85a0aada..00000000000 --- a/modules/queue/queue_batch_test.go +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package queue - -import "testing" - -import "github.com/stretchr/testify/assert" - -import "context" - -func TestBatchedChannelQueue(t *testing.T) { - handleChan := make(chan *testData) - handle := func(data ...Data) { - assert.True(t, len(data) == 2) - for _, datum := range data { - testDatum := datum.(*testData) - handleChan <- testDatum - } - } - - nilFn := func(_ context.Context, _ func()) {} - - queue, err := NewBatchedChannelQueue(handle, BatchedChannelQueueConfiguration{QueueLength: 20, BatchLength: 2, Workers: 1}, &testData{}) - assert.NoError(t, err) - - go queue.Run(nilFn, nilFn) - - test1 := testData{"A", 1} - test2 := testData{"B", 2} - - queue.Push(&test1) - go queue.Push(&test2) - - result1 := <-handleChan - assert.Equal(t, test1.TestString, result1.TestString) - assert.Equal(t, test1.TestInt, result1.TestInt) - - result2 := <-handleChan - assert.Equal(t, test2.TestString, result2.TestString) - assert.Equal(t, test2.TestInt, result2.TestInt) - - err = queue.Push(test1) - assert.Error(t, err) -} diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index 9d0ab11d21c..ebcf22ef793 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -8,6 +8,7 @@ import ( "context" "fmt" "reflect" + "time" "code.gitea.io/gitea/modules/log" ) @@ -17,14 +18,17 @@ const ChannelQueueType Type = "channel" // ChannelQueueConfiguration is the configuration for a ChannelQueue type ChannelQueueConfiguration struct { - QueueLength int - Workers int + QueueLength int + BatchLength int + Workers int + BlockTimeout time.Duration + BoostTimeout time.Duration + BoostWorkers int } // ChannelQueue implements type ChannelQueue struct { - queue chan Data - handle HandlerFunc + pool *WorkerPool exemplar interface{} workers int } @@ -36,9 +40,23 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro return nil, err } config := configInterface.(ChannelQueueConfiguration) + if config.BatchLength == 0 { + config.BatchLength = 1 + } + dataChan := make(chan Data, config.QueueLength) + + ctx, cancel := context.WithCancel(context.Background()) return &ChannelQueue{ - queue: make(chan Data, config.QueueLength), - handle: handle, + pool: &WorkerPool{ + baseCtx: ctx, + cancel: cancel, + batchLength: config.BatchLength, + handle: handle, + dataChan: dataChan, + blockTimeout: config.BlockTimeout, + boostTimeout: config.BoostTimeout, + boostWorkers: config.BoostWorkers, + }, exemplar: exemplar, workers: config.Workers, }, nil @@ -52,13 +70,7 @@ func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func()) atTerminate(context.Background(), func() { log.Warn("ChannelQueue is not terminatable!") }) - for i := 0; i < c.workers; i++ { - go func() { - for data := range c.queue { - c.handle(data) - } - }() - } + c.pool.addWorkers(c.pool.baseCtx, c.workers) } // Push will push the indexer data to queue @@ -71,7 +83,7 @@ func (c *ChannelQueue) Push(data Data) error { return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name) } } - c.queue <- data + c.pool.Push(data) return nil } diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go index 9e72bed85d7..c04407aa243 100644 --- a/modules/queue/queue_channel_test.go +++ b/modules/queue/queue_channel_test.go @@ -7,6 +7,7 @@ package queue import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -22,7 +23,14 @@ func TestChannelQueue(t *testing.T) { nilFn := func(_ context.Context, _ func()) {} - queue, err := NewChannelQueue(handle, ChannelQueueConfiguration{QueueLength: 20, Workers: 1}, &testData{}) + queue, err := NewChannelQueue(handle, + ChannelQueueConfiguration{ + QueueLength: 20, + Workers: 1, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + }, &testData{}) assert.NoError(t, err) go queue.Run(nilFn, nilFn) @@ -36,3 +44,46 @@ func TestChannelQueue(t *testing.T) { err = queue.Push(test1) assert.Error(t, err) } + +func TestChannelQueue_Batch(t *testing.T) { + handleChan := make(chan *testData) + handle := func(data ...Data) { + assert.True(t, len(data) == 2) + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + } + + nilFn := func(_ context.Context, _ func()) {} + + queue, err := NewChannelQueue(handle, + ChannelQueueConfiguration{ + QueueLength: 20, + BatchLength: 2, + Workers: 1, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(nilFn, nilFn) + + test1 := testData{"A", 1} + test2 := testData{"B", 2} + + queue.Push(&test1) + go queue.Push(&test2) + + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + result2 := <-handleChan + assert.Equal(t, test2.TestString, result2.TestString) + assert.Equal(t, test2.TestInt, result2.TestInt) + + err = queue.Push(test1) + assert.Error(t, err) +} diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go index 799bc98046f..50e49f3a29e 100644 --- a/modules/queue/queue_disk.go +++ b/modules/queue/queue_disk.go @@ -9,7 +9,6 @@ import ( "encoding/json" "fmt" "reflect" - "sync" "time" "code.gitea.io/gitea/modules/log" @@ -22,19 +21,23 @@ const LevelQueueType Type = "level" // LevelQueueConfiguration is the configuration for a LevelQueue type LevelQueueConfiguration struct { - DataDir string - BatchLength int - Workers int + DataDir string + QueueLength int + BatchLength int + Workers int + BlockTimeout time.Duration + BoostTimeout time.Duration + BoostWorkers int } // LevelQueue implements a disk library queue type LevelQueue struct { - handle HandlerFunc - queue *levelqueue.Queue - batchLength int - closed chan struct{} - exemplar interface{} - workers int + pool *WorkerPool + queue *levelqueue.Queue + closed chan struct{} + terminated chan struct{} + exemplar interface{} + workers int } // NewLevelQueue creates a ledis local queue @@ -50,13 +53,25 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) return nil, err } + dataChan := make(chan Data, config.QueueLength) + ctx, cancel := context.WithCancel(context.Background()) + return &LevelQueue{ - handle: handle, - queue: queue, - batchLength: config.BatchLength, - exemplar: exemplar, - closed: make(chan struct{}), - workers: config.Workers, + pool: &WorkerPool{ + baseCtx: ctx, + cancel: cancel, + batchLength: config.BatchLength, + handle: handle, + dataChan: dataChan, + blockTimeout: config.BlockTimeout, + boostTimeout: config.BoostTimeout, + boostWorkers: config.BoostWorkers, + }, + queue: queue, + exemplar: exemplar, + closed: make(chan struct{}), + terminated: make(chan struct{}), + workers: config.Workers, }, nil } @@ -65,72 +80,66 @@ func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) atShutdown(context.Background(), l.Shutdown) atTerminate(context.Background(), l.Terminate) - wg := sync.WaitGroup{} - for i := 0; i < l.workers; i++ { - wg.Add(1) - go func() { - l.worker() - wg.Done() - }() - } - wg.Wait() + go l.pool.addWorkers(l.pool.baseCtx, l.workers) + + go l.readToChan() + + log.Trace("Waiting til closed") + <-l.closed + + log.Trace("Waiting til done") + l.pool.Wait() + // FIXME: graceful: Needs HammerContext + log.Trace("Waiting til cleaned") + + l.pool.CleanUp(context.TODO()) + log.Trace("cleaned") + } -func (l *LevelQueue) worker() { - var i int - var datas = make([]Data, 0, l.batchLength) +func (l *LevelQueue) readToChan() { for { select { case <-l.closed: - if len(datas) > 0 { - log.Trace("Handling: %d data, %v", len(datas), datas) - l.handle(datas...) - } + // tell the pool to shutdown. + l.pool.cancel() return default: - } - i++ - if len(datas) > l.batchLength || (len(datas) > 0 && i > 3) { - log.Trace("Handling: %d data, %v", len(datas), datas) - l.handle(datas...) - datas = make([]Data, 0, l.batchLength) - i = 0 - continue - } - - bs, err := l.queue.RPop() - if err != nil { - if err != levelqueue.ErrNotFound { - log.Error("RPop: %v", err) + bs, err := l.queue.RPop() + if err != nil { + if err != levelqueue.ErrNotFound { + log.Error("RPop: %v", err) + } + time.Sleep(time.Millisecond * 100) + continue } - time.Sleep(time.Millisecond * 100) - continue - } - if len(bs) == 0 { - time.Sleep(time.Millisecond * 100) - continue - } + if len(bs) == 0 { + time.Sleep(time.Millisecond * 100) + continue + } - var data Data - if l.exemplar != nil { - t := reflect.TypeOf(l.exemplar) - n := reflect.New(t) - ne := n.Elem() - err = json.Unmarshal(bs, ne.Addr().Interface()) - data = ne.Interface().(Data) - } else { - err = json.Unmarshal(bs, &data) - } - if err != nil { - log.Error("Unmarshal: %v", err) + var data Data + if l.exemplar != nil { + t := reflect.TypeOf(l.exemplar) + n := reflect.New(t) + ne := n.Elem() + err = json.Unmarshal(bs, ne.Addr().Interface()) + data = ne.Interface().(Data) + } else { + err = json.Unmarshal(bs, &data) + } + if err != nil { + log.Error("LevelQueue failed to unmarshal: %v", err) + time.Sleep(time.Millisecond * 10) + continue + } + + log.Trace("LevelQueue: task found: %#v", data) + l.pool.Push(data) time.Sleep(time.Millisecond * 10) - continue + } - - log.Trace("LevelQueue: task found: %#v", data) - - datas = append(datas, data) } } @@ -163,6 +172,7 @@ func (l *LevelQueue) Shutdown() { // Terminate this queue and close the queue func (l *LevelQueue) Terminate() { + log.Trace("Terminating") l.Shutdown() if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" { log.Error("Error whilst closing internal queue: %v", err) diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 428e104fb5a..f3278271527 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -6,8 +6,9 @@ package queue import ( "context" - "sync" "time" + + "code.gitea.io/gitea/modules/log" ) // PersistableChannelQueueType is the type for persistable queue @@ -15,17 +16,20 @@ const PersistableChannelQueueType Type = "persistable-channel" // PersistableChannelQueueConfiguration is the configuration for a PersistableChannelQueue type PersistableChannelQueueConfiguration struct { - DataDir string - BatchLength int - QueueLength int - Timeout time.Duration - MaxAttempts int - Workers int + DataDir string + BatchLength int + QueueLength int + Timeout time.Duration + MaxAttempts int + Workers int + BlockTimeout time.Duration + BoostTimeout time.Duration + BoostWorkers int } // PersistableChannelQueue wraps a channel queue and level queue together type PersistableChannelQueue struct { - *BatchedChannelQueue + *ChannelQueue delayedStarter closed chan struct{} } @@ -39,26 +43,33 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( } config := configInterface.(PersistableChannelQueueConfiguration) - batchChannelQueue, err := NewBatchedChannelQueue(handle, BatchedChannelQueueConfiguration{ - QueueLength: config.QueueLength, - BatchLength: config.BatchLength, - Workers: config.Workers, + channelQueue, err := NewChannelQueue(handle, ChannelQueueConfiguration{ + QueueLength: config.QueueLength, + BatchLength: config.BatchLength, + Workers: config.Workers, + BlockTimeout: config.BlockTimeout, + BoostTimeout: config.BoostTimeout, + BoostWorkers: config.BoostWorkers, }, exemplar) if err != nil { return nil, err } - // the level backend only needs one worker to catch up with the previously dropped work + // the level backend only needs temporary workrers to catch up with the previously dropped work levelCfg := LevelQueueConfiguration{ - DataDir: config.DataDir, - BatchLength: config.BatchLength, - Workers: 1, + DataDir: config.DataDir, + QueueLength: config.QueueLength, + BatchLength: config.BatchLength, + Workers: 1, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, } levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar) if err == nil { return &PersistableChannelQueue{ - BatchedChannelQueue: batchChannelQueue.(*BatchedChannelQueue), + ChannelQueue: channelQueue.(*ChannelQueue), delayedStarter: delayedStarter{ internal: levelQueue.(*LevelQueue), }, @@ -71,7 +82,7 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( } return &PersistableChannelQueue{ - BatchedChannelQueue: batchChannelQueue.(*BatchedChannelQueue), + ChannelQueue: channelQueue.(*ChannelQueue), delayedStarter: delayedStarter{ cfg: levelCfg, underlying: LevelQueueType, @@ -88,7 +99,7 @@ func (p *PersistableChannelQueue) Push(data Data) error { case <-p.closed: return p.internal.Push(data) default: - return p.BatchedChannelQueue.Push(data) + return p.ChannelQueue.Push(data) } } @@ -96,7 +107,7 @@ func (p *PersistableChannelQueue) Push(data Data) error { func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { p.lock.Lock() if p.internal == nil { - p.setInternal(atShutdown, p.handle, p.exemplar) + p.setInternal(atShutdown, p.ChannelQueue.pool.handle, p.exemplar) } else { p.lock.Unlock() } @@ -106,44 +117,16 @@ func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte // Just run the level queue - we shut it down later go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) - wg := sync.WaitGroup{} - for i := 0; i < p.workers; i++ { - wg.Add(1) - go func() { - p.worker() - wg.Done() - }() - } - wg.Wait() -} + go p.ChannelQueue.pool.addWorkers(p.ChannelQueue.pool.baseCtx, p.workers) -func (p *PersistableChannelQueue) worker() { - delay := time.Millisecond * 300 - var datas = make([]Data, 0, p.batchLength) -loop: - for { - select { - case data := <-p.queue: - datas = append(datas, data) - if len(datas) >= p.batchLength { - p.handle(datas...) - datas = make([]Data, 0, p.batchLength) - } - case <-time.After(delay): - delay = time.Millisecond * 100 - if len(datas) > 0 { - p.handle(datas...) - datas = make([]Data, 0, p.batchLength) - } - case <-p.closed: - if len(datas) > 0 { - p.handle(datas...) - } - break loop - } - } + <-p.closed + p.ChannelQueue.pool.cancel() + p.internal.(*LevelQueue).pool.cancel() + p.ChannelQueue.pool.Wait() + p.internal.(*LevelQueue).pool.Wait() + // Redirect all remaining data in the chan to the internal channel go func() { - for data := range p.queue { + for data := range p.ChannelQueue.pool.dataChan { _ = p.internal.Push(data) } }() @@ -154,17 +137,18 @@ func (p *PersistableChannelQueue) Shutdown() { select { case <-p.closed: default: - close(p.closed) p.lock.Lock() defer p.lock.Unlock() if p.internal != nil { p.internal.(*LevelQueue).Shutdown() } + close(p.closed) } } // Terminate this queue and close the queue func (p *PersistableChannelQueue) Terminate() { + log.Trace("Terminating") p.Shutdown() p.lock.Lock() defer p.lock.Unlock() diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go index 7033fc6a34a..b9c6f278ef5 100644 --- a/modules/queue/queue_disk_test.go +++ b/modules/queue/queue_disk_test.go @@ -27,9 +27,13 @@ func TestLevelQueue(t *testing.T) { var queueTerminate func() queue, err := NewLevelQueue(handle, LevelQueueConfiguration{ - DataDir: "level-queue-test-data", - BatchLength: 2, - Workers: 1, + DataDir: "level-queue-test-data", + BatchLength: 2, + Workers: 1, + QueueLength: 20, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, }, &testData{}) assert.NoError(t, err) @@ -75,9 +79,13 @@ func TestLevelQueue(t *testing.T) { // Reopen queue queue, err = NewLevelQueue(handle, LevelQueueConfiguration{ - DataDir: "level-queue-test-data", - BatchLength: 2, - Workers: 1, + DataDir: "level-queue-test-data", + BatchLength: 2, + Workers: 1, + QueueLength: 20, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, }, &testData{}) assert.NoError(t, err) diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go index 80ce67233c3..acc6feeb95e 100644 --- a/modules/queue/queue_redis.go +++ b/modules/queue/queue_redis.go @@ -11,7 +11,6 @@ import ( "fmt" "reflect" "strings" - "sync" "time" "code.gitea.io/gitea/modules/log" @@ -31,23 +30,26 @@ type redisClient interface { // RedisQueue redis queue type RedisQueue struct { - client redisClient - queueName string - handle HandlerFunc - batchLength int - closed chan struct{} - exemplar interface{} - workers int + pool *WorkerPool + client redisClient + queueName string + closed chan struct{} + exemplar interface{} + workers int } // RedisQueueConfiguration is the configuration for the redis queue type RedisQueueConfiguration struct { - Addresses string - Password string - DBIndex int - BatchLength int - QueueName string - Workers int + Addresses string + Password string + DBIndex int + BatchLength int + QueueLength int + QueueName string + Workers int + BlockTimeout time.Duration + BoostTimeout time.Duration + BoostWorkers int } // NewRedisQueue creates single redis or cluster redis queue @@ -59,13 +61,25 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) config := configInterface.(RedisQueueConfiguration) dbs := strings.Split(config.Addresses, ",") + + dataChan := make(chan Data, config.QueueLength) + ctx, cancel := context.WithCancel(context.Background()) + var queue = RedisQueue{ - queueName: config.QueueName, - handle: handle, - batchLength: config.BatchLength, - exemplar: exemplar, - closed: make(chan struct{}), - workers: config.Workers, + pool: &WorkerPool{ + baseCtx: ctx, + cancel: cancel, + batchLength: config.BatchLength, + handle: handle, + dataChan: dataChan, + blockTimeout: config.BlockTimeout, + boostTimeout: config.BoostTimeout, + boostWorkers: config.BoostWorkers, + }, + queueName: config.QueueName, + exemplar: exemplar, + closed: make(chan struct{}), + workers: config.Workers, } if len(dbs) == 0 { return nil, errors.New("no redis host found") @@ -90,79 +104,57 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) { atShutdown(context.Background(), r.Shutdown) atTerminate(context.Background(), r.Terminate) - wg := sync.WaitGroup{} - for i := 0; i < r.workers; i++ { - wg.Add(1) - go func() { - r.worker() - wg.Done() - }() - } - wg.Wait() + + go r.pool.addWorkers(r.pool.baseCtx, r.workers) + + go r.readToChan() + + <-r.closed + r.pool.Wait() + // FIXME: graceful: Needs HammerContext + r.pool.CleanUp(context.TODO()) } -func (r *RedisQueue) worker() { - var i int - var datas = make([]Data, 0, r.batchLength) +func (r *RedisQueue) readToChan() { for { select { case <-r.closed: - if len(datas) > 0 { - log.Trace("Handling: %d data, %v", len(datas), datas) - r.handle(datas...) - } + // tell the pool to shutdown + r.pool.cancel() return default: - } - bs, err := r.client.LPop(r.queueName).Bytes() - if err != nil && err != redis.Nil { - log.Error("LPop failed: %v", err) - time.Sleep(time.Millisecond * 100) - continue - } - - i++ - if len(datas) > r.batchLength || (len(datas) > 0 && i > 3) { - log.Trace("Handling: %d data, %v", len(datas), datas) - r.handle(datas...) - datas = make([]Data, 0, r.batchLength) - i = 0 - } - - if len(bs) == 0 { - time.Sleep(time.Millisecond * 100) - continue - } - - var data Data - if r.exemplar != nil { - t := reflect.TypeOf(r.exemplar) - n := reflect.New(t) - ne := n.Elem() - err = json.Unmarshal(bs, ne.Addr().Interface()) - data = ne.Interface().(Data) - } else { - err = json.Unmarshal(bs, &data) - } - if err != nil { - log.Error("Unmarshal: %v", err) - time.Sleep(time.Millisecond * 100) - continue - } - - log.Trace("RedisQueue: task found: %#v", data) - - datas = append(datas, data) - select { - case <-r.closed: - if len(datas) > 0 { - log.Trace("Handling: %d data, %v", len(datas), datas) - r.handle(datas...) + bs, err := r.client.LPop(r.queueName).Bytes() + if err != nil && err != redis.Nil { + log.Error("LPop failed: %v", err) + time.Sleep(time.Millisecond * 100) + continue } - return - default: + + if len(bs) == 0 { + time.Sleep(time.Millisecond * 100) + continue + } + + var data Data + if r.exemplar != nil { + t := reflect.TypeOf(r.exemplar) + n := reflect.New(t) + ne := n.Elem() + err = json.Unmarshal(bs, ne.Addr().Interface()) + data = ne.Interface().(Data) + } else { + err = json.Unmarshal(bs, &data) + } + if err != nil { + log.Error("Unmarshal: %v", err) + time.Sleep(time.Millisecond * 100) + continue + } + + log.Trace("RedisQueue: task found: %#v", data) + r.pool.Push(data) + time.Sleep(time.Millisecond * 10) } - time.Sleep(time.Millisecond * 100) } } diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go new file mode 100644 index 00000000000..02e053a427b --- /dev/null +++ b/modules/queue/workerpool.go @@ -0,0 +1,239 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "sync" + "time" + + "code.gitea.io/gitea/modules/log" +) + +// WorkerPool takes +type WorkerPool struct { + lock sync.Mutex + baseCtx context.Context + cancel context.CancelFunc + cond *sync.Cond + numberOfWorkers int + batchLength int + handle HandlerFunc + dataChan chan Data + blockTimeout time.Duration + boostTimeout time.Duration + boostWorkers int +} + +// Push pushes the data to the internal channel +func (p *WorkerPool) Push(data Data) { + p.lock.Lock() + if p.blockTimeout > 0 && p.boostTimeout > 0 { + p.lock.Unlock() + p.pushBoost(data) + } else { + p.lock.Unlock() + p.dataChan <- data + } +} + +func (p *WorkerPool) pushBoost(data Data) { + select { + case p.dataChan <- data: + default: + p.lock.Lock() + if p.blockTimeout <= 0 { + p.lock.Unlock() + p.dataChan <- data + return + } + ourTimeout := p.blockTimeout + timer := time.NewTimer(p.blockTimeout) + p.lock.Unlock() + select { + case p.dataChan <- data: + if timer.Stop() { + select { + case <-timer.C: + default: + } + } + case <-timer.C: + p.lock.Lock() + if p.blockTimeout > ourTimeout { + p.lock.Unlock() + p.dataChan <- data + return + } + p.blockTimeout *= 2 + log.Warn("Worker Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout) + ctx, cancel := context.WithCancel(p.baseCtx) + go func() { + <-time.After(p.boostTimeout) + cancel() + p.lock.Lock() + p.blockTimeout /= 2 + p.lock.Unlock() + }() + p.addWorkers(ctx, p.boostWorkers) + p.lock.Unlock() + p.dataChan <- data + } + } +} + +// NumberOfWorkers returns the number of current workers in the pool +func (p *WorkerPool) NumberOfWorkers() int { + p.lock.Lock() + defer p.lock.Unlock() + return p.numberOfWorkers +} + +// AddWorkers adds workers to the pool +func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc { + var ctx context.Context + var cancel context.CancelFunc + if timeout > 0 { + ctx, cancel = context.WithTimeout(p.baseCtx, timeout) + } else { + ctx, cancel = context.WithCancel(p.baseCtx) + } + + p.addWorkers(ctx, number) + return cancel +} + +// addWorkers adds workers to the pool +func (p *WorkerPool) addWorkers(ctx context.Context, number int) { + for i := 0; i < number; i++ { + p.lock.Lock() + if p.cond == nil { + p.cond = sync.NewCond(&p.lock) + } + p.numberOfWorkers++ + p.lock.Unlock() + go func() { + p.doWork(ctx) + + p.lock.Lock() + p.numberOfWorkers-- + if p.numberOfWorkers <= 0 { + // numberOfWorkers can't go negative but... + p.numberOfWorkers = 0 + p.cond.Broadcast() + } + p.lock.Unlock() + }() + } +} + +// Wait for WorkerPool to finish +func (p *WorkerPool) Wait() { + p.lock.Lock() + defer p.lock.Unlock() + if p.cond == nil { + p.cond = sync.NewCond(&p.lock) + } + if p.numberOfWorkers <= 0 { + return + } + p.cond.Wait() +} + +// CleanUp will drain the remaining contents of the channel +// This should be called after AddWorkers context is closed +func (p *WorkerPool) CleanUp(ctx context.Context) { + log.Trace("CleanUp") + close(p.dataChan) + for data := range p.dataChan { + p.handle(data) + select { + case <-ctx.Done(): + log.Warn("Cleanup context closed before finishing clean-up") + return + default: + } + } + log.Trace("CleanUp done") +} + +func (p *WorkerPool) doWork(ctx context.Context) { + delay := time.Millisecond * 300 + var data = make([]Data, 0, p.batchLength) + for { + select { + case <-ctx.Done(): + if len(data) > 0 { + log.Trace("Handling: %d data, %v", len(data), data) + p.handle(data...) + } + log.Trace("Worker shutting down") + return + case datum, ok := <-p.dataChan: + if !ok { + // the dataChan has been closed - we should finish up: + if len(data) > 0 { + log.Trace("Handling: %d data, %v", len(data), data) + p.handle(data...) + } + log.Trace("Worker shutting down") + return + } + data = append(data, datum) + if len(data) >= p.batchLength { + log.Trace("Handling: %d data, %v", len(data), data) + p.handle(data...) + data = make([]Data, 0, p.batchLength) + } + default: + timer := time.NewTimer(delay) + select { + case <-ctx.Done(): + if timer.Stop() { + select { + case <-timer.C: + default: + } + } + if len(data) > 0 { + log.Trace("Handling: %d data, %v", len(data), data) + p.handle(data...) + } + log.Trace("Worker shutting down") + return + case datum, ok := <-p.dataChan: + if timer.Stop() { + select { + case <-timer.C: + default: + } + } + if !ok { + // the dataChan has been closed - we should finish up: + if len(data) > 0 { + log.Trace("Handling: %d data, %v", len(data), data) + p.handle(data...) + } + log.Trace("Worker shutting down") + return + } + data = append(data, datum) + if len(data) >= p.batchLength { + log.Trace("Handling: %d data, %v", len(data), data) + p.handle(data...) + data = make([]Data, 0, p.batchLength) + } + case <-timer.C: + delay = time.Millisecond * 100 + if len(data) > 0 { + log.Trace("Handling: %d data, %v", len(data), data) + p.handle(data...) + data = make([]Data, 0, p.batchLength) + } + + } + } + } +} diff --git a/modules/setting/queue.go b/modules/setting/queue.go index 1a33e232c33..b619c9855a7 100644 --- a/modules/setting/queue.go +++ b/modules/setting/queue.go @@ -28,6 +28,9 @@ type queueSettings struct { MaxAttempts int Timeout time.Duration Workers int + BlockTimeout time.Duration + BoostTimeout time.Duration + BoostWorkers int } // Queue settings @@ -45,6 +48,9 @@ func CreateQueue(name string, handle queue.HandlerFunc, exemplar interface{}) qu opts["DBIndex"] = q.DBIndex opts["QueueName"] = name opts["Workers"] = q.Workers + opts["BlockTimeout"] = q.BlockTimeout + opts["BoostTimeout"] = q.BoostTimeout + opts["BoostWorkers"] = q.BoostWorkers cfg, err := json.Marshal(opts) if err != nil { @@ -96,7 +102,10 @@ func getQueueSettings(name string) queueSettings { q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary) q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts) q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout) - q.Workers = sec.Key("WORKER").MustInt(Queue.Workers) + q.Workers = sec.Key("WORKERS").MustInt(Queue.Workers) + q.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(Queue.BlockTimeout) + q.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(Queue.BoostTimeout) + q.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(Queue.BoostWorkers) q.Addresses, q.Password, q.DBIndex, _ = ParseQueueConnStr(q.ConnectionString) return q @@ -117,7 +126,10 @@ func newQueueService() { Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true) Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10) Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second) - Queue.Workers = sec.Key("WORKER").MustInt(1) + Queue.Workers = sec.Key("WORKERS").MustInt(1) + Queue.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(1 * time.Second) + Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute) + Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(5) hasWorkers := false for _, key := range Cfg.Section("queue.notification").Keys() {