Queue: Make resizing worker pools

This commit is contained in:
Andrew Thornton 2019-12-07 16:44:37 +00:00
parent 0edb70a099
commit e6ebb47299
No known key found for this signature in database
GPG Key ID: 3CDE74631F13A748
11 changed files with 543 additions and 363 deletions

View File

@ -236,7 +236,7 @@ relation to port exhaustion.
## Queue (`queue`)
- `TYPE`: **persistable-channel**: General queue type, currently support: `persistable-channel`, `batched-channel`, `channel`, `level`, `redis`, `dummy`
- `TYPE`: **persistable-channel**: General queue type, currently support: `persistable-channel`, `channel`, `level`, `redis`, `dummy`
- `DATADIR`: **queues/**: Base DataDir for storing persistent and level queues.
- `LENGTH`: **20**: Maximal queue size before channel queues block
- `BATCH_LENGTH`: **20**: Batch data before passing to the handler

View File

@ -1,82 +0,0 @@
// Copyright 2019 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package queue
import (
"context"
"time"
"code.gitea.io/gitea/modules/log"
)
// BatchedChannelQueueType is the type for batched channel queue
const BatchedChannelQueueType Type = "batched-channel"
// BatchedChannelQueueConfiguration is the configuration for a BatchedChannelQueue
type BatchedChannelQueueConfiguration struct {
QueueLength int
BatchLength int
Workers int
}
// BatchedChannelQueue implements
type BatchedChannelQueue struct {
*ChannelQueue
batchLength int
}
// NewBatchedChannelQueue create a memory channel queue
func NewBatchedChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
configInterface, err := toConfig(BatchedChannelQueueConfiguration{}, cfg)
if err != nil {
return nil, err
}
config := configInterface.(BatchedChannelQueueConfiguration)
return &BatchedChannelQueue{
&ChannelQueue{
queue: make(chan Data, config.QueueLength),
handle: handle,
exemplar: exemplar,
workers: config.Workers,
},
config.BatchLength,
}, nil
}
// Run starts to run the queue
func (c *BatchedChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
atShutdown(context.Background(), func() {
log.Warn("BatchedChannelQueue is not shutdownable!")
})
atTerminate(context.Background(), func() {
log.Warn("BatchedChannelQueue is not terminatable!")
})
for i := 0; i < c.workers; i++ {
go func() {
delay := time.Millisecond * 300
var datas = make([]Data, 0, c.batchLength)
for {
select {
case data := <-c.queue:
datas = append(datas, data)
if len(datas) >= c.batchLength {
c.handle(datas...)
datas = make([]Data, 0, c.batchLength)
}
case <-time.After(delay):
delay = time.Millisecond * 100
if len(datas) > 0 {
c.handle(datas...)
datas = make([]Data, 0, c.batchLength)
}
}
}
}()
}
}
func init() {
queuesMap[BatchedChannelQueueType] = NewBatchedChannelQueue
}

View File

@ -1,46 +0,0 @@
// Copyright 2019 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package queue
import "testing"
import "github.com/stretchr/testify/assert"
import "context"
func TestBatchedChannelQueue(t *testing.T) {
handleChan := make(chan *testData)
handle := func(data ...Data) {
assert.True(t, len(data) == 2)
for _, datum := range data {
testDatum := datum.(*testData)
handleChan <- testDatum
}
}
nilFn := func(_ context.Context, _ func()) {}
queue, err := NewBatchedChannelQueue(handle, BatchedChannelQueueConfiguration{QueueLength: 20, BatchLength: 2, Workers: 1}, &testData{})
assert.NoError(t, err)
go queue.Run(nilFn, nilFn)
test1 := testData{"A", 1}
test2 := testData{"B", 2}
queue.Push(&test1)
go queue.Push(&test2)
result1 := <-handleChan
assert.Equal(t, test1.TestString, result1.TestString)
assert.Equal(t, test1.TestInt, result1.TestInt)
result2 := <-handleChan
assert.Equal(t, test2.TestString, result2.TestString)
assert.Equal(t, test2.TestInt, result2.TestInt)
err = queue.Push(test1)
assert.Error(t, err)
}

View File

@ -8,6 +8,7 @@ import (
"context"
"fmt"
"reflect"
"time"
"code.gitea.io/gitea/modules/log"
)
@ -18,13 +19,16 @@ const ChannelQueueType Type = "channel"
// ChannelQueueConfiguration is the configuration for a ChannelQueue
type ChannelQueueConfiguration struct {
QueueLength int
BatchLength int
Workers int
BlockTimeout time.Duration
BoostTimeout time.Duration
BoostWorkers int
}
// ChannelQueue implements
type ChannelQueue struct {
queue chan Data
handle HandlerFunc
pool *WorkerPool
exemplar interface{}
workers int
}
@ -36,9 +40,23 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
return nil, err
}
config := configInterface.(ChannelQueueConfiguration)
if config.BatchLength == 0 {
config.BatchLength = 1
}
dataChan := make(chan Data, config.QueueLength)
ctx, cancel := context.WithCancel(context.Background())
return &ChannelQueue{
queue: make(chan Data, config.QueueLength),
pool: &WorkerPool{
baseCtx: ctx,
cancel: cancel,
batchLength: config.BatchLength,
handle: handle,
dataChan: dataChan,
blockTimeout: config.BlockTimeout,
boostTimeout: config.BoostTimeout,
boostWorkers: config.BoostWorkers,
},
exemplar: exemplar,
workers: config.Workers,
}, nil
@ -52,13 +70,7 @@ func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())
atTerminate(context.Background(), func() {
log.Warn("ChannelQueue is not terminatable!")
})
for i := 0; i < c.workers; i++ {
go func() {
for data := range c.queue {
c.handle(data)
}
}()
}
c.pool.addWorkers(c.pool.baseCtx, c.workers)
}
// Push will push the indexer data to queue
@ -71,7 +83,7 @@ func (c *ChannelQueue) Push(data Data) error {
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name)
}
}
c.queue <- data
c.pool.Push(data)
return nil
}

View File

@ -7,6 +7,7 @@ package queue
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
@ -22,7 +23,14 @@ func TestChannelQueue(t *testing.T) {
nilFn := func(_ context.Context, _ func()) {}
queue, err := NewChannelQueue(handle, ChannelQueueConfiguration{QueueLength: 20, Workers: 1}, &testData{})
queue, err := NewChannelQueue(handle,
ChannelQueueConfiguration{
QueueLength: 20,
Workers: 1,
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
}, &testData{})
assert.NoError(t, err)
go queue.Run(nilFn, nilFn)
@ -36,3 +44,46 @@ func TestChannelQueue(t *testing.T) {
err = queue.Push(test1)
assert.Error(t, err)
}
func TestChannelQueue_Batch(t *testing.T) {
handleChan := make(chan *testData)
handle := func(data ...Data) {
assert.True(t, len(data) == 2)
for _, datum := range data {
testDatum := datum.(*testData)
handleChan <- testDatum
}
}
nilFn := func(_ context.Context, _ func()) {}
queue, err := NewChannelQueue(handle,
ChannelQueueConfiguration{
QueueLength: 20,
BatchLength: 2,
Workers: 1,
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
}, &testData{})
assert.NoError(t, err)
go queue.Run(nilFn, nilFn)
test1 := testData{"A", 1}
test2 := testData{"B", 2}
queue.Push(&test1)
go queue.Push(&test2)
result1 := <-handleChan
assert.Equal(t, test1.TestString, result1.TestString)
assert.Equal(t, test1.TestInt, result1.TestInt)
result2 := <-handleChan
assert.Equal(t, test2.TestString, result2.TestString)
assert.Equal(t, test2.TestInt, result2.TestInt)
err = queue.Push(test1)
assert.Error(t, err)
}

View File

@ -9,7 +9,6 @@ import (
"encoding/json"
"fmt"
"reflect"
"sync"
"time"
"code.gitea.io/gitea/modules/log"
@ -23,16 +22,20 @@ const LevelQueueType Type = "level"
// LevelQueueConfiguration is the configuration for a LevelQueue
type LevelQueueConfiguration struct {
DataDir string
QueueLength int
BatchLength int
Workers int
BlockTimeout time.Duration
BoostTimeout time.Duration
BoostWorkers int
}
// LevelQueue implements a disk library queue
type LevelQueue struct {
handle HandlerFunc
pool *WorkerPool
queue *levelqueue.Queue
batchLength int
closed chan struct{}
terminated chan struct{}
exemplar interface{}
workers int
}
@ -50,12 +53,24 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
return nil, err
}
dataChan := make(chan Data, config.QueueLength)
ctx, cancel := context.WithCancel(context.Background())
return &LevelQueue{
handle: handle,
queue: queue,
pool: &WorkerPool{
baseCtx: ctx,
cancel: cancel,
batchLength: config.BatchLength,
handle: handle,
dataChan: dataChan,
blockTimeout: config.BlockTimeout,
boostTimeout: config.BoostTimeout,
boostWorkers: config.BoostWorkers,
},
queue: queue,
exemplar: exemplar,
closed: make(chan struct{}),
terminated: make(chan struct{}),
workers: config.Workers,
}, nil
}
@ -65,39 +80,31 @@ func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func()))
atShutdown(context.Background(), l.Shutdown)
atTerminate(context.Background(), l.Terminate)
wg := sync.WaitGroup{}
for i := 0; i < l.workers; i++ {
wg.Add(1)
go func() {
l.worker()
wg.Done()
}()
}
wg.Wait()
go l.pool.addWorkers(l.pool.baseCtx, l.workers)
go l.readToChan()
log.Trace("Waiting til closed")
<-l.closed
log.Trace("Waiting til done")
l.pool.Wait()
// FIXME: graceful: Needs HammerContext
log.Trace("Waiting til cleaned")
l.pool.CleanUp(context.TODO())
log.Trace("cleaned")
}
func (l *LevelQueue) worker() {
var i int
var datas = make([]Data, 0, l.batchLength)
func (l *LevelQueue) readToChan() {
for {
select {
case <-l.closed:
if len(datas) > 0 {
log.Trace("Handling: %d data, %v", len(datas), datas)
l.handle(datas...)
}
// tell the pool to shutdown.
l.pool.cancel()
return
default:
}
i++
if len(datas) > l.batchLength || (len(datas) > 0 && i > 3) {
log.Trace("Handling: %d data, %v", len(datas), datas)
l.handle(datas...)
datas = make([]Data, 0, l.batchLength)
i = 0
continue
}
bs, err := l.queue.RPop()
if err != nil {
if err != levelqueue.ErrNotFound {
@ -123,14 +130,16 @@ func (l *LevelQueue) worker() {
err = json.Unmarshal(bs, &data)
}
if err != nil {
log.Error("Unmarshal: %v", err)
log.Error("LevelQueue failed to unmarshal: %v", err)
time.Sleep(time.Millisecond * 10)
continue
}
log.Trace("LevelQueue: task found: %#v", data)
l.pool.Push(data)
time.Sleep(time.Millisecond * 10)
datas = append(datas, data)
}
}
}
@ -163,6 +172,7 @@ func (l *LevelQueue) Shutdown() {
// Terminate this queue and close the queue
func (l *LevelQueue) Terminate() {
log.Trace("Terminating")
l.Shutdown()
if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" {
log.Error("Error whilst closing internal queue: %v", err)

View File

@ -6,8 +6,9 @@ package queue
import (
"context"
"sync"
"time"
"code.gitea.io/gitea/modules/log"
)
// PersistableChannelQueueType is the type for persistable queue
@ -21,11 +22,14 @@ type PersistableChannelQueueConfiguration struct {
Timeout time.Duration
MaxAttempts int
Workers int
BlockTimeout time.Duration
BoostTimeout time.Duration
BoostWorkers int
}
// PersistableChannelQueue wraps a channel queue and level queue together
type PersistableChannelQueue struct {
*BatchedChannelQueue
*ChannelQueue
delayedStarter
closed chan struct{}
}
@ -39,26 +43,33 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
}
config := configInterface.(PersistableChannelQueueConfiguration)
batchChannelQueue, err := NewBatchedChannelQueue(handle, BatchedChannelQueueConfiguration{
channelQueue, err := NewChannelQueue(handle, ChannelQueueConfiguration{
QueueLength: config.QueueLength,
BatchLength: config.BatchLength,
Workers: config.Workers,
BlockTimeout: config.BlockTimeout,
BoostTimeout: config.BoostTimeout,
BoostWorkers: config.BoostWorkers,
}, exemplar)
if err != nil {
return nil, err
}
// the level backend only needs one worker to catch up with the previously dropped work
// the level backend only needs temporary workrers to catch up with the previously dropped work
levelCfg := LevelQueueConfiguration{
DataDir: config.DataDir,
QueueLength: config.QueueLength,
BatchLength: config.BatchLength,
Workers: 1,
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
}
levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar)
if err == nil {
return &PersistableChannelQueue{
BatchedChannelQueue: batchChannelQueue.(*BatchedChannelQueue),
ChannelQueue: channelQueue.(*ChannelQueue),
delayedStarter: delayedStarter{
internal: levelQueue.(*LevelQueue),
},
@ -71,7 +82,7 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
}
return &PersistableChannelQueue{
BatchedChannelQueue: batchChannelQueue.(*BatchedChannelQueue),
ChannelQueue: channelQueue.(*ChannelQueue),
delayedStarter: delayedStarter{
cfg: levelCfg,
underlying: LevelQueueType,
@ -88,7 +99,7 @@ func (p *PersistableChannelQueue) Push(data Data) error {
case <-p.closed:
return p.internal.Push(data)
default:
return p.BatchedChannelQueue.Push(data)
return p.ChannelQueue.Push(data)
}
}
@ -96,7 +107,7 @@ func (p *PersistableChannelQueue) Push(data Data) error {
func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
p.lock.Lock()
if p.internal == nil {
p.setInternal(atShutdown, p.handle, p.exemplar)
p.setInternal(atShutdown, p.ChannelQueue.pool.handle, p.exemplar)
} else {
p.lock.Unlock()
}
@ -106,44 +117,16 @@ func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte
// Just run the level queue - we shut it down later
go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
wg := sync.WaitGroup{}
for i := 0; i < p.workers; i++ {
wg.Add(1)
go func() {
p.worker()
wg.Done()
}()
}
wg.Wait()
}
go p.ChannelQueue.pool.addWorkers(p.ChannelQueue.pool.baseCtx, p.workers)
func (p *PersistableChannelQueue) worker() {
delay := time.Millisecond * 300
var datas = make([]Data, 0, p.batchLength)
loop:
for {
select {
case data := <-p.queue:
datas = append(datas, data)
if len(datas) >= p.batchLength {
p.handle(datas...)
datas = make([]Data, 0, p.batchLength)
}
case <-time.After(delay):
delay = time.Millisecond * 100
if len(datas) > 0 {
p.handle(datas...)
datas = make([]Data, 0, p.batchLength)
}
case <-p.closed:
if len(datas) > 0 {
p.handle(datas...)
}
break loop
}
}
<-p.closed
p.ChannelQueue.pool.cancel()
p.internal.(*LevelQueue).pool.cancel()
p.ChannelQueue.pool.Wait()
p.internal.(*LevelQueue).pool.Wait()
// Redirect all remaining data in the chan to the internal channel
go func() {
for data := range p.queue {
for data := range p.ChannelQueue.pool.dataChan {
_ = p.internal.Push(data)
}
}()
@ -154,17 +137,18 @@ func (p *PersistableChannelQueue) Shutdown() {
select {
case <-p.closed:
default:
close(p.closed)
p.lock.Lock()
defer p.lock.Unlock()
if p.internal != nil {
p.internal.(*LevelQueue).Shutdown()
}
close(p.closed)
}
}
// Terminate this queue and close the queue
func (p *PersistableChannelQueue) Terminate() {
log.Trace("Terminating")
p.Shutdown()
p.lock.Lock()
defer p.lock.Unlock()

View File

@ -30,6 +30,10 @@ func TestLevelQueue(t *testing.T) {
DataDir: "level-queue-test-data",
BatchLength: 2,
Workers: 1,
QueueLength: 20,
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
}, &testData{})
assert.NoError(t, err)
@ -78,6 +82,10 @@ func TestLevelQueue(t *testing.T) {
DataDir: "level-queue-test-data",
BatchLength: 2,
Workers: 1,
QueueLength: 20,
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
}, &testData{})
assert.NoError(t, err)

View File

@ -11,7 +11,6 @@ import (
"fmt"
"reflect"
"strings"
"sync"
"time"
"code.gitea.io/gitea/modules/log"
@ -31,10 +30,9 @@ type redisClient interface {
// RedisQueue redis queue
type RedisQueue struct {
pool *WorkerPool
client redisClient
queueName string
handle HandlerFunc
batchLength int
closed chan struct{}
exemplar interface{}
workers int
@ -46,8 +44,12 @@ type RedisQueueConfiguration struct {
Password string
DBIndex int
BatchLength int
QueueLength int
QueueName string
Workers int
BlockTimeout time.Duration
BoostTimeout time.Duration
BoostWorkers int
}
// NewRedisQueue creates single redis or cluster redis queue
@ -59,10 +61,22 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
config := configInterface.(RedisQueueConfiguration)
dbs := strings.Split(config.Addresses, ",")
dataChan := make(chan Data, config.QueueLength)
ctx, cancel := context.WithCancel(context.Background())
var queue = RedisQueue{
queueName: config.QueueName,
handle: handle,
pool: &WorkerPool{
baseCtx: ctx,
cancel: cancel,
batchLength: config.BatchLength,
handle: handle,
dataChan: dataChan,
blockTimeout: config.BlockTimeout,
boostTimeout: config.BoostTimeout,
boostWorkers: config.BoostWorkers,
},
queueName: config.QueueName,
exemplar: exemplar,
closed: make(chan struct{}),
workers: config.Workers,
@ -90,30 +104,25 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
atShutdown(context.Background(), r.Shutdown)
atTerminate(context.Background(), r.Terminate)
wg := sync.WaitGroup{}
for i := 0; i < r.workers; i++ {
wg.Add(1)
go func() {
r.worker()
wg.Done()
}()
}
wg.Wait()
go r.pool.addWorkers(r.pool.baseCtx, r.workers)
go r.readToChan()
<-r.closed
r.pool.Wait()
// FIXME: graceful: Needs HammerContext
r.pool.CleanUp(context.TODO())
}
func (r *RedisQueue) worker() {
var i int
var datas = make([]Data, 0, r.batchLength)
func (r *RedisQueue) readToChan() {
for {
select {
case <-r.closed:
if len(datas) > 0 {
log.Trace("Handling: %d data, %v", len(datas), datas)
r.handle(datas...)
}
// tell the pool to shutdown
r.pool.cancel()
return
default:
}
bs, err := r.client.LPop(r.queueName).Bytes()
if err != nil && err != redis.Nil {
log.Error("LPop failed: %v", err)
@ -121,14 +130,6 @@ func (r *RedisQueue) worker() {
continue
}
i++
if len(datas) > r.batchLength || (len(datas) > 0 && i > 3) {
log.Trace("Handling: %d data, %v", len(datas), datas)
r.handle(datas...)
datas = make([]Data, 0, r.batchLength)
i = 0
}
if len(bs) == 0 {
time.Sleep(time.Millisecond * 100)
continue
@ -151,18 +152,9 @@ func (r *RedisQueue) worker() {
}
log.Trace("RedisQueue: task found: %#v", data)
datas = append(datas, data)
select {
case <-r.closed:
if len(datas) > 0 {
log.Trace("Handling: %d data, %v", len(datas), datas)
r.handle(datas...)
r.pool.Push(data)
time.Sleep(time.Millisecond * 10)
}
return
default:
}
time.Sleep(time.Millisecond * 100)
}
}

239
modules/queue/workerpool.go Normal file
View File

@ -0,0 +1,239 @@
// Copyright 2019 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package queue
import (
"context"
"sync"
"time"
"code.gitea.io/gitea/modules/log"
)
// WorkerPool takes
type WorkerPool struct {
lock sync.Mutex
baseCtx context.Context
cancel context.CancelFunc
cond *sync.Cond
numberOfWorkers int
batchLength int
handle HandlerFunc
dataChan chan Data
blockTimeout time.Duration
boostTimeout time.Duration
boostWorkers int
}
// Push pushes the data to the internal channel
func (p *WorkerPool) Push(data Data) {
p.lock.Lock()
if p.blockTimeout > 0 && p.boostTimeout > 0 {
p.lock.Unlock()
p.pushBoost(data)
} else {
p.lock.Unlock()
p.dataChan <- data
}
}
func (p *WorkerPool) pushBoost(data Data) {
select {
case p.dataChan <- data:
default:
p.lock.Lock()
if p.blockTimeout <= 0 {
p.lock.Unlock()
p.dataChan <- data
return
}
ourTimeout := p.blockTimeout
timer := time.NewTimer(p.blockTimeout)
p.lock.Unlock()
select {
case p.dataChan <- data:
if timer.Stop() {
select {
case <-timer.C:
default:
}
}
case <-timer.C:
p.lock.Lock()
if p.blockTimeout > ourTimeout {
p.lock.Unlock()
p.dataChan <- data
return
}
p.blockTimeout *= 2
log.Warn("Worker Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout)
ctx, cancel := context.WithCancel(p.baseCtx)
go func() {
<-time.After(p.boostTimeout)
cancel()
p.lock.Lock()
p.blockTimeout /= 2
p.lock.Unlock()
}()
p.addWorkers(ctx, p.boostWorkers)
p.lock.Unlock()
p.dataChan <- data
}
}
}
// NumberOfWorkers returns the number of current workers in the pool
func (p *WorkerPool) NumberOfWorkers() int {
p.lock.Lock()
defer p.lock.Unlock()
return p.numberOfWorkers
}
// AddWorkers adds workers to the pool
func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
var ctx context.Context
var cancel context.CancelFunc
if timeout > 0 {
ctx, cancel = context.WithTimeout(p.baseCtx, timeout)
} else {
ctx, cancel = context.WithCancel(p.baseCtx)
}
p.addWorkers(ctx, number)
return cancel
}
// addWorkers adds workers to the pool
func (p *WorkerPool) addWorkers(ctx context.Context, number int) {
for i := 0; i < number; i++ {
p.lock.Lock()
if p.cond == nil {
p.cond = sync.NewCond(&p.lock)
}
p.numberOfWorkers++
p.lock.Unlock()
go func() {
p.doWork(ctx)
p.lock.Lock()
p.numberOfWorkers--
if p.numberOfWorkers <= 0 {
// numberOfWorkers can't go negative but...
p.numberOfWorkers = 0
p.cond.Broadcast()
}
p.lock.Unlock()
}()
}
}
// Wait for WorkerPool to finish
func (p *WorkerPool) Wait() {
p.lock.Lock()
defer p.lock.Unlock()
if p.cond == nil {
p.cond = sync.NewCond(&p.lock)
}
if p.numberOfWorkers <= 0 {
return
}
p.cond.Wait()
}
// CleanUp will drain the remaining contents of the channel
// This should be called after AddWorkers context is closed
func (p *WorkerPool) CleanUp(ctx context.Context) {
log.Trace("CleanUp")
close(p.dataChan)
for data := range p.dataChan {
p.handle(data)
select {
case <-ctx.Done():
log.Warn("Cleanup context closed before finishing clean-up")
return
default:
}
}
log.Trace("CleanUp done")
}
func (p *WorkerPool) doWork(ctx context.Context) {
delay := time.Millisecond * 300
var data = make([]Data, 0, p.batchLength)
for {
select {
case <-ctx.Done():
if len(data) > 0 {
log.Trace("Handling: %d data, %v", len(data), data)
p.handle(data...)
}
log.Trace("Worker shutting down")
return
case datum, ok := <-p.dataChan:
if !ok {
// the dataChan has been closed - we should finish up:
if len(data) > 0 {
log.Trace("Handling: %d data, %v", len(data), data)
p.handle(data...)
}
log.Trace("Worker shutting down")
return
}
data = append(data, datum)
if len(data) >= p.batchLength {
log.Trace("Handling: %d data, %v", len(data), data)
p.handle(data...)
data = make([]Data, 0, p.batchLength)
}
default:
timer := time.NewTimer(delay)
select {
case <-ctx.Done():
if timer.Stop() {
select {
case <-timer.C:
default:
}
}
if len(data) > 0 {
log.Trace("Handling: %d data, %v", len(data), data)
p.handle(data...)
}
log.Trace("Worker shutting down")
return
case datum, ok := <-p.dataChan:
if timer.Stop() {
select {
case <-timer.C:
default:
}
}
if !ok {
// the dataChan has been closed - we should finish up:
if len(data) > 0 {
log.Trace("Handling: %d data, %v", len(data), data)
p.handle(data...)
}
log.Trace("Worker shutting down")
return
}
data = append(data, datum)
if len(data) >= p.batchLength {
log.Trace("Handling: %d data, %v", len(data), data)
p.handle(data...)
data = make([]Data, 0, p.batchLength)
}
case <-timer.C:
delay = time.Millisecond * 100
if len(data) > 0 {
log.Trace("Handling: %d data, %v", len(data), data)
p.handle(data...)
data = make([]Data, 0, p.batchLength)
}
}
}
}
}

View File

@ -28,6 +28,9 @@ type queueSettings struct {
MaxAttempts int
Timeout time.Duration
Workers int
BlockTimeout time.Duration
BoostTimeout time.Duration
BoostWorkers int
}
// Queue settings
@ -45,6 +48,9 @@ func CreateQueue(name string, handle queue.HandlerFunc, exemplar interface{}) qu
opts["DBIndex"] = q.DBIndex
opts["QueueName"] = name
opts["Workers"] = q.Workers
opts["BlockTimeout"] = q.BlockTimeout
opts["BoostTimeout"] = q.BoostTimeout
opts["BoostWorkers"] = q.BoostWorkers
cfg, err := json.Marshal(opts)
if err != nil {
@ -96,7 +102,10 @@ func getQueueSettings(name string) queueSettings {
q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary)
q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts)
q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout)
q.Workers = sec.Key("WORKER").MustInt(Queue.Workers)
q.Workers = sec.Key("WORKERS").MustInt(Queue.Workers)
q.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(Queue.BlockTimeout)
q.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(Queue.BoostTimeout)
q.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(Queue.BoostWorkers)
q.Addresses, q.Password, q.DBIndex, _ = ParseQueueConnStr(q.ConnectionString)
return q
@ -117,7 +126,10 @@ func newQueueService() {
Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true)
Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10)
Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second)
Queue.Workers = sec.Key("WORKER").MustInt(1)
Queue.Workers = sec.Key("WORKERS").MustInt(1)
Queue.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(1 * time.Second)
Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute)
Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(5)
hasWorkers := false
for _, key := range Cfg.Section("queue.notification").Keys() {