diff --git a/modules/queue/manager.go b/modules/queue/manager.go index 56298a3e00b..73c57540be8 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -84,6 +84,8 @@ type ManagedPool interface { BoostWorkers() int // SetPoolSettings sets the user updatable settings for the pool SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) + // Done returns a channel that will be closed when the Pool's baseCtx is closed + Done() <-chan struct{} } // ManagedQueueList implements the sort.Interface @@ -211,6 +213,15 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error continue } } + if pool, ok := mq.Managed.(ManagedPool); ok { + // No point into flushing pools when their base's ctx is already done. + select { + case <-pool.Done(): + wg.Done() + continue + default: + } + } allEmpty = false if flushable, ok := mq.Managed.(Flushable); ok { diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index fd56f782d4f..20108d35886 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -74,6 +74,11 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo return pool } +// Done returns when this worker pool's base context has been cancelled +func (p *WorkerPool) Done() <-chan struct{} { + return p.baseCtx.Done() +} + // Push pushes the data to the internal channel func (p *WorkerPool) Push(data Data) { atomic.AddInt64(&p.numInQueue, 1)