diff --git a/utilities/transactions/optimistic_transaction_impl.cc b/utilities/transactions/optimistic_transaction_impl.cc index 120f18ed8..897e2711c 100644 --- a/utilities/transactions/optimistic_transaction_impl.cc +++ b/utilities/transactions/optimistic_transaction_impl.cc @@ -54,7 +54,7 @@ Status OptimisticTransactionImpl::Commit() { } Status s = db_impl->WriteWithCallback( - write_options_, write_batch_->GetWriteBatch(), &callback); + write_options_, GetWriteBatch()->GetWriteBatch(), &callback); if (s.ok()) { Clear(); @@ -77,7 +77,7 @@ Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family, SequenceNumber seq; if (snapshot_) { - seq = snapshot_->snapshot()->GetSequenceNumber(); + seq = snapshot_->GetSequenceNumber(); } else { seq = db_->GetLatestSequenceNumber(); } diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index 5f3e97e9b..aeea21e73 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -21,14 +21,14 @@ TransactionBaseImpl::TransactionBaseImpl(DB* db, : db_(db), write_options_(write_options), cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())), - write_batch_(new WriteBatchWithIndex(cmp_, 0, true)), - start_time_(db_->GetEnv()->NowMicros()) {} + start_time_(db_->GetEnv()->NowMicros()), + write_batch_(cmp_, 0, true) {} TransactionBaseImpl::~TransactionBaseImpl() {} void TransactionBaseImpl::Clear() { save_points_.reset(nullptr); - write_batch_->Clear(); + write_batch_.Clear(); tracked_keys_.clear(); num_puts_ = 0; num_deletes_ = 0; @@ -40,7 +40,11 @@ void TransactionBaseImpl::SetSnapshot() { auto db_impl = reinterpret_cast(db_); const Snapshot* snapshot = db_impl->GetSnapshotForWriteConflictBoundary(); - snapshot_.reset(new ManagedSnapshot(db_, snapshot)); + + // Set a custom deleter for the snapshot_ SharedPtr as the snapshot needs to + // be released, not deleted when it is no longer referenced. + snapshot_.reset(snapshot, std::bind(&TransactionBaseImpl::ReleaseSnapshot, + this, std::placeholders::_1, db_)); snapshot_needed_ = false; snapshot_notifier_ = nullptr; } @@ -84,7 +88,7 @@ void TransactionBaseImpl::SetSavePoint() { } save_points_->emplace(snapshot_, snapshot_needed_, snapshot_notifier_, num_puts_, num_deletes_, num_merges_); - write_batch_->SetSavePoint(); + write_batch_.SetSavePoint(); } Status TransactionBaseImpl::RollbackToSavePoint() { @@ -99,7 +103,7 @@ Status TransactionBaseImpl::RollbackToSavePoint() { num_merges_ = save_point.num_merges_; // Rollback batch - Status s = write_batch_->RollbackToSavePoint(); + Status s = write_batch_.RollbackToSavePoint(); assert(s.ok()); // Rollback any keys that were tracked since the last savepoint @@ -119,7 +123,7 @@ Status TransactionBaseImpl::RollbackToSavePoint() { return s; } else { - assert(write_batch_->RollbackToSavePoint().IsNotFound()); + assert(write_batch_.RollbackToSavePoint().IsNotFound()); return Status::NotFound(); } } @@ -127,8 +131,8 @@ Status TransactionBaseImpl::RollbackToSavePoint() { Status TransactionBaseImpl::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value) { - return write_batch_->GetFromBatchAndDB(db_, read_options, column_family, key, - value); + return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key, + value); } Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options, @@ -189,7 +193,7 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options) { Iterator* db_iter = db_->NewIterator(read_options); assert(db_iter); - return write_batch_->NewIteratorWithBase(db_iter); + return write_batch_.NewIteratorWithBase(db_iter); } Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options, @@ -197,7 +201,7 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options, Iterator* db_iter = db_->NewIterator(read_options, column_family); assert(db_iter); - return write_batch_->NewIteratorWithBase(column_family, db_iter); + return write_batch_.NewIteratorWithBase(column_family, db_iter); } Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, @@ -353,11 +357,11 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, } void TransactionBaseImpl::PutLogData(const Slice& blob) { - write_batch_->PutLogData(blob); + write_batch_.PutLogData(blob); } WriteBatchWithIndex* TransactionBaseImpl::GetWriteBatch() { - return write_batch_.get(); + return &write_batch_; } uint64_t TransactionBaseImpl::GetElapsedTime() const { @@ -413,13 +417,17 @@ const TransactionKeyMap* TransactionBaseImpl::GetTrackedKeysSinceSavePoint() { WriteBatchBase* TransactionBaseImpl::GetBatchForWrite() { if (indexing_enabled_) { // Use WriteBatchWithIndex - return write_batch_.get(); + return &write_batch_; } else { // Don't use WriteBatchWithIndex. Return base WriteBatch. - return write_batch_->GetWriteBatch(); + return write_batch_.GetWriteBatch(); } } +void TransactionBaseImpl::ReleaseSnapshot(const Snapshot* snapshot, DB* db) { + db->ReleaseSnapshot(snapshot); +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 4515bfaf5..3fe3513b9 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -165,7 +165,7 @@ class TransactionBaseImpl : public Transaction { } const Snapshot* GetSnapshot() const override { - return snapshot_ ? snapshot_->snapshot() : nullptr; + return snapshot_ ? snapshot_.get() : nullptr; } void SetSnapshot() override; @@ -202,6 +202,9 @@ class TransactionBaseImpl : public Transaction { write_options_ = write_options; } + // Used for memory management for snapshot_ + void ReleaseSnapshot(const Snapshot* snapshot, DB* db); + protected: // Add a key to the list of tracked keys. // seqno is the earliest seqno this key was involved with this transaction. @@ -218,15 +221,12 @@ class TransactionBaseImpl : public Transaction { const Comparator* cmp_; - // Records writes pending in this transaction - std::unique_ptr write_batch_; - // Stores that time the txn was constructed, in microseconds. const uint64_t start_time_; // Stores the current snapshot that was was set by SetSnapshot or null if // no snapshot is currently set. - std::shared_ptr snapshot_; + std::shared_ptr snapshot_; // Count of various operations pending in this transaction uint64_t num_puts_ = 0; @@ -234,7 +234,7 @@ class TransactionBaseImpl : public Transaction { uint64_t num_merges_ = 0; struct SavePoint { - std::shared_ptr snapshot_; + std::shared_ptr snapshot_; bool snapshot_needed_; std::shared_ptr snapshot_notifier_; uint64_t num_puts_; @@ -244,7 +244,7 @@ class TransactionBaseImpl : public Transaction { // Record all keys tracked since the last savepoint TransactionKeyMap new_keys_; - SavePoint(std::shared_ptr snapshot, bool snapshot_needed, + SavePoint(std::shared_ptr snapshot, bool snapshot_needed, std::shared_ptr snapshot_notifier, uint64_t num_puts, uint64_t num_deletes, uint64_t num_merges) : snapshot_(snapshot), @@ -256,6 +256,9 @@ class TransactionBaseImpl : public Transaction { }; private: + // Records writes pending in this transaction + WriteBatchWithIndex write_batch_; + // Stack of the Snapshot saved at each save point. Saved snapshots may be // nullptr if there was no snapshot at the time SetSavePoint() was called. std::unique_ptr> save_points_; diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index 7480ce6dd..3f25ff77d 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -92,7 +92,7 @@ Status TransactionImpl::CommitBatch(WriteBatch* batch) { } Status TransactionImpl::Commit() { - Status s = DoCommit(write_batch_->GetWriteBatch()); + Status s = DoCommit(GetWriteBatch()->GetWriteBatch()); Clear(); @@ -295,7 +295,7 @@ Status TransactionImpl::ValidateSnapshot(ColumnFamilyHandle* column_family, SequenceNumber* new_seqno) { assert(snapshot_); - SequenceNumber seq = snapshot_->snapshot()->GetSequenceNumber(); + SequenceNumber seq = snapshot_->GetSequenceNumber(); if (prev_seqno <= seq) { // If the key has been previous validated at a sequence number earlier // than the curent snapshot's sequence number, we already know it has not @@ -311,9 +311,9 @@ Status TransactionImpl::ValidateSnapshot(ColumnFamilyHandle* column_family, ColumnFamilyHandle* cfh = column_family ? column_family : db_impl->DefaultColumnFamily(); - return TransactionUtil::CheckKeyForConflicts( - db_impl, cfh, key.ToString(), snapshot_->snapshot()->GetSequenceNumber(), - false /* cache_only */); + return TransactionUtil::CheckKeyForConflicts(db_impl, cfh, key.ToString(), + snapshot_->GetSequenceNumber(), + false /* cache_only */); } } // namespace rocksdb