Dynamic max_total_wal_size option
Summary: Closes https://github.com/facebook/rocksdb/pull/1509 Differential Revision: D4176426 Pulled By: yiwu-arbug fbshipit-source-id: b57689d
This commit is contained in:
parent
ec2f64794b
commit
91300d01f6
@ -2,7 +2,7 @@
|
|||||||
## Unreleased
|
## Unreleased
|
||||||
### Public API Change
|
### Public API Change
|
||||||
* Options::max_bytes_for_level_multiplier is now a double along with all getters and setters.
|
* Options::max_bytes_for_level_multiplier is now a double along with all getters and setters.
|
||||||
* Suppor dynamically change `delayed_write_rate` option via SetDBOptions().
|
* Support dynamically change `delayed_write_rate` and `max_total_wal_size` options via SetDBOptions().
|
||||||
|
|
||||||
### New Features
|
### New Features
|
||||||
* Add avoid_flush_during_shutdown option, which speeds up DB shutdown by not flushing unpersisted data (i.e. with disableWAL = true). Unpersisted data will be lost. The options is dynamically changeable via SetDBOptions().
|
* Add avoid_flush_during_shutdown option, which speeds up DB shutdown by not flushing unpersisted data (i.e. with disableWAL = true). Unpersisted data will be lost. The options is dynamically changeable via SetDBOptions().
|
||||||
|
@ -2493,6 +2493,10 @@ Status DBImpl::SetDBOptions(
|
|||||||
|
|
||||||
mutable_db_options_ = new_options;
|
mutable_db_options_ = new_options;
|
||||||
|
|
||||||
|
if (total_log_size_ > GetMaxTotalWalSize()) {
|
||||||
|
FlushColumnFamilies();
|
||||||
|
}
|
||||||
|
|
||||||
persist_options_status = PersistOptions();
|
persist_options_status = PersistOptions();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -3769,11 +3773,12 @@ bool DBImpl::MCOverlap(ManualCompaction* m, ManualCompaction* m1) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const {
|
size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const {
|
||||||
|
mutex_.AssertHeld();
|
||||||
size_t bsize = write_buffer_size / 10 + write_buffer_size;
|
size_t bsize = write_buffer_size / 10 + write_buffer_size;
|
||||||
// Some users might set very high write_buffer_size and rely on
|
// Some users might set very high write_buffer_size and rely on
|
||||||
// max_total_wal_size or other parameters to control the WAL size.
|
// max_total_wal_size or other parameters to control the WAL size.
|
||||||
if (immutable_db_options_.max_total_wal_size > 0) {
|
if (mutable_db_options_.max_total_wal_size > 0) {
|
||||||
bsize = std::min<size_t>(bsize, immutable_db_options_.max_total_wal_size);
|
bsize = std::min<size_t>(bsize, mutable_db_options_.max_total_wal_size);
|
||||||
}
|
}
|
||||||
if (immutable_db_options_.db_write_buffer_size > 0) {
|
if (immutable_db_options_.db_write_buffer_size > 0) {
|
||||||
bsize = std::min<size_t>(bsize, immutable_db_options_.db_write_buffer_size);
|
bsize = std::min<size_t>(bsize, immutable_db_options_.db_write_buffer_size);
|
||||||
@ -4676,34 +4681,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
|
|||||||
assert(!single_column_family_mode_ ||
|
assert(!single_column_family_mode_ ||
|
||||||
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
|
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
|
||||||
|
|
||||||
uint64_t max_total_wal_size = (immutable_db_options_.max_total_wal_size == 0)
|
|
||||||
? 4 * max_total_in_memory_state_
|
|
||||||
: immutable_db_options_.max_total_wal_size;
|
|
||||||
if (UNLIKELY(!single_column_family_mode_ &&
|
if (UNLIKELY(!single_column_family_mode_ &&
|
||||||
alive_log_files_.begin()->getting_flushed == false &&
|
!alive_log_files_.begin()->getting_flushed &&
|
||||||
total_log_size_ > max_total_wal_size)) {
|
total_log_size_ > GetMaxTotalWalSize())) {
|
||||||
uint64_t flush_column_family_if_log_file = alive_log_files_.begin()->number;
|
FlushColumnFamilies();
|
||||||
alive_log_files_.begin()->getting_flushed = true;
|
|
||||||
Log(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log,
|
|
||||||
"Flushing all column families with data in WAL number %" PRIu64
|
|
||||||
". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
|
|
||||||
flush_column_family_if_log_file, total_log_size_, max_total_wal_size);
|
|
||||||
// no need to refcount because drop is happening in write thread, so can't
|
|
||||||
// happen while we're in the write thread
|
|
||||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
|
||||||
if (cfd->IsDropped()) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (cfd->GetLogNumber() <= flush_column_family_if_log_file) {
|
|
||||||
status = SwitchMemtable(cfd, &context);
|
|
||||||
if (!status.ok()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
cfd->imm()->FlushRequested();
|
|
||||||
SchedulePendingFlush(cfd);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
MaybeScheduleFlushOrCompaction();
|
|
||||||
} else if (UNLIKELY(write_buffer_manager_->ShouldFlush())) {
|
} else if (UNLIKELY(write_buffer_manager_->ShouldFlush())) {
|
||||||
// Before a new memtable is added in SwitchMemtable(),
|
// Before a new memtable is added in SwitchMemtable(),
|
||||||
// write_buffer_manager_->ShouldFlush() will keep returning true. If another
|
// write_buffer_manager_->ShouldFlush() will keep returning true. If another
|
||||||
@ -5020,6 +5001,46 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
|
|||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void DBImpl::FlushColumnFamilies() {
|
||||||
|
mutex_.AssertHeld();
|
||||||
|
|
||||||
|
WriteContext context;
|
||||||
|
|
||||||
|
if (alive_log_files_.begin()->getting_flushed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t flush_column_family_if_log_file = alive_log_files_.begin()->number;
|
||||||
|
alive_log_files_.begin()->getting_flushed = true;
|
||||||
|
Log(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log,
|
||||||
|
"Flushing all column families with data in WAL number %" PRIu64
|
||||||
|
". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
|
||||||
|
flush_column_family_if_log_file, total_log_size_, GetMaxTotalWalSize());
|
||||||
|
// no need to refcount because drop is happening in write thread, so can't
|
||||||
|
// happen while we're in the write thread
|
||||||
|
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||||
|
if (cfd->IsDropped()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (cfd->GetLogNumber() <= flush_column_family_if_log_file) {
|
||||||
|
auto status = SwitchMemtable(cfd, &context);
|
||||||
|
if (!status.ok()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
cfd->imm()->FlushRequested();
|
||||||
|
SchedulePendingFlush(cfd);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
MaybeScheduleFlushOrCompaction();
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t DBImpl::GetMaxTotalWalSize() const {
|
||||||
|
mutex_.AssertHeld();
|
||||||
|
return mutable_db_options_.max_total_wal_size == 0
|
||||||
|
? 4 * max_total_in_memory_state_
|
||||||
|
: mutable_db_options_.max_total_wal_size;
|
||||||
|
}
|
||||||
|
|
||||||
// REQUIRES: mutex_ is held
|
// REQUIRES: mutex_ is held
|
||||||
// REQUIRES: this thread is currently at the front of the writer queue
|
// REQUIRES: this thread is currently at the front of the writer queue
|
||||||
Status DBImpl::DelayWrite(uint64_t num_bytes) {
|
Status DBImpl::DelayWrite(uint64_t num_bytes) {
|
||||||
@ -5119,6 +5140,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
|
|||||||
int num_imm_unflushed = cfd->imm()->NumNotFlushed();
|
int num_imm_unflushed = cfd->imm()->NumNotFlushed();
|
||||||
DBOptions db_options =
|
DBOptions db_options =
|
||||||
BuildDBOptions(immutable_db_options_, mutable_db_options_);
|
BuildDBOptions(immutable_db_options_, mutable_db_options_);
|
||||||
|
const auto preallocate_block_size =
|
||||||
|
GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size);
|
||||||
mutex_.Unlock();
|
mutex_.Unlock();
|
||||||
Status s;
|
Status s;
|
||||||
{
|
{
|
||||||
@ -5140,8 +5163,10 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
|
|||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
// Our final size should be less than write_buffer_size
|
// Our final size should be less than write_buffer_size
|
||||||
// (compression, etc) but err on the side of caution.
|
// (compression, etc) but err on the side of caution.
|
||||||
lfile->SetPreallocationBlockSize(
|
|
||||||
GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size));
|
// use preallocate_block_size instead
|
||||||
|
// of calling GetWalPreallocateBlockSize()
|
||||||
|
lfile->SetPreallocationBlockSize(preallocate_block_size);
|
||||||
unique_ptr<WritableFileWriter> file_writer(
|
unique_ptr<WritableFileWriter> file_writer(
|
||||||
new WritableFileWriter(std::move(lfile), opt_env_opt));
|
new WritableFileWriter(std::move(lfile), opt_env_opt));
|
||||||
new_log =
|
new_log =
|
||||||
|
@ -728,6 +728,10 @@ class DBImpl : public DB {
|
|||||||
// REQUIRES: mutex locked
|
// REQUIRES: mutex locked
|
||||||
Status PersistOptions();
|
Status PersistOptions();
|
||||||
|
|
||||||
|
void FlushColumnFamilies();
|
||||||
|
|
||||||
|
uint64_t GetMaxTotalWalSize() const;
|
||||||
|
|
||||||
// table_cache_ provides its own synchronization
|
// table_cache_ provides its own synchronization
|
||||||
std::shared_ptr<Cache> table_cache_;
|
std::shared_ptr<Cache> table_cache_;
|
||||||
|
|
||||||
|
@ -289,6 +289,33 @@ TEST_F(DBOptionsTest, SetDelayedWriteRateOption) {
|
|||||||
ASSERT_EQ(20000, dbfull()->TEST_write_controler().max_delayed_write_rate());
|
ASSERT_EQ(20000, dbfull()->TEST_write_controler().max_delayed_write_rate());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(DBOptionsTest, MaxTotalWalSizeChange) {
|
||||||
|
Random rnd(1044);
|
||||||
|
const auto value_size = size_t(1024);
|
||||||
|
std::string value;
|
||||||
|
test::RandomString(&rnd, value_size, &value);
|
||||||
|
|
||||||
|
Options options;
|
||||||
|
options.create_if_missing = true;
|
||||||
|
CreateColumnFamilies({"1", "2", "3"}, options);
|
||||||
|
ReopenWithColumnFamilies({"default", "1", "2", "3"}, options);
|
||||||
|
|
||||||
|
WriteOptions write_options;
|
||||||
|
|
||||||
|
const int key_count = 100;
|
||||||
|
for (int i = 0; i < key_count; ++i) {
|
||||||
|
for (size_t cf = 0; cf < handles_.size(); ++cf) {
|
||||||
|
ASSERT_OK(Put(static_cast<int>(cf), Key(i), value));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ASSERT_OK(dbfull()->SetDBOptions({{"max_total_wal_size", "10"}}));
|
||||||
|
|
||||||
|
for (size_t cf = 0; cf < handles_.size(); ++cf) {
|
||||||
|
dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
|
||||||
|
ASSERT_EQ("1", FilesPerLevel(static_cast<int>(cf)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#endif // ROCKSDB_LITE
|
#endif // ROCKSDB_LITE
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
@ -33,7 +33,6 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
|
|||||||
info_log_level(options.info_log_level),
|
info_log_level(options.info_log_level),
|
||||||
max_open_files(options.max_open_files),
|
max_open_files(options.max_open_files),
|
||||||
max_file_opening_threads(options.max_file_opening_threads),
|
max_file_opening_threads(options.max_file_opening_threads),
|
||||||
max_total_wal_size(options.max_total_wal_size),
|
|
||||||
statistics(options.statistics),
|
statistics(options.statistics),
|
||||||
disable_data_sync(options.disableDataSync),
|
disable_data_sync(options.disableDataSync),
|
||||||
use_fsync(options.use_fsync),
|
use_fsync(options.use_fsync),
|
||||||
@ -105,8 +104,6 @@ void ImmutableDBOptions::Dump(Logger* log) const {
|
|||||||
max_open_files);
|
max_open_files);
|
||||||
Header(log, " Options.max_file_opening_threads: %d",
|
Header(log, " Options.max_file_opening_threads: %d",
|
||||||
max_file_opening_threads);
|
max_file_opening_threads);
|
||||||
Header(log, " Options.max_total_wal_size: %" PRIu64,
|
|
||||||
max_total_wal_size);
|
|
||||||
Header(log, " Options.disableDataSync: %d",
|
Header(log, " Options.disableDataSync: %d",
|
||||||
disable_data_sync);
|
disable_data_sync);
|
||||||
Header(log, " Options.use_fsync: %d", use_fsync);
|
Header(log, " Options.use_fsync: %d", use_fsync);
|
||||||
@ -224,13 +221,15 @@ MutableDBOptions::MutableDBOptions()
|
|||||||
: base_background_compactions(1),
|
: base_background_compactions(1),
|
||||||
max_background_compactions(1),
|
max_background_compactions(1),
|
||||||
avoid_flush_during_shutdown(false),
|
avoid_flush_during_shutdown(false),
|
||||||
delayed_write_rate(2 * 1024U * 1024U) {}
|
delayed_write_rate(2 * 1024U * 1024U),
|
||||||
|
max_total_wal_size(0) {}
|
||||||
|
|
||||||
MutableDBOptions::MutableDBOptions(const DBOptions& options)
|
MutableDBOptions::MutableDBOptions(const DBOptions& options)
|
||||||
: base_background_compactions(options.base_background_compactions),
|
: base_background_compactions(options.base_background_compactions),
|
||||||
max_background_compactions(options.max_background_compactions),
|
max_background_compactions(options.max_background_compactions),
|
||||||
avoid_flush_during_shutdown(options.avoid_flush_during_shutdown),
|
avoid_flush_during_shutdown(options.avoid_flush_during_shutdown),
|
||||||
delayed_write_rate(options.delayed_write_rate) {}
|
delayed_write_rate(options.delayed_write_rate),
|
||||||
|
max_total_wal_size(options.max_total_wal_size) {}
|
||||||
|
|
||||||
void MutableDBOptions::Dump(Logger* log) const {
|
void MutableDBOptions::Dump(Logger* log) const {
|
||||||
Header(log, " Options.base_background_compactions: %d",
|
Header(log, " Options.base_background_compactions: %d",
|
||||||
@ -241,6 +240,8 @@ void MutableDBOptions::Dump(Logger* log) const {
|
|||||||
avoid_flush_during_shutdown);
|
avoid_flush_during_shutdown);
|
||||||
Header(log, " Options.delayed_write_rate : %" PRIu64,
|
Header(log, " Options.delayed_write_rate : %" PRIu64,
|
||||||
delayed_write_rate);
|
delayed_write_rate);
|
||||||
|
Header(log, " Options.max_total_wal_size: %" PRIu64,
|
||||||
|
max_total_wal_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
@ -29,7 +29,6 @@ struct ImmutableDBOptions {
|
|||||||
InfoLogLevel info_log_level;
|
InfoLogLevel info_log_level;
|
||||||
int max_open_files;
|
int max_open_files;
|
||||||
int max_file_opening_threads;
|
int max_file_opening_threads;
|
||||||
uint64_t max_total_wal_size;
|
|
||||||
std::shared_ptr<Statistics> statistics;
|
std::shared_ptr<Statistics> statistics;
|
||||||
bool disable_data_sync;
|
bool disable_data_sync;
|
||||||
bool use_fsync;
|
bool use_fsync;
|
||||||
@ -95,6 +94,7 @@ struct MutableDBOptions {
|
|||||||
int max_background_compactions;
|
int max_background_compactions;
|
||||||
bool avoid_flush_during_shutdown;
|
bool avoid_flush_during_shutdown;
|
||||||
uint64_t delayed_write_rate;
|
uint64_t delayed_write_rate;
|
||||||
|
uint64_t max_total_wal_size;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
@ -45,7 +45,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
|
|||||||
options.max_open_files = immutable_db_options.max_open_files;
|
options.max_open_files = immutable_db_options.max_open_files;
|
||||||
options.max_file_opening_threads =
|
options.max_file_opening_threads =
|
||||||
immutable_db_options.max_file_opening_threads;
|
immutable_db_options.max_file_opening_threads;
|
||||||
options.max_total_wal_size = immutable_db_options.max_total_wal_size;
|
options.max_total_wal_size = mutable_db_options.max_total_wal_size;
|
||||||
options.statistics = immutable_db_options.statistics;
|
options.statistics = immutable_db_options.statistics;
|
||||||
options.disableDataSync = immutable_db_options.disable_data_sync;
|
options.disableDataSync = immutable_db_options.disable_data_sync;
|
||||||
options.use_fsync = immutable_db_options.use_fsync;
|
options.use_fsync = immutable_db_options.use_fsync;
|
||||||
|
@ -308,7 +308,8 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
|
|||||||
OptionVerificationType::kNormal, false, 0}},
|
OptionVerificationType::kNormal, false, 0}},
|
||||||
{"max_total_wal_size",
|
{"max_total_wal_size",
|
||||||
{offsetof(struct DBOptions, max_total_wal_size), OptionType::kUInt64T,
|
{offsetof(struct DBOptions, max_total_wal_size), OptionType::kUInt64T,
|
||||||
OptionVerificationType::kNormal, false, 0}},
|
OptionVerificationType::kNormal, true,
|
||||||
|
offsetof(struct MutableDBOptions, max_total_wal_size)}},
|
||||||
{"wal_bytes_per_sync",
|
{"wal_bytes_per_sync",
|
||||||
{offsetof(struct DBOptions, wal_bytes_per_sync), OptionType::kUInt64T,
|
{offsetof(struct DBOptions, wal_bytes_per_sync), OptionType::kUInt64T,
|
||||||
OptionVerificationType::kNormal, false, 0}},
|
OptionVerificationType::kNormal, false, 0}},
|
||||||
@ -710,4 +711,3 @@ extern const std::string kNullptrString;
|
|||||||
#endif // !ROCKSDB_LITE
|
#endif // !ROCKSDB_LITE
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user