Fix bug causing incorrect data returned by snapshot read (#9648)

Summary:
This bug affects use cases that meet the following conditions
- (has only the default column family or disables WAL) and
- has at least one event listener
- atomic flush is NOT affected.

If the above conditions meet, then RocksDB can release the db mutex before picking all the
existing memtables to flush. In the meantime, a snapshot can be created and db's sequence
number can still be incremented. The upcoming flush will ignore this snapshot.
A later read using this snapshot can return incorrect result.

To fix this issue, we call the listeners callbacks after picking the memtables so that we avoid
creating snapshots during this interval.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9648

Test Plan: make check

Reviewed By: ajkr

Differential Revision: D34555456

Pulled By: riversand963

fbshipit-source-id: 1438981e9f069a5916686b1a0ad7627f734cf0ee
This commit is contained in:
Yanqin Jin 2022-03-02 21:03:14 -08:00 committed by Andrew Kryczka
parent a6fd3332e5
commit 596c606739
3 changed files with 89 additions and 8 deletions

View File

@ -2,9 +2,8 @@
## Unreleased ## Unreleased
### Bug Fixes ### Bug Fixes
* Fix a race condition when cancel manual compaction with `DisableManualCompaction`. Also DB close can cancel the manual compaction thread. * Fix a race condition when cancel manual compaction with `DisableManualCompaction`. Also DB close can cancel the manual compaction thread.
* Fixed a data race on `versions_` between `DBImpl::ResumeImpl()` and threads waiting for recovery to complete (#9496)
### Bug Fixes * Fixed a bug caused by race among flush, incoming writes and taking snapshots. Queries to snapshots created with these race condition can return incorrect result, e.g. resurfacing deleted data.
* * Fixed a data race on `versions_` between `DBImpl::ResumeImpl()` and threads waiting for recovery to complete (#9496)
## 7.0.0 (02/20/2022) ## 7.0.0 (02/20/2022)
### Bug Fixes ### Bug Fixes

View File

@ -676,6 +676,7 @@ class TestFlushListener : public EventListener {
~TestFlushListener() override { ~TestFlushListener() override {
prev_fc_info_.status.PermitUncheckedError(); // Ignore the status prev_fc_info_.status.PermitUncheckedError(); // Ignore the status
} }
void OnTableFileCreated(const TableFileCreationInfo& info) override { void OnTableFileCreated(const TableFileCreationInfo& info) override {
// remember the info for later checking the FlushJobInfo. // remember the info for later checking the FlushJobInfo.
prev_fc_info_ = info; prev_fc_info_ = info;
@ -1999,6 +2000,61 @@ TEST_P(DBFlushTestBlobError, FlushError) {
} }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
TEST_F(DBFlushTest, TombstoneVisibleInSnapshot) {
class SimpleTestFlushListener : public EventListener {
public:
explicit SimpleTestFlushListener(DBFlushTest* _test) : test_(_test) {}
~SimpleTestFlushListener() override {}
void OnFlushBegin(DB* db, const FlushJobInfo& info) override {
ASSERT_EQ(static_cast<uint32_t>(0), info.cf_id);
ASSERT_OK(db->Delete(WriteOptions(), "foo"));
snapshot_ = db->GetSnapshot();
ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
auto* dbimpl = static_cast_with_check<DBImpl>(db);
assert(dbimpl);
ColumnFamilyHandle* cfh = db->DefaultColumnFamily();
auto* cfhi = static_cast_with_check<ColumnFamilyHandleImpl>(cfh);
assert(cfhi);
ASSERT_OK(dbimpl->TEST_SwitchMemtable(cfhi->cfd()));
}
DBFlushTest* test_ = nullptr;
const Snapshot* snapshot_ = nullptr;
};
Options options = CurrentOptions();
options.create_if_missing = true;
auto* listener = new SimpleTestFlushListener(this);
options.listeners.emplace_back(listener);
DestroyAndReopen(options);
ASSERT_OK(db_->Put(WriteOptions(), "foo", "value0"));
ManagedSnapshot snapshot_guard(db_);
ColumnFamilyHandle* default_cf = db_->DefaultColumnFamily();
ASSERT_OK(db_->Flush(FlushOptions(), default_cf));
const Snapshot* snapshot = listener->snapshot_;
assert(snapshot);
ReadOptions read_opts;
read_opts.snapshot = snapshot;
// Using snapshot should not see "foo".
{
std::string value;
Status s = db_->Get(read_opts, "foo", &value);
ASSERT_TRUE(s.IsNotFound());
}
db_->ReleaseSnapshot(snapshot);
}
TEST_P(DBAtomicFlushTest, ManualFlushUnder2PC) { TEST_P(DBAtomicFlushTest, ManualFlushUnder2PC) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.create_if_missing = true; options.create_if_missing = true;

View File

@ -170,6 +170,7 @@ Status DBImpl::FlushMemTableToOutputFile(
const bool needs_to_sync_closed_wals = const bool needs_to_sync_closed_wals =
logfile_number_ > 0 && logfile_number_ > 0 &&
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1; versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1;
// If needs_to_sync_closed_wals is true, we need to record the current // If needs_to_sync_closed_wals is true, we need to record the current
// maximum memtable ID of this column family so that a later PickMemtables() // maximum memtable ID of this column family so that a later PickMemtables()
// call will not pick memtables whose IDs are higher. This is due to the fact // call will not pick memtables whose IDs are higher. This is due to the fact
@ -177,9 +178,33 @@ Status DBImpl::FlushMemTableToOutputFile(
// happen for this column family in the meantime. The newly created memtables // happen for this column family in the meantime. The newly created memtables
// have their data backed by unsynced WALs, thus they cannot be included in // have their data backed by unsynced WALs, thus they cannot be included in
// this flush job. // this flush job.
// Another reason why we must record the current maximum memtable ID of this
// column family: SyncClosedLogs() may release db mutex, thus it's possible
// for application to continue to insert into memtables increasing db's
// sequence number. The application may take a snapshot, but this snapshot is
// not included in `snapshot_seqs` which will be passed to flush job because
// `snapshot_seqs` has already been computed before this function starts.
// Recording the max memtable ID ensures that the flush job does not flush
// a memtable without knowing such snapshot(s).
uint64_t max_memtable_id = needs_to_sync_closed_wals uint64_t max_memtable_id = needs_to_sync_closed_wals
? cfd->imm()->GetLatestMemTableID() ? cfd->imm()->GetLatestMemTableID()
: port::kMaxUint64; : port::kMaxUint64;
// If needs_to_sync_closed_wals is false, then the flush job will pick ALL
// existing memtables of the column family when PickMemTable() is called
// later. Although we won't call SyncClosedLogs() in this case, we may still
// call the callbacks of the listeners, i.e. NotifyOnFlushBegin() which also
// releases and re-acquires the db mutex. In the meantime, the application
// can still insert into the memtables and increase the db's sequence number.
// The application can take a snapshot, hoping that the latest visible state
// to this snapshto is preserved. This is hard to guarantee since db mutex
// not held. This newly-created snapshot is not included in `snapshot_seqs`
// and the flush job is unaware of its presence. Consequently, the flush job
// may drop certain keys when generating the L0, causing incorrect data to be
// returned for snapshot read using this snapshot.
// To address this, we make sure NotifyOnFlushBegin() executes after memtable
// picking so that no new snapshot can be taken between the two functions.
FlushJob flush_job( FlushJob flush_job(
dbname_, cfd, immutable_db_options_, mutable_cf_options, max_memtable_id, dbname_, cfd, immutable_db_options_, mutable_cf_options, max_memtable_id,
file_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_, file_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_,
@ -192,11 +217,6 @@ Status DBImpl::FlushMemTableToOutputFile(
&blob_callback_); &blob_callback_);
FileMetaData file_meta; FileMetaData file_meta;
#ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex.
NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id);
#endif // ROCKSDB_LITE
Status s; Status s;
bool need_cancel = false; bool need_cancel = false;
IOStatus log_io_s = IOStatus::OK(); IOStatus log_io_s = IOStatus::OK();
@ -221,6 +241,12 @@ Status DBImpl::FlushMemTableToOutputFile(
} }
TEST_SYNC_POINT_CALLBACK( TEST_SYNC_POINT_CALLBACK(
"DBImpl::FlushMemTableToOutputFile:AfterPickMemtables", &flush_job); "DBImpl::FlushMemTableToOutputFile:AfterPickMemtables", &flush_job);
#ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex.
NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id);
#endif // ROCKSDB_LITE
bool switched_to_mempurge = false; bool switched_to_mempurge = false;
// Within flush_job.Run, rocksdb may call event listener to notify // Within flush_job.Run, rocksdb may call event listener to notify
// file creation and deletion. // file creation and deletion.