Fix for 2PC causing WAL to grow too large

Summary:
Consider the following single column family scenario:
prepare in log A
commit in log B
*WAL is too large, flush all CFs to releast log A*
*CFA is on log B so we do not see CFA is depending on log A so no flush is requested*

To fix this we must also consider the log containing the prepare section when determining what log a CF is dependent on.
Closes https://github.com/facebook/rocksdb/pull/1768

Differential Revision: D4403265

Pulled By: reidHoruff

fbshipit-source-id: ce800ff
This commit is contained in:
Reid Horuff 2017-01-19 15:21:07 -08:00 committed by Islam AbdelRahman
parent a5163cfa60
commit 7d8218912d
7 changed files with 207 additions and 12 deletions

View File

@ -370,7 +370,8 @@ ColumnFamilyData::ColumnFamilyData(
column_family_set_(column_family_set), column_family_set_(column_family_set),
pending_flush_(false), pending_flush_(false),
pending_compaction_(false), pending_compaction_(false),
prev_compaction_needed_bytes_(0) { prev_compaction_needed_bytes_(0),
allow_2pc_(db_options.allow_2pc) {
Ref(); Ref();
// Convert user defined table properties collector factories to internal ones. // Convert user defined table properties collector factories to internal ones.
@ -492,6 +493,25 @@ ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_); return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
} }
uint64_t ColumnFamilyData::OldestLogToKeep() {
auto current_log = GetLogNumber();
if (allow_2pc_) {
auto imm_prep_log = imm()->GetMinLogContainingPrepSection();
auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
if (imm_prep_log > 0 && imm_prep_log < current_log) {
current_log = imm_prep_log;
}
if (mem_prep_log > 0 && mem_prep_log < current_log) {
current_log = mem_prep_log;
}
}
return current_log;
}
const double kIncSlowdownRatio = 0.8; const double kIncSlowdownRatio = 0.8;
const double kDecSlowdownRatio = 1 / kIncSlowdownRatio; const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
const double kNearStopSlowdownRatio = 0.6; const double kNearStopSlowdownRatio = 0.6;

View File

@ -239,6 +239,9 @@ class ColumnFamilyData {
uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held
void SetMemtable(MemTable* new_mem) { mem_ = new_mem; } void SetMemtable(MemTable* new_mem) { mem_ = new_mem; }
// calculate the oldest log needed for the durability of this column family
uint64_t OldestLogToKeep();
// See Memtable constructor for explanation of earliest_seq param. // See Memtable constructor for explanation of earliest_seq param.
MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options, MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options,
SequenceNumber earliest_seq); SequenceNumber earliest_seq);
@ -404,6 +407,9 @@ class ColumnFamilyData {
bool pending_compaction_; bool pending_compaction_;
uint64_t prev_compaction_needed_bytes_; uint64_t prev_compaction_needed_bytes_;
// if the database was opened with 2pc enabled
bool allow_2pc_;
}; };
// ColumnFamilySet has interesting thread-safety requirements // ColumnFamilySet has interesting thread-safety requirements

View File

@ -338,6 +338,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
last_stats_dump_time_microsec_(0), last_stats_dump_time_microsec_(0),
next_job_id_(1), next_job_id_(1),
has_unpersisted_data_(false), has_unpersisted_data_(false),
unable_to_flush_oldest_log_(false),
env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)), env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
num_running_ingest_file_(0), num_running_ingest_file_(0),
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
@ -654,6 +655,10 @@ void DBImpl::MaybeDumpStats() {
} }
uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() { uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() {
if (!allow_2pc()) {
return 0;
}
uint64_t min_log = 0; uint64_t min_log = 0;
// we must look through the memtables for two phase transactions // we must look through the memtables for two phase transactions
@ -698,6 +703,11 @@ void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) {
} }
uint64_t DBImpl::FindMinLogContainingOutstandingPrep() { uint64_t DBImpl::FindMinLogContainingOutstandingPrep() {
if (!allow_2pc()) {
return 0;
}
std::lock_guard<std::mutex> lock(prep_heap_mutex_); std::lock_guard<std::mutex> lock(prep_heap_mutex_);
uint64_t min_log = 0; uint64_t min_log = 0;
@ -2493,7 +2503,7 @@ Status DBImpl::SetDBOptions(
mutable_db_options_ = new_options; mutable_db_options_ = new_options;
if (total_log_size_ > GetMaxTotalWalSize()) { if (total_log_size_ > GetMaxTotalWalSize()) {
FlushColumnFamilies(); MaybeFlushColumnFamilies();
} }
persist_options_status = PersistOptions(); persist_options_status = PersistOptions();
@ -4686,9 +4696,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1); versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
if (UNLIKELY(!single_column_family_mode_ && if (UNLIKELY(!single_column_family_mode_ &&
!alive_log_files_.begin()->getting_flushed &&
total_log_size_ > GetMaxTotalWalSize())) { total_log_size_ > GetMaxTotalWalSize())) {
FlushColumnFamilies(); MaybeFlushColumnFamilies();
} else if (UNLIKELY(write_buffer_manager_->ShouldFlush())) { } else if (UNLIKELY(write_buffer_manager_->ShouldFlush())) {
// Before a new memtable is added in SwitchMemtable(), // Before a new memtable is added in SwitchMemtable(),
// write_buffer_manager_->ShouldFlush() will keep returning true. If another // write_buffer_manager_->ShouldFlush() will keep returning true. If another
@ -5006,28 +5015,40 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
return status; return status;
} }
void DBImpl::FlushColumnFamilies() { void DBImpl::MaybeFlushColumnFamilies() {
mutex_.AssertHeld(); mutex_.AssertHeld();
WriteContext context;
if (alive_log_files_.begin()->getting_flushed) { if (alive_log_files_.begin()->getting_flushed) {
return; return;
} }
uint64_t flush_column_family_if_log_file = alive_log_files_.begin()->number; auto oldest_alive_log = alive_log_files_.begin()->number;
alive_log_files_.begin()->getting_flushed = true; auto oldest_log_with_uncommited_prep = FindMinLogContainingOutstandingPrep();
if (allow_2pc() &&
unable_to_flush_oldest_log_ &&
oldest_log_with_uncommited_prep > 0 &&
oldest_log_with_uncommited_prep <= oldest_alive_log) {
// we already attempted to flush all column families dependent on
// the oldest alive log but the log still contained uncommited transactions.
// the oldest alive log STILL contains uncommited transaction so there
// is still nothing that we can do.
return;
}
WriteContext context;
Log(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log, Log(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log,
"Flushing all column families with data in WAL number %" PRIu64 "Flushing all column families with data in WAL number %" PRIu64
". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64, ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
flush_column_family_if_log_file, total_log_size_, GetMaxTotalWalSize()); oldest_alive_log, total_log_size_, GetMaxTotalWalSize());
// no need to refcount because drop is happening in write thread, so can't // no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread // happen while we're in the write thread
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) { if (cfd->IsDropped()) {
continue; continue;
} }
if (cfd->GetLogNumber() <= flush_column_family_if_log_file) { if (cfd->OldestLogToKeep() <= oldest_alive_log) {
auto status = SwitchMemtable(cfd, &context); auto status = SwitchMemtable(cfd, &context);
if (!status.ok()) { if (!status.ok()) {
break; break;
@ -5037,6 +5058,26 @@ void DBImpl::FlushColumnFamilies() {
} }
} }
MaybeScheduleFlushOrCompaction(); MaybeScheduleFlushOrCompaction();
// we only mark this log as getting flushed if we have successfully
// flushed all data in this log. If this log contains outstanding prepred
// transactions then we cannot flush this log until those transactions are commited.
unable_to_flush_oldest_log_ = false;
if (allow_2pc()) {
if (oldest_log_with_uncommited_prep == 0 ||
oldest_log_with_uncommited_prep > oldest_alive_log) {
// this log contains no outstanding prepared transactions
alive_log_files_.begin()->getting_flushed = true;
} else {
Log(InfoLogLevel::WARN_LEVEL, immutable_db_options_.info_log,
"Unable to release oldest log due to uncommited transaction");
unable_to_flush_oldest_log_ = true;
}
} else {
alive_log_files_.begin()->getting_flushed = true;
}
} }
uint64_t DBImpl::GetMaxTotalWalSize() const { uint64_t DBImpl::GetMaxTotalWalSize() const {

View File

@ -308,6 +308,16 @@ class DBImpl : public DB {
ColumnFamilyHandle* column_family = nullptr, ColumnFamilyHandle* column_family = nullptr,
bool disallow_trivial_move = false); bool disallow_trivial_move = false);
void TEST_MaybeFlushColumnFamilies();
bool TEST_UnableToFlushOldestLog() {
return unable_to_flush_oldest_log_;
}
bool TEST_IsLogGettingFlushed() {
return alive_log_files_.begin()->getting_flushed;
}
// Force current memtable contents to be flushed. // Force current memtable contents to be flushed.
Status TEST_FlushMemTable(bool wait = true, Status TEST_FlushMemTable(bool wait = true,
ColumnFamilyHandle* cfh = nullptr); ColumnFamilyHandle* cfh = nullptr);
@ -732,7 +742,7 @@ class DBImpl : public DB {
// REQUIRES: mutex locked // REQUIRES: mutex locked
Status PersistOptions(); Status PersistOptions();
void FlushColumnFamilies(); void MaybeFlushColumnFamilies();
uint64_t GetMaxTotalWalSize() const; uint64_t GetMaxTotalWalSize() const;
@ -991,6 +1001,15 @@ class DBImpl : public DB {
// Used when disableWAL is true. // Used when disableWAL is true.
bool has_unpersisted_data_; bool has_unpersisted_data_;
// if an attempt was made to flush all column families that
// the oldest log depends on but uncommited data in the oldest
// log prevents the log from being released.
// We must attempt to free the dependent memtables again
// at a later time after the transaction in the oldest
// log is fully commited.
bool unable_to_flush_oldest_log_;
static const int KEEP_LOG_FILE_NUM = 1000; static const int KEEP_LOG_FILE_NUM = 1000;
// MSVC version 1800 still does not have constexpr for ::max() // MSVC version 1800 still does not have constexpr for ::max()
static const uint64_t kNoTimeOut = port::kMaxUint64; static const uint64_t kNoTimeOut = port::kMaxUint64;

View File

@ -19,6 +19,11 @@ uint64_t DBImpl::TEST_GetLevel0TotalSize() {
return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0); return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0);
} }
void DBImpl::TEST_MaybeFlushColumnFamilies() {
InstrumentedMutexLock l(&mutex_);
MaybeFlushColumnFamilies();
}
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes( int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes(
ColumnFamilyHandle* column_family) { ColumnFamilyHandle* column_family) {
ColumnFamilyData* cfd; ColumnFamilyData* cfd;

View File

@ -221,6 +221,8 @@ class MemTableList {
// PickMemtablesToFlush() is called. // PickMemtablesToFlush() is called.
void FlushRequested() { flush_requested_ = true; } void FlushRequested() { flush_requested_ = true; }
bool HasFlushRequested() { return flush_requested_; }
// Copying allowed // Copying allowed
// MemTableList(const MemTableList&); // MemTableList(const MemTableList&);
// void operator=(const MemTableList&); // void operator=(const MemTableList&);

View File

@ -1178,6 +1178,108 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
delete cfb; delete cfb;
} }
TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
Status s;
ColumnFamilyHandle *cfa, *cfb;
ColumnFamilyOptions cf_options;
s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
ASSERT_OK(s);
s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
ASSERT_OK(s);
WriteOptions wopts;
wopts.disableWAL = false;
wopts.sync = true;
auto cfh_a = reinterpret_cast<ColumnFamilyHandleImpl*>(cfa);
auto cfh_b = reinterpret_cast<ColumnFamilyHandleImpl*>(cfb);
TransactionOptions topts1;
Transaction* txn1 = db->BeginTransaction(wopts, topts1);
s = txn1->SetName("xid1");
ASSERT_OK(s);
s = txn1->Put(cfa, "boys", "girls1");
ASSERT_OK(s);
Transaction* txn2 = db->BeginTransaction(wopts, topts1);
s = txn2->SetName("xid2");
ASSERT_OK(s);
s = txn2->Put(cfb, "up", "down1");
ASSERT_OK(s);
// prepre transaction in LOG A
s = txn1->Prepare();
ASSERT_OK(s);
// prepre transaction in LOG A
s = txn2->Prepare();
ASSERT_OK(s);
// regular put so that mem table can actually be flushed for log rolling
s = db->Put(wopts, "cats", "dogs1");
ASSERT_OK(s);
auto prepare_log_no = txn1->GetLogNumber();
// roll to LOG B
s = db_impl->TEST_FlushMemTable(true);
ASSERT_OK(s);
// now we pause background work so that
// imm()s are not flushed before we can check their status
s = db_impl->PauseBackgroundWork();
ASSERT_OK(s);
ASSERT_GT(db_impl->TEST_LogfileNumber(), prepare_log_no);
ASSERT_GT(cfh_a->cfd()->GetLogNumber(), prepare_log_no);
ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), db_impl->TEST_LogfileNumber());
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
prepare_log_no);
ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
// commit in LOG B
s = txn1->Commit();
ASSERT_OK(s);
ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), prepare_log_no);
ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog());
// request a flush for all column families such that the earliest
// alive log file can be killed
db_impl->TEST_MaybeFlushColumnFamilies();
// log cannot be flushed because txn2 has not been commited
ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed());
ASSERT_TRUE(db_impl->TEST_UnableToFlushOldestLog());
// assert that cfa has a flush requested
ASSERT_TRUE(cfh_a->cfd()->imm()->HasFlushRequested());
// cfb should not be flushed becuse it has no data from LOG A
ASSERT_TRUE(!cfh_b->cfd()->imm()->HasFlushRequested());
// cfb now has data from LOG A
s = txn2->Commit();
ASSERT_OK(s);
db_impl->TEST_MaybeFlushColumnFamilies();
ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog());
// we should see that cfb now has a flush requested
ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());
// all data in LOG A resides in a memtable that has been
// requested for a flush
ASSERT_TRUE(db_impl->TEST_IsLogGettingFlushed());
delete txn1;
delete txn2;
delete cfa;
delete cfb;
}
/* /*
* 1) use prepare to keep first log around to determine starting sequence * 1) use prepare to keep first log around to determine starting sequence
* during recovery. * during recovery.