Consolidating WAL creation which currently has duplicate logic in db_impl_write.cc and db_impl_open.cc (#5188)

Summary:
Right now, two separate pieces of code are used to create WAL files in DBImpl::Open function of db_impl_open.cc and DBImpl::SwitchMemtable function of db_impl_write.cc. This code change simply creates 1 function called DBImpl::CreateWAL in db_impl_open.cc which is used to replace existing WAL creation logic in DBImpl::Open and DBImpl::SwitchMemtable.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5188

Differential Revision: D14942832

Pulled By: vjnadimpalli

fbshipit-source-id: d49230e04c36176015c8c1b422575872f92157fb
This commit is contained in:
Vijay Nadimpalli 2019-04-15 18:47:24 -07:00 committed by Facebook Github Bot
parent 3e63e553b4
commit 71a82a0abe
3 changed files with 63 additions and 70 deletions

View File

@ -1678,6 +1678,9 @@ class DBImpl : public DB {
size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
Env::WriteLifeTimeHint CalculateWALWriteHint() { return Env::WLTH_SHORT; } Env::WriteLifeTimeHint CalculateWALWriteHint() { return Env::WLTH_SHORT; }
Status CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
size_t preallocate_block_size, log::Writer** new_log);
// When set, we use a separate queue for writes that dont write to memtable. // When set, we use a separate queue for writes that dont write to memtable.
// In 2PC these are the writes at Prepare phase. // In 2PC these are the writes at Prepare phase.
const bool two_write_queues_; const bool two_write_queues_;

View File

@ -1106,6 +1106,45 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
!kSeqPerBatch, kBatchPerTxn); !kSeqPerBatch, kBatchPerTxn);
} }
Status DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
size_t preallocate_block_size, log::Writer** new_log) {
Status s;
std::unique_ptr<WritableFile> lfile;
DBOptions db_options =
BuildDBOptions(immutable_db_options_, mutable_db_options_);
EnvOptions opt_env_options =
env_->OptimizeForLogWrite(env_options_, db_options);
std::string log_fname =
LogFileName(immutable_db_options_.wal_dir, log_file_num);
if (recycle_log_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"reusing log %" PRIu64 " from recycle list\n",
recycle_log_number);
std::string old_log_fname =
LogFileName(immutable_db_options_.wal_dir, recycle_log_number);
s = env_->ReuseWritableFile(log_fname, old_log_fname, &lfile,
opt_env_options);
} else {
s = NewWritableFile(env_, log_fname, &lfile, opt_env_options);
}
if (s.ok()) {
lfile->SetWriteLifeTimeHint(CalculateWALWriteHint());
lfile->SetPreallocationBlockSize(preallocate_block_size);
const auto& listeners = immutable_db_options_.listeners;
std::unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(lfile), log_fname, opt_env_options,
env_, nullptr /* stats */, listeners));
*new_log = new log::Writer(std::move(file_writer), log_file_num,
immutable_db_options_.recycle_log_file_num > 0,
immutable_db_options_.manual_wal_flush);
}
return s;
}
Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families, const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr, std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
@ -1166,40 +1205,23 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
return s; return s;
} }
impl->mutex_.Lock(); impl->mutex_.Lock();
auto write_hint = impl->CalculateWALWriteHint();
// Handles create_if_missing, error_if_exists // Handles create_if_missing, error_if_exists
s = impl->Recover(column_families); s = impl->Recover(column_families);
if (s.ok()) { if (s.ok()) {
uint64_t new_log_number = impl->versions_->NewFileNumber(); uint64_t new_log_number = impl->versions_->NewFileNumber();
std::unique_ptr<WritableFile> lfile; log::Writer* new_log = nullptr;
EnvOptions soptions(db_options); const size_t preallocate_block_size =
EnvOptions opt_env_options = impl->GetWalPreallocateBlockSize(max_write_buffer_size);
impl->immutable_db_options_.env->OptimizeForLogWrite( s = impl->CreateWAL(new_log_number, 0 /*recycle_log_number*/,
soptions, BuildDBOptions(impl->immutable_db_options_, preallocate_block_size, &new_log);
impl->mutable_db_options_));
std::string log_fname =
LogFileName(impl->immutable_db_options_.wal_dir, new_log_number);
s = NewWritableFile(impl->immutable_db_options_.env, log_fname, &lfile,
opt_env_options);
if (s.ok()) { if (s.ok()) {
lfile->SetWriteLifeTimeHint(write_hint);
lfile->SetPreallocationBlockSize(
impl->GetWalPreallocateBlockSize(max_write_buffer_size));
{
InstrumentedMutexLock wl(&impl->log_write_mutex_); InstrumentedMutexLock wl(&impl->log_write_mutex_);
impl->logfile_number_ = new_log_number; impl->logfile_number_ = new_log_number;
const auto& listeners = impl->immutable_db_options_.listeners; assert(new_log != nullptr);
std::unique_ptr<WritableFileWriter> file_writer( impl->logs_.emplace_back(new_log_number, new_log);
new WritableFileWriter(std::move(lfile), log_fname, opt_env_options,
impl->env_, nullptr /* stats */, listeners));
impl->logs_.emplace_back(
new_log_number,
new log::Writer(
std::move(file_writer), new_log_number,
impl->immutable_db_options_.recycle_log_file_num > 0,
impl->immutable_db_options_.manual_wal_flush));
} }
if (s.ok()) {
// set column family handles // set column family handles
for (auto cf : column_families) { for (auto cf : column_families) {
auto cfd = auto cfd =

View File

@ -1418,52 +1418,20 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
// Log this later after lock release. It may be outdated, e.g., if background // Log this later after lock release. It may be outdated, e.g., if background
// flush happens before logging, but that should be ok. // flush happens before logging, but that should be ok.
int num_imm_unflushed = cfd->imm()->NumNotFlushed(); int num_imm_unflushed = cfd->imm()->NumNotFlushed();
DBOptions db_options =
BuildDBOptions(immutable_db_options_, mutable_db_options_);
const auto preallocate_block_size = const auto preallocate_block_size =
GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size); GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size);
auto write_hint = CalculateWALWriteHint();
mutex_.Unlock(); mutex_.Unlock();
{
std::string log_fname =
LogFileName(immutable_db_options_.wal_dir, new_log_number);
if (creating_new_log) { if (creating_new_log) {
EnvOptions opt_env_opt = // TODO: Write buffer size passed in should be max of all CF's instead
env_->OptimizeForLogWrite(env_options_, db_options); // of mutable_cf_options.write_buffer_size.
if (recycle_log_number) { s = CreateWAL(new_log_number, recycle_log_number, preallocate_block_size,
ROCKS_LOG_INFO(immutable_db_options_.info_log, &new_log);
"reusing log %" PRIu64 " from recycle list\n",
recycle_log_number);
std::string old_log_fname =
LogFileName(immutable_db_options_.wal_dir, recycle_log_number);
s = env_->ReuseWritableFile(log_fname, old_log_fname, &lfile,
opt_env_opt);
} else {
s = NewWritableFile(env_, log_fname, &lfile, opt_env_opt);
} }
if (s.ok()) {
// Our final size should be less than write_buffer_size
// (compression, etc) but err on the side of caution.
// use preallocate_block_size instead
// of calling GetWalPreallocateBlockSize()
lfile->SetPreallocationBlockSize(preallocate_block_size);
lfile->SetWriteLifeTimeHint(write_hint);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(lfile), log_fname, opt_env_opt, env_, nullptr /* stats */,
immutable_db_options_.listeners));
new_log = new log::Writer(
std::move(file_writer), new_log_number,
immutable_db_options_.recycle_log_file_num > 0, manual_wal_flush_);
}
}
if (s.ok()) { if (s.ok()) {
SequenceNumber seq = versions_->LastSequence(); SequenceNumber seq = versions_->LastSequence();
new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq); new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
context->superversion_context.NewSuperVersion(); context->superversion_context.NewSuperVersion();
} }
}
ROCKS_LOG_INFO(immutable_db_options_.info_log, ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] New memtable created with log file: #%" PRIu64 "[%s] New memtable created with log file: #%" PRIu64
". Immutable memtables: %d.\n", ". Immutable memtables: %d.\n",