diff --git a/db/db_impl.cc b/db/db_impl.cc index 0d621b639..628b00e6e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -90,10 +90,10 @@ #include "util/log_buffer.h" #include "util/logging.h" #include "util/mutexlock.h" -#include "util/sst_file_manager_impl.h" #include "util/options_helper.h" #include "util/options_parser.h" #include "util/perf_context_imp.h" +#include "util/sst_file_manager_impl.h" #include "util/stop_watch.h" #include "util/string_util.h" #include "util/sync_point.h" @@ -614,6 +614,78 @@ void DBImpl::MaybeDumpStats() { } } +uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() { + uint64_t min_log = 0; + + // we must look through the memtables for two phase transactions + // that have been committed but not yet flushed + for (auto loop_cfd : *versions_->GetColumnFamilySet()) { + if (loop_cfd->IsDropped()) { + continue; + } + + auto log = loop_cfd->imm()->GetMinLogContainingPrepSection(); + + if (log > 0 && (min_log == 0 || log < min_log)) { + min_log = log; + } + + log = loop_cfd->mem()->GetMinLogContainingPrepSection(); + + if (log > 0 && (min_log == 0 || log < min_log)) { + min_log = log; + } + } + + return min_log; +} + +void DBImpl::MarkLogAsHavingPrepSectionFlushed(uint64_t log) { + assert(log != 0); + std::lock_guard lock(prep_heap_mutex_); + auto it = prepared_section_completed_.find(log); + assert(it != prepared_section_completed_.end()); + it->second += 1; +} + +void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) { + assert(log != 0); + std::lock_guard lock(prep_heap_mutex_); + min_log_with_prep_.push(log); + auto it = prepared_section_completed_.find(log); + if (it == prepared_section_completed_.end()) { + prepared_section_completed_[log] = 0; + } +} + +uint64_t DBImpl::FindMinLogContainingOutstandingPrep() { + uint64_t min_log = 0; + + // first we look in the prepared heap where we keep + // track of transactions that have been prepared (written to WAL) + // but not yet committed. + while (!min_log_with_prep_.empty()) { + min_log = min_log_with_prep_.top(); + + auto it = prepared_section_completed_.find(min_log); + + // value was marked as 'deleted' from heap + if (it != prepared_section_completed_.end() && it->second > 0) { + it->second -= 1; + min_log_with_prep_.pop(); + + // back to squere one... + min_log = 0; + continue; + } else { + // found a valid value + break; + } + } + + return min_log; +} + // * Returns the list of live files in 'sst_live' // If it's doing full scan: // * Returns the list of all files in the filesystem in @@ -671,6 +743,32 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, job_context->pending_manifest_file_number = versions_->pending_manifest_file_number(); job_context->log_number = versions_->MinLogNumber(); + + if (allow_2pc()) { + // if are 2pc we must consider logs containing prepared + // sections of outstanding transactions. + // + // We must check min logs with outstanding prep before we check + // logs referneces by memtables because a log referenced by the + // first data structure could transition to the second under us. + // + // TODO(horuff): iterating over all column families under db mutex. + // should find more optimial solution + auto min_log_in_prep_heap = FindMinLogContainingOutstandingPrep(); + + if (min_log_in_prep_heap != 0 && + min_log_in_prep_heap < job_context->log_number) { + job_context->log_number = min_log_in_prep_heap; + } + + auto min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable(); + + if (min_log_refed_by_mem != 0 && + min_log_refed_by_mem < job_context->log_number) { + job_context->log_number = min_log_refed_by_mem; + } + } + job_context->prev_log_number = versions_->prev_log_number(); versions_->AddLiveFiles(&job_context->sst_live); @@ -708,7 +806,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, } if (!alive_log_files_.empty()) { - uint64_t min_log_number = versions_->MinLogNumber(); + uint64_t min_log_number = job_context->log_number; // find newly obsoleted log files while (alive_log_files_.begin()->number < min_log_number) { auto& earliest = *alive_log_files_.begin(); @@ -1378,9 +1476,9 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // insert. We don't want to fail the whole write batch in that case -- // we just ignore the update. // That's why we set ignore missing column families to true - status = - WriteBatchInternal::InsertInto(&batch, column_family_memtables_.get(), - &flush_scheduler_, true, log_number); + status = WriteBatchInternal::InsertInto( + &batch, column_family_memtables_.get(), &flush_scheduler_, true, + log_number, this); MaybeIgnoreError(&status); if (!status.ok()) { @@ -4258,19 +4356,21 @@ Status DBImpl::SingleDelete(const WriteOptions& write_options, } Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { - return WriteImpl(write_options, my_batch, nullptr); + return WriteImpl(write_options, my_batch, nullptr, nullptr); } #ifndef ROCKSDB_LITE Status DBImpl::WriteWithCallback(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback) { - return WriteImpl(write_options, my_batch, callback); + return WriteImpl(write_options, my_batch, callback, nullptr); } #endif // ROCKSDB_LITE Status DBImpl::WriteImpl(const WriteOptions& write_options, - WriteBatch* my_batch, WriteCallback* callback) { + WriteBatch* my_batch, WriteCallback* callback, + uint64_t* log_used, uint64_t log_ref, + bool disable_memtable) { if (my_batch == nullptr) { return Status::Corruption("Batch is nullptr!"); } @@ -4295,8 +4395,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, w.batch = my_batch; w.sync = write_options.sync; w.disableWAL = write_options.disableWAL; + w.disable_memtable = disable_memtable; w.in_batch_group = false; w.callback = callback; + w.log_ref = log_ref; if (!write_options.disableWAL) { RecordTick(stats_, WRITE_WITH_WAL); @@ -4309,12 +4411,16 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // we are a non-leader in a parallel group PERF_TIMER_GUARD(write_memtable_time); - if (!w.CallbackFailed()) { + if (log_used != nullptr) { + *log_used = w.log_used; + } + + if (w.ShouldWriteToMemtable()) { ColumnFamilyMemTablesImpl column_family_memtables( versions_->GetColumnFamilySet()); WriteBatchInternal::SetSequence(w.batch, w.sequence); w.status = WriteBatchInternal::InsertInto( - w.batch, &column_family_memtables, &flush_scheduler_, + &w, &column_family_memtables, &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*dont_filter_deletes*/, true /*concurrent_memtable_writes*/); } @@ -4332,6 +4438,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, status = w.FinalStatus(); } if (w.state == WriteThread::STATE_COMPLETED) { + if (log_used != nullptr) { + *log_used = w.log_used; + } // write is complete and leader has updated sequence RecordTick(stats_, WRITE_DONE_BY_OTHER); return w.FinalStatus(); @@ -4489,10 +4598,15 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, uint64_t total_byte_size = 0; for (auto writer : write_group) { if (writer->CheckCallback(this)) { - total_count += WriteBatchInternal::Count(writer->batch); - total_byte_size = WriteBatchInternal::AppendedByteSize( - total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); - parallel = parallel && !writer->batch->HasMerge(); + if (writer->ShouldWriteToMemtable()) { + total_count += WriteBatchInternal::Count(writer->batch); + parallel = parallel && !writer->batch->HasMerge(); + } + + if (writer->ShouldWriteToWAL()) { + total_byte_size = WriteBatchInternal::AppendedByteSize( + total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); + } } } @@ -4514,22 +4628,27 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_GUARD(write_wal_time); WriteBatch* merged_batch = nullptr; - if (write_group.size() == 1 && !write_group[0]->CallbackFailed()) { + if (write_group.size() == 1 && write_group[0]->ShouldWriteToWAL()) { merged_batch = write_group[0]->batch; + write_group[0]->log_used = logfile_number_; } else { // WAL needs all of the batches flattened into a single batch. // We could avoid copying here with an iov-like AddRecord // interface merged_batch = &tmp_batch_; for (auto writer : write_group) { - if (!writer->CallbackFailed()) { + if (writer->ShouldWriteToWAL()) { WriteBatchInternal::Append(merged_batch, writer->batch); } + writer->log_used = logfile_number_; } } - WriteBatchInternal::SetSequence(merged_batch, current_sequence); - assert(WriteBatchInternal::Count(merged_batch) == total_count); + if (log_used != nullptr) { + *log_used = logfile_number_; + } + + WriteBatchInternal::SetSequence(merged_batch, current_sequence); Slice log_entry = WriteBatchInternal::Contents(merged_batch); status = logs_.back().writer->AddRecord(log_entry); @@ -4615,14 +4734,14 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, std::memory_order_relaxed); write_thread_.LaunchParallelFollowers(&pg, current_sequence); - if (!w.CallbackFailed()) { + if (w.ShouldWriteToMemtable()) { // do leader write ColumnFamilyMemTablesImpl column_family_memtables( versions_->GetColumnFamilySet()); assert(w.sequence == current_sequence); WriteBatchInternal::SetSequence(w.batch, w.sequence); w.status = WriteBatchInternal::InsertInto( - w.batch, &column_family_memtables, &flush_scheduler_, + &w, &column_family_memtables, &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*dont_filter_deletes*/, true /*concurrent_memtable_writes*/); diff --git a/db/db_impl.h b/db/db_impl.h index 98373a3a6..57f7fd8a8 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -10,8 +10,10 @@ #include #include +#include #include #include +#include #include #include #include @@ -296,7 +298,8 @@ class DBImpl : public DB { bool disallow_trivial_move = false); // Force current memtable contents to be flushed. - Status TEST_FlushMemTable(bool wait = true); + Status TEST_FlushMemTable(bool wait = true, + ColumnFamilyHandle* cfh = nullptr); // Wait for memtable compaction Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr); @@ -345,6 +348,9 @@ class DBImpl : public DB { WriteController& TEST_write_controler() { return write_controller_; } + uint64_t TEST_FindMinLogContainingOutstandingPrep(); + uint64_t TEST_FindMinPrepLogReferencedByMemTable(); + #endif // NDEBUG // Return maximum background compaction alowed to be scheduled based on @@ -421,12 +427,57 @@ class DBImpl : public DB { return num_running_compactions_; } + // hollow transactions shell used for recovery. + // these will then be passed to TransactionDB so that + // locks can be reacquired before writing can resume. + struct RecoveredTransaction { + uint64_t log_number_; + std::string name_; + WriteBatch* batch_; + explicit RecoveredTransaction(const uint64_t log, const std::string& name, + WriteBatch* batch) + : log_number_(log), name_(name), batch_(batch) {} + + ~RecoveredTransaction() { delete batch_; } + }; + + bool allow_2pc() const { return db_options_.allow_2pc; } + + RecoveredTransaction* GetRecoveredTransaction(const std::string& name) { + auto it = recovered_transactions_.find(name); + if (it == recovered_transactions_.end()) { + return nullptr; + } else { + return it->second; + } + } + + void InsertRecoveredTransaction(const uint64_t log, const std::string& name, + WriteBatch* batch) { + recovered_transactions_[name] = new RecoveredTransaction(log, name, batch); + MarkLogAsContainingPrepSection(log); + } + + void DeleteRecoveredTransaction(const std::string& name) { + auto it = recovered_transactions_.find(name); + assert(it != recovered_transactions_.end()); + auto* trx = it->second; + recovered_transactions_.erase(it); + MarkLogAsHavingPrepSectionFlushed(trx->log_number_); + delete trx; + } + + void MarkLogAsHavingPrepSectionFlushed(uint64_t log); + void MarkLogAsContainingPrepSection(uint64_t log); + protected: Env* const env_; const std::string dbname_; unique_ptr versions_; const DBOptions db_options_; Statistics* stats_; + std::unordered_map + recovered_transactions_; InternalIterator* NewInternalIterator(const ReadOptions&, ColumnFamilyData* cfd, @@ -460,7 +511,12 @@ class DBImpl : public DB { void EraseThreadStatusDbInfo() const; Status WriteImpl(const WriteOptions& options, WriteBatch* updates, - WriteCallback* callback); + WriteCallback* callback = nullptr, + uint64_t* log_used = nullptr, uint64_t log_ref = 0, + bool disable_memtable = false); + + uint64_t FindMinLogContainingOutstandingPrep(); + uint64_t FindMinPrepLogReferencedByMemTable(); private: friend class DB; @@ -854,6 +910,28 @@ class DBImpl : public DB { // Indicate DB was opened successfully bool opened_successfully_; + // minmum log number still containing prepared data. + // this is used by FindObsoleteFiles to determine which + // flushed logs we must keep around because they still + // contain prepared data which has not been flushed or rolled back + std::priority_queue, std::greater> + min_log_with_prep_; + + // to be used in conjunction with min_log_with_prep_. + // once a transaction with data in log L is committed or rolled back + // rather than removing the value from the heap we add that value + // to prepared_section_completed_ which maps LOG -> instance_count + // since a log could contain multiple prepared sections + // + // when trying to determine the minmum log still active we first + // consult min_log_with_prep_. while that root value maps to + // a value > 0 in prepared_section_completed_ we decrement the + // instance_count for that log and pop the root value in + // min_log_with_prep_. This will work the same as a min_heap + // where we are deleteing arbitrary elements and the up heaping. + std::unordered_map prepared_section_completed_; + std::mutex prep_heap_mutex_; + // No copying allowed DBImpl(const DBImpl&); void operator=(const DBImpl&); diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index af4553f89..a62aa7546 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -74,10 +74,17 @@ Status DBImpl::TEST_CompactRange(int level, const Slice* begin, disallow_trivial_move); } -Status DBImpl::TEST_FlushMemTable(bool wait) { +Status DBImpl::TEST_FlushMemTable(bool wait, ColumnFamilyHandle* cfh) { FlushOptions fo; fo.wait = wait; - return FlushMemTable(default_cf_handle_->cfd(), fo); + ColumnFamilyData* cfd; + if (cfh == nullptr) { + cfd = default_cf_handle_->cfd(); + } else { + auto cfhi = reinterpret_cast(cfh); + cfd = cfhi->cfd(); + } + return FlushMemTable(cfd, fo); } Status DBImpl::TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family) { @@ -154,5 +161,12 @@ Status DBImpl::TEST_GetAllImmutableCFOptions( return Status::OK(); } +uint64_t DBImpl::TEST_FindMinLogContainingOutstandingPrep() { + return FindMinLogContainingOutstandingPrep(); +} + +uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() { + return FindMinPrepLogReferencedByMemTable(); +} } // namespace rocksdb #endif // NDEBUG diff --git a/db/memtable.cc b/db/memtable.cc index 32aa8d76c..e899974b3 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -75,6 +75,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, first_seqno_(0), earliest_seqno_(earliest_seq), mem_next_logfile_number_(0), + min_prep_log_referenced_(0), locks_(moptions_.inplace_update_support ? moptions_.inplace_update_num_locks : 0), @@ -800,4 +801,17 @@ void MemTableRep::Get(const LookupKey& k, void* callback_args, } } +void MemTable::RefLogContainingPrepSection(uint64_t log) { + assert(log > 0); + auto cur = min_prep_log_referenced_.load(); + while ((log < cur || cur == 0) && + !min_prep_log_referenced_.compare_exchange_strong(cur, log)) { + cur = min_prep_log_referenced_.load(); + } +} + +uint64_t MemTable::GetMinLogContainingPrepSection() { + return min_prep_log_referenced_.load(); +} + } // namespace rocksdb diff --git a/db/memtable.h b/db/memtable.h index a01a598f0..2585ac0c7 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -271,6 +271,13 @@ class MemTable { // operations on the same MemTable. void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; } + // if this memtable contains data from a committed + // two phase transaction we must take note of the + // log which contains that data so we can know + // when to relese that log + void RefLogContainingPrepSection(uint64_t log); + uint64_t GetMinLogContainingPrepSection(); + // Notify the underlying storage that no more items will be added. // REQUIRES: external synchronization to prevent simultaneous // operations on the same MemTable. @@ -342,6 +349,10 @@ class MemTable { // The log files earlier than this number can be deleted. uint64_t mem_next_logfile_number_; + // the earliest log containing a prepared section + // which has been inserted into this memtable. + std::atomic min_prep_log_referenced_; + // rw locks for inplace updates std::vector locks_; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 9c1d3632b..685d34b83 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -392,4 +392,24 @@ void MemTableList::InstallNewVersion() { } } +uint64_t MemTableList::GetMinLogContainingPrepSection() { + uint64_t min_log = 0; + + for (auto& m : current_->memlist_) { + // this mem has been flushed it no longer + // needs to hold on the its prep section + if (m->flush_completed_) { + continue; + } + + auto log = m->GetMinLogContainingPrepSection(); + + if (log > 0 && (min_log == 0 || log < min_log)) { + min_log = log; + } + } + + return min_log; +} + } // namespace rocksdb diff --git a/db/memtable_list.h b/db/memtable_list.h index 37f5c7784..7f633f026 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -215,6 +215,8 @@ class MemTableList { size_t* current_memory_usage() { return ¤t_memory_usage_; } + uint64_t GetMinLogContainingPrepSection(); + private: // DB mutex held void InstallNewVersion(); diff --git a/db/write_batch.cc b/db/write_batch.cc index 9dcd4c26f..75b8ae319 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -681,38 +681,46 @@ Status WriteBatch::RollbackToSavePoint() { return Status::OK(); } -namespace { class MemTableInserter : public WriteBatch::Handler { public: SequenceNumber sequence_; ColumnFamilyMemTables* const cf_mems_; FlushScheduler* const flush_scheduler_; const bool ignore_missing_column_families_; - const uint64_t log_number_; + const uint64_t recovering_log_number_; + // log number that all Memtables inserted into should reference + uint64_t log_number_ref_; DBImpl* db_; const bool dont_filter_deletes_; const bool concurrent_memtable_writes_; + // current recovered transaction we are rebuilding (recovery) + WriteBatch* rebuilding_trx_; // cf_mems should not be shared with concurrent inserters MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems, FlushScheduler* flush_scheduler, - bool ignore_missing_column_families, uint64_t log_number, - DB* db, const bool dont_filter_deletes, + bool ignore_missing_column_families, + uint64_t recovering_log_number, DB* db, + const bool dont_filter_deletes, bool concurrent_memtable_writes) : sequence_(sequence), cf_mems_(cf_mems), flush_scheduler_(flush_scheduler), ignore_missing_column_families_(ignore_missing_column_families), - log_number_(log_number), + recovering_log_number_(recovering_log_number), + log_number_ref_(0), db_(reinterpret_cast(db)), dont_filter_deletes_(dont_filter_deletes), - concurrent_memtable_writes_(concurrent_memtable_writes) { + concurrent_memtable_writes_(concurrent_memtable_writes), + rebuilding_trx_(nullptr) { assert(cf_mems_); if (!dont_filter_deletes_) { assert(db_); } } + void set_log_number_ref(uint64_t log) { log_number_ref_ = log; } + bool SeekToColumnFamily(uint32_t column_family_id, Status* s) { // If we are in a concurrent mode, it is the caller's responsibility // to clone the original ColumnFamilyMemTables so that each thread @@ -728,16 +736,24 @@ class MemTableInserter : public WriteBatch::Handler { } return false; } - if (log_number_ != 0 && log_number_ < cf_mems_->GetLogNumber()) { - // This is true only in recovery environment (log_number_ is always 0 in + if (recovering_log_number_ != 0 && + recovering_log_number_ < cf_mems_->GetLogNumber()) { + // This is true only in recovery environment (recovering_log_number_ is + // always 0 in // non-recovery, regular write code-path) - // * If log_number_ < cf_mems_->GetLogNumber(), this means that column + // * If recovering_log_number_ < cf_mems_->GetLogNumber(), this means that + // column // family already contains updates from this log. We can't apply updates // twice because of update-in-place or merge workloads -- ignore the // update *s = Status::OK(); return false; } + + if (log_number_ref_ > 0) { + cf_mems_->GetMemTable()->RefLogContainingPrepSection(log_number_ref_); + } + return true; } @@ -748,6 +764,12 @@ class MemTableInserter : public WriteBatch::Handler { ++sequence_; return seek_status; } + + if (rebuilding_trx_ != nullptr) { + rebuilding_trx_->Put(cf_mems_->GetColumnFamilyHandle(), key, value); + return Status::OK(); + } + MemTable* mem = cf_mems_->GetMemTable(); auto* moptions = mem->GetMemTableOptions(); if (!moptions->inplace_update_support) { @@ -801,11 +823,6 @@ class MemTableInserter : public WriteBatch::Handler { Status DeleteImpl(uint32_t column_family_id, const Slice& key, ValueType delete_type) { - Status seek_status; - if (!SeekToColumnFamily(column_family_id, &seek_status)) { - ++sequence_; - return seek_status; - } MemTable* mem = cf_mems_->GetMemTable(); auto* moptions = mem->GetMemTableOptions(); if (!dont_filter_deletes_ && moptions->filter_deletes) { @@ -832,11 +849,33 @@ class MemTableInserter : public WriteBatch::Handler { virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) override { + Status seek_status; + if (!SeekToColumnFamily(column_family_id, &seek_status)) { + ++sequence_; + return seek_status; + } + + if (rebuilding_trx_ != nullptr) { + rebuilding_trx_->Delete(cf_mems_->GetColumnFamilyHandle(), key); + return Status::OK(); + } + return DeleteImpl(column_family_id, key, kTypeDeletion); } virtual Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override { + Status seek_status; + if (!SeekToColumnFamily(column_family_id, &seek_status)) { + ++sequence_; + return seek_status; + } + + if (rebuilding_trx_ != nullptr) { + rebuilding_trx_->SingleDelete(cf_mems_->GetColumnFamilyHandle(), key); + return Status::OK(); + } + return DeleteImpl(column_family_id, key, kTypeSingleDeletion); } @@ -848,6 +887,10 @@ class MemTableInserter : public WriteBatch::Handler { ++sequence_; return seek_status; } + if (rebuilding_trx_ != nullptr) { + rebuilding_trx_->Merge(cf_mems_->GetColumnFamilyHandle(), key, value); + return Status::OK(); + } MemTable* mem = cf_mems_->GetMemTable(); auto* moptions = mem->GetMemTableOptions(); bool perform_merge = false; @@ -933,8 +976,102 @@ class MemTableInserter : public WriteBatch::Handler { } } } + + Status MarkBeginPrepare() override { + assert(rebuilding_trx_ == nullptr); + assert(db_); + + if (recovering_log_number_ != 0) { + // during recovery we rebuild a hollow transaction + // from all encountered prepare sections of the wal + if (db_->allow_2pc() == false) { + return Status::NotSupported( + "WAL contains prepared transactions. Open with " + "TransactionDB::Open()."); + } + + // we are now iterating through a prepared section + rebuilding_trx_ = new WriteBatch(); + } else { + // in non-recovery we ignore prepare markers + // and insert the values directly. making sure we have a + // log for each insertion to reference. + assert(log_number_ref_ > 0); + } + + return Status::OK(); + } + + Status MarkEndPrepare(const Slice& name) override { + assert(db_); + assert((rebuilding_trx_ != nullptr) == (recovering_log_number_ != 0)); + + if (recovering_log_number_ != 0) { + assert(db_->allow_2pc()); + db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(), + rebuilding_trx_); + rebuilding_trx_ = nullptr; + } else { + assert(rebuilding_trx_ == nullptr); + assert(log_number_ref_ > 0); + } + + return Status::OK(); + } + + Status MarkCommit(const Slice& name) override { + assert(db_); + + Status s; + + if (recovering_log_number_ != 0) { + // in recovery when we encounter a commit marker + // we lookup this transaction in our set of rebuilt transactions + // and commit. + auto trx = db_->GetRecoveredTransaction(name.ToString()); + + // the log contaiting the prepared section may have + // been released in the last incarnation because the + // data was flushed to L0 + if (trx != nullptr) { + // at this point individual CF lognumbers will prevent + // duplicate re-insertion of values. + assert(log_number_ref_ == 0); + // all insertes must refernce this trx log number + log_number_ref_ = trx->log_number_; + s = trx->batch_->Iterate(this); + log_number_ref_ = 0; + + if (s.ok()) { + db_->DeleteRecoveredTransaction(name.ToString()); + } + } + } else { + // in non recovery we simply ignore this tag + } + + return s; + } + + Status MarkRollback(const Slice& name) override { + assert(db_); + + if (recovering_log_number_ != 0) { + auto trx = db_->GetRecoveredTransaction(name.ToString()); + + // the log containing the transactions prep section + // may have been released in the previous incarnation + // because we knew it had been rolled back + if (trx != nullptr) { + db_->DeleteRecoveredTransaction(name.ToString()); + } + } else { + // in non recovery we simply ignore this tag + } + + return Status::OK(); + } }; -} // namespace // This function can only be called in these conditions: // 1) During Recovery() @@ -949,18 +1086,36 @@ Status WriteBatchInternal::InsertInto( MemTableInserter inserter(sequence, memtables, flush_scheduler, ignore_missing_column_families, log_number, db, dont_filter_deletes, concurrent_memtable_writes); - for (size_t i = 0; i < writers.size(); i++) { - if (!writers[i]->CallbackFailed()) { - writers[i]->status = writers[i]->batch->Iterate(&inserter); - if (!writers[i]->status.ok()) { - return writers[i]->status; - } + auto w = writers[i]; + if (!w->ShouldWriteToMemtable()) { + continue; + } + inserter.set_log_number_ref(w->log_ref); + w->status = w->batch->Iterate(&inserter); + if (!w->status.ok()) { + return w->status; } } return Status::OK(); } +Status WriteBatchInternal::InsertInto(WriteThread::Writer* writer, + ColumnFamilyMemTables* memtables, + FlushScheduler* flush_scheduler, + bool ignore_missing_column_families, + uint64_t log_number, DB* db, + const bool dont_filter_deletes, + bool concurrent_memtable_writes) { + MemTableInserter inserter(WriteBatchInternal::Sequence(writer->batch), + memtables, flush_scheduler, + ignore_missing_column_families, log_number, db, + dont_filter_deletes, concurrent_memtable_writes); + assert(writer->ShouldWriteToMemtable()); + inserter.set_log_number_ref(writer->log_ref); + return writer->batch->Iterate(&inserter); +} + Status WriteBatchInternal::InsertInto(const WriteBatch* batch, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 80e08d1d0..90203809d 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -164,6 +164,13 @@ class WriteBatchInternal { uint64_t log_number = 0, DB* db = nullptr, const bool dont_filter_deletes = true, bool concurrent_memtable_writes = false); + static Status InsertInto(WriteThread::Writer* writer, + ColumnFamilyMemTables* memtables, + FlushScheduler* flush_scheduler, + bool ignore_missing_column_families = false, + uint64_t log_number = 0, DB* db = nullptr, + const bool dont_filter_deletes = true, + bool concurrent_memtable_writes = false); static void Append(WriteBatch* dst, const WriteBatch* src); diff --git a/db/write_thread.h b/db/write_thread.h index c3cb5cc0e..3e3a2f337 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -11,11 +11,12 @@ #include #include #include -#include #include +#include + #include "db/write_callback.h" -#include "rocksdb/types.h" #include "rocksdb/status.h" +#include "rocksdb/types.h" #include "rocksdb/write_batch.h" #include "util/autovector.h" #include "util/instrumented_mutex.h" @@ -79,6 +80,9 @@ class WriteThread { WriteBatch* batch; bool sync; bool disableWAL; + bool disable_memtable; + uint64_t log_used; // log number that this batch was inserted into + uint64_t log_ref; // log number that memtable insert should reference bool in_batch_group; WriteCallback* callback; bool made_waitable; // records lazy construction of mutex and cv @@ -96,6 +100,9 @@ class WriteThread { : batch(nullptr), sync(false), disableWAL(false), + disable_memtable(false), + log_used(0), + log_ref(0), in_batch_group(false), callback(nullptr), made_waitable(false), @@ -153,6 +160,12 @@ class WriteThread { return (callback != nullptr) && !callback_status.ok(); } + bool ShouldWriteToMemtable() { + return !CallbackFailed() && !disable_memtable; + } + + bool ShouldWriteToWAL() { return !CallbackFailed() && !disableWAL; } + // No other mutexes may be acquired while holding StateMutex(), it is // always last in the order std::mutex& StateMutex() { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index fb5eab5d4..538671824 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1313,6 +1313,10 @@ struct DBOptions { // Default: kPointInTimeRecovery WALRecoveryMode wal_recovery_mode; + // if set to false then recovery will fail when a prepared + // transaction is encountered in the WAL + bool allow_2pc = false; + // A global cache for table-level rows. // Default: nullptr (disabled) // Not supported in ROCKSDB_LITE mode! diff --git a/util/options_helper.h b/util/options_helper.h index 909cbc9ec..2a72ac071 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -167,6 +167,9 @@ static std::unordered_map db_options_type_info = { {"allow_mmap_writes", {offsetof(struct DBOptions, allow_mmap_writes), OptionType::kBoolean, OptionVerificationType::kNormal}}, + {"allow_2pc", + {offsetof(struct DBOptions, allow_2pc), OptionType::kBoolean, + OptionVerificationType::kNormal}}, {"allow_os_buffer", {offsetof(struct DBOptions, allow_os_buffer), OptionType::kBoolean, OptionVerificationType::kNormal}}, diff --git a/util/options_settable_test.cc b/util/options_settable_test.cc index b4c0fd4cf..c05e3b803 100644 --- a/util/options_settable_test.cc +++ b/util/options_settable_test.cc @@ -279,7 +279,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "write_thread_max_yield_usec=1000;" "access_hint_on_compaction_start=NONE;" "info_log_level=DEBUG_LEVEL;" - "dump_malloc_stats=false;", + "dump_malloc_stats=false;" + "allow_2pc=false;", new_options)); ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions),