Summary: Change the `MemPurge` code to address a failure during a crash test reported in https://github.com/facebook/rocksdb/issues/8958. ### Details and results of the crash investigation: These failures happened in a specific scenario where the list of immutable tables was composed of 2 or more memtables, and the last memtable was the output of a previous `Mempurge` operation. Because the `PickMemtablesToFlush` function included a sorting of the memtables (previous PR related to the Mempurge project), and because the `VersionEdit` of the flush class is piggybacked onto a single one of these memtables, the `VersionEdit` was not properly selected and applied to the `VersionSet` of the DB. Since the `VersionSet` was not edited properly, the database was losing track of the SST file created during the flush process, which was subsequently deleted (and as you can expect, caused the tests to crash). The following command consistently failed, which was quite convenient to investigate the issue: `$ while rm -rf /dev/shm/single_stress && ./db_stress --clear_column_family_one_in=0 --column_families=1 --db=/dev/shm/single_stress --experimental_mempurge_threshold=5.493146827397074 --flush_one_in=10000 --reopen=0 --write_buffer_size=262144 --value_size_mult=33 --max_write_buffer_number=3 -ops_per_thread=10000; do : ; done` ### Solution proposed The memtables are no longer sorted based on their `memtableID` in the `PickMemtablesToFlush` function. Additionally, the `next_log_number` of the memtable created as an output of the `Mempurge` function now takes in the correct value (the log number of the first memtable being mempurged). Finally, the VersionEdit object of the flush class now takes the maximum `next_log_number` of the stack of memtables being flushed, which doesnt change anything when Mempurge is `off` but becomes necessary when Mempurge is `on`. ### Testing of the solution The following command no longer fails: ``$ while rm -rf /dev/shm/single_stress && ./db_stress --clear_column_family_one_in=0 --column_families=1 --db=/dev/shm/single_stress --experimental_mempurge_threshold=5.493146827397074 --flush_one_in=10000 --reopen=0 --write_buffer_size=262144 --value_size_mult=33 --max_write_buffer_number=3 -ops_per_thread=10000; do : ; done`` Additionally, I ran `db_crashtest` (`whitebox` and `blackbox`) for 2.5 hours with MemPurge on and did not observe any crash. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9671 Reviewed By: pdillinger Differential Revision: D34697424 Pulled By: bjlemaire fbshipit-source-id: d1ab675b361904351ac81a35c184030e52222874
This commit is contained in:
parent
062396af15
commit
7bed6595f3
@ -1374,6 +1374,140 @@ TEST_F(DBFlushTest, DISABLED_MemPurgeWALSupport) {
|
||||
} while (ChangeWalOptions());
|
||||
}
|
||||
|
||||
TEST_F(DBFlushTest, MemPurgeCorrectLogNumberAndSSTFileCreation) {
|
||||
// Before our bug fix, we noticed that when 2 memtables were
|
||||
// being flushed (with one memtable being the output of a
|
||||
// previous MemPurge and one memtable being a newly-sealed memtable),
|
||||
// the SST file created was not properly added to the DB version
|
||||
// (via the VersionEdit obj), leading to data loss (the SST file
|
||||
// was later being purged as an obsolete file).
|
||||
// Therefore, we reproduce this scenario to test our fix.
|
||||
Options options = CurrentOptions();
|
||||
|
||||
options.create_if_missing = true;
|
||||
options.compression = kNoCompression;
|
||||
options.inplace_update_support = false;
|
||||
options.allow_concurrent_memtable_write = true;
|
||||
|
||||
// Enforce size of a single MemTable to 1MB (64MB = 1048576 bytes).
|
||||
options.write_buffer_size = 1 << 20;
|
||||
// Activate the MemPurge prototype.
|
||||
options.experimental_mempurge_threshold = 1.0;
|
||||
|
||||
// Force to have more than one memtable to trigger a flush.
|
||||
// For some reason this option does not seem to be enforced,
|
||||
// so the following test is designed to make sure that we
|
||||
// are testing the correct test case.
|
||||
options.min_write_buffer_number_to_merge = 3;
|
||||
options.max_write_buffer_number = 5;
|
||||
options.max_write_buffer_size_to_maintain = 2 * (options.write_buffer_size);
|
||||
options.disable_auto_compactions = true;
|
||||
ASSERT_OK(TryReopen(options));
|
||||
|
||||
std::atomic<uint32_t> mempurge_count{0};
|
||||
std::atomic<uint32_t> sst_count{0};
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::FlushJob:MemPurgeSuccessful",
|
||||
[&](void* /*arg*/) { mempurge_count++; });
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
// Dummy variable used for the following callback function.
|
||||
uint64_t ZERO = 0;
|
||||
// We will first execute mempurge operations exclusively.
|
||||
// Therefore, when the first flush is triggered, we want to make
|
||||
// sure there is at least 2 memtables being flushed: one output
|
||||
// from a previous mempurge, and one newly sealed memtable.
|
||||
// This is when we observed in the past that some SST files created
|
||||
// were not properly added to the DB version (via the VersionEdit obj).
|
||||
std::atomic<uint64_t> num_memtable_at_first_flush(0);
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"FlushJob::WriteLevel0Table:num_memtables", [&](void* arg) {
|
||||
uint64_t* mems_size = reinterpret_cast<uint64_t*>(arg);
|
||||
// atomic_compare_exchange_strong sometimes updates the value
|
||||
// of ZERO (the "expected" object), so we make sure ZERO is indeed...
|
||||
// zero.
|
||||
ZERO = 0;
|
||||
std::atomic_compare_exchange_strong(&num_memtable_at_first_flush, &ZERO,
|
||||
*mems_size);
|
||||
});
|
||||
|
||||
const std::vector<std::string> KEYS = {
|
||||
"ThisIsKey1", "ThisIsKey2", "ThisIsKey3", "ThisIsKey4", "ThisIsKey5",
|
||||
"ThisIsKey6", "ThisIsKey7", "ThisIsKey8", "ThisIsKey9"};
|
||||
const std::string NOT_FOUND = "NOT_FOUND";
|
||||
|
||||
Random rnd(117);
|
||||
const uint64_t NUM_REPEAT_OVERWRITES = 100;
|
||||
const uint64_t NUM_RAND_INSERTS = 500;
|
||||
const uint64_t RAND_VALUES_LENGTH = 10240;
|
||||
|
||||
std::string key, value;
|
||||
std::vector<std::string> values(9, "");
|
||||
|
||||
// Keys used to check that no SST file disappeared.
|
||||
for (uint64_t k = 0; k < 5; k++) {
|
||||
values[k] = rnd.RandomString(RAND_VALUES_LENGTH);
|
||||
ASSERT_OK(Put(KEYS[k], values[k]));
|
||||
}
|
||||
|
||||
// Insertion of of K-V pairs, multiple times.
|
||||
// Trigger at least one mempurge and no SST file creation.
|
||||
for (size_t i = 0; i < NUM_REPEAT_OVERWRITES; i++) {
|
||||
// Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
|
||||
for (uint64_t k = 5; k < values.size(); k++) {
|
||||
values[k] = rnd.RandomString(RAND_VALUES_LENGTH);
|
||||
ASSERT_OK(Put(KEYS[k], values[k]));
|
||||
}
|
||||
// Check database consistency.
|
||||
for (uint64_t k = 0; k < values.size(); k++) {
|
||||
ASSERT_EQ(Get(KEYS[k]), values[k]);
|
||||
}
|
||||
}
|
||||
|
||||
// Check that there was at least one mempurge
|
||||
uint32_t expected_min_mempurge_count = 1;
|
||||
// Check that there was no SST files created during flush.
|
||||
uint32_t expected_sst_count = 0;
|
||||
EXPECT_GE(mempurge_count.load(), expected_min_mempurge_count);
|
||||
EXPECT_EQ(sst_count.load(), expected_sst_count);
|
||||
|
||||
// Trigger an SST file creation and no mempurge.
|
||||
for (size_t i = 0; i < NUM_RAND_INSERTS; i++) {
|
||||
key = rnd.RandomString(RAND_VALUES_LENGTH);
|
||||
// Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
|
||||
value = rnd.RandomString(RAND_VALUES_LENGTH);
|
||||
ASSERT_OK(Put(key, value));
|
||||
// Check database consistency.
|
||||
for (uint64_t k = 0; k < values.size(); k++) {
|
||||
ASSERT_EQ(Get(KEYS[k]), values[k]);
|
||||
}
|
||||
ASSERT_EQ(Get(key), value);
|
||||
}
|
||||
|
||||
// Check that there was at least one SST files created during flush.
|
||||
expected_sst_count = 1;
|
||||
EXPECT_GE(sst_count.load(), expected_sst_count);
|
||||
|
||||
// Oddly enough, num_memtable_at_first_flush is not enforced to be
|
||||
// equal to min_write_buffer_number_to_merge. So by asserting that
|
||||
// the first SST file creation comes from one output memtable
|
||||
// from a previous mempurge, and one newly sealed memtable. This
|
||||
// is the scenario where we observed that some SST files created
|
||||
// were not properly added to the DB version before our bug fix.
|
||||
ASSERT_GE(num_memtable_at_first_flush.load(), 2);
|
||||
|
||||
// Check that no data was lost after SST file creation.
|
||||
for (uint64_t k = 0; k < values.size(); k++) {
|
||||
ASSERT_EQ(Get(KEYS[k]), values[k]);
|
||||
}
|
||||
// Extra check of database consistency.
|
||||
ASSERT_EQ(Get(key), value);
|
||||
|
||||
Close();
|
||||
}
|
||||
|
||||
TEST_P(DBFlushDirectIOTest, DirectIO) {
|
||||
Options options;
|
||||
options.create_if_missing = true;
|
||||
|
@ -170,8 +170,20 @@ void FlushJob::PickMemTable() {
|
||||
db_mutex_->AssertHeld();
|
||||
assert(!pick_memtable_called);
|
||||
pick_memtable_called = true;
|
||||
|
||||
// Maximum "NextLogNumber" of the memtables to flush.
|
||||
// When mempurge feature is turned off, this variable is useless
|
||||
// because the memtables are implicitly sorted by increasing order of creation
|
||||
// time. Therefore mems_->back()->GetNextLogNumber() is already equal to
|
||||
// max_next_log_number. However when Mempurge is on, the memtables are no
|
||||
// longer sorted by increasing order of creation time. Therefore this variable
|
||||
// becomes necessary because mems_->back()->GetNextLogNumber() is no longer
|
||||
// necessarily equal to max_next_log_number.
|
||||
uint64_t max_next_log_number = 0;
|
||||
|
||||
// Save the contents of the earliest memtable as a new Table
|
||||
cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_);
|
||||
cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_,
|
||||
&max_next_log_number);
|
||||
if (mems_.empty()) {
|
||||
return;
|
||||
}
|
||||
@ -186,7 +198,7 @@ void FlushJob::PickMemTable() {
|
||||
edit_->SetPrevLogNumber(0);
|
||||
// SetLogNumber(log_num) indicates logs with number smaller than log_num
|
||||
// will no longer be picked up for recovery.
|
||||
edit_->SetLogNumber(mems_.back()->GetNextLogNumber());
|
||||
edit_->SetLogNumber(max_next_log_number);
|
||||
edit_->SetColumnFamily(cfd_->GetID());
|
||||
|
||||
// path 0 for level 0 file.
|
||||
@ -569,6 +581,7 @@ Status FlushJob::MemPurge() {
|
||||
uint64_t new_mem_id = mems_[0]->GetID();
|
||||
|
||||
new_mem->SetID(new_mem_id);
|
||||
new_mem->SetNextLogNumber(mems_[0]->GetNextLogNumber());
|
||||
|
||||
// This addition will not trigger another flush, because
|
||||
// we do not call SchedulePendingFlush().
|
||||
@ -815,6 +828,12 @@ Status FlushJob::WriteLevel0Table() {
|
||||
uint64_t total_num_entries = 0, total_num_deletes = 0;
|
||||
uint64_t total_data_size = 0;
|
||||
size_t total_memory_usage = 0;
|
||||
// Used for testing:
|
||||
uint64_t mems_size = mems_.size();
|
||||
(void)mems_size; // avoids unused variable error when
|
||||
// TEST_SYNC_POINT_CALLBACK not used.
|
||||
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:num_memtables",
|
||||
&mems_size);
|
||||
for (MemTable* m : mems_) {
|
||||
ROCKS_LOG_INFO(
|
||||
db_options_.info_log,
|
||||
|
@ -338,7 +338,8 @@ bool MemTableList::IsFlushPending() const {
|
||||
|
||||
// Returns the memtables that need to be flushed.
|
||||
void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id,
|
||||
autovector<MemTable*>* ret) {
|
||||
autovector<MemTable*>* ret,
|
||||
uint64_t* max_next_log_number) {
|
||||
AutoThreadOperationStageUpdater stage_updater(
|
||||
ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
|
||||
const auto& memlist = current_->memlist_;
|
||||
@ -349,8 +350,7 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id,
|
||||
// iterating through the memlist starting at the end, the vector<MemTable*>
|
||||
// ret is filled with memtables already sorted in increasing MemTable ID.
|
||||
// However, when the mempurge feature is activated, new memtables with older
|
||||
// IDs will be added to the memlist. Therefore we std::sort(ret) at the end to
|
||||
// return a vector of memtables sorted by increasing memtable ID.
|
||||
// IDs will be added to the memlist.
|
||||
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
|
||||
MemTable* m = *it;
|
||||
if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) {
|
||||
@ -366,21 +366,16 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id,
|
||||
imm_flush_needed.store(false, std::memory_order_release);
|
||||
}
|
||||
m->flush_in_progress_ = true; // flushing will start very soon
|
||||
if (max_next_log_number) {
|
||||
*max_next_log_number =
|
||||
std::max(m->GetNextLogNumber(), *max_next_log_number);
|
||||
}
|
||||
ret->push_back(m);
|
||||
}
|
||||
}
|
||||
if (!atomic_flush || num_flush_not_started_ == 0) {
|
||||
flush_requested_ = false; // start-flush request is complete
|
||||
}
|
||||
|
||||
// Sort the list of memtables by increasing memtable ID.
|
||||
// This is useful when the mempurge feature is activated
|
||||
// and the memtables are not guaranteed to be sorted in
|
||||
// the memlist vector.
|
||||
std::sort(ret->begin(), ret->end(),
|
||||
[](const MemTable* m1, const MemTable* m2) -> bool {
|
||||
return m1->GetID() < m2->GetID();
|
||||
});
|
||||
}
|
||||
|
||||
void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
|
||||
|
@ -253,7 +253,8 @@ class MemTableList {
|
||||
// Returns the earliest memtables that needs to be flushed. The returned
|
||||
// memtables are guaranteed to be in the ascending order of created time.
|
||||
void PickMemtablesToFlush(uint64_t max_memtable_id,
|
||||
autovector<MemTable*>* mems);
|
||||
autovector<MemTable*>* mems,
|
||||
uint64_t* max_next_log_number = nullptr);
|
||||
|
||||
// Reset status of the given memtable list back to pending state so that
|
||||
// they can get picked up again on the next round of flush.
|
||||
|
Loading…
Reference in New Issue
Block a user