TransactionUtil::CheckKey() to skip unnecessary history (#4941)
Summary: If a memtable definitely covers a key, there isn't a need to check older memtables. We can skip them by checking the earliest sequence number. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4941 Differential Revision: D13932666 fbshipit-source-id: b9d52f234b8ad9dd3bf6547645cd457175a3ca9b
This commit is contained in:
parent
a94aef6596
commit
58c4aee42e
@ -3412,7 +3412,9 @@ SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv,
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
|
||||
bool cache_only, SequenceNumber* seq,
|
||||
bool cache_only,
|
||||
SequenceNumber lower_bound_seq,
|
||||
SequenceNumber* seq,
|
||||
bool* found_record_for_key,
|
||||
bool* is_blob_index) {
|
||||
Status s;
|
||||
@ -3445,6 +3447,13 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
SequenceNumber lower_bound_in_mem = sv->mem->GetEarliestSequenceNumber();
|
||||
if (lower_bound_in_mem != kMaxSequenceNumber &&
|
||||
lower_bound_in_mem < lower_bound_seq) {
|
||||
*found_record_for_key = false;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// Check if there is a record for this key in the immutable memtables
|
||||
sv->imm->Get(lkey, nullptr, &s, &merge_context, &max_covering_tombstone_seq,
|
||||
seq, read_options, nullptr /*read_callback*/, is_blob_index);
|
||||
@ -3464,6 +3473,13 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
SequenceNumber lower_bound_in_imm = sv->imm->GetEarliestSequenceNumber();
|
||||
if (lower_bound_in_imm != kMaxSequenceNumber &&
|
||||
lower_bound_in_imm < lower_bound_seq) {
|
||||
*found_record_for_key = false;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// Check if there is a record for this key in the immutable memtables
|
||||
sv->imm->GetFromHistory(lkey, nullptr, &s, &merge_context,
|
||||
&max_covering_tombstone_seq, seq, read_options,
|
||||
@ -3485,6 +3501,10 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// We could do a sv->imm->GetEarliestSequenceNumber(/*include_history*/ true)
|
||||
// check here to skip the history if possible. But currently the caller
|
||||
// already does that. Maybe we should move the logic here later.
|
||||
|
||||
// TODO(agiardullo): possible optimization: consider checking cached
|
||||
// SST files if cache_only=true?
|
||||
if (!cache_only) {
|
||||
|
@ -413,11 +413,17 @@ class DBImpl : public DB {
|
||||
// snapshot, we know that no key could have existing after this snapshot
|
||||
// (since we do not compact keys that have an earlier snapshot).
|
||||
//
|
||||
// Only records newer than or at `lower_bound_seq` are guaranteed to be
|
||||
// returned. Memtables and files may not be checked if it only contains data
|
||||
// older than `lower_bound_seq`.
|
||||
//
|
||||
// Returns OK or NotFound on success,
|
||||
// other status on unexpected error.
|
||||
// TODO(andrewkr): this API need to be aware of range deletion operations
|
||||
Status GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
|
||||
bool cache_only, SequenceNumber* seq,
|
||||
bool cache_only,
|
||||
SequenceNumber lower_bound_seq,
|
||||
SequenceNumber* seq,
|
||||
bool* found_record_for_key,
|
||||
bool* is_blob_index = nullptr);
|
||||
|
||||
|
@ -1426,8 +1426,8 @@ class BlobDBImpl::GarbageCollectionWriteCallback : public WriteCallback {
|
||||
bool found_record_for_key = false;
|
||||
bool is_blob_index = false;
|
||||
Status s = db_impl->GetLatestSequenceForKey(
|
||||
sv, key_, false /*cache_only*/, &latest_seq, &found_record_for_key,
|
||||
&is_blob_index);
|
||||
sv, key_, false /*cache_only*/, 0 /*lower_bound_seq*/, &latest_seq,
|
||||
&found_record_for_key, &is_blob_index);
|
||||
db_impl->ReturnAndCleanupSuperVersion(cfd_, sv);
|
||||
if (!s.ok() && !s.IsNotFound()) {
|
||||
// Error.
|
||||
|
@ -9,11 +9,15 @@
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
|
||||
#include "db/db_impl/db_impl.h"
|
||||
#include "logging/logging.h"
|
||||
#include "port/port.h"
|
||||
#include "rocksdb/db.h"
|
||||
#include "rocksdb/perf_context.h"
|
||||
#include "rocksdb/utilities/optimistic_transaction_db.h"
|
||||
#include "rocksdb/utilities/transaction.h"
|
||||
#include "test_util/sync_point.h"
|
||||
#include "test_util/testharness.h"
|
||||
#include "test_util/transaction_test_util.h"
|
||||
#include "util/crc32c.h"
|
||||
@ -308,6 +312,120 @@ TEST_F(OptimisticTransactionTest, FlushTest2) {
|
||||
delete txn;
|
||||
}
|
||||
|
||||
// Trigger the condition where some old memtables are skipped when doing
|
||||
// TransactionUtil::CheckKey(), and make sure the result is still correct.
|
||||
TEST_F(OptimisticTransactionTest, CheckKeySkipOldMemtable) {
|
||||
const int kAttemptHistoryMemtable = 0;
|
||||
const int kAttemptImmMemTable = 1;
|
||||
for (int attempt = kAttemptHistoryMemtable; attempt <= kAttemptImmMemTable;
|
||||
attempt++) {
|
||||
options.max_write_buffer_number_to_maintain = 3;
|
||||
Reopen();
|
||||
|
||||
WriteOptions write_options;
|
||||
ReadOptions read_options;
|
||||
ReadOptions snapshot_read_options;
|
||||
ReadOptions snapshot_read_options2;
|
||||
string value;
|
||||
Status s;
|
||||
|
||||
ASSERT_OK(txn_db->Put(write_options, Slice("foo"), Slice("bar")));
|
||||
ASSERT_OK(txn_db->Put(write_options, Slice("foo2"), Slice("bar")));
|
||||
|
||||
Transaction* txn = txn_db->BeginTransaction(write_options);
|
||||
ASSERT_TRUE(txn != nullptr);
|
||||
|
||||
Transaction* txn2 = txn_db->BeginTransaction(write_options);
|
||||
ASSERT_TRUE(txn2 != nullptr);
|
||||
|
||||
snapshot_read_options.snapshot = txn->GetSnapshot();
|
||||
ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
|
||||
ASSERT_EQ(value, "bar");
|
||||
ASSERT_OK(txn->Put(Slice("foo"), Slice("bar2")));
|
||||
|
||||
snapshot_read_options2.snapshot = txn2->GetSnapshot();
|
||||
ASSERT_OK(txn2->GetForUpdate(snapshot_read_options2, "foo2", &value));
|
||||
ASSERT_EQ(value, "bar");
|
||||
ASSERT_OK(txn2->Put(Slice("foo2"), Slice("bar2")));
|
||||
|
||||
// txn updates "foo" and txn2 updates "foo2", and now a write is
|
||||
// issued for "foo", which conflicts with txn but not txn2
|
||||
ASSERT_OK(txn_db->Put(write_options, "foo", "bar"));
|
||||
|
||||
if (attempt == kAttemptImmMemTable) {
|
||||
// For the second attempt, hold flush from beginning. The memtable
|
||||
// will be switched to immutable after calling TEST_SwitchMemtable()
|
||||
// while CheckKey() is called.
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"OptimisticTransactionTest.CheckKeySkipOldMemtable",
|
||||
"FlushJob::Start"}});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
}
|
||||
|
||||
// force a memtable flush. The memtable should still be kept
|
||||
FlushOptions flush_ops;
|
||||
if (attempt == kAttemptHistoryMemtable) {
|
||||
ASSERT_OK(txn_db->Flush(flush_ops));
|
||||
} else {
|
||||
assert(attempt == kAttemptImmMemTable);
|
||||
DBImpl* db_impl = static_cast<DBImpl*>(txn_db->GetRootDB());
|
||||
db_impl->TEST_SwitchMemtable();
|
||||
}
|
||||
uint64_t num_imm_mems;
|
||||
ASSERT_TRUE(txn_db->GetIntProperty(DB::Properties::kNumImmutableMemTable,
|
||||
&num_imm_mems));
|
||||
if (attempt == kAttemptHistoryMemtable) {
|
||||
ASSERT_EQ(0, num_imm_mems);
|
||||
} else {
|
||||
assert(attempt == kAttemptImmMemTable);
|
||||
ASSERT_EQ(1, num_imm_mems);
|
||||
}
|
||||
|
||||
// Put something in active memtable
|
||||
ASSERT_OK(txn_db->Put(write_options, Slice("foo3"), Slice("bar")));
|
||||
|
||||
// Create txn3 after flushing, when this transaction is commited,
|
||||
// only need to check the active memtable
|
||||
Transaction* txn3 = txn_db->BeginTransaction(write_options);
|
||||
ASSERT_TRUE(txn3 != nullptr);
|
||||
|
||||
// Commit both of txn and txn2. txn will conflict but txn2 will
|
||||
// pass. In both ways, both memtables are queried.
|
||||
SetPerfLevel(PerfLevel::kEnableCount);
|
||||
|
||||
get_perf_context()->Reset();
|
||||
s = txn->Commit();
|
||||
// We should have checked two memtables
|
||||
ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
|
||||
// txn should fail because of conflict, even if the memtable
|
||||
// has flushed, because it is still preserved in history.
|
||||
ASSERT_TRUE(s.IsBusy());
|
||||
|
||||
get_perf_context()->Reset();
|
||||
s = txn2->Commit();
|
||||
// We should have checked two memtables
|
||||
ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
|
||||
ASSERT_TRUE(s.ok());
|
||||
|
||||
txn3->Put(Slice("foo2"), Slice("bar2"));
|
||||
get_perf_context()->Reset();
|
||||
s = txn3->Commit();
|
||||
// txn3 is created after the active memtable is created, so that is the only
|
||||
// memtable to check.
|
||||
ASSERT_EQ(1, get_perf_context()->get_from_memtable_count);
|
||||
ASSERT_TRUE(s.ok());
|
||||
|
||||
TEST_SYNC_POINT("OptimisticTransactionTest.CheckKeySkipOldMemtable");
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
||||
SetPerfLevel(PerfLevel::kDisable);
|
||||
|
||||
delete txn;
|
||||
delete txn2;
|
||||
delete txn3;
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(OptimisticTransactionTest, NoSnapshotTest) {
|
||||
WriteOptions write_options;
|
||||
ReadOptions read_options;
|
||||
|
@ -52,6 +52,12 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv,
|
||||
const std::string& key, bool cache_only,
|
||||
ReadCallback* snap_checker,
|
||||
SequenceNumber min_uncommitted) {
|
||||
// When `min_uncommitted` is provided, keys are not always committed
|
||||
// in sequence number order, and `snap_checker` is used to check whether
|
||||
// specific sequence number is in the database is visible to the transaction.
|
||||
// So `snap_checker` must be provided.
|
||||
assert(min_uncommitted == kMaxSequenceNumber || snap_checker != nullptr);
|
||||
|
||||
Status result;
|
||||
bool need_to_read_sst = false;
|
||||
|
||||
@ -100,8 +106,19 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv,
|
||||
SequenceNumber seq = kMaxSequenceNumber;
|
||||
bool found_record_for_key = false;
|
||||
|
||||
// When min_uncommitted == kMaxSequenceNumber, writes are committed in
|
||||
// sequence number order, so only keys larger than `snap_seq` can cause
|
||||
// conflict.
|
||||
// When min_uncommitted != kMaxSequenceNumber, keys lower than
|
||||
// min_uncommitted will not triggered conflicts, while keys larger than
|
||||
// min_uncommitted might create conflicts, so we need to read them out
|
||||
// from the DB, and call callback to snap_checker to determine. So only
|
||||
// keys lower than min_uncommitted can be skipped.
|
||||
SequenceNumber lower_bound_seq =
|
||||
(min_uncommitted == kMaxSequenceNumber) ? snap_seq : min_uncommitted;
|
||||
Status s = db_impl->GetLatestSequenceForKey(sv, key, !need_to_read_sst,
|
||||
&seq, &found_record_for_key);
|
||||
lower_bound_seq, &seq,
|
||||
&found_record_for_key);
|
||||
|
||||
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
|
||||
result = s;
|
||||
|
@ -50,6 +50,9 @@ class TransactionUtil {
|
||||
// SST files. This will make it more likely this function will
|
||||
// return an error if it is unable to determine if there are any conflicts.
|
||||
//
|
||||
// See comment of CheckKey() for explanation of `snap_seq`, `snap_checker`
|
||||
// and `min_uncommitted`.
|
||||
//
|
||||
// Returns OK on success, BUSY if there is a conflicting write, or other error
|
||||
// status for any unexpected errors.
|
||||
static Status CheckKeyForConflicts(
|
||||
@ -72,6 +75,14 @@ class TransactionUtil {
|
||||
bool cache_only);
|
||||
|
||||
private:
|
||||
// If `snap_checker` == nullptr, writes are always commited in sequence number
|
||||
// order. All sequence number <= `snap_seq` will not conflict with any
|
||||
// write, and all keys > `snap_seq` of `key` will trigger conflict.
|
||||
// If `snap_checker` != nullptr, writes may not commit in sequence number
|
||||
// order. In this case `min_uncommitted` is a lower bound.
|
||||
// seq < `min_uncommitted`: no conflict
|
||||
// seq > `snap_seq`: applicable to conflict
|
||||
// `min_uncommitted` <= seq <= `snap_seq`: call `snap_checker` to determine.
|
||||
static Status CheckKey(DBImpl* db_impl, SuperVersion* sv,
|
||||
SequenceNumber earliest_seq, SequenceNumber snap_seq,
|
||||
const std::string& key, bool cache_only,
|
||||
|
@ -761,6 +761,147 @@ TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) {
|
||||
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
|
||||
}
|
||||
|
||||
// Trigger the condition where some old memtables are skipped when doing
|
||||
// TransactionUtil::CheckKey(), and make sure the result is still correct.
|
||||
TEST_P(WritePreparedTransactionTest, CheckKeySkipOldMemtable) {
|
||||
const int kAttemptHistoryMemtable = 0;
|
||||
const int kAttemptImmMemTable = 1;
|
||||
for (int attempt = kAttemptHistoryMemtable; attempt <= kAttemptImmMemTable;
|
||||
attempt++) {
|
||||
options.max_write_buffer_number_to_maintain = 3;
|
||||
ReOpen();
|
||||
|
||||
WriteOptions write_options;
|
||||
ReadOptions read_options;
|
||||
TransactionOptions txn_options;
|
||||
txn_options.set_snapshot = true;
|
||||
string value;
|
||||
Status s;
|
||||
|
||||
ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
|
||||
ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar")));
|
||||
|
||||
Transaction* txn = db->BeginTransaction(write_options, txn_options);
|
||||
ASSERT_TRUE(txn != nullptr);
|
||||
ASSERT_OK(txn->SetName("txn"));
|
||||
|
||||
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
|
||||
ASSERT_TRUE(txn2 != nullptr);
|
||||
ASSERT_OK(txn2->SetName("txn2"));
|
||||
|
||||
// This transaction is created to cause potential conflict.
|
||||
Transaction* txn_x = db->BeginTransaction(write_options);
|
||||
ASSERT_OK(txn_x->SetName("txn_x"));
|
||||
ASSERT_OK(txn_x->Put(Slice("foo"), Slice("bar3")));
|
||||
ASSERT_OK(txn_x->Prepare());
|
||||
|
||||
// Create snapshots after the prepare, but there should still
|
||||
// be a conflict when trying to read "foo".
|
||||
|
||||
if (attempt == kAttemptImmMemTable) {
|
||||
// For the second attempt, hold flush from beginning. The memtable
|
||||
// will be switched to immutable after calling TEST_SwitchMemtable()
|
||||
// while CheckKey() is called.
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"WritePreparedTransactionTest.CheckKeySkipOldMemtable",
|
||||
"FlushJob::Start"}});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
}
|
||||
|
||||
// force a memtable flush. The memtable should still be kept
|
||||
FlushOptions flush_ops;
|
||||
if (attempt == kAttemptHistoryMemtable) {
|
||||
ASSERT_OK(db->Flush(flush_ops));
|
||||
} else {
|
||||
assert(attempt == kAttemptImmMemTable);
|
||||
DBImpl* db_impl = static_cast<DBImpl*>(db->GetRootDB());
|
||||
db_impl->TEST_SwitchMemtable();
|
||||
}
|
||||
uint64_t num_imm_mems;
|
||||
ASSERT_TRUE(db->GetIntProperty(DB::Properties::kNumImmutableMemTable,
|
||||
&num_imm_mems));
|
||||
if (attempt == kAttemptHistoryMemtable) {
|
||||
ASSERT_EQ(0, num_imm_mems);
|
||||
} else {
|
||||
assert(attempt == kAttemptImmMemTable);
|
||||
ASSERT_EQ(1, num_imm_mems);
|
||||
}
|
||||
|
||||
// Put something in active memtable
|
||||
ASSERT_OK(db->Put(write_options, Slice("foo3"), Slice("bar")));
|
||||
|
||||
// Create txn3 after flushing, but this transaction also needs to
|
||||
// check all memtables because of they contains uncommitted data.
|
||||
Transaction* txn3 = db->BeginTransaction(write_options, txn_options);
|
||||
ASSERT_TRUE(txn3 != nullptr);
|
||||
ASSERT_OK(txn3->SetName("txn3"));
|
||||
|
||||
// Commit the pending write
|
||||
ASSERT_OK(txn_x->Commit());
|
||||
|
||||
// Commit txn, txn2 and tx3. txn and tx3 will conflict but txn2 will
|
||||
// pass. In all cases, both memtables are queried.
|
||||
SetPerfLevel(PerfLevel::kEnableCount);
|
||||
get_perf_context()->Reset();
|
||||
ASSERT_TRUE(txn3->GetForUpdate(read_options, "foo", &value).IsBusy());
|
||||
// We should have checked two memtables, active and either immutable
|
||||
// or history memtable, depending on the test case.
|
||||
ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
|
||||
|
||||
get_perf_context()->Reset();
|
||||
ASSERT_TRUE(txn->GetForUpdate(read_options, "foo", &value).IsBusy());
|
||||
// We should have checked two memtables, active and either immutable
|
||||
// or history memtable, depending on the test case.
|
||||
ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
|
||||
|
||||
get_perf_context()->Reset();
|
||||
ASSERT_OK(txn2->GetForUpdate(read_options, "foo2", &value));
|
||||
ASSERT_EQ(value, "bar");
|
||||
// We should have checked two memtables, and since there is no
|
||||
// conflict, another Get() will be made and fetch the data from
|
||||
// DB. If it is in immutable memtable, two extra memtable reads
|
||||
// will be issued. If it is not (in history), only one will
|
||||
// be made, which is to the active memtable.
|
||||
if (attempt == kAttemptHistoryMemtable) {
|
||||
ASSERT_EQ(3, get_perf_context()->get_from_memtable_count);
|
||||
} else {
|
||||
assert(attempt == kAttemptImmMemTable);
|
||||
ASSERT_EQ(4, get_perf_context()->get_from_memtable_count);
|
||||
}
|
||||
|
||||
Transaction* txn4 = db->BeginTransaction(write_options, txn_options);
|
||||
ASSERT_TRUE(txn4 != nullptr);
|
||||
ASSERT_OK(txn4->SetName("txn4"));
|
||||
get_perf_context()->Reset();
|
||||
ASSERT_OK(txn4->GetForUpdate(read_options, "foo", &value));
|
||||
if (attempt == kAttemptHistoryMemtable) {
|
||||
// Active memtable will be checked in snapshot validation and when
|
||||
// getting the value.
|
||||
ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
|
||||
} else {
|
||||
// Only active memtable will be checked in snapshot validation but
|
||||
// both of active and immutable snapshot will be queried when
|
||||
// getting the value.
|
||||
assert(attempt == kAttemptImmMemTable);
|
||||
ASSERT_EQ(3, get_perf_context()->get_from_memtable_count);
|
||||
}
|
||||
|
||||
ASSERT_OK(txn2->Commit());
|
||||
ASSERT_OK(txn4->Commit());
|
||||
|
||||
TEST_SYNC_POINT("WritePreparedTransactionTest.CheckKeySkipOldMemtable");
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
||||
SetPerfLevel(PerfLevel::kDisable);
|
||||
|
||||
delete txn;
|
||||
delete txn2;
|
||||
delete txn3;
|
||||
delete txn4;
|
||||
delete txn_x;
|
||||
}
|
||||
}
|
||||
|
||||
// Reproduce the bug with two snapshots with the same seuqence number and test
|
||||
// that the release of the first snapshot will not affect the reads by the other
|
||||
// snapshot
|
||||
|
Loading…
Reference in New Issue
Block a user