Add SharedSnapshot-related APIs

This commit is contained in:
Yanqin Jin 2022-04-16 07:42:40 -07:00
parent 83b6a07d3f
commit 45b2bc6124
8 changed files with 307 additions and 33 deletions

View File

@ -511,6 +511,19 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
WaitForBackgroundWork();
}
Status DBImpl::CheckIfAllSnapshotsReleased() {
size_t num_snapshots = 0;
ReleaseSharedSnapshotsOlderThan(std::numeric_limits<uint64_t>::max(),
&num_snapshots);
// If there is unreleased snapshot, fail the close call
if (num_snapshots > 0) {
return Status::Aborted("Cannot close DB with unreleased snapshot.");
}
return Status::OK();
}
Status DBImpl::CloseHelper() {
// Guarantee that there is no background error recovery in progress before
// continuing with the shutdown
@ -727,11 +740,19 @@ Status DBImpl::CloseImpl() { return CloseHelper(); }
DBImpl::~DBImpl() {
InstrumentedMutexLock closing_lock_guard(&closing_mutex_);
if (!closed_) {
closed_ = true;
closing_status_ = CloseHelper();
closing_status_.PermitUncheckedError();
if (closed_) {
return;
}
closed_ = true;
{
const Status s = CheckIfAllSnapshotsReleased();
s.PermitUncheckedError();
}
closing_status_ = CloseImpl();
closing_status_.PermitUncheckedError();
}
void DBImpl::MaybeIgnoreError(Status* s) const {
@ -1795,11 +1816,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
// data for the snapshot, so the reader would see neither data that was be
// visible to the snapshot before compaction nor the newer data inserted
// afterwards.
if (last_seq_same_as_publish_seq_) {
snapshot = versions_->LastSequence();
} else {
snapshot = versions_->LastPublishedSequence();
}
snapshot = GetLastPublishedSequence();
if (get_impl_options.callback) {
// The unprep_seqs are not published for write unprepared, so it could be
// that max_visible_seq is larger. Seek to the std::max of the two.
@ -2192,11 +2209,7 @@ bool DBImpl::MultiCFSnapshot(
// version because a flush happening in between may compact away data for
// the snapshot, but the snapshot is earlier than the data overwriting it,
// so users may see wrong results.
if (last_seq_same_as_publish_seq_) {
*snapshot = versions_->LastSequence();
} else {
*snapshot = versions_->LastPublishedSequence();
}
*snapshot = GetLastPublishedSequence();
}
} else {
// If we end up with the same issue of memtable geting sealed during 2
@ -2227,11 +2240,7 @@ bool DBImpl::MultiCFSnapshot(
// acquire the lock so we're sure to succeed
mutex_.Lock();
}
if (last_seq_same_as_publish_seq_) {
*snapshot = versions_->LastSequence();
} else {
*snapshot = versions_->LastPublishedSequence();
}
*snapshot = GetLastPublishedSequence();
} else {
*snapshot =
static_cast_with_check<const SnapshotImpl>(read_options.snapshot)
@ -3156,6 +3165,50 @@ const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() {
}
#endif // ROCKSDB_LITE
std::shared_ptr<const Snapshot> DBImpl::CreateSharedSnapshot(
SequenceNumber snapshot_seq, uint64_t ts) {
assert(ts != std::numeric_limits<uint64_t>::max());
auto ret = CreateSharedSnapshotImpl(snapshot_seq, ts, /*lock=*/true);
return ret;
}
std::shared_ptr<const Snapshot> DBImpl::GetSharedSnapshot(uint64_t ts) const {
InstrumentedMutexLock lock_guard(&mutex_);
return shared_snapshots_.GetSnapshot(ts);
}
void DBImpl::ReleaseSharedSnapshotsOlderThan(uint64_t ts,
size_t* remaining_total_ss) {
std::vector<std::shared_ptr<const Snapshot>> snapshots_to_release;
{
InstrumentedMutexLock lock_guard(&mutex_);
shared_snapshots_.ReleaseSnapshotsOlderThan(ts, snapshots_to_release);
}
snapshots_to_release.clear();
if (remaining_total_ss) {
InstrumentedMutexLock lock_guard(&mutex_);
*remaining_total_ss = static_cast<size_t>(snapshots_.count());
}
}
Status DBImpl::GetSharedSnapshots(
uint64_t ts_lb, uint64_t ts_ub,
std::vector<std::shared_ptr<const Snapshot>>* shared_snapshots) const {
if (!shared_snapshots) {
return Status::InvalidArgument("shared_snapshots must not be null");
} else if (ts_lb >= ts_ub) {
return Status::InvalidArgument(
"timestamp lower bound must be smaller than upper bound");
}
assert(shared_snapshots);
shared_snapshots->clear();
InstrumentedMutexLock lock_guard(&mutex_);
shared_snapshots_.GetSnapshots(ts_lb, ts_ub, shared_snapshots);
return Status::OK();
}
SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
bool lock) {
int64_t unix_time = 0;
@ -3165,6 +3218,8 @@ SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
if (lock) {
mutex_.Lock();
} else {
mutex_.AssertHeld();
}
// returns null if the underlying memtable does not support snapshot.
if (!is_snapshot_supported_) {
@ -3174,9 +3229,7 @@ SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
delete s;
return nullptr;
}
auto snapshot_seq = last_seq_same_as_publish_seq_
? versions_->LastSequence()
: versions_->LastPublishedSequence();
auto snapshot_seq = GetLastPublishedSequence();
SnapshotImpl* snapshot =
snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
if (lock) {
@ -3185,6 +3238,63 @@ SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
return snapshot;
}
std::shared_ptr<const SnapshotImpl> DBImpl::CreateSharedSnapshotImpl(
SequenceNumber snapshot_seq, uint64_t ts, bool lock) {
int64_t unix_time = 0;
immutable_db_options_.clock->GetCurrentTime(&unix_time)
.PermitUncheckedError(); // Ignore error
SnapshotImpl* s = new SnapshotImpl;
const bool need_update_seq = (snapshot_seq != kMaxSequenceNumber);
if (lock) {
mutex_.Lock();
} else {
mutex_.AssertHeld();
}
// returns null if the underlying memtable does not support snapshot.
if (!is_snapshot_supported_) {
if (lock) {
mutex_.Unlock();
}
delete s;
return nullptr;
}
// Caller is not write thread, thus didn't provide a valid snapshot_seq.
// Obtain seq from db.
if (!need_update_seq) {
snapshot_seq = GetLastPublishedSequence();
}
SnapshotImpl* snapshot =
snapshots_.New(s, snapshot_seq, unix_time,
/*is_write_conflict_boundary=*/true, ts);
std::shared_ptr<const SnapshotImpl> ret(
snapshot,
std::bind(&DBImpl::ReleaseSnapshot, this, std::placeholders::_1));
shared_snapshots_.AddSnapshot(ret);
// Caller is from write thread, and we need to update database's sequence
// number.
if (need_update_seq) {
assert(versions_);
if (last_seq_same_as_publish_seq_) {
versions_->SetLastSequence(snapshot_seq);
} else {
// TODO: support write-prepared/write-unprepared transactions with two
// write queues.
assert(false);
}
}
if (lock) {
mutex_.Unlock();
}
return ret;
}
namespace {
using CfdList = autovector<ColumnFamilyData*, 2>;
bool CfdListContains(const CfdList& list, ColumnFamilyData* cfd) {
@ -3210,11 +3320,7 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
snapshots_.Delete(casted_s);
uint64_t oldest_snapshot;
if (snapshots_.empty()) {
if (last_seq_same_as_publish_seq_) {
oldest_snapshot = versions_->LastSequence();
} else {
oldest_snapshot = versions_->LastPublishedSequence();
}
oldest_snapshot = GetLastPublishedSequence();
} else {
oldest_snapshot = snapshots_.oldest()->number_;
}
@ -4105,13 +4211,14 @@ Status DBImpl::Close() {
if (closed_) {
return closing_status_;
}
{
InstrumentedMutexLock l(&mutex_);
// If there is unreleased snapshot, fail the close call
if (!snapshots_.empty()) {
return Status::Aborted("Cannot close DB with unreleased snapshot.");
const Status s = CheckIfAllSnapshotsReleased();
if (!s.ok()) {
return s;
}
}
closing_status_ = CloseImpl();
closed_ = true;
return closing_status_;

View File

@ -284,6 +284,19 @@ class DBImpl : public DB {
virtual const Snapshot* GetSnapshot() override;
virtual void ReleaseSnapshot(const Snapshot* snapshot) override;
// Create a shared snapshot. This snapshot can be shared by multiple readers.
// If any of them uses it for write conflict checking, then
// is_write_conflict_boundary is true. For simplicity, set it to true by
// default.
std::shared_ptr<const Snapshot> CreateSharedSnapshot(
SequenceNumber snapshot_seq, uint64_t ts);
std::shared_ptr<const Snapshot> GetSharedSnapshot(uint64_t ts) const;
void ReleaseSharedSnapshotsOlderThan(uint64_t ts,
size_t* remaining_total_ss = nullptr);
Status GetSharedSnapshots(
uint64_t ts_lb, uint64_t ts_ub,
std::vector<std::shared_ptr<const Snapshot>>* shared_snapshots) const;
using DB::GetProperty;
virtual bool GetProperty(ColumnFamilyHandle* column_family,
const Slice& property, std::string* value) override;
@ -1921,10 +1934,23 @@ class DBImpl : public DB {
SnapshotImpl* GetSnapshotImpl(bool is_write_conflict_boundary,
bool lock = true);
// If snapshot_seq != kMaxSequenceNumber, then this function can only be
// called from the write thread that publishes sequence numbers to readers.
// For 1) write-committed, or 2) write-prepared + one-write-queue, this will
// be the write thread performing memtable writes. For write-prepared with
// two write queues, this will be the write thread writing commit marker to
// the WAL.
// If snapshot_seq == kMaxSequenceNumber, this function is called by a caller
// ensuring no writes to the database.
std::shared_ptr<const SnapshotImpl> CreateSharedSnapshotImpl(
SequenceNumber snapshot_seq, uint64_t ts, bool lock = true);
uint64_t GetMaxTotalWalSize() const;
FSDirectory* GetDataDir(ColumnFamilyData* cfd, size_t path_id) const;
Status CheckIfAllSnapshotsReleased();
Status CloseHelper();
void WaitForBackgroundWork();
@ -2189,6 +2215,8 @@ class DBImpl : public DB {
SnapshotList snapshots_;
SharedSnapshotList shared_snapshots_;
// For each background job, pending_outputs_ keeps the current file number at
// the time that background job started.
// FindObsoleteFiles()/PurgeObsoleteFiles() never deletes any file that has

View File

@ -2859,6 +2859,12 @@ class ModelDB : public DB {
assert(false);
return 0;
}
uint64_t GetTimestamp() const override {
// no need to call this
assert(false);
return 0;
}
};
explicit ModelDB(const Options& options) : options_(options) {}

View File

@ -29,6 +29,8 @@ class SnapshotImpl : public Snapshot {
virtual SequenceNumber GetSequenceNumber() const override { return number_; }
uint64_t GetTimestamp() const override { return timestamp_; }
private:
friend class SnapshotList;
@ -40,6 +42,8 @@ class SnapshotImpl : public Snapshot {
int64_t unix_time_;
uint64_t timestamp_;
// Will this snapshot be used by a Transaction to do write-conflict checking?
bool is_write_conflict_boundary_;
};
@ -53,6 +57,7 @@ class SnapshotList {
// Set all the variables to make UBSAN happy.
list_.list_ = nullptr;
list_.unix_time_ = 0;
list_.timestamp_ = 0;
list_.is_write_conflict_boundary_ = false;
count_ = 0;
}
@ -60,14 +65,19 @@ class SnapshotList {
// No copy-construct.
SnapshotList(const SnapshotList&) = delete;
bool empty() const { return list_.next_ == &list_; }
bool empty() const {
assert(list_.next_ != &list_ || 0 == count_);
return list_.next_ == &list_;
}
SnapshotImpl* oldest() const { assert(!empty()); return list_.next_; }
SnapshotImpl* newest() const { assert(!empty()); return list_.prev_; }
SnapshotImpl* New(SnapshotImpl* s, SequenceNumber seq, uint64_t unix_time,
bool is_write_conflict_boundary) {
bool is_write_conflict_boundary,
uint64_t ts = std::numeric_limits<uint64_t>::max()) {
s->number_ = seq;
s->unix_time_ = unix_time;
s->timestamp_ = ts;
s->is_write_conflict_boundary_ = is_write_conflict_boundary;
s->list_ = this;
s->next_ = &list_;
@ -165,4 +175,58 @@ class SnapshotList {
uint64_t count_;
};
// All operations on SharedSnapshotList must be synchronized.
class SharedSnapshotList {
public:
explicit SharedSnapshotList() = default;
std::shared_ptr<const Snapshot> GetSnapshot(uint64_t ts) const {
if (ts == std::numeric_limits<uint64_t>::max() && !snapshots_.empty()) {
auto it = snapshots_.rbegin();
assert(it != snapshots_.rend());
return it->second;
}
auto it = snapshots_.find(ts);
if (it == snapshots_.end()) {
return std::shared_ptr<const Snapshot>();
}
return it->second;
}
void GetSnapshots(
uint64_t ts_lb, uint64_t ts_ub,
std::vector<std::shared_ptr<const Snapshot>>* snapshots) const {
assert(ts_lb < ts_ub);
assert(snapshots);
auto it_low = snapshots_.lower_bound(ts_lb);
auto it_high = snapshots_.lower_bound(ts_ub);
assert(it_low != it_high);
for (auto it = it_low; it != it_high; ++it) {
snapshots->emplace_back(it->second);
}
}
void AddSnapshot(const std::shared_ptr<const Snapshot>& snapshot) {
assert(snapshot);
snapshots_.try_emplace(snapshot->GetTimestamp(), snapshot);
}
// snapshots_to_release: the container to where the shared snapshots will be
// moved so that it retains the last reference to the snapshots and the
// snapshots won't be actually released which requires db mutex. The
// snapshots will be released by caller of ReleaseSnapshotsOlderThan().
void ReleaseSnapshotsOlderThan(
uint64_t ts,
std::vector<std::shared_ptr<const Snapshot>>& snapshots_to_release) {
auto ub = snapshots_.upper_bound(ts);
for (auto it = snapshots_.begin(); it != ub; ++it) {
snapshots_to_release.emplace_back(it->second);
}
snapshots_.erase(snapshots_.begin(), ub);
}
private:
std::map<uint64_t, std::shared_ptr<const Snapshot>> snapshots_;
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -22,6 +22,8 @@ class Snapshot {
// returns Snapshot's sequence number
virtual SequenceNumber GetSequenceNumber() const = 0;
virtual uint64_t GetTimestamp() const = 0;
protected:
virtual ~Snapshot();
};

View File

@ -458,6 +458,37 @@ class TransactionDB : public StackableDB {
virtual std::vector<DeadlockPath> GetDeadlockInfoBuffer() = 0;
virtual void SetDeadlockInfoBufferSize(uint32_t target_size) = 0;
// Caller must ensure there are no active writes when this API is called.
// Create a shared snapshot and assign ts to it.
virtual std::shared_ptr<const Snapshot> CreateSharedSnapshot(
TxnTimestamp ts) = 0;
// Return the latest shared snapshot if present.
std::shared_ptr<const Snapshot> GetLatestSharedSnapshot() const {
return GetSharedSnapshot(kMaxTxnTimestamp);
}
// Return the shared snapshot correponding to given timestamp. If ts is
// kMaxTxnTimestamp, then we return the latest shared snapshot if present.
// Othersise, we return the snapshot whose timestamp is equal to `ts`. If no
// such snapshot exists, then we return null.
virtual std::shared_ptr<const Snapshot> GetSharedSnapshot(
TxnTimestamp ts) const = 0;
// Release shared snapshots whose timestamps are less than or equal to ts.
virtual void ReleaseSharedSnapshotsOlderThan(TxnTimestamp ts) = 0;
// Get all shared snapshots which will be stored in shared_snapshots.
Status GetAllSharedSnapshots(
std::vector<std::shared_ptr<const Snapshot>>* shared_snapshots) const {
return GetSharedSnapshots(/*ts_lb=*/0, /*ts_ub=*/kMaxTxnTimestamp,
shared_snapshots);
}
// Get all shared snapshots whose timestamps fall within [ts_lb, ts_ub).
// shared_snapshots will be cleared and contain returned snapshots.
virtual Status GetSharedSnapshots(
TxnTimestamp ts_lb, TxnTimestamp ts_ub,
std::vector<std::shared_ptr<const Snapshot>>* shared_snapshots) const = 0;
protected:
// To Create an TransactionDB, call Open()
// The ownership of db is transferred to the base StackableDB

View File

@ -656,5 +656,29 @@ void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) {
transactions_.erase(it);
}
std::shared_ptr<const Snapshot> PessimisticTransactionDB::CreateSharedSnapshot(
TxnTimestamp ts) {
assert(db_impl_);
return db_impl_->CreateSharedSnapshot(kMaxSequenceNumber, ts);
}
std::shared_ptr<const Snapshot> PessimisticTransactionDB::GetSharedSnapshot(
TxnTimestamp ts) const {
assert(db_impl_);
return db_impl_->GetSharedSnapshot(ts);
}
void PessimisticTransactionDB::ReleaseSharedSnapshotsOlderThan(
TxnTimestamp ts) {
assert(db_impl_);
db_impl_->ReleaseSharedSnapshotsOlderThan(ts);
}
Status PessimisticTransactionDB::GetSharedSnapshots(
TxnTimestamp ts_lb, TxnTimestamp ts_ub,
std::vector<std::shared_ptr<const Snapshot>>* shared_snapshots) const {
assert(db_impl_);
return db_impl_->GetSharedSnapshots(ts_lb, ts_ub, shared_snapshots);
}
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

View File

@ -150,6 +150,18 @@ class PessimisticTransactionDB : public TransactionDB {
return lock_manager_->GetLockTrackerFactory();
}
std::shared_ptr<const Snapshot> CreateSharedSnapshot(
TxnTimestamp ts) override;
std::shared_ptr<const Snapshot> GetSharedSnapshot(
TxnTimestamp ts) const override;
void ReleaseSharedSnapshotsOlderThan(TxnTimestamp ts) override;
Status GetSharedSnapshots(TxnTimestamp ts_lb, TxnTimestamp ts_ub,
std::vector<std::shared_ptr<const Snapshot>>*
shared_snapshots) const override;
protected:
DBImpl* db_impl_;
std::shared_ptr<Logger> info_log_;