diff --git a/db/db_impl.cc b/db/db_impl.cc index f17d70290..27ffac50f 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -157,6 +157,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)), env_options_for_compaction_(env_->OptimizeForCompactionTableWrite( env_options_, immutable_db_options_)), + seq_per_batch_(seq_per_batch), + batch_per_txn_(batch_per_txn), db_lock_(nullptr), shutting_down_(false), bg_cv_(&mutex_), @@ -202,8 +204,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, opened_successfully_(false), two_write_queues_(options.two_write_queues), manual_wal_flush_(options.manual_wal_flush), - seq_per_batch_(seq_per_batch), - batch_per_txn_(batch_per_txn), // last_sequencee_ is always maintained by the main queue that also writes // to the memtable. When two_write_queues_ is disabled last seq in // memtable is the same as last seq published to the readers. When it is diff --git a/db/db_impl.h b/db/db_impl.h index ce90ab25e..9bdb0abdc 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -803,6 +803,23 @@ class DBImpl : public DB { // Additonal options for compaction and flush EnvOptions env_options_for_compaction_; + std::unique_ptr column_family_memtables_; + + // Increase the sequence number after writing each batch, whether memtable is + // disabled for that or not. Otherwise the sequence number is increased after + // writing each key into memtable. This implies that when disable_memtable is + // set, the seq is not increased at all. + // + // Default: false + const bool seq_per_batch_; + // This determines during recovery whether we expect one writebatch per + // recovered transaction, or potentially multiple writebatches per + // transaction. For WriteUnprepared, this is set to false, since multiple + // batches can exist per transaction. + // + // Default: true + const bool batch_per_txn_; + // Except in DB::Open(), WriteOptionsFile can only be called when: // Persist options to options file. // If need_mutex_lock = false, the method will lock DB mutex. @@ -1036,8 +1053,8 @@ class DBImpl : public DB { JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri); // REQUIRES: log_numbers are sorted in ascending order - Status RecoverLogFiles(const std::vector& log_numbers, - SequenceNumber* next_sequence, bool read_only); + virtual Status RecoverLogFiles(const std::vector& log_numbers, + SequenceNumber* next_sequence, bool read_only); // The following two methods are used to flush a memtable to // storage. The first one is used at database RecoveryTime (when the @@ -1294,7 +1311,6 @@ class DBImpl : public DB { // expesnive mutex_ lock during WAL write, which update log_empty_. bool log_empty_; - std::unique_ptr column_family_memtables_; struct LogFileNumberSize { explicit LogFileNumberSize(uint64_t _number) : number(_number) {} void AddSize(uint64_t new_size) { size += new_size; } @@ -1689,20 +1705,7 @@ class DBImpl : public DB { // In 2PC these are the writes at Prepare phase. const bool two_write_queues_; const bool manual_wal_flush_; - // Increase the sequence number after writing each batch, whether memtable is - // disabled for that or not. Otherwise the sequence number is increased after - // writing each key into memtable. This implies that when disable_memtable is - // set, the seq is not increased at all. - // - // Default: false - const bool seq_per_batch_; - // This determines during recovery whether we expect one writebatch per - // recovered transaction, or potentially multiple writebatches per - // transaction. For WriteUnprepared, this is set to false, since multiple - // batches can exist per transaction. - // - // Default: true - const bool batch_per_txn_; + // LastSequence also indicates last published sequence visibile to the // readers. Otherwise LastPublishedSequence should be used. const bool last_seq_same_as_publish_seq_; diff --git a/db/db_impl_secondary.cc b/db/db_impl_secondary.cc index acc952524..90e979b4e 100644 --- a/db/db_impl_secondary.cc +++ b/db/db_impl_secondary.cc @@ -4,6 +4,12 @@ // (found in the LICENSE.Apache file in the root directory). #include "db/db_impl_secondary.h" + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif +#include + #include "db/db_iter.h" #include "db/merge_context.h" #include "monitoring/perf_context_imp.h" @@ -52,12 +58,174 @@ Status DBImplSecondary::Recover( default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats(); single_column_family_mode_ = versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1; + + // Recover from all newer log files than the ones named in the + // descriptor. + std::vector filenames; + s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames); + if (s.IsNotFound()) { + return Status::InvalidArgument("Failed to open wal_dir", + immutable_db_options_.wal_dir); + } else if (!s.ok()) { + return s; + } + + std::vector logs; + // if log_readers_ is non-empty, it means we have applied all logs with log + // numbers smaller than the smallest log in log_readers_, so there is no + // need to pass these logs to RecoverLogFiles + uint64_t log_number_min = 0; + if (log_readers_.size() > 0) { + log_number_min = log_readers_.begin()->first; + } + for (size_t i = 0; i < filenames.size(); i++) { + uint64_t number; + FileType type; + if (ParseFileName(filenames[i], &number, &type) && type == kLogFile && + number >= log_number_min) { + logs.push_back(number); + } + } + + if (!logs.empty()) { + // Recover in the order in which the logs were generated + std::sort(logs.begin(), logs.end()); + SequenceNumber next_sequence(kMaxSequenceNumber); + s = RecoverLogFiles(logs, &next_sequence, true /*read_only*/); + } } - // TODO: attempt to recover from WAL files. + // TODO: update options_file_number_ needed? + return s; } +// try to find log reader using log_number from log_readers_ map, initialize +// if it doesn't exist +Status DBImplSecondary::MaybeInitLogReader( + uint64_t log_number, log::FragmentBufferedReader** log_reader) { + auto iter = log_readers_.find(log_number); + // make sure the log file is still present + if (iter == log_readers_.end() || + iter->second->reader_->GetLogNumber() != log_number) { + // delete the obsolete log reader if log number mismatch + if (iter != log_readers_.end()) { + log_readers_.erase(iter); + } + // initialize log reader from log_number + // TODO: min_log_number_to_keep_2pc check needed? + // Open the log file + std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Recovering log #%" PRIu64 " mode %d", log_number, + static_cast(immutable_db_options_.wal_recovery_mode)); + + std::unique_ptr file_reader; + { + std::unique_ptr file; + Status status = env_->NewSequentialFile( + fname, &file, env_->OptimizeForLogRead(env_options_)); + if (!status.ok()) { + *log_reader = nullptr; + return status; + } + file_reader.reset(new SequentialFileReader(std::move(file), fname)); + } + + // Create the log reader. + LogReaderContainer* log_reader_container = new LogReaderContainer( + env_, immutable_db_options_.info_log, std::move(fname), + std::move(file_reader), log_number); + log_readers_.insert(std::make_pair( + log_number, std::unique_ptr(log_reader_container))); + } + iter = log_readers_.find(log_number); + assert(iter != log_readers_.end()); + *log_reader = iter->second->reader_; + return Status::OK(); +} + +// After manifest recovery, replay WALs and refresh log_readers_ if necessary +// REQUIRES: log_numbers are sorted in ascending order +Status DBImplSecondary::RecoverLogFiles( + const std::vector& log_numbers, SequenceNumber* next_sequence, + bool /*read_only*/) { + mutex_.AssertHeld(); + Status status; + for (auto log_number : log_numbers) { + log::FragmentBufferedReader* reader = nullptr; + status = MaybeInitLogReader(log_number, &reader); + if (!status.ok()) { + return status; + } + assert(reader != nullptr); + } + for (auto log_number : log_numbers) { + auto it = log_readers_.find(log_number); + assert(it != log_readers_.end()); + log::FragmentBufferedReader* reader = it->second->reader_; + // Manually update the file number allocation counter in VersionSet. + versions_->MarkFileNumberUsed(log_number); + + // Determine if we should tolerate incomplete records at the tail end of the + // Read all the records and add to a memtable + std::string scratch; + Slice record; + WriteBatch batch; + + while (reader->ReadRecord(&record, &scratch, + immutable_db_options_.wal_recovery_mode) && + status.ok()) { + if (record.size() < WriteBatchInternal::kHeader) { + reader->GetReporter()->Corruption( + record.size(), Status::Corruption("log record too small")); + continue; + } + WriteBatchInternal::SetContents(&batch, record); + // do not check sequence number because user may toggle disableWAL + // between writes which breaks sequence number continuity guarantee + + // If column family was not found, it might mean that the WAL write + // batch references to the column family that was dropped after the + // insert. We don't want to fail the whole write batch in that case -- + // we just ignore the update. + // That's why we set ignore missing column families to true + // passing null flush_scheduler will disable memtable flushing which is + // needed for secondary instances + bool has_valid_writes = false; + status = WriteBatchInternal::InsertInto( + &batch, column_family_memtables_.get(), nullptr /* flush_scheduler */, + true, log_number, this, false /* concurrent_memtable_writes */, + next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_); + if (!status.ok()) { + // We are treating this as a failure while reading since we read valid + // blocks that do not form coherent data + reader->GetReporter()->Corruption(record.size(), status); + continue; + } + } + + if (!status.ok()) { + return status; + } + + auto last_sequence = *next_sequence - 1; + if ((*next_sequence != kMaxSequenceNumber) && + (versions_->LastSequence() <= last_sequence)) { + versions_->SetLastAllocatedSequence(last_sequence); + versions_->SetLastPublishedSequence(last_sequence); + versions_->SetLastSequence(last_sequence); + } + } + // remove logreaders from map after successfully recovering the WAL + if (log_readers_.size() > 1) { + auto eraseIter = log_readers_.begin(); + std::advance(eraseIter, log_readers_.size() - 1); + log_readers_.erase(log_readers_.begin(), eraseIter); + } + return status; +} + // Implementation of the DB interface Status DBImplSecondary::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, diff --git a/db/db_impl_secondary.h b/db/db_impl_secondary.h index 1b6746f7e..64c814328 100644 --- a/db/db_impl_secondary.h +++ b/db/db_impl_secondary.h @@ -13,6 +13,55 @@ namespace rocksdb { +class LogReaderContainer { + public: + LogReaderContainer() + : reader_(nullptr), reporter_(nullptr), status_(nullptr) {} + LogReaderContainer(Env* env, std::shared_ptr info_log, + std::string fname, + std::unique_ptr&& file_reader, + uint64_t log_number) { + LogReporter* reporter = new LogReporter(); + status_ = new Status(); + reporter->env = env; + reporter->info_log = info_log.get(); + reporter->fname = std::move(fname); + reporter->status = status_; + reporter_ = reporter; + // We intentially make log::Reader do checksumming even if + // paranoid_checks==false so that corruptions cause entire commits + // to be skipped instead of propagating bad information (like overly + // large sequence numbers). + reader_ = new log::FragmentBufferedReader(info_log, std::move(file_reader), + reporter, true /*checksum*/, + log_number); + } + log::FragmentBufferedReader* reader_; + log::Reader::Reporter* reporter_; + Status* status_; + ~LogReaderContainer() { + delete reader_; + delete reporter_; + delete status_; + } + private: + struct LogReporter : public log::Reader::Reporter { + Env* env; + Logger* info_log; + std::string fname; + Status* status; // nullptr if immutable_db_options_.paranoid_checks==false + void Corruption(size_t bytes, const Status& s) override { + ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s", + (this->status == nullptr ? "(ignoring error) " : ""), + fname.c_str(), static_cast(bytes), + s.ToString().c_str()); + if (this->status != nullptr && this->status->ok()) { + *this->status = s; + } + } + }; +}; + class DBImplSecondary : public DBImpl { public: DBImplSecondary(const DBOptions& options, const std::string& dbname); @@ -133,6 +182,9 @@ class DBImplSecondary : public DBImpl { // method can take long time due to all the I/O and CPU costs. Status TryCatchUpWithPrimary() override; + Status MaybeInitLogReader(uint64_t log_number, + log::FragmentBufferedReader** log_reader); + private: friend class DB; @@ -142,10 +194,19 @@ class DBImplSecondary : public DBImpl { using DBImpl::Recover; + Status RecoverLogFiles(const std::vector& log_numbers, + SequenceNumber* next_sequence, + bool read_only) override; + std::unique_ptr manifest_reader_; std::unique_ptr manifest_reporter_; std::unique_ptr manifest_reader_status_; + + // cache log readers for each log number, used for continue WAL replay + // after recovery + std::map> log_readers_; }; + } // namespace rocksdb #endif // !ROCKSDB_LITE diff --git a/db/db_secondary_test.cc b/db/db_secondary_test.cc index 478a7cec9..47daf9fd8 100644 --- a/db/db_secondary_test.cc +++ b/db/db_secondary_test.cc @@ -195,6 +195,50 @@ TEST_F(DBSecondaryTest, OpenAsSecondary) { verify_db_func("new_foo_value", "new_bar_value"); } +TEST_F(DBSecondaryTest, OpenAsSecondaryWALTailing) { + Options options; + options.env = env_; + options.level0_file_num_compaction_trigger = 4; + Reopen(options); + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); + } + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + + ReadOptions ropts; + ropts.verify_checksums = true; + const auto verify_db_func = [&](const std::string& foo_val, + const std::string& bar_val) { + std::string value; + ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); + ASSERT_EQ(foo_val, value); + ASSERT_OK(db_secondary_->Get(ropts, "bar", &value)); + ASSERT_EQ(bar_val, value); + Iterator* iter = db_secondary_->NewIterator(ropts); + ASSERT_NE(nullptr, iter); + iter->Seek("foo"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("foo", iter->key().ToString()); + ASSERT_EQ(foo_val, iter->value().ToString()); + iter->Seek("bar"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bar", iter->key().ToString()); + ASSERT_EQ(bar_val, iter->value().ToString()); + size_t count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ++count; + } + ASSERT_EQ(2, count); + delete iter; + }; + + verify_db_func("foo_value2", "bar_value2"); +} + TEST_F(DBSecondaryTest, OpenWithNonExistColumnFamily) { Options options; options.env = env_; diff --git a/db/log_reader.h b/db/log_reader.h index 058382b8a..bda9ac8bb 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -89,6 +89,8 @@ class Reader { Reporter* GetReporter() const { return reporter_; } + uint64_t GetLogNumber() const { return log_number_; } + protected: std::shared_ptr info_log_; const std::unique_ptr file_;