Introduce a ThreadGuard class and use it in ExternalSSTFileTest.PickedLevelBug (#8112)
Summary: The patch adds a resource management/RAII class called `ThreadGuard`, which can be used to ensure that the managed thread is joined when the `ThreadGuard` is destroyed, regardless of whether it is due to the object going out of scope, an early return, an exception etc. This is important because if an `std::thread` object is destroyed without having been joined (or detached) first, the process is aborted (via `std::terminate`). For now, `ThreadGuard` is only used in the test case `ExternalSSTFileTest.PickedLevelBug`; however, it could come in handy elsewhere in the codebase as well (both in test code and "real" code). Case in point: in the `PickedLevelBug` test case, with the earlier code we could end up in the above situation when the following assertion (which is before the threads are joined) is triggered: ``` ASSERT_FALSE(bg_compact_started.load()); ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/8112 Test Plan: ``` make check gtest-parallel --repeat=10000 ./external_sst_file_test --gtest_filter="*PickedLevelBug" ``` Reviewed By: riversand963 Differential Revision: D27343185 Pulled By: ltamasi fbshipit-source-id: 2a8c3aa68bc78cc03ec0dbae909fb25c2cd15c69
This commit is contained in:
parent
af80a78ba4
commit
303cb23a0f
@ -16,6 +16,7 @@
|
|||||||
#include "rocksdb/sst_file_writer.h"
|
#include "rocksdb/sst_file_writer.h"
|
||||||
#include "test_util/testutil.h"
|
#include "test_util/testutil.h"
|
||||||
#include "util/random.h"
|
#include "util/random.h"
|
||||||
|
#include "util/thread_guard.h"
|
||||||
#include "utilities/fault_injection_env.h"
|
#include "utilities/fault_injection_env.h"
|
||||||
|
|
||||||
namespace ROCKSDB_NAMESPACE {
|
namespace ROCKSDB_NAMESPACE {
|
||||||
@ -1305,38 +1306,38 @@ TEST_F(ExternalSSTFileTest, PickedLevelBug) {
|
|||||||
|
|
||||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
// While writing the MANIFEST start a thread that will ask for compaction
|
|
||||||
Status bg_compact_status;
|
Status bg_compact_status;
|
||||||
ROCKSDB_NAMESPACE::port::Thread bg_compact([&]() {
|
|
||||||
bg_compact_status =
|
|
||||||
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
|
|
||||||
});
|
|
||||||
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:2");
|
|
||||||
|
|
||||||
// Start a thread that will ingest a new file
|
|
||||||
Status bg_addfile_status;
|
Status bg_addfile_status;
|
||||||
ROCKSDB_NAMESPACE::port::Thread bg_addfile([&]() {
|
|
||||||
file_keys = {1, 2, 3};
|
|
||||||
bg_addfile_status = GenerateAndAddExternalFile(options, file_keys, 1);
|
|
||||||
});
|
|
||||||
|
|
||||||
// Wait for AddFile to start picking levels and writing MANIFEST
|
{
|
||||||
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:0");
|
// While writing the MANIFEST start a thread that will ask for compaction
|
||||||
|
ThreadGuard bg_compact(port::Thread([&]() {
|
||||||
|
bg_compact_status =
|
||||||
|
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
|
||||||
|
}));
|
||||||
|
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:2");
|
||||||
|
|
||||||
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:3");
|
// Start a thread that will ingest a new file
|
||||||
|
ThreadGuard bg_addfile(port::Thread([&]() {
|
||||||
|
file_keys = {1, 2, 3};
|
||||||
|
bg_addfile_status = GenerateAndAddExternalFile(options, file_keys, 1);
|
||||||
|
}));
|
||||||
|
|
||||||
// We need to verify that no compactions can run while AddFile is
|
// Wait for AddFile to start picking levels and writing MANIFEST
|
||||||
// ingesting the files into the levels it find suitable. So we will
|
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:0");
|
||||||
// wait for 2 seconds to give a chance for compactions to run during
|
|
||||||
// this period, and then make sure that no compactions where able to run
|
|
||||||
env_->SleepForMicroseconds(1000000 * 2);
|
|
||||||
ASSERT_FALSE(bg_compact_started.load());
|
|
||||||
|
|
||||||
// Hold AddFile from finishing writing the MANIFEST
|
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:3");
|
||||||
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:1");
|
|
||||||
|
|
||||||
bg_addfile.join();
|
// We need to verify that no compactions can run while AddFile is
|
||||||
bg_compact.join();
|
// ingesting the files into the levels it find suitable. So we will
|
||||||
|
// wait for 2 seconds to give a chance for compactions to run during
|
||||||
|
// this period, and then make sure that no compactions where able to run
|
||||||
|
env_->SleepForMicroseconds(1000000 * 2);
|
||||||
|
ASSERT_FALSE(bg_compact_started.load());
|
||||||
|
|
||||||
|
// Hold AddFile from finishing writing the MANIFEST
|
||||||
|
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:1");
|
||||||
|
}
|
||||||
|
|
||||||
ASSERT_OK(bg_addfile_status);
|
ASSERT_OK(bg_addfile_status);
|
||||||
ASSERT_OK(bg_compact_status);
|
ASSERT_OK(bg_compact_status);
|
||||||
|
41
util/thread_guard.h
Normal file
41
util/thread_guard.h
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under both the GPLv2 (found in the
|
||||||
|
// COPYING file in the root directory) and Apache 2.0 License
|
||||||
|
// (found in the LICENSE.Apache file in the root directory).
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "port/port.h"
|
||||||
|
#include "rocksdb/rocksdb_namespace.h"
|
||||||
|
|
||||||
|
namespace ROCKSDB_NAMESPACE {
|
||||||
|
|
||||||
|
// Resource management object for threads that joins the thread upon
|
||||||
|
// destruction. Has unique ownership of the thread object, so copying it is not
|
||||||
|
// allowed, while moving it transfers ownership.
|
||||||
|
class ThreadGuard {
|
||||||
|
public:
|
||||||
|
ThreadGuard() = default;
|
||||||
|
|
||||||
|
explicit ThreadGuard(port::Thread&& thread) : thread_(std::move(thread)) {}
|
||||||
|
|
||||||
|
ThreadGuard(const ThreadGuard&) = delete;
|
||||||
|
ThreadGuard& operator=(const ThreadGuard&) = delete;
|
||||||
|
|
||||||
|
ThreadGuard(ThreadGuard&&) noexcept = default;
|
||||||
|
ThreadGuard& operator=(ThreadGuard&&) noexcept = default;
|
||||||
|
|
||||||
|
~ThreadGuard() {
|
||||||
|
if (thread_.joinable()) {
|
||||||
|
thread_.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const port::Thread& GetThread() const { return thread_; }
|
||||||
|
port::Thread& GetThread() { return thread_; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
port::Thread thread_;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace ROCKSDB_NAMESPACE
|
Loading…
Reference in New Issue
Block a user