Make sure that CompactFiles does not run two parallel Level 0 compactions
Summary: Since level 0 files can overlap, two level 0 compactions cannot run in parallel. Compact files needs to check this before running a compaction. Test Plan: CompactFilesTest.L0ConflictsFiles Reviewers: igor, IslamAbdelRahman, anthony, sdong, yhchiang Reviewed By: yhchiang Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D50079
This commit is contained in:
parent
d781da8164
commit
2ae4d7d708
@ -12,6 +12,7 @@
|
|||||||
#include "rocksdb/db.h"
|
#include "rocksdb/db.h"
|
||||||
#include "rocksdb/env.h"
|
#include "rocksdb/env.h"
|
||||||
#include "util/string_util.h"
|
#include "util/string_util.h"
|
||||||
|
#include "util/sync_point.h"
|
||||||
#include "util/testharness.h"
|
#include "util/testharness.h"
|
||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
@ -53,6 +54,62 @@ class FlushedFileCollector : public EventListener {
|
|||||||
std::mutex mutex_;
|
std::mutex mutex_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
TEST_F(CompactFilesTest, L0ConflictsFiles) {
|
||||||
|
Options options;
|
||||||
|
// to trigger compaction more easily
|
||||||
|
const int kWriteBufferSize = 10000;
|
||||||
|
const int kLevel0Trigger = 2;
|
||||||
|
options.create_if_missing = true;
|
||||||
|
options.compaction_style = kCompactionStyleLevel;
|
||||||
|
// Small slowdown and stop trigger for experimental purpose.
|
||||||
|
options.level0_slowdown_writes_trigger = 20;
|
||||||
|
options.level0_stop_writes_trigger = 20;
|
||||||
|
options.level0_stop_writes_trigger = 20;
|
||||||
|
options.write_buffer_size = kWriteBufferSize;
|
||||||
|
options.level0_file_num_compaction_trigger = kLevel0Trigger;
|
||||||
|
options.compression = kNoCompression;
|
||||||
|
|
||||||
|
DB* db = nullptr;
|
||||||
|
DestroyDB(db_name_, options);
|
||||||
|
Status s = DB::Open(options, db_name_, &db);
|
||||||
|
assert(s.ok());
|
||||||
|
assert(db);
|
||||||
|
|
||||||
|
rocksdb::SyncPoint::GetInstance()->LoadDependency({
|
||||||
|
{"CompactFilesImpl:0", "BackgroundCallCompaction:0"},
|
||||||
|
{"BackgroundCallCompaction:1", "CompactFilesImpl:1"},
|
||||||
|
});
|
||||||
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
|
// create couple files
|
||||||
|
// Background compaction starts and waits in BackgroundCallCompaction:0
|
||||||
|
for (int i = 0; i < kLevel0Trigger * 4; ++i) {
|
||||||
|
db->Put(WriteOptions(), ToString(i), "");
|
||||||
|
db->Put(WriteOptions(), ToString(100 - i), "");
|
||||||
|
db->Flush(FlushOptions());
|
||||||
|
}
|
||||||
|
|
||||||
|
rocksdb::ColumnFamilyMetaData meta;
|
||||||
|
db->GetColumnFamilyMetaData(&meta);
|
||||||
|
std::string file1;
|
||||||
|
for (auto& file : meta.levels[0].files) {
|
||||||
|
ASSERT_EQ(0, meta.levels[0].level);
|
||||||
|
if (file1 == "") {
|
||||||
|
file1 = file.db_path + "/" + file.name;
|
||||||
|
} else {
|
||||||
|
std::string file2 = file.db_path + "/" + file.name;
|
||||||
|
// Another thread starts a compact files and creates an L0 compaction
|
||||||
|
// The background compaction then notices that there is an L0 compaction
|
||||||
|
// already in progress and doesn't do an L0 compaction
|
||||||
|
// Once the background compaction finishes, the compact files finishes
|
||||||
|
ASSERT_OK(
|
||||||
|
db->CompactFiles(rocksdb::CompactionOptions(), {file1, file2}, 0));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
delete db;
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(CompactFilesTest, ObsoleteFiles) {
|
TEST_F(CompactFilesTest, ObsoleteFiles) {
|
||||||
Options options;
|
Options options;
|
||||||
// to trigger compaction more easily
|
// to trigger compaction more easily
|
||||||
|
@ -255,16 +255,30 @@ Compaction* CompactionPicker::FormCompaction(
|
|||||||
const CompactionOptions& compact_options,
|
const CompactionOptions& compact_options,
|
||||||
const std::vector<CompactionInputFiles>& input_files, int output_level,
|
const std::vector<CompactionInputFiles>& input_files, int output_level,
|
||||||
VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options,
|
VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options,
|
||||||
uint32_t output_path_id) const {
|
uint32_t output_path_id) {
|
||||||
uint64_t max_grandparent_overlap_bytes =
|
uint64_t max_grandparent_overlap_bytes =
|
||||||
output_level + 1 < vstorage->num_levels() ?
|
output_level + 1 < vstorage->num_levels() ?
|
||||||
mutable_cf_options.MaxGrandParentOverlapBytes(output_level + 1) :
|
mutable_cf_options.MaxGrandParentOverlapBytes(output_level + 1) :
|
||||||
std::numeric_limits<uint64_t>::max();
|
std::numeric_limits<uint64_t>::max();
|
||||||
assert(input_files.size());
|
assert(input_files.size());
|
||||||
return new Compaction(
|
|
||||||
|
// TODO(rven ): we might be able to run concurrent level 0 compaction
|
||||||
|
// if the key ranges of the two compactions do not overlap, but for now
|
||||||
|
// we do not allow it.
|
||||||
|
if ((input_files[0].level == 0) && !level0_compactions_in_progress_.empty()) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
auto c = new Compaction(
|
||||||
vstorage, mutable_cf_options, input_files, output_level,
|
vstorage, mutable_cf_options, input_files, output_level,
|
||||||
compact_options.output_file_size_limit, max_grandparent_overlap_bytes,
|
compact_options.output_file_size_limit, max_grandparent_overlap_bytes,
|
||||||
output_path_id, compact_options.compression, /* grandparents */ {}, true);
|
output_path_id, compact_options.compression, /* grandparents */ {}, true);
|
||||||
|
|
||||||
|
// If it's level 0 compaction, make sure we don't execute any other level 0
|
||||||
|
// compactions in parallel
|
||||||
|
if ((c != nullptr) && (input_files[0].level == 0)) {
|
||||||
|
level0_compactions_in_progress_.insert(c);
|
||||||
|
}
|
||||||
|
return c;
|
||||||
}
|
}
|
||||||
|
|
||||||
Status CompactionPicker::GetCompactionInputsFromFileNumbers(
|
Status CompactionPicker::GetCompactionInputsFromFileNumbers(
|
||||||
|
@ -94,7 +94,7 @@ class CompactionPicker {
|
|||||||
const CompactionOptions& compact_options,
|
const CompactionOptions& compact_options,
|
||||||
const std::vector<CompactionInputFiles>& input_files, int output_level,
|
const std::vector<CompactionInputFiles>& input_files, int output_level,
|
||||||
VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options,
|
VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options,
|
||||||
uint32_t output_path_id) const;
|
uint32_t output_path_id);
|
||||||
|
|
||||||
// Converts a set of compaction input file numbers into
|
// Converts a set of compaction input file numbers into
|
||||||
// a list of CompactionInputFiles.
|
// a list of CompactionInputFiles.
|
||||||
|
@ -1772,7 +1772,9 @@ Status DBImpl::CompactFilesImpl(
|
|||||||
c.reset(cfd->compaction_picker()->FormCompaction(
|
c.reset(cfd->compaction_picker()->FormCompaction(
|
||||||
compact_options, input_files, output_level, version->storage_info(),
|
compact_options, input_files, output_level, version->storage_info(),
|
||||||
*cfd->GetLatestMutableCFOptions(), output_path_id));
|
*cfd->GetLatestMutableCFOptions(), output_path_id));
|
||||||
assert(c);
|
if (!c) {
|
||||||
|
return Status::Aborted("Another Level 0 compaction is running");
|
||||||
|
}
|
||||||
c->SetInputVersion(version);
|
c->SetInputVersion(version);
|
||||||
// deletion compaction currently not allowed in CompactFiles.
|
// deletion compaction currently not allowed in CompactFiles.
|
||||||
assert(!c->deletion_compaction());
|
assert(!c->deletion_compaction());
|
||||||
@ -1801,6 +1803,8 @@ Status DBImpl::CompactFilesImpl(
|
|||||||
compaction_job.Prepare();
|
compaction_job.Prepare();
|
||||||
|
|
||||||
mutex_.Unlock();
|
mutex_.Unlock();
|
||||||
|
TEST_SYNC_POINT("CompactFilesImpl:0");
|
||||||
|
TEST_SYNC_POINT("CompactFilesImpl:1");
|
||||||
compaction_job.Run();
|
compaction_job.Run();
|
||||||
mutex_.Lock();
|
mutex_.Lock();
|
||||||
|
|
||||||
@ -2559,7 +2563,7 @@ void DBImpl::BackgroundCallFlush() {
|
|||||||
void DBImpl::BackgroundCallCompaction() {
|
void DBImpl::BackgroundCallCompaction() {
|
||||||
bool made_progress = false;
|
bool made_progress = false;
|
||||||
JobContext job_context(next_job_id_.fetch_add(1), true);
|
JobContext job_context(next_job_id_.fetch_add(1), true);
|
||||||
|
TEST_SYNC_POINT("BackgroundCallCompaction:0");
|
||||||
MaybeDumpStats();
|
MaybeDumpStats();
|
||||||
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
|
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
|
||||||
{
|
{
|
||||||
@ -2571,6 +2575,7 @@ void DBImpl::BackgroundCallCompaction() {
|
|||||||
|
|
||||||
assert(bg_compaction_scheduled_);
|
assert(bg_compaction_scheduled_);
|
||||||
Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer);
|
Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer);
|
||||||
|
TEST_SYNC_POINT("BackgroundCallCompaction:1");
|
||||||
if (!s.ok() && !s.IsShutdownInProgress()) {
|
if (!s.ok() && !s.IsShutdownInProgress()) {
|
||||||
// Wait a little bit before retrying background compaction in
|
// Wait a little bit before retrying background compaction in
|
||||||
// case this is an environmental problem and we do not want to
|
// case this is an environmental problem and we do not want to
|
||||||
|
Loading…
x
Reference in New Issue
Block a user