1
0
mirror of https://github.com/go-gitea/gitea synced 2025-02-18 05:27:04 +01:00
gitea/modules/queue/queue_disk_channel_test.go
zeripath 382101ecc7
In disk_channel queues synchronously push to disk on shutdown () ()
Partial Backport of 

Instead of using an asynchronous goroutine to push to disk on shutdown
just close the datachan and immediately push to the disk.

Prevents messages of incompletely flushed queues.

Signed-off-by: Andrew Thornton <art27@cantab.net>

Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
2022-02-22 20:08:35 +08:00

192 lines
4.2 KiB
Go

// Copyright 2019 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package queue
import (
"os"
"sync"
"testing"
"code.gitea.io/gitea/modules/util"
"github.com/stretchr/testify/assert"
)
func TestPersistableChannelQueue(t *testing.T) {
handleChan := make(chan *testData)
handle := func(data ...Data) {
for _, datum := range data {
if datum == nil {
continue
}
testDatum := datum.(*testData)
handleChan <- testDatum
}
}
lock := sync.Mutex{}
queueShutdown := []func(){}
queueTerminate := []func(){}
tmpDir, err := os.MkdirTemp("", "persistable-channel-queue-test-data")
assert.NoError(t, err)
defer util.RemoveAll(tmpDir)
queue, err := NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
DataDir: tmpDir,
BatchLength: 2,
QueueLength: 20,
Workers: 1,
BoostWorkers: 0,
MaxWorkers: 10,
Name: "first",
}, &testData{})
assert.NoError(t, err)
readyForShutdown := make(chan struct{})
readyForTerminate := make(chan struct{})
go queue.Run(func(shutdown func()) {
lock.Lock()
defer lock.Unlock()
select {
case <-readyForShutdown:
default:
close(readyForShutdown)
}
queueShutdown = append(queueShutdown, shutdown)
}, func(terminate func()) {
lock.Lock()
defer lock.Unlock()
select {
case <-readyForTerminate:
default:
close(readyForTerminate)
}
queueTerminate = append(queueTerminate, terminate)
})
test1 := testData{"A", 1}
test2 := testData{"B", 2}
err = queue.Push(&test1)
assert.NoError(t, err)
go func() {
err := queue.Push(&test2)
assert.NoError(t, err)
}()
result1 := <-handleChan
assert.Equal(t, test1.TestString, result1.TestString)
assert.Equal(t, test1.TestInt, result1.TestInt)
result2 := <-handleChan
assert.Equal(t, test2.TestString, result2.TestString)
assert.Equal(t, test2.TestInt, result2.TestInt)
// test1 is a testData not a *testData so will be rejected
err = queue.Push(test1)
assert.Error(t, err)
<-readyForShutdown
// Now shutdown the queue
lock.Lock()
callbacks := make([]func(), len(queueShutdown))
copy(callbacks, queueShutdown)
lock.Unlock()
for _, callback := range callbacks {
callback()
}
// Wait til it is closed
<-queue.(*PersistableChannelQueue).closed
err = queue.Push(&test1)
assert.NoError(t, err)
err = queue.Push(&test2)
assert.NoError(t, err)
select {
case <-handleChan:
assert.Fail(t, "Handler processing should have stopped")
default:
}
// terminate the queue
<-readyForTerminate
lock.Lock()
callbacks = make([]func(), len(queueTerminate))
copy(callbacks, queueTerminate)
lock.Unlock()
for _, callback := range callbacks {
callback()
}
select {
case <-handleChan:
assert.Fail(t, "Handler processing should have stopped")
default:
}
// Reopen queue
queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
DataDir: tmpDir,
BatchLength: 2,
QueueLength: 20,
Workers: 1,
BoostWorkers: 0,
MaxWorkers: 10,
Name: "second",
}, &testData{})
assert.NoError(t, err)
readyForShutdown = make(chan struct{})
readyForTerminate = make(chan struct{})
go queue.Run(func(shutdown func()) {
lock.Lock()
defer lock.Unlock()
select {
case <-readyForShutdown:
default:
close(readyForShutdown)
}
queueShutdown = append(queueShutdown, shutdown)
}, func(terminate func()) {
lock.Lock()
defer lock.Unlock()
select {
case <-readyForTerminate:
default:
close(readyForTerminate)
}
queueTerminate = append(queueTerminate, terminate)
})
result3 := <-handleChan
assert.Equal(t, test1.TestString, result3.TestString)
assert.Equal(t, test1.TestInt, result3.TestInt)
result4 := <-handleChan
assert.Equal(t, test2.TestString, result4.TestString)
assert.Equal(t, test2.TestInt, result4.TestInt)
<-readyForShutdown
lock.Lock()
callbacks = make([]func(), len(queueShutdown))
copy(callbacks, queueShutdown)
lock.Unlock()
for _, callback := range callbacks {
callback()
}
<-readyForTerminate
lock.Lock()
callbacks = make([]func(), len(queueTerminate))
copy(callbacks, queueTerminate)
lock.Unlock()
for _, callback := range callbacks {
callback()
}
}