Consider more factors when determining preallocation size of WAL files
Summary: Currently the WAL file preallocation size is 1.1 * write_buffer_size. This, however, will be over-estimated if options.db_write_buffer_size or options.max_total_wal_size is set and is much smaller. Test Plan: Add a unit test. Reviewers: andrewkr, yiwu Reviewed By: yiwu Subscribers: leveldb, andrewkr, dhruba Differential Revision: https://reviews.facebook.net/D63957
This commit is contained in:
parent
4c3f4496b5
commit
b666f85445
@ -3682,6 +3682,24 @@ bool DBImpl::MCOverlap(ManualCompaction* m, ManualCompaction* m1) {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint64_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const {
|
||||||
|
uint64_t bsize = write_buffer_size / 10 + write_buffer_size;
|
||||||
|
// Some users might set very high write_buffer_size and rely on
|
||||||
|
// max_total_wal_size or other parameters to control the WAL size.
|
||||||
|
if (db_options_.max_total_wal_size > 0) {
|
||||||
|
bsize = std::min(bsize, db_options_.max_total_wal_size);
|
||||||
|
}
|
||||||
|
if (db_options_.db_write_buffer_size > 0) {
|
||||||
|
bsize = std::min(bsize, db_options_.db_write_buffer_size);
|
||||||
|
}
|
||||||
|
if (db_options_.write_buffer_manager &&
|
||||||
|
db_options_.write_buffer_manager->enabled()) {
|
||||||
|
bsize = std::min(bsize, db_options_.write_buffer_manager->buffer_size());
|
||||||
|
}
|
||||||
|
|
||||||
|
return bsize;
|
||||||
|
}
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
struct IterState {
|
struct IterState {
|
||||||
IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version,
|
IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version,
|
||||||
@ -4995,8 +5013,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
|
|||||||
// Our final size should be less than write_buffer_size
|
// Our final size should be less than write_buffer_size
|
||||||
// (compression, etc) but err on the side of caution.
|
// (compression, etc) but err on the side of caution.
|
||||||
lfile->SetPreallocationBlockSize(
|
lfile->SetPreallocationBlockSize(
|
||||||
mutable_cf_options.write_buffer_size / 10 +
|
GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size));
|
||||||
mutable_cf_options.write_buffer_size);
|
|
||||||
unique_ptr<WritableFileWriter> file_writer(
|
unique_ptr<WritableFileWriter> file_writer(
|
||||||
new WritableFileWriter(std::move(lfile), opt_env_opt));
|
new WritableFileWriter(std::move(lfile), opt_env_opt));
|
||||||
new_log = new log::Writer(std::move(file_writer), new_log_number,
|
new_log = new log::Writer(std::move(file_writer), new_log_number,
|
||||||
@ -5747,7 +5764,8 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
|
|||||||
LogFileName(impl->db_options_.wal_dir, new_log_number),
|
LogFileName(impl->db_options_.wal_dir, new_log_number),
|
||||||
&lfile, opt_env_options);
|
&lfile, opt_env_options);
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
lfile->SetPreallocationBlockSize((max_write_buffer_size / 10) + max_write_buffer_size);
|
lfile->SetPreallocationBlockSize(
|
||||||
|
impl->GetWalPreallocateBlockSize(max_write_buffer_size));
|
||||||
impl->logfile_number_ = new_log_number;
|
impl->logfile_number_ = new_log_number;
|
||||||
unique_ptr<WritableFileWriter> file_writer(
|
unique_ptr<WritableFileWriter> file_writer(
|
||||||
new WritableFileWriter(std::move(lfile), opt_env_options));
|
new WritableFileWriter(std::move(lfile), opt_env_options));
|
||||||
|
@ -1067,6 +1067,8 @@ class DBImpl : public DB {
|
|||||||
bool ShouldntRunManualCompaction(ManualCompaction* m);
|
bool ShouldntRunManualCompaction(ManualCompaction* m);
|
||||||
bool HaveManualCompaction(ColumnFamilyData* cfd);
|
bool HaveManualCompaction(ColumnFamilyData* cfd);
|
||||||
bool MCOverlap(ManualCompaction* m, ManualCompaction* m1);
|
bool MCOverlap(ManualCompaction* m, ManualCompaction* m1);
|
||||||
|
|
||||||
|
uint64_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Sanitize db options. The caller should delete result.info_log if
|
// Sanitize db options. The caller should delete result.info_log if
|
||||||
|
@ -2065,7 +2065,6 @@ TEST_F(DBTest2, ReadAmpBitmapLiveInCacheAfterDBClose) {
|
|||||||
ASSERT_EQ(total_useful_bytes_iter1 + total_useful_bytes_iter2,
|
ASSERT_EQ(total_useful_bytes_iter1 + total_useful_bytes_iter2,
|
||||||
total_loaded_bytes_iter1 + total_loaded_bytes_iter2);
|
total_loaded_bytes_iter1 + total_loaded_bytes_iter2);
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
@ -310,7 +310,18 @@ class SpecialEnv : public EnvWrapper {
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
Status Truncate(uint64_t size) override { return base_->Truncate(size); }
|
Status Truncate(uint64_t size) override { return base_->Truncate(size); }
|
||||||
Status Close() override { return base_->Close(); }
|
Status Close() override {
|
||||||
|
// SyncPoint is not supported in Released Windows Mode.
|
||||||
|
#if !(defined NDEBUG) || !defined(OS_WIN)
|
||||||
|
// Check preallocation size
|
||||||
|
// preallocation size is never passed to base file.
|
||||||
|
size_t preallocation_size = preallocation_block_size();
|
||||||
|
TEST_SYNC_POINT_CALLBACK("DBTestWalFile.GetPreallocationStatus",
|
||||||
|
&preallocation_size);
|
||||||
|
#endif // !(defined NDEBUG) || !defined(OS_WIN)
|
||||||
|
|
||||||
|
return base_->Close();
|
||||||
|
}
|
||||||
Status Flush() override { return base_->Flush(); }
|
Status Flush() override { return base_->Flush(); }
|
||||||
Status Sync() override {
|
Status Sync() override {
|
||||||
++env_->sync_counter_;
|
++env_->sync_counter_;
|
||||||
|
@ -291,6 +291,95 @@ TEST_F(DBWALTest, RecoveryWithEmptyLog) {
|
|||||||
} while (ChangeOptions());
|
} while (ChangeOptions());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if !(defined NDEBUG) || !defined(OS_WIN)
|
||||||
|
TEST_F(DBWALTest, PreallocateBlock) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
options.write_buffer_size = 10 * 1000 * 1000;
|
||||||
|
options.max_total_wal_size = 0;
|
||||||
|
|
||||||
|
size_t expected_preallocation_size = static_cast<size_t>(
|
||||||
|
options.write_buffer_size + options.write_buffer_size / 10);
|
||||||
|
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
|
||||||
|
std::atomic<int> called(0);
|
||||||
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||||
|
"DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
|
||||||
|
ASSERT_TRUE(arg != nullptr);
|
||||||
|
size_t preallocation_size = *(static_cast<size_t*>(arg));
|
||||||
|
ASSERT_EQ(expected_preallocation_size, preallocation_size);
|
||||||
|
called.fetch_add(1);
|
||||||
|
});
|
||||||
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
Put("", "");
|
||||||
|
Flush();
|
||||||
|
Put("", "");
|
||||||
|
Close();
|
||||||
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
ASSERT_EQ(2, called.load());
|
||||||
|
|
||||||
|
options.max_total_wal_size = 1000 * 1000;
|
||||||
|
expected_preallocation_size = static_cast<size_t>(options.max_total_wal_size);
|
||||||
|
Reopen(options);
|
||||||
|
called.store(0);
|
||||||
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||||
|
"DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
|
||||||
|
ASSERT_TRUE(arg != nullptr);
|
||||||
|
size_t preallocation_size = *(static_cast<size_t*>(arg));
|
||||||
|
ASSERT_EQ(expected_preallocation_size, preallocation_size);
|
||||||
|
called.fetch_add(1);
|
||||||
|
});
|
||||||
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
Put("", "");
|
||||||
|
Flush();
|
||||||
|
Put("", "");
|
||||||
|
Close();
|
||||||
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
ASSERT_EQ(2, called.load());
|
||||||
|
|
||||||
|
options.db_write_buffer_size = 800 * 1000;
|
||||||
|
expected_preallocation_size =
|
||||||
|
static_cast<size_t>(options.db_write_buffer_size);
|
||||||
|
Reopen(options);
|
||||||
|
called.store(0);
|
||||||
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||||
|
"DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
|
||||||
|
ASSERT_TRUE(arg != nullptr);
|
||||||
|
size_t preallocation_size = *(static_cast<size_t*>(arg));
|
||||||
|
ASSERT_EQ(expected_preallocation_size, preallocation_size);
|
||||||
|
called.fetch_add(1);
|
||||||
|
});
|
||||||
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
Put("", "");
|
||||||
|
Flush();
|
||||||
|
Put("", "");
|
||||||
|
Close();
|
||||||
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
ASSERT_EQ(2, called.load());
|
||||||
|
|
||||||
|
expected_preallocation_size = 700 * 1000;
|
||||||
|
std::shared_ptr<WriteBufferManager> write_buffer_manager =
|
||||||
|
std::make_shared<WriteBufferManager>(static_cast<uint64_t>(700 * 1000));
|
||||||
|
options.write_buffer_manager = write_buffer_manager;
|
||||||
|
Reopen(options);
|
||||||
|
called.store(0);
|
||||||
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||||
|
"DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
|
||||||
|
ASSERT_TRUE(arg != nullptr);
|
||||||
|
size_t preallocation_size = *(static_cast<size_t*>(arg));
|
||||||
|
ASSERT_EQ(expected_preallocation_size, preallocation_size);
|
||||||
|
called.fetch_add(1);
|
||||||
|
});
|
||||||
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
Put("", "");
|
||||||
|
Flush();
|
||||||
|
Put("", "");
|
||||||
|
Close();
|
||||||
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
ASSERT_EQ(2, called.load());
|
||||||
|
}
|
||||||
|
#endif // !(defined NDEBUG) || !defined(OS_WIN)
|
||||||
|
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
TEST_F(DBWALTest, GetSortedWalFiles) {
|
TEST_F(DBWALTest, GetSortedWalFiles) {
|
||||||
do {
|
do {
|
||||||
|
Loading…
Reference in New Issue
Block a user