Support marking snapshots for write-conflict checking
Summary: D50475 enables using SST files for transaction write-conflict checking. In order for this to work, we need to make sure not to compact out SingleDeletes when there is an earlier transaction snapshot(D50295). If there is a long-held snapshot, this could reduce the benefit of the SingleDelete optimization. This diff allows Transactions to mark snapshots as being used for write-conflict checking. Then, during compaction, we will be able to optimize SingleDeletes better in the future. This diff adds a flag to SnapshotImpl which is used by Transactions. This diff also passes the earliest write-conflict snapshot's sequence number to CompactionIterator. This diff does not actually change Compaction (after this diff is pushed, D50295 will be able to use this information). Test Plan: no behavior change, ran existing tests Reviewers: rven, kradhakrishnan, yhchiang, IslamAbdelRahman, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D51183
This commit is contained in:
parent
770dea9325
commit
ec704aafdc
@ -96,7 +96,8 @@ Status BuildTable(
|
|||||||
snapshots.empty() ? 0 : snapshots.back());
|
snapshots.empty() ? 0 : snapshots.back());
|
||||||
|
|
||||||
CompactionIterator c_iter(iter, internal_comparator.user_comparator(),
|
CompactionIterator c_iter(iter, internal_comparator.user_comparator(),
|
||||||
&merge, kMaxSequenceNumber, &snapshots, env,
|
&merge, kMaxSequenceNumber, &snapshots,
|
||||||
|
kMaxSequenceNumber, env,
|
||||||
true /* internal key corruption is not ok */);
|
true /* internal key corruption is not ok */);
|
||||||
c_iter.SeekToFirst();
|
c_iter.SeekToFirst();
|
||||||
for (; c_iter.Valid(); c_iter.Next()) {
|
for (; c_iter.Valid(); c_iter.Next()) {
|
||||||
|
@ -13,12 +13,14 @@ namespace rocksdb {
|
|||||||
CompactionIterator::CompactionIterator(
|
CompactionIterator::CompactionIterator(
|
||||||
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
|
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
|
||||||
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
|
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
|
||||||
Env* env, bool expect_valid_internal_key, Compaction* compaction,
|
SequenceNumber earliest_write_conflict_snapshot, Env* env,
|
||||||
|
bool expect_valid_internal_key, Compaction* compaction,
|
||||||
const CompactionFilter* compaction_filter, LogBuffer* log_buffer)
|
const CompactionFilter* compaction_filter, LogBuffer* log_buffer)
|
||||||
: input_(input),
|
: input_(input),
|
||||||
cmp_(cmp),
|
cmp_(cmp),
|
||||||
merge_helper_(merge_helper),
|
merge_helper_(merge_helper),
|
||||||
snapshots_(snapshots),
|
snapshots_(snapshots),
|
||||||
|
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
|
||||||
env_(env),
|
env_(env),
|
||||||
expect_valid_internal_key_(expect_valid_internal_key),
|
expect_valid_internal_key_(expect_valid_internal_key),
|
||||||
compaction_(compaction),
|
compaction_(compaction),
|
||||||
@ -200,6 +202,11 @@ void CompactionIterator::NextFromInput() {
|
|||||||
ParsedInternalKey next_ikey;
|
ParsedInternalKey next_ikey;
|
||||||
input_->Next();
|
input_->Next();
|
||||||
|
|
||||||
|
if (earliest_write_conflict_snapshot_) {
|
||||||
|
// TODO(agiardullo): to be used in D50295
|
||||||
|
// adding this if statement to keep CLANG happy in the meantime
|
||||||
|
}
|
||||||
|
|
||||||
// Check whether the current key is valid, not corrupt and the same
|
// Check whether the current key is valid, not corrupt and the same
|
||||||
// as the single delete.
|
// as the single delete.
|
||||||
if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
|
if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
|
||||||
|
@ -39,7 +39,8 @@ class CompactionIterator {
|
|||||||
public:
|
public:
|
||||||
CompactionIterator(InternalIterator* input, const Comparator* cmp,
|
CompactionIterator(InternalIterator* input, const Comparator* cmp,
|
||||||
MergeHelper* merge_helper, SequenceNumber last_sequence,
|
MergeHelper* merge_helper, SequenceNumber last_sequence,
|
||||||
std::vector<SequenceNumber>* snapshots, Env* env,
|
std::vector<SequenceNumber>* snapshots,
|
||||||
|
SequenceNumber earliest_write_conflict_snapshot, Env* env,
|
||||||
bool expect_valid_internal_key,
|
bool expect_valid_internal_key,
|
||||||
Compaction* compaction = nullptr,
|
Compaction* compaction = nullptr,
|
||||||
const CompactionFilter* compaction_filter = nullptr,
|
const CompactionFilter* compaction_filter = nullptr,
|
||||||
@ -88,6 +89,7 @@ class CompactionIterator {
|
|||||||
const Comparator* cmp_;
|
const Comparator* cmp_;
|
||||||
MergeHelper* merge_helper_;
|
MergeHelper* merge_helper_;
|
||||||
const std::vector<SequenceNumber>* snapshots_;
|
const std::vector<SequenceNumber>* snapshots_;
|
||||||
|
const SequenceNumber earliest_write_conflict_snapshot_;
|
||||||
Env* env_;
|
Env* env_;
|
||||||
bool expect_valid_internal_key_;
|
bool expect_valid_internal_key_;
|
||||||
Compaction* compaction_;
|
Compaction* compaction_;
|
||||||
|
@ -20,9 +20,9 @@ class CompactionIteratorTest : public testing::Test {
|
|||||||
nullptr, 0U, false, 0));
|
nullptr, 0U, false, 0));
|
||||||
iter_.reset(new test::VectorIterator(ks, vs));
|
iter_.reset(new test::VectorIterator(ks, vs));
|
||||||
iter_->SeekToFirst();
|
iter_->SeekToFirst();
|
||||||
c_iter_.reset(new CompactionIterator(iter_.get(), cmp_, merge_helper_.get(),
|
c_iter_.reset(new CompactionIterator(
|
||||||
last_sequence, &snapshots_,
|
iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
|
||||||
Env::Default(), false));
|
kMaxSequenceNumber, Env::Default(), false));
|
||||||
}
|
}
|
||||||
|
|
||||||
const Comparator* cmp_;
|
const Comparator* cmp_;
|
||||||
|
@ -212,6 +212,7 @@ CompactionJob::CompactionJob(
|
|||||||
std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
|
std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
|
||||||
Directory* db_directory, Directory* output_directory, Statistics* stats,
|
Directory* db_directory, Directory* output_directory, Statistics* stats,
|
||||||
std::vector<SequenceNumber> existing_snapshots,
|
std::vector<SequenceNumber> existing_snapshots,
|
||||||
|
SequenceNumber earliest_write_conflict_snapshot,
|
||||||
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
|
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
|
||||||
bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname,
|
bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname,
|
||||||
CompactionJobStats* compaction_job_stats)
|
CompactionJobStats* compaction_job_stats)
|
||||||
@ -230,6 +231,7 @@ CompactionJob::CompactionJob(
|
|||||||
output_directory_(output_directory),
|
output_directory_(output_directory),
|
||||||
stats_(stats),
|
stats_(stats),
|
||||||
existing_snapshots_(std::move(existing_snapshots)),
|
existing_snapshots_(std::move(existing_snapshots)),
|
||||||
|
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
|
||||||
table_cache_(std::move(table_cache)),
|
table_cache_(std::move(table_cache)),
|
||||||
event_logger_(event_logger),
|
event_logger_(event_logger),
|
||||||
paranoid_file_checks_(paranoid_file_checks),
|
paranoid_file_checks_(paranoid_file_checks),
|
||||||
@ -638,8 +640,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
|||||||
Status status;
|
Status status;
|
||||||
sub_compact->c_iter.reset(new CompactionIterator(
|
sub_compact->c_iter.reset(new CompactionIterator(
|
||||||
input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
|
input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
|
||||||
&existing_snapshots_, env_, false, sub_compact->compaction,
|
&existing_snapshots_, earliest_write_conflict_snapshot_, env_, false,
|
||||||
compaction_filter));
|
sub_compact->compaction, compaction_filter));
|
||||||
auto c_iter = sub_compact->c_iter.get();
|
auto c_iter = sub_compact->c_iter.get();
|
||||||
c_iter->SeekToFirst();
|
c_iter->SeekToFirst();
|
||||||
const auto& c_iter_stats = c_iter->iter_stats();
|
const auto& c_iter_stats = c_iter->iter_stats();
|
||||||
|
@ -58,6 +58,7 @@ class CompactionJob {
|
|||||||
Directory* db_directory, Directory* output_directory,
|
Directory* db_directory, Directory* output_directory,
|
||||||
Statistics* stats,
|
Statistics* stats,
|
||||||
std::vector<SequenceNumber> existing_snapshots,
|
std::vector<SequenceNumber> existing_snapshots,
|
||||||
|
SequenceNumber earliest_write_conflict_snapshot,
|
||||||
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
|
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
|
||||||
bool paranoid_file_checks, bool measure_io_stats,
|
bool paranoid_file_checks, bool measure_io_stats,
|
||||||
const std::string& dbname,
|
const std::string& dbname,
|
||||||
@ -134,6 +135,12 @@ class CompactionJob {
|
|||||||
// entirely within s1 and s2, then the earlier version of k1 can be safely
|
// entirely within s1 and s2, then the earlier version of k1 can be safely
|
||||||
// deleted because that version is not visible in any snapshot.
|
// deleted because that version is not visible in any snapshot.
|
||||||
std::vector<SequenceNumber> existing_snapshots_;
|
std::vector<SequenceNumber> existing_snapshots_;
|
||||||
|
|
||||||
|
// This is the earliest snapshot that could be used for write-conflict
|
||||||
|
// checking by a transaction. For any user-key newer than this snapshot, we
|
||||||
|
// should make sure not to remove evidence that a write occured.
|
||||||
|
SequenceNumber earliest_write_conflict_snapshot_;
|
||||||
|
|
||||||
std::shared_ptr<Cache> table_cache_;
|
std::shared_ptr<Cache> table_cache_;
|
||||||
|
|
||||||
EventLogger* event_logger_;
|
EventLogger* event_logger_;
|
||||||
|
@ -243,11 +243,11 @@ class CompactionJobTest : public testing::Test {
|
|||||||
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
|
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
|
||||||
mutex_.Lock();
|
mutex_.Lock();
|
||||||
EventLogger event_logger(db_options_.info_log.get());
|
EventLogger event_logger(db_options_.info_log.get());
|
||||||
CompactionJob compaction_job(0, &compaction, db_options_, env_options_,
|
CompactionJob compaction_job(
|
||||||
versions_.get(), &shutting_down_, &log_buffer,
|
0, &compaction, db_options_, env_options_, versions_.get(),
|
||||||
nullptr, nullptr, nullptr, snapshots,
|
&shutting_down_, &log_buffer, nullptr, nullptr, nullptr, snapshots,
|
||||||
table_cache_, &event_logger, false, false,
|
kMaxSequenceNumber, table_cache_, &event_logger, false, false, dbname_,
|
||||||
dbname_, &compaction_job_stats_);
|
&compaction_job_stats_);
|
||||||
|
|
||||||
VerifyInitializationOfCompactionJobStats(compaction_job_stats_);
|
VerifyInitializationOfCompactionJobStats(compaction_job_stats_);
|
||||||
|
|
||||||
|
@ -1779,12 +1779,16 @@ Status DBImpl::CompactFilesImpl(
|
|||||||
// deletion compaction currently not allowed in CompactFiles.
|
// deletion compaction currently not allowed in CompactFiles.
|
||||||
assert(!c->deletion_compaction());
|
assert(!c->deletion_compaction());
|
||||||
|
|
||||||
|
SequenceNumber earliest_write_conflict_snapshot;
|
||||||
|
std::vector<SequenceNumber> snapshot_seqs =
|
||||||
|
snapshots_.GetAll(&earliest_write_conflict_snapshot);
|
||||||
|
|
||||||
assert(is_snapshot_supported_ || snapshots_.empty());
|
assert(is_snapshot_supported_ || snapshots_.empty());
|
||||||
CompactionJob compaction_job(
|
CompactionJob compaction_job(
|
||||||
job_context->job_id, c.get(), db_options_, env_options_, versions_.get(),
|
job_context->job_id, c.get(), db_options_, env_options_, versions_.get(),
|
||||||
&shutting_down_, log_buffer, directories_.GetDbDir(),
|
&shutting_down_, log_buffer, directories_.GetDbDir(),
|
||||||
directories_.GetDataDir(c->output_path_id()), stats_, snapshots_.GetAll(),
|
directories_.GetDataDir(c->output_path_id()), stats_, snapshot_seqs,
|
||||||
table_cache_, &event_logger_,
|
earliest_write_conflict_snapshot, table_cache_, &event_logger_,
|
||||||
c->mutable_cf_options()->paranoid_file_checks,
|
c->mutable_cf_options()->paranoid_file_checks,
|
||||||
c->mutable_cf_options()->compaction_measure_io_stats, dbname_,
|
c->mutable_cf_options()->compaction_measure_io_stats, dbname_,
|
||||||
nullptr); // Here we pass a nullptr for CompactionJobStats because
|
nullptr); // Here we pass a nullptr for CompactionJobStats because
|
||||||
@ -2868,12 +2872,17 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
|||||||
int output_level __attribute__((unused)) = c->output_level();
|
int output_level __attribute__((unused)) = c->output_level();
|
||||||
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
|
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
|
||||||
&output_level);
|
&output_level);
|
||||||
|
|
||||||
|
SequenceNumber earliest_write_conflict_snapshot;
|
||||||
|
std::vector<SequenceNumber> snapshot_seqs =
|
||||||
|
snapshots_.GetAll(&earliest_write_conflict_snapshot);
|
||||||
|
|
||||||
assert(is_snapshot_supported_ || snapshots_.empty());
|
assert(is_snapshot_supported_ || snapshots_.empty());
|
||||||
CompactionJob compaction_job(
|
CompactionJob compaction_job(
|
||||||
job_context->job_id, c.get(), db_options_, env_options_,
|
job_context->job_id, c.get(), db_options_, env_options_,
|
||||||
versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(),
|
versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(),
|
||||||
directories_.GetDataDir(c->output_path_id()), stats_,
|
directories_.GetDataDir(c->output_path_id()), stats_, snapshot_seqs,
|
||||||
snapshots_.GetAll(), table_cache_, &event_logger_,
|
earliest_write_conflict_snapshot, table_cache_, &event_logger_,
|
||||||
c->mutable_cf_options()->paranoid_file_checks,
|
c->mutable_cf_options()->paranoid_file_checks,
|
||||||
c->mutable_cf_options()->compaction_measure_io_stats, dbname_,
|
c->mutable_cf_options()->compaction_measure_io_stats, dbname_,
|
||||||
&compaction_job_stats);
|
&compaction_job_stats);
|
||||||
@ -3784,7 +3793,13 @@ Status DBImpl::NewIterators(
|
|||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
const Snapshot* DBImpl::GetSnapshot() {
|
const Snapshot* DBImpl::GetSnapshot() { return GetSnapshotImpl(false); }
|
||||||
|
|
||||||
|
const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() {
|
||||||
|
return GetSnapshotImpl(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
const Snapshot* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary) {
|
||||||
int64_t unix_time = 0;
|
int64_t unix_time = 0;
|
||||||
env_->GetCurrentTime(&unix_time); // Ignore error
|
env_->GetCurrentTime(&unix_time); // Ignore error
|
||||||
SnapshotImpl* s = new SnapshotImpl;
|
SnapshotImpl* s = new SnapshotImpl;
|
||||||
@ -3795,7 +3810,8 @@ const Snapshot* DBImpl::GetSnapshot() {
|
|||||||
delete s;
|
delete s;
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
return snapshots_.New(s, versions_->LastSequence(), unix_time);
|
return snapshots_.New(s, versions_->LastSequence(), unix_time,
|
||||||
|
is_write_conflict_boundary);
|
||||||
}
|
}
|
||||||
|
|
||||||
void DBImpl::ReleaseSnapshot(const Snapshot* s) {
|
void DBImpl::ReleaseSnapshot(const Snapshot* s) {
|
||||||
|
@ -234,6 +234,12 @@ class DBImpl : public DB {
|
|||||||
Status GetLatestSequenceForKeyFromMemtable(SuperVersion* sv, const Slice& key,
|
Status GetLatestSequenceForKeyFromMemtable(SuperVersion* sv, const Slice& key,
|
||||||
SequenceNumber* seq);
|
SequenceNumber* seq);
|
||||||
|
|
||||||
|
// Similar to GetSnapshot(), but also lets the db know that this snapshot
|
||||||
|
// will be used for transaction write-conflict checking. The DB can then
|
||||||
|
// make sure not to compact any keys that would prevent a write-conflict from
|
||||||
|
// being detected.
|
||||||
|
const Snapshot* GetSnapshotForWriteConflictBoundary();
|
||||||
|
|
||||||
using DB::AddFile;
|
using DB::AddFile;
|
||||||
virtual Status AddFile(ColumnFamilyHandle* column_family,
|
virtual Status AddFile(ColumnFamilyHandle* column_family,
|
||||||
const ExternalSstFileInfo* file_info,
|
const ExternalSstFileInfo* file_info,
|
||||||
@ -560,6 +566,8 @@ class DBImpl : public DB {
|
|||||||
// helper function to call after some of the logs_ were synced
|
// helper function to call after some of the logs_ were synced
|
||||||
void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status);
|
void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status);
|
||||||
|
|
||||||
|
const Snapshot* GetSnapshotImpl(bool is_write_conflict_boundary);
|
||||||
|
|
||||||
// table_cache_ provides its own synchronization
|
// table_cache_ provides its own synchronization
|
||||||
std::shared_ptr<Cache> table_cache_;
|
std::shared_ptr<Cache> table_cache_;
|
||||||
|
|
||||||
|
@ -12,6 +12,9 @@ namespace rocksdb {
|
|||||||
ManagedSnapshot::ManagedSnapshot(DB* db) : db_(db),
|
ManagedSnapshot::ManagedSnapshot(DB* db) : db_(db),
|
||||||
snapshot_(db->GetSnapshot()) {}
|
snapshot_(db->GetSnapshot()) {}
|
||||||
|
|
||||||
|
ManagedSnapshot::ManagedSnapshot(DB* db, const Snapshot* _snapshot)
|
||||||
|
: db_(db), snapshot_(_snapshot) {}
|
||||||
|
|
||||||
ManagedSnapshot::~ManagedSnapshot() {
|
ManagedSnapshot::~ManagedSnapshot() {
|
||||||
if (snapshot_) {
|
if (snapshot_) {
|
||||||
db_->ReleaseSnapshot(snapshot_);
|
db_->ReleaseSnapshot(snapshot_);
|
||||||
|
@ -34,6 +34,9 @@ class SnapshotImpl : public Snapshot {
|
|||||||
SnapshotList* list_; // just for sanity checks
|
SnapshotList* list_; // just for sanity checks
|
||||||
|
|
||||||
int64_t unix_time_;
|
int64_t unix_time_;
|
||||||
|
|
||||||
|
// Will this snapshot be used by a Transaction to do write-conflict checking?
|
||||||
|
bool is_write_conflict_boundary_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class SnapshotList {
|
class SnapshotList {
|
||||||
@ -50,9 +53,10 @@ class SnapshotList {
|
|||||||
SnapshotImpl* newest() const { assert(!empty()); return list_.prev_; }
|
SnapshotImpl* newest() const { assert(!empty()); return list_.prev_; }
|
||||||
|
|
||||||
const SnapshotImpl* New(SnapshotImpl* s, SequenceNumber seq,
|
const SnapshotImpl* New(SnapshotImpl* s, SequenceNumber seq,
|
||||||
uint64_t unix_time) {
|
uint64_t unix_time, bool is_write_conflict_boundary) {
|
||||||
s->number_ = seq;
|
s->number_ = seq;
|
||||||
s->unix_time_ = unix_time;
|
s->unix_time_ = unix_time;
|
||||||
|
s->is_write_conflict_boundary_ = is_write_conflict_boundary;
|
||||||
s->list_ = this;
|
s->list_ = this;
|
||||||
s->next_ = &list_;
|
s->next_ = &list_;
|
||||||
s->prev_ = list_.prev_;
|
s->prev_ = list_.prev_;
|
||||||
@ -71,14 +75,29 @@ class SnapshotList {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// retrieve all snapshot numbers. They are sorted in ascending order.
|
// retrieve all snapshot numbers. They are sorted in ascending order.
|
||||||
std::vector<SequenceNumber> GetAll() {
|
std::vector<SequenceNumber> GetAll(
|
||||||
|
SequenceNumber* oldest_write_conflict_snapshot = nullptr) {
|
||||||
std::vector<SequenceNumber> ret;
|
std::vector<SequenceNumber> ret;
|
||||||
|
|
||||||
|
if (oldest_write_conflict_snapshot != nullptr) {
|
||||||
|
*oldest_write_conflict_snapshot = kMaxSequenceNumber;
|
||||||
|
}
|
||||||
|
|
||||||
if (empty()) {
|
if (empty()) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
SnapshotImpl* s = &list_;
|
SnapshotImpl* s = &list_;
|
||||||
while (s->next_ != &list_) {
|
while (s->next_ != &list_) {
|
||||||
ret.push_back(s->next_->number_);
|
ret.push_back(s->next_->number_);
|
||||||
|
|
||||||
|
if (oldest_write_conflict_snapshot != nullptr &&
|
||||||
|
*oldest_write_conflict_snapshot != kMaxSequenceNumber &&
|
||||||
|
s->next_->is_write_conflict_boundary_) {
|
||||||
|
// If this is the first write-conflict boundary snapshot in the list,
|
||||||
|
// it is the oldest
|
||||||
|
*oldest_write_conflict_snapshot = s->next_->number_;
|
||||||
|
}
|
||||||
|
|
||||||
s = s->next_;
|
s = s->next_;
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -33,6 +33,9 @@ class ManagedSnapshot {
|
|||||||
public:
|
public:
|
||||||
explicit ManagedSnapshot(DB* db);
|
explicit ManagedSnapshot(DB* db);
|
||||||
|
|
||||||
|
// Instead of creating a snapshot, take ownership of the input snapshot.
|
||||||
|
ManagedSnapshot(DB* db, const Snapshot* _snapshot);
|
||||||
|
|
||||||
~ManagedSnapshot();
|
~ManagedSnapshot();
|
||||||
|
|
||||||
const Snapshot* snapshot();
|
const Snapshot* snapshot();
|
||||||
|
@ -7,6 +7,7 @@
|
|||||||
|
|
||||||
#include "utilities/transactions/transaction_base.h"
|
#include "utilities/transactions/transaction_base.h"
|
||||||
|
|
||||||
|
#include "db/db_impl.h"
|
||||||
#include "db/column_family.h"
|
#include "db/column_family.h"
|
||||||
#include "rocksdb/comparator.h"
|
#include "rocksdb/comparator.h"
|
||||||
#include "rocksdb/db.h"
|
#include "rocksdb/db.h"
|
||||||
@ -35,7 +36,11 @@ void TransactionBaseImpl::Clear() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void TransactionBaseImpl::SetSnapshot() {
|
void TransactionBaseImpl::SetSnapshot() {
|
||||||
snapshot_.reset(new ManagedSnapshot(db_));
|
assert(dynamic_cast<DBImpl*>(db_) != nullptr);
|
||||||
|
auto db_impl = reinterpret_cast<DBImpl*>(db_);
|
||||||
|
|
||||||
|
const Snapshot* snapshot = db_impl->GetSnapshotForWriteConflictBoundary();
|
||||||
|
snapshot_.reset(new ManagedSnapshot(db_, snapshot));
|
||||||
snapshot_needed_ = false;
|
snapshot_needed_ = false;
|
||||||
snapshot_notifier_ = nullptr;
|
snapshot_notifier_ = nullptr;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user