When closing BlobDB, should first wait for all background tasks (#5005)
Summary: When closing a BlobDB, it only waits for background tasks to finish as the last thing, but the background task may access some variables that are destroyed. The fix is to introduce a shutdown function in the timer queue and call the function as the first thing when destorying BlobDB. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5005 Differential Revision: D14170342 Pulled By: siying fbshipit-source-id: 081e6a2d99b9765d5956cf6cdfc290c07270c233
This commit is contained in:
parent
c4f5d0aa15
commit
06f378d75e
@ -22,8 +22,6 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "port/port.h"
|
||||
|
||||
#include <assert.h>
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
@ -33,6 +31,9 @@
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "port/port.h"
|
||||
#include "util/sync_point.h"
|
||||
|
||||
// Allows execution of handlers at a specified time in the future
|
||||
// Guarantees:
|
||||
// - All handlers are executed ONCE, even if cancelled (aborted parameter will
|
||||
@ -48,7 +49,13 @@ class TimerQueue {
|
||||
public:
|
||||
TimerQueue() : m_th(&TimerQueue::run, this) {}
|
||||
|
||||
~TimerQueue() {
|
||||
~TimerQueue() { shutdown(); }
|
||||
|
||||
// This function is not thread-safe.
|
||||
void shutdown() {
|
||||
if (closed_) {
|
||||
return;
|
||||
}
|
||||
cancelAll();
|
||||
// Abusing the timer queue to trigger the shutdown.
|
||||
add(0, [this](bool) {
|
||||
@ -56,6 +63,7 @@ class TimerQueue {
|
||||
return std::make_pair(false, 0);
|
||||
});
|
||||
m_th.join();
|
||||
closed_ = true;
|
||||
}
|
||||
|
||||
// Adds a new timer
|
||||
@ -67,6 +75,7 @@ class TimerQueue {
|
||||
WorkItem item;
|
||||
Clock::time_point tp = Clock::now();
|
||||
item.end = tp + std::chrono::milliseconds(milliseconds);
|
||||
TEST_SYNC_POINT_CALLBACK("TimeQueue::Add:item.end", &item.end);
|
||||
item.period = milliseconds;
|
||||
item.handler = std::move(handler);
|
||||
|
||||
@ -217,4 +226,5 @@ class TimerQueue {
|
||||
std::vector<WorkItem>& getContainer() { return this->c; }
|
||||
} m_items;
|
||||
rocksdb::port::Thread m_th;
|
||||
bool closed_ = false;
|
||||
};
|
||||
|
@ -94,6 +94,7 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
|
||||
}
|
||||
|
||||
BlobDBImpl::~BlobDBImpl() {
|
||||
tqueue_.shutdown();
|
||||
// CancelAllBackgroundWork(db_, true);
|
||||
Status s __attribute__((__unused__)) = Close();
|
||||
assert(s.ok());
|
||||
@ -1308,6 +1309,9 @@ std::pair<bool, int64_t> BlobDBImpl::EvictExpiredFiles(bool aborted) {
|
||||
return std::make_pair(false, -1);
|
||||
}
|
||||
|
||||
TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:0");
|
||||
TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:1");
|
||||
|
||||
std::vector<std::shared_ptr<BlobFile>> process_files;
|
||||
uint64_t now = EpochNow();
|
||||
{
|
||||
@ -1322,6 +1326,10 @@ std::pair<bool, int64_t> BlobDBImpl::EvictExpiredFiles(bool aborted) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:2");
|
||||
TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:3");
|
||||
TEST_SYNC_POINT_CALLBACK("BlobDBImpl::EvictExpiredFiles:cb", nullptr);
|
||||
|
||||
SequenceNumber seq = GetLatestSequenceNumber();
|
||||
{
|
||||
MutexLock l(&write_mutex_);
|
||||
|
@ -6,6 +6,7 @@
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <cstdlib>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
@ -72,6 +73,12 @@ class BlobDBTest : public testing::Test {
|
||||
Open(bdb_options, options);
|
||||
}
|
||||
|
||||
void Close() {
|
||||
assert(blob_db_ != nullptr);
|
||||
delete blob_db_;
|
||||
blob_db_ = nullptr;
|
||||
}
|
||||
|
||||
void Destroy() {
|
||||
if (blob_db_) {
|
||||
Options options = blob_db_->GetOptions();
|
||||
@ -1542,6 +1549,68 @@ TEST_F(BlobDBTest, DisableFileDeletions) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(BlobDBTest, ShutdownWait) {
|
||||
BlobDBOptions bdb_options;
|
||||
bdb_options.ttl_range_secs = 100;
|
||||
bdb_options.min_blob_size = 0;
|
||||
bdb_options.disable_background_tasks = false;
|
||||
Options options;
|
||||
options.env = mock_env_.get();
|
||||
|
||||
SyncPoint::GetInstance()->LoadDependency({
|
||||
{"BlobDBImpl::EvictExpiredFiles:0", "BlobDBTest.ShutdownWait:0"},
|
||||
{"BlobDBTest.ShutdownWait:1", "BlobDBImpl::EvictExpiredFiles:1"},
|
||||
{"BlobDBImpl::EvictExpiredFiles:2", "BlobDBTest.ShutdownWait:2"},
|
||||
{"BlobDBTest.ShutdownWait:3", "BlobDBImpl::EvictExpiredFiles:3"},
|
||||
});
|
||||
// Force all tasks to be scheduled immediately.
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"TimeQueue::Add:item.end", [&](void *arg) {
|
||||
std::chrono::steady_clock::time_point *tp =
|
||||
static_cast<std::chrono::steady_clock::time_point *>(arg);
|
||||
*tp =
|
||||
std::chrono::steady_clock::now() - std::chrono::milliseconds(10000);
|
||||
});
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"BlobDBImpl::EvictExpiredFiles:cb", [&](void * /*arg*/) {
|
||||
// Sleep 3 ms to increase the chance of data race.
|
||||
// We've synced up the code so that EvictExpiredFiles()
|
||||
// is called concurrently with ~BlobDBImpl().
|
||||
// ~BlobDBImpl() is supposed to wait for all background
|
||||
// task to shutdown before doing anything else. In order
|
||||
// to use the same test to reproduce a bug of the waiting
|
||||
// logic, we wait a little bit here, so that TSAN can
|
||||
// catch the data race.
|
||||
// We should improve the test if we find a better way.
|
||||
Env::Default()->SleepForMicroseconds(3000);
|
||||
});
|
||||
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
Open(bdb_options, options);
|
||||
mock_env_->set_current_time(50);
|
||||
std::map<std::string, std::string> data;
|
||||
ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
|
||||
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
|
||||
ASSERT_EQ(1, blob_files.size());
|
||||
auto blob_file = blob_files[0];
|
||||
ASSERT_FALSE(blob_file->Immutable());
|
||||
ASSERT_FALSE(blob_file->Obsolete());
|
||||
VerifyDB(data);
|
||||
|
||||
TEST_SYNC_POINT("BlobDBTest.ShutdownWait:0");
|
||||
mock_env_->set_current_time(250);
|
||||
// The key should expired now.
|
||||
TEST_SYNC_POINT("BlobDBTest.ShutdownWait:1");
|
||||
|
||||
TEST_SYNC_POINT("BlobDBTest.ShutdownWait:2");
|
||||
TEST_SYNC_POINT("BlobDBTest.ShutdownWait:3");
|
||||
Close();
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
} // namespace blob_db
|
||||
} // namespace rocksdb
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user