diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 62589ba1d..93a590ced 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -511,6 +511,19 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { WaitForBackgroundWork(); } +Status DBImpl::CheckIfAllSnapshotsReleased() { + size_t num_snapshots = 0; + ReleaseSharedSnapshotsOlderThan(std::numeric_limits::max(), + &num_snapshots); + + // If there is unreleased snapshot, fail the close call + if (num_snapshots > 0) { + return Status::Aborted("Cannot close DB with unreleased snapshot."); + } + + return Status::OK(); +} + Status DBImpl::CloseHelper() { // Guarantee that there is no background error recovery in progress before // continuing with the shutdown @@ -727,11 +740,19 @@ Status DBImpl::CloseImpl() { return CloseHelper(); } DBImpl::~DBImpl() { InstrumentedMutexLock closing_lock_guard(&closing_mutex_); - if (!closed_) { - closed_ = true; - closing_status_ = CloseHelper(); - closing_status_.PermitUncheckedError(); + if (closed_) { + return; } + + closed_ = true; + + { + const Status s = CheckIfAllSnapshotsReleased(); + s.PermitUncheckedError(); + } + + closing_status_ = CloseImpl(); + closing_status_.PermitUncheckedError(); } void DBImpl::MaybeIgnoreError(Status* s) const { @@ -1795,11 +1816,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, // data for the snapshot, so the reader would see neither data that was be // visible to the snapshot before compaction nor the newer data inserted // afterwards. - if (last_seq_same_as_publish_seq_) { - snapshot = versions_->LastSequence(); - } else { - snapshot = versions_->LastPublishedSequence(); - } + snapshot = GetLastPublishedSequence(); if (get_impl_options.callback) { // The unprep_seqs are not published for write unprepared, so it could be // that max_visible_seq is larger. Seek to the std::max of the two. @@ -2192,11 +2209,7 @@ bool DBImpl::MultiCFSnapshot( // version because a flush happening in between may compact away data for // the snapshot, but the snapshot is earlier than the data overwriting it, // so users may see wrong results. - if (last_seq_same_as_publish_seq_) { - *snapshot = versions_->LastSequence(); - } else { - *snapshot = versions_->LastPublishedSequence(); - } + *snapshot = GetLastPublishedSequence(); } } else { // If we end up with the same issue of memtable geting sealed during 2 @@ -2227,11 +2240,7 @@ bool DBImpl::MultiCFSnapshot( // acquire the lock so we're sure to succeed mutex_.Lock(); } - if (last_seq_same_as_publish_seq_) { - *snapshot = versions_->LastSequence(); - } else { - *snapshot = versions_->LastPublishedSequence(); - } + *snapshot = GetLastPublishedSequence(); } else { *snapshot = static_cast_with_check(read_options.snapshot) @@ -3156,6 +3165,50 @@ const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() { } #endif // ROCKSDB_LITE +std::shared_ptr DBImpl::CreateSharedSnapshot( + SequenceNumber snapshot_seq, uint64_t ts) { + assert(ts != std::numeric_limits::max()); + + auto ret = CreateSharedSnapshotImpl(snapshot_seq, ts, /*lock=*/true); + return ret; +} + +std::shared_ptr DBImpl::GetSharedSnapshot(uint64_t ts) const { + InstrumentedMutexLock lock_guard(&mutex_); + return shared_snapshots_.GetSnapshot(ts); +} + +void DBImpl::ReleaseSharedSnapshotsOlderThan(uint64_t ts, + size_t* remaining_total_ss) { + std::vector> snapshots_to_release; + { + InstrumentedMutexLock lock_guard(&mutex_); + shared_snapshots_.ReleaseSnapshotsOlderThan(ts, snapshots_to_release); + } + snapshots_to_release.clear(); + + if (remaining_total_ss) { + InstrumentedMutexLock lock_guard(&mutex_); + *remaining_total_ss = static_cast(snapshots_.count()); + } +} + +Status DBImpl::GetSharedSnapshots( + uint64_t ts_lb, uint64_t ts_ub, + std::vector>* shared_snapshots) const { + if (!shared_snapshots) { + return Status::InvalidArgument("shared_snapshots must not be null"); + } else if (ts_lb >= ts_ub) { + return Status::InvalidArgument( + "timestamp lower bound must be smaller than upper bound"); + } + assert(shared_snapshots); + shared_snapshots->clear(); + InstrumentedMutexLock lock_guard(&mutex_); + shared_snapshots_.GetSnapshots(ts_lb, ts_ub, shared_snapshots); + return Status::OK(); +} + SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary, bool lock) { int64_t unix_time = 0; @@ -3165,6 +3218,8 @@ SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary, if (lock) { mutex_.Lock(); + } else { + mutex_.AssertHeld(); } // returns null if the underlying memtable does not support snapshot. if (!is_snapshot_supported_) { @@ -3174,9 +3229,7 @@ SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary, delete s; return nullptr; } - auto snapshot_seq = last_seq_same_as_publish_seq_ - ? versions_->LastSequence() - : versions_->LastPublishedSequence(); + auto snapshot_seq = GetLastPublishedSequence(); SnapshotImpl* snapshot = snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary); if (lock) { @@ -3185,6 +3238,63 @@ SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary, return snapshot; } +std::shared_ptr DBImpl::CreateSharedSnapshotImpl( + SequenceNumber snapshot_seq, uint64_t ts, bool lock) { + int64_t unix_time = 0; + immutable_db_options_.clock->GetCurrentTime(&unix_time) + .PermitUncheckedError(); // Ignore error + SnapshotImpl* s = new SnapshotImpl; + + const bool need_update_seq = (snapshot_seq != kMaxSequenceNumber); + + if (lock) { + mutex_.Lock(); + } else { + mutex_.AssertHeld(); + } + // returns null if the underlying memtable does not support snapshot. + if (!is_snapshot_supported_) { + if (lock) { + mutex_.Unlock(); + } + delete s; + return nullptr; + } + + // Caller is not write thread, thus didn't provide a valid snapshot_seq. + // Obtain seq from db. + if (!need_update_seq) { + snapshot_seq = GetLastPublishedSequence(); + } + + SnapshotImpl* snapshot = + snapshots_.New(s, snapshot_seq, unix_time, + /*is_write_conflict_boundary=*/true, ts); + + std::shared_ptr ret( + snapshot, + std::bind(&DBImpl::ReleaseSnapshot, this, std::placeholders::_1)); + shared_snapshots_.AddSnapshot(ret); + + // Caller is from write thread, and we need to update database's sequence + // number. + if (need_update_seq) { + assert(versions_); + if (last_seq_same_as_publish_seq_) { + versions_->SetLastSequence(snapshot_seq); + } else { + // TODO: support write-prepared/write-unprepared transactions with two + // write queues. + assert(false); + } + } + + if (lock) { + mutex_.Unlock(); + } + return ret; +} + namespace { using CfdList = autovector; bool CfdListContains(const CfdList& list, ColumnFamilyData* cfd) { @@ -3210,11 +3320,7 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) { snapshots_.Delete(casted_s); uint64_t oldest_snapshot; if (snapshots_.empty()) { - if (last_seq_same_as_publish_seq_) { - oldest_snapshot = versions_->LastSequence(); - } else { - oldest_snapshot = versions_->LastPublishedSequence(); - } + oldest_snapshot = GetLastPublishedSequence(); } else { oldest_snapshot = snapshots_.oldest()->number_; } @@ -4105,13 +4211,14 @@ Status DBImpl::Close() { if (closed_) { return closing_status_; } + { - InstrumentedMutexLock l(&mutex_); - // If there is unreleased snapshot, fail the close call - if (!snapshots_.empty()) { - return Status::Aborted("Cannot close DB with unreleased snapshot."); + const Status s = CheckIfAllSnapshotsReleased(); + if (!s.ok()) { + return s; } } + closing_status_ = CloseImpl(); closed_ = true; return closing_status_; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index a4aded732..466811a12 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -284,6 +284,19 @@ class DBImpl : public DB { virtual const Snapshot* GetSnapshot() override; virtual void ReleaseSnapshot(const Snapshot* snapshot) override; + // Create a shared snapshot. This snapshot can be shared by multiple readers. + // If any of them uses it for write conflict checking, then + // is_write_conflict_boundary is true. For simplicity, set it to true by + // default. + std::shared_ptr CreateSharedSnapshot( + SequenceNumber snapshot_seq, uint64_t ts); + std::shared_ptr GetSharedSnapshot(uint64_t ts) const; + void ReleaseSharedSnapshotsOlderThan(uint64_t ts, + size_t* remaining_total_ss = nullptr); + Status GetSharedSnapshots( + uint64_t ts_lb, uint64_t ts_ub, + std::vector>* shared_snapshots) const; + using DB::GetProperty; virtual bool GetProperty(ColumnFamilyHandle* column_family, const Slice& property, std::string* value) override; @@ -1921,10 +1934,23 @@ class DBImpl : public DB { SnapshotImpl* GetSnapshotImpl(bool is_write_conflict_boundary, bool lock = true); + // If snapshot_seq != kMaxSequenceNumber, then this function can only be + // called from the write thread that publishes sequence numbers to readers. + // For 1) write-committed, or 2) write-prepared + one-write-queue, this will + // be the write thread performing memtable writes. For write-prepared with + // two write queues, this will be the write thread writing commit marker to + // the WAL. + // If snapshot_seq == kMaxSequenceNumber, this function is called by a caller + // ensuring no writes to the database. + std::shared_ptr CreateSharedSnapshotImpl( + SequenceNumber snapshot_seq, uint64_t ts, bool lock = true); + uint64_t GetMaxTotalWalSize() const; FSDirectory* GetDataDir(ColumnFamilyData* cfd, size_t path_id) const; + Status CheckIfAllSnapshotsReleased(); + Status CloseHelper(); void WaitForBackgroundWork(); @@ -2189,6 +2215,8 @@ class DBImpl : public DB { SnapshotList snapshots_; + SharedSnapshotList shared_snapshots_; + // For each background job, pending_outputs_ keeps the current file number at // the time that background job started. // FindObsoleteFiles()/PurgeObsoleteFiles() never deletes any file that has diff --git a/db/db_test.cc b/db/db_test.cc index c8e394394..105993980 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2859,6 +2859,12 @@ class ModelDB : public DB { assert(false); return 0; } + + uint64_t GetTimestamp() const override { + // no need to call this + assert(false); + return 0; + } }; explicit ModelDB(const Options& options) : options_(options) {} diff --git a/db/snapshot_impl.h b/db/snapshot_impl.h index 9961bdd58..d73d310f0 100644 --- a/db/snapshot_impl.h +++ b/db/snapshot_impl.h @@ -29,6 +29,8 @@ class SnapshotImpl : public Snapshot { virtual SequenceNumber GetSequenceNumber() const override { return number_; } + uint64_t GetTimestamp() const override { return timestamp_; } + private: friend class SnapshotList; @@ -40,6 +42,8 @@ class SnapshotImpl : public Snapshot { int64_t unix_time_; + uint64_t timestamp_; + // Will this snapshot be used by a Transaction to do write-conflict checking? bool is_write_conflict_boundary_; }; @@ -53,6 +57,7 @@ class SnapshotList { // Set all the variables to make UBSAN happy. list_.list_ = nullptr; list_.unix_time_ = 0; + list_.timestamp_ = 0; list_.is_write_conflict_boundary_ = false; count_ = 0; } @@ -60,14 +65,19 @@ class SnapshotList { // No copy-construct. SnapshotList(const SnapshotList&) = delete; - bool empty() const { return list_.next_ == &list_; } + bool empty() const { + assert(list_.next_ != &list_ || 0 == count_); + return list_.next_ == &list_; + } SnapshotImpl* oldest() const { assert(!empty()); return list_.next_; } SnapshotImpl* newest() const { assert(!empty()); return list_.prev_; } SnapshotImpl* New(SnapshotImpl* s, SequenceNumber seq, uint64_t unix_time, - bool is_write_conflict_boundary) { + bool is_write_conflict_boundary, + uint64_t ts = std::numeric_limits::max()) { s->number_ = seq; s->unix_time_ = unix_time; + s->timestamp_ = ts; s->is_write_conflict_boundary_ = is_write_conflict_boundary; s->list_ = this; s->next_ = &list_; @@ -165,4 +175,58 @@ class SnapshotList { uint64_t count_; }; +// All operations on SharedSnapshotList must be synchronized. +class SharedSnapshotList { + public: + explicit SharedSnapshotList() = default; + + std::shared_ptr GetSnapshot(uint64_t ts) const { + if (ts == std::numeric_limits::max() && !snapshots_.empty()) { + auto it = snapshots_.rbegin(); + assert(it != snapshots_.rend()); + return it->second; + } + auto it = snapshots_.find(ts); + if (it == snapshots_.end()) { + return std::shared_ptr(); + } + return it->second; + } + + void GetSnapshots( + uint64_t ts_lb, uint64_t ts_ub, + std::vector>* snapshots) const { + assert(ts_lb < ts_ub); + assert(snapshots); + auto it_low = snapshots_.lower_bound(ts_lb); + auto it_high = snapshots_.lower_bound(ts_ub); + assert(it_low != it_high); + for (auto it = it_low; it != it_high; ++it) { + snapshots->emplace_back(it->second); + } + } + + void AddSnapshot(const std::shared_ptr& snapshot) { + assert(snapshot); + snapshots_.try_emplace(snapshot->GetTimestamp(), snapshot); + } + + // snapshots_to_release: the container to where the shared snapshots will be + // moved so that it retains the last reference to the snapshots and the + // snapshots won't be actually released which requires db mutex. The + // snapshots will be released by caller of ReleaseSnapshotsOlderThan(). + void ReleaseSnapshotsOlderThan( + uint64_t ts, + std::vector>& snapshots_to_release) { + auto ub = snapshots_.upper_bound(ts); + for (auto it = snapshots_.begin(); it != ub; ++it) { + snapshots_to_release.emplace_back(it->second); + } + snapshots_.erase(snapshots_.begin(), ub); + } + + private: + std::map> snapshots_; +}; + } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/snapshot.h b/include/rocksdb/snapshot.h index 6a7212d60..2eb0a1632 100644 --- a/include/rocksdb/snapshot.h +++ b/include/rocksdb/snapshot.h @@ -22,6 +22,8 @@ class Snapshot { // returns Snapshot's sequence number virtual SequenceNumber GetSequenceNumber() const = 0; + virtual uint64_t GetTimestamp() const = 0; + protected: virtual ~Snapshot(); }; diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index eaf2bc128..2a9b396f2 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -458,6 +458,37 @@ class TransactionDB : public StackableDB { virtual std::vector GetDeadlockInfoBuffer() = 0; virtual void SetDeadlockInfoBufferSize(uint32_t target_size) = 0; + // Caller must ensure there are no active writes when this API is called. + // Create a shared snapshot and assign ts to it. + virtual std::shared_ptr CreateSharedSnapshot( + TxnTimestamp ts) = 0; + + // Return the latest shared snapshot if present. + std::shared_ptr GetLatestSharedSnapshot() const { + return GetSharedSnapshot(kMaxTxnTimestamp); + } + // Return the shared snapshot correponding to given timestamp. If ts is + // kMaxTxnTimestamp, then we return the latest shared snapshot if present. + // Othersise, we return the snapshot whose timestamp is equal to `ts`. If no + // such snapshot exists, then we return null. + virtual std::shared_ptr GetSharedSnapshot( + TxnTimestamp ts) const = 0; + // Release shared snapshots whose timestamps are less than or equal to ts. + virtual void ReleaseSharedSnapshotsOlderThan(TxnTimestamp ts) = 0; + + // Get all shared snapshots which will be stored in shared_snapshots. + Status GetAllSharedSnapshots( + std::vector>* shared_snapshots) const { + return GetSharedSnapshots(/*ts_lb=*/0, /*ts_ub=*/kMaxTxnTimestamp, + shared_snapshots); + } + + // Get all shared snapshots whose timestamps fall within [ts_lb, ts_ub). + // shared_snapshots will be cleared and contain returned snapshots. + virtual Status GetSharedSnapshots( + TxnTimestamp ts_lb, TxnTimestamp ts_ub, + std::vector>* shared_snapshots) const = 0; + protected: // To Create an TransactionDB, call Open() // The ownership of db is transferred to the base StackableDB diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index c1e3a2ab2..a9c5df38b 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -656,5 +656,29 @@ void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) { transactions_.erase(it); } +std::shared_ptr PessimisticTransactionDB::CreateSharedSnapshot( + TxnTimestamp ts) { + assert(db_impl_); + return db_impl_->CreateSharedSnapshot(kMaxSequenceNumber, ts); +} + +std::shared_ptr PessimisticTransactionDB::GetSharedSnapshot( + TxnTimestamp ts) const { + assert(db_impl_); + return db_impl_->GetSharedSnapshot(ts); +} + +void PessimisticTransactionDB::ReleaseSharedSnapshotsOlderThan( + TxnTimestamp ts) { + assert(db_impl_); + db_impl_->ReleaseSharedSnapshotsOlderThan(ts); +} + +Status PessimisticTransactionDB::GetSharedSnapshots( + TxnTimestamp ts_lb, TxnTimestamp ts_ub, + std::vector>* shared_snapshots) const { + assert(db_impl_); + return db_impl_->GetSharedSnapshots(ts_lb, ts_ub, shared_snapshots); +} } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index c0a4b9736..c3ef3a4f9 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -150,6 +150,18 @@ class PessimisticTransactionDB : public TransactionDB { return lock_manager_->GetLockTrackerFactory(); } + std::shared_ptr CreateSharedSnapshot( + TxnTimestamp ts) override; + + std::shared_ptr GetSharedSnapshot( + TxnTimestamp ts) const override; + + void ReleaseSharedSnapshotsOlderThan(TxnTimestamp ts) override; + + Status GetSharedSnapshots(TxnTimestamp ts_lb, TxnTimestamp ts_ub, + std::vector>* + shared_snapshots) const override; + protected: DBImpl* db_impl_; std::shared_ptr info_log_;