diff --git a/db/db_impl.cc b/db/db_impl.cc index 928302411..c5ba6f59d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1158,42 +1158,44 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, bool batch_changed = false; WalFilter::WalProcessingOption wal_processing_option = - db_options_.wal_filter->LogRecord(batch, &new_batch, &batch_changed); + db_options_.wal_filter->LogRecord(batch, &new_batch, + &batch_changed); switch (wal_processing_option) { - case WalFilter::WalProcessingOption::kContinueProcessing: - //do nothing, proceeed normally - break; - case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: - //skip current record - continue; - case WalFilter::WalProcessingOption::kStopReplay: - //skip current record and stop replay - continue_replay_log = false; - continue; - case WalFilter::WalProcessingOption::kCorruptedRecord: { - status = Status::Corruption("Corruption reported by Wal Filter ", - db_options_.wal_filter->Name()); - MaybeIgnoreError(&status); - if (!status.ok()) { - reporter.Corruption(record.size(), status); + case WalFilter::WalProcessingOption::kContinueProcessing: + // do nothing, proceeed normally + break; + case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: + // skip current record continue; - } - break; - } - default: { - assert(false); //unhandled case - status = Status::NotSupported("Unknown WalProcessingOption returned" - " by Wal Filter ", db_options_.wal_filter->Name()); - MaybeIgnoreError(&status); - if (!status.ok()) { - return status; - } - else { - // Ignore the error with current record processing. + case WalFilter::WalProcessingOption::kStopReplay: + // skip current record and stop replay + continue_replay_log = false; continue; + case WalFilter::WalProcessingOption::kCorruptedRecord: { + status = Status::Corruption("Corruption reported by Wal Filter ", + db_options_.wal_filter->Name()); + MaybeIgnoreError(&status); + if (!status.ok()) { + reporter.Corruption(record.size(), status); + continue; + } + break; + } + default: { + assert(false); // unhandled case + status = Status::NotSupported( + "Unknown WalProcessingOption returned" + " by Wal Filter ", + db_options_.wal_filter->Name()); + MaybeIgnoreError(&status); + if (!status.ok()) { + return status; + } else { + // Ignore the error with current record processing. + continue; + } } - } } if (batch_changed) { @@ -1203,23 +1205,26 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, int original_count = WriteBatchInternal::Count(&batch); if (new_count > original_count) { Log(InfoLogLevel::FATAL_LEVEL, db_options_.info_log, - "Recovering log #%" PRIu64 " mode %d log filter %s returned " - "more records (%d) than original (%d) which is not allowed. " - "Aborting recovery.", - log_number, db_options_.wal_recovery_mode, - db_options_.wal_filter->Name(), new_count, original_count); - status = Status::NotSupported("More than original # of records " - "returned by Wal Filter ", db_options_.wal_filter->Name()); + "Recovering log #%" PRIu64 + " mode %d log filter %s returned " + "more records (%d) than original (%d) which is not allowed. " + "Aborting recovery.", + log_number, db_options_.wal_recovery_mode, + db_options_.wal_filter->Name(), new_count, original_count); + status = Status::NotSupported( + "More than original # of records " + "returned by Wal Filter ", + db_options_.wal_filter->Name()); return status; } // Set the same sequence number in the new_batch // as the original batch. - WriteBatchInternal::SetSequence(&new_batch, - WriteBatchInternal::Sequence(&batch)); + WriteBatchInternal::SetSequence(&new_batch, + WriteBatchInternal::Sequence(&batch)); batch = new_batch; } } -#endif //ROCKSDB_LITE +#endif // ROCKSDB_LITE // If column family was not found, it might mean that the WAL write // batch references to the column family that was dropped after the @@ -4161,9 +4166,9 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { LogFileName(db_options_.wal_dir, recycle_log_number), &lfile, opt_env_opt); } else { - s = NewWritableFile(env_, - LogFileName(db_options_.wal_dir, new_log_number), - &lfile, opt_env_opt); + s = NewWritableFile(env_, + LogFileName(db_options_.wal_dir, new_log_number), + &lfile, opt_env_opt); } if (s.ok()) { // Our final size should be less than write_buffer_size @@ -4172,9 +4177,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { mutable_cf_options.write_buffer_size); unique_ptr file_writer( new WritableFileWriter(std::move(lfile), opt_env_opt)); - new_log = new log::Writer(std::move(file_writer), - new_log_number, - db_options_.recycle_log_file_num > 0); + new_log = new log::Writer(std::move(file_writer), new_log_number, + db_options_.recycle_log_file_num > 0); } } @@ -4815,9 +4819,8 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, new WritableFileWriter(std::move(lfile), opt_env_options)); impl->logs_.emplace_back( new_log_number, - new log::Writer(std::move(file_writer), - new_log_number, - impl->db_options_.recycle_log_file_num > 0)); + new log::Writer(std::move(file_writer), new_log_number, + impl->db_options_.recycle_log_file_num > 0)); // set column family handles for (auto cf : column_families) { diff --git a/db/db_test.cc b/db/db_test.cc index 03c52ec5e..d9d27f346 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -5073,9 +5073,9 @@ class RecoveryTestHelper { ASSERT_OK(db_options.env->NewWritableFile(fname, &file, env_options)); unique_ptr file_writer( new WritableFileWriter(std::move(file), env_options)); - current_log_writer.reset(new log::Writer( - std::move(file_writer), current_log_number, - db_options.recycle_log_file_num > 0)); + current_log_writer.reset( + new log::Writer(std::move(file_writer), current_log_number, + db_options.recycle_log_file_num > 0)); for (int i = 0; i < kKeysPerWALFile; i++) { std::string key = "key" + ToString(count++); @@ -9891,36 +9891,33 @@ TEST_F(DBTest, PauseBackgroundWorkTest) { #ifndef ROCKSDB_LITE namespace { - void ValidateKeyExistence(DB* db, - const std::vector& keys_must_exist, - const std::vector& keys_must_not_exist) { - // Ensure that expected keys exist - std::vector values; - if (keys_must_exist.size() > 0) { - std::vector status_list = db->MultiGet(ReadOptions(), - keys_must_exist, - &values); - for (size_t i = 0; i < keys_must_exist.size(); i++) { - ASSERT_OK(status_list[i]); - } - } - - // Ensure that given keys don't exist - if (keys_must_not_exist.size() > 0) { - std::vector status_list = db->MultiGet(ReadOptions(), - keys_must_not_exist, - &values); - for (size_t i = 0; i < keys_must_not_exist.size(); i++) { - ASSERT_TRUE(status_list[i].IsNotFound()); - } +void ValidateKeyExistence(DB* db, const std::vector& keys_must_exist, + const std::vector& keys_must_not_exist) { + // Ensure that expected keys exist + std::vector values; + if (keys_must_exist.size() > 0) { + std::vector status_list = + db->MultiGet(ReadOptions(), keys_must_exist, &values); + for (size_t i = 0; i < keys_must_exist.size(); i++) { + ASSERT_OK(status_list[i]); } } -} //namespace + // Ensure that given keys don't exist + if (keys_must_not_exist.size() > 0) { + std::vector status_list = + db->MultiGet(ReadOptions(), keys_must_not_exist, &values); + for (size_t i = 0; i < keys_must_not_exist.size(); i++) { + ASSERT_TRUE(status_list[i].IsNotFound()); + } + } +} + +} // namespace TEST_F(DBTest, WalFilterTest) { class TestWalFilter : public WalFilter { - private: + private: // Processing option that is requested to be applied at the given index WalFilter::WalProcessingOption wal_processing_option_; // Index at which to apply wal_processing_option_ @@ -9929,21 +9926,22 @@ TEST_F(DBTest, WalFilterTest) { size_t apply_option_at_record_index_; // Current record index, incremented with each record encountered. size_t current_record_index_; - public: - TestWalFilter(WalFilter::WalProcessingOption wal_processing_option, - size_t apply_option_for_record_index) : - wal_processing_option_(wal_processing_option), - apply_option_at_record_index_(apply_option_for_record_index), - current_record_index_(0) { } - virtual WalProcessingOption LogRecord(const WriteBatch & batch, - WriteBatch* new_batch, bool* batch_changed) const override { + public: + TestWalFilter(WalFilter::WalProcessingOption wal_processing_option, + size_t apply_option_for_record_index) + : wal_processing_option_(wal_processing_option), + apply_option_at_record_index_(apply_option_for_record_index), + current_record_index_(0) {} + + virtual WalProcessingOption LogRecord(const WriteBatch& batch, + WriteBatch* new_batch, + bool* batch_changed) const override { WalFilter::WalProcessingOption option_to_return; if (current_record_index_ == apply_option_at_record_index_) { option_to_return = wal_processing_option_; - } - else { + } else { option_to_return = WalProcessingOption::kContinueProcessing; } @@ -9955,9 +9953,7 @@ TEST_F(DBTest, WalFilterTest) { return option_to_return; } - virtual const char* Name() const override { - return "TestWalFilter"; - } + virtual const char* Name() const override { return "TestWalFilter"; } }; // Create 3 batches with two keys each @@ -9971,12 +9967,13 @@ TEST_F(DBTest, WalFilterTest) { batch_keys[2].push_back("key6"); // Test with all WAL processing options - for (int option = 0; - option < static_cast(WalFilter::WalProcessingOption::kWalProcessingOptionMax); - option++) { + for (int option = 0; + option < static_cast( + WalFilter::WalProcessingOption::kWalProcessingOptionMax); + option++) { Options options = OptionsForLogIterTest(); DestroyAndReopen(options); - CreateAndReopenWithCF({ "pikachu" }, options); + CreateAndReopenWithCF({"pikachu"}, options); // Write given keys in given batches for (size_t i = 0; i < batch_keys.size(); i++) { @@ -9988,26 +9985,26 @@ TEST_F(DBTest, WalFilterTest) { } WalFilter::WalProcessingOption wal_processing_option = - static_cast(option); + static_cast(option); // Create a test filter that would apply wal_processing_option at the first // record size_t apply_option_for_record_index = 1; - TestWalFilter test_wal_filter(wal_processing_option, - apply_option_for_record_index); + TestWalFilter test_wal_filter(wal_processing_option, + apply_option_for_record_index); // Reopen database with option to use WAL filter options = OptionsForLogIterTest(); options.wal_filter = &test_wal_filter; - Status status = TryReopenWithColumnFamilies({ "default", "pikachu" }, - options); + Status status = + TryReopenWithColumnFamilies({"default", "pikachu"}, options); if (wal_processing_option == - WalFilter::WalProcessingOption::kCorruptedRecord) { + WalFilter::WalProcessingOption::kCorruptedRecord) { assert(!status.ok()); // In case of corruption we can turn off paranoid_checks to reopen // databse options.paranoid_checks = false; - ReopenWithColumnFamilies({ "default", "pikachu" }, options); + ReopenWithColumnFamilies({"default", "pikachu"}, options); } else { assert(status.ok()); } @@ -10017,10 +10014,10 @@ TEST_F(DBTest, WalFilterTest) { std::vector keys_must_exist; std::vector keys_must_not_exist; switch (wal_processing_option) { - case WalFilter::WalProcessingOption::kCorruptedRecord: - case WalFilter::WalProcessingOption::kContinueProcessing: { + case WalFilter::WalProcessingOption::kCorruptedRecord: + case WalFilter::WalProcessingOption::kContinueProcessing: { fprintf(stderr, "Testing with complete WAL processing\n"); - //we expect all records to be processed + // we expect all records to be processed for (size_t i = 0; i < batch_keys.size(); i++) { for (size_t j = 0; j < batch_keys[i].size(); j++) { keys_must_exist.push_back(Slice(batch_keys[i][j])); @@ -10029,16 +10026,16 @@ TEST_F(DBTest, WalFilterTest) { break; } case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: { - fprintf(stderr, "Testing with ignoring record %" ROCKSDB_PRIszt " only\n", - apply_option_for_record_index); + fprintf(stderr, + "Testing with ignoring record %" ROCKSDB_PRIszt " only\n", + apply_option_for_record_index); // We expect the record with apply_option_for_record_index to be not // found. for (size_t i = 0; i < batch_keys.size(); i++) { for (size_t j = 0; j < batch_keys[i].size(); j++) { if (i == apply_option_for_record_index) { keys_must_not_exist.push_back(Slice(batch_keys[i][j])); - } - else { + } else { keys_must_exist.push_back(Slice(batch_keys[i][j])); } } @@ -10046,16 +10043,17 @@ TEST_F(DBTest, WalFilterTest) { break; } case WalFilter::WalProcessingOption::kStopReplay: { - fprintf(stderr, "Testing with stopping replay from record %" ROCKSDB_PRIszt "\n", - apply_option_for_record_index); + fprintf(stderr, + "Testing with stopping replay from record %" ROCKSDB_PRIszt + "\n", + apply_option_for_record_index); // We expect records beyond apply_option_for_record_index to be not // found. for (size_t i = 0; i < batch_keys.size(); i++) { for (size_t j = 0; j < batch_keys[i].size(); j++) { if (i >= apply_option_for_record_index) { keys_must_not_exist.push_back(Slice(batch_keys[i][j])); - } - else { + } else { keys_must_exist.push_back(Slice(batch_keys[i][j])); } } @@ -10063,7 +10061,7 @@ TEST_F(DBTest, WalFilterTest) { break; } default: - assert(false); //unhandled case + assert(false); // unhandled case } bool checked_after_reopen = false; @@ -10077,11 +10075,11 @@ TEST_F(DBTest, WalFilterTest) { break; } - //reopen database again to make sure previous log(s) are not used + // reopen database again to make sure previous log(s) are not used //(even if they were skipped) - //reopn database with option to use WAL filter + // reopn database with option to use WAL filter options = OptionsForLogIterTest(); - ReopenWithColumnFamilies({ "default", "pikachu" }, options); + ReopenWithColumnFamilies({"default", "pikachu"}, options); checked_after_reopen = true; } @@ -10090,19 +10088,20 @@ TEST_F(DBTest, WalFilterTest) { TEST_F(DBTest, WalFilterTestWithChangeBatch) { class ChangeBatchHandler : public WriteBatch::Handler { - private: + private: // Batch to insert keys in WriteBatch* new_write_batch_; // Number of keys to add in the new batch size_t num_keys_to_add_in_new_batch_; // Number of keys added to new batch size_t num_keys_added_; - public: - ChangeBatchHandler(WriteBatch* new_write_batch, - size_t num_keys_to_add_in_new_batch) : - new_write_batch_(new_write_batch), - num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch), - num_keys_added_(0){ } + + public: + ChangeBatchHandler(WriteBatch* new_write_batch, + size_t num_keys_to_add_in_new_batch) + : new_write_batch_(new_write_batch), + num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch), + num_keys_added_(0) {} virtual void Put(const Slice& key, const Slice& value) override { if (num_keys_added_ < num_keys_to_add_in_new_batch_) { new_write_batch_->Put(key, value); @@ -10112,24 +10111,24 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) { }; class TestWalFilterWithChangeBatch : public WalFilter { - private: + private: // Index at which to start changing records size_t change_records_from_index_; // Number of keys to add in the new batch size_t num_keys_to_add_in_new_batch_; // Current record index, incremented with each record encountered. size_t current_record_index_; - public: - TestWalFilterWithChangeBatch( - size_t change_records_from_index, - size_t num_keys_to_add_in_new_batch) : - change_records_from_index_(change_records_from_index), - num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch), - current_record_index_(0) { } - virtual WalProcessingOption LogRecord(const WriteBatch & batch, - WriteBatch* new_batch, bool* batch_changed) const override { + public: + TestWalFilterWithChangeBatch(size_t change_records_from_index, + size_t num_keys_to_add_in_new_batch) + : change_records_from_index_(change_records_from_index), + num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch), + current_record_index_(0) {} + virtual WalProcessingOption LogRecord(const WriteBatch& batch, + WriteBatch* new_batch, + bool* batch_changed) const override { if (current_record_index_ >= change_records_from_index_) { ChangeBatchHandler handler(new_batch, num_keys_to_add_in_new_batch_); batch.Iterate(&handler); @@ -10139,7 +10138,8 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) { // Filter is passed as a const object for RocksDB to not modify the // object, however we modify it for our own purpose here and hence // cast the constness away. - (const_cast(this)->current_record_index_)++; + (const_cast(this) + ->current_record_index_)++; return WalProcessingOption::kContinueProcessing; } @@ -10160,7 +10160,7 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) { Options options = OptionsForLogIterTest(); DestroyAndReopen(options); - CreateAndReopenWithCF({ "pikachu" }, options); + CreateAndReopenWithCF({"pikachu"}, options); // Write given keys in given batches for (size_t i = 0; i < batch_keys.size(); i++) { @@ -10176,12 +10176,12 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) { size_t change_records_from_index = 1; size_t num_keys_to_add_in_new_batch = 1; TestWalFilterWithChangeBatch test_wal_filter_with_change_batch( - change_records_from_index, num_keys_to_add_in_new_batch); + change_records_from_index, num_keys_to_add_in_new_batch); // Reopen database with option to use WAL filter options = OptionsForLogIterTest(); options.wal_filter = &test_wal_filter_with_change_batch; - ReopenWithColumnFamilies({ "default", "pikachu" }, options); + ReopenWithColumnFamilies({"default", "pikachu"}, options); // Ensure that all keys exist before change_records_from_index_ // And after that index only single key exists @@ -10193,8 +10193,7 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) { for (size_t j = 0; j < batch_keys[i].size(); j++) { if (i >= change_records_from_index && j >= num_keys_to_add_in_new_batch) { keys_must_not_exist.push_back(Slice(batch_keys[i][j])); - } - else { + } else { keys_must_exist.push_back(Slice(batch_keys[i][j])); } } @@ -10211,11 +10210,11 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) { break; } - //reopen database again to make sure previous log(s) are not used + // reopen database again to make sure previous log(s) are not used //(even if they were skipped) - //reopn database with option to use WAL filter + // reopn database with option to use WAL filter options = OptionsForLogIterTest(); - ReopenWithColumnFamilies({ "default", "pikachu" }, options); + ReopenWithColumnFamilies({"default", "pikachu"}, options); checked_after_reopen = true; } @@ -10223,9 +10222,10 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) { TEST_F(DBTest, WalFilterTestWithChangeBatchExtraKeys) { class TestWalFilterWithChangeBatchAddExtraKeys : public WalFilter { - public: - virtual WalProcessingOption LogRecord(const WriteBatch & batch, - WriteBatch* new_batch, bool* batch_changed) const override { + public: + virtual WalProcessingOption LogRecord(const WriteBatch& batch, + WriteBatch* new_batch, + bool* batch_changed) const override { *new_batch = batch; new_batch->Put("key_extra", "value_extra"); *batch_changed = true; @@ -10248,7 +10248,7 @@ TEST_F(DBTest, WalFilterTestWithChangeBatchExtraKeys) { Options options = OptionsForLogIterTest(); DestroyAndReopen(options); - CreateAndReopenWithCF({ "pikachu" }, options); + CreateAndReopenWithCF({"pikachu"}, options); // Write given keys in given batches for (size_t i = 0; i < batch_keys.size(); i++) { @@ -10265,17 +10265,16 @@ TEST_F(DBTest, WalFilterTestWithChangeBatchExtraKeys) { // Reopen database with option to use WAL filter options = OptionsForLogIterTest(); options.wal_filter = &test_wal_filter_extra_keys; - Status status = - TryReopenWithColumnFamilies({ "default", "pikachu" }, options); + Status status = TryReopenWithColumnFamilies({"default", "pikachu"}, options); ASSERT_TRUE(status.IsNotSupported()); // Reopen without filter, now reopen should succeed - previous // attempt to open must not have altered the db. options = OptionsForLogIterTest(); - ReopenWithColumnFamilies({ "default", "pikachu" }, options); + ReopenWithColumnFamilies({"default", "pikachu"}, options); std::vector keys_must_exist; - std::vector keys_must_not_exist; //empty vector + std::vector keys_must_not_exist; // empty vector for (size_t i = 0; i < batch_keys.size(); i++) { for (size_t j = 0; j < batch_keys[i].size(); j++) { @@ -10286,7 +10285,7 @@ TEST_F(DBTest, WalFilterTestWithChangeBatchExtraKeys) { ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist); } -#endif // ROCKSDB_LITE +#endif // ROCKSDB_LITE #ifndef ROCKSDB_LITE class BloomStatsTestWithParam diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 24c372a54..cb194cbaa 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -503,7 +503,8 @@ class DB { return CompactRange(options, DefaultColumnFamily(), begin, end); } - virtual Status SetOptions(ColumnFamilyHandle* /*column_family*/, + virtual Status SetOptions( + ColumnFamilyHandle* /*column_family*/, const std::unordered_map& /*new_options*/) { return Status::NotSupported("Not implemented"); } @@ -655,7 +656,7 @@ class DB { // Returns a list of all table files with their level, start key // and end key virtual void GetLiveFilesMetaData( - std::vector* /*metadata*/) {} + std::vector* /*metadata*/) {} // Obtains the meta data of the specified column family of the DB. // Status::NotFound() will be returned if the current DB does not have @@ -663,9 +664,8 @@ class DB { // // If cf_name is not specified, then the metadata of the default // column family will be returned. - virtual void GetColumnFamilyMetaData( - ColumnFamilyHandle* /*column_family*/, - ColumnFamilyMetaData* /*metadata*/) {} + virtual void GetColumnFamilyMetaData(ColumnFamilyHandle* /*column_family*/, + ColumnFamilyMetaData* /*metadata*/) {} // Get the metadata of the default column family. void GetColumnFamilyMetaData( diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 7290d4b1d..9e20a15aa 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -416,8 +416,7 @@ class RandomAccessFile { // For cases when read-ahead is implemented in the platform dependent // layer - virtual void EnableReadAhead() { - } + virtual void EnableReadAhead() {} // Tries to get an unique ID for this file that will be the same each time // the file is opened (and will stay the same while the file is open). diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 69bfd62ae..1d288065c 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -151,7 +151,7 @@ class EventListener { // it should not run for an extended period of time before the function // returns. Otherwise, RocksDB may be blocked. virtual void OnFlushCompleted(DB* /*db*/, - const FlushJobInfo& /*flush_job_info*/) {} + const FlushJobInfo& /*flush_job_info*/) {} // A call-back function for RocksDB which will be called whenever // a SST file is deleted. Different from OnCompactionCompleted and @@ -180,7 +180,7 @@ class EventListener { // after this function is returned, and must be copied if it is needed // outside of this function. virtual void OnCompactionCompleted(DB* /*db*/, - const CompactionJobInfo& /*ci*/) {} + const CompactionJobInfo& /*ci*/) {} // A call-back function for RocksDB which will be called whenever // a SST file is created. Different from OnCompactionCompleted and diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index c2feafc0a..eb0bc679f 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1163,7 +1163,7 @@ struct DBOptions { // The filter is invoked at startup and is invoked from a single-thread // currently. const WalFilter* wal_filter; -#endif //ROCKSDB_LITE +#endif // ROCKSDB_LITE }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/include/rocksdb/wal_filter.h b/include/rocksdb/wal_filter.h index 81ffa3998..226d6971c 100644 --- a/include/rocksdb/wal_filter.h +++ b/include/rocksdb/wal_filter.h @@ -13,7 +13,7 @@ class WriteBatch; // records or modify their processing on recovery. // Please see the details below. class WalFilter { -public: + public: enum class WalProcessingOption { // Continue processing as usual kContinueProcessing = 0, @@ -28,12 +28,12 @@ public: kWalProcessingOptionMax = 4 }; - virtual ~WalFilter() { }; + virtual ~WalFilter() {} // LogRecord is invoked for each log record encountered for all the logs // during replay on logs on recovery. This method can be used to: // * inspect the record (using the batch parameter) - // * ignoring current record + // * ignoring current record // (by returning WalProcessingOption::kIgnoreCurrentRecord) // * reporting corrupted record // (by returning WalProcessingOption::kCorruptedRecord) @@ -55,7 +55,8 @@ public: // Please see WalProcessingOption enum above for // details. virtual WalProcessingOption LogRecord(const WriteBatch& batch, - WriteBatch* new_batch, bool* batch_changed) const = 0; + WriteBatch* new_batch, + bool* batch_changed) const = 0; // Returns a name that identifies this WAL filter. // The name will be printed to LOG file on start up for diagnosis. diff --git a/port/win/env_win.cc b/port/win/env_win.cc index b67298150..95796554f 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -688,39 +688,44 @@ class WinRandomAccessFile : public RandomAccessFile { const std::string filename_; HANDLE hFile_; const bool use_os_buffer_; - bool read_ahead_; - const size_t compaction_readahead_size_; - const size_t random_access_max_buffer_size_; + bool read_ahead_; + const size_t compaction_readahead_size_; + const size_t random_access_max_buffer_size_; mutable std::mutex buffer_mut_; mutable AlignedBuffer buffer_; mutable uint64_t buffered_start_; // file offset set that is currently buffered /* - * The function reads a requested amount of bytes into the specified aligned buffer - * Upon success the function sets the length of the buffer to the amount of bytes actually - * read even though it might be less than actually requested. - * It then copies the amount of bytes requested by the user (left) to the user supplied - * buffer (dest) and reduces left by the amount of bytes copied to the user buffer + * The function reads a requested amount of bytes into the specified aligned + * buffer Upon success the function sets the length of the buffer to the + * amount of bytes actually read even though it might be less than actually + * requested. It then copies the amount of bytes requested by the user (left) + * to the user supplied buffer (dest) and reduces left by the amount of bytes + * copied to the user buffer * * @user_offset [in] - offset on disk where the read was requested by the user - * @first_page_start [in] - actual page aligned disk offset that we want to read from - * @bytes_to_read [in] - total amount of bytes that will be read from disk which is generally - * greater or equal to the amount that the user has requested due to the - * either alignment requirements or read_ahead in effect. - * @left [in/out] total amount of bytes that needs to be copied to the user buffer. It is reduced - * by the amount of bytes that actually copied + * @first_page_start [in] - actual page aligned disk offset that we want to + * read from + * @bytes_to_read [in] - total amount of bytes that will be read from disk + * which is generally greater or equal to the amount + * that the user has requested due to the + * either alignment requirements or read_ahead in + * effect. + * @left [in/out] total amount of bytes that needs to be copied to the user + * buffer. It is reduced by the amount of bytes that actually + * copied * @buffer - buffer to use * @dest - user supplied buffer */ SSIZE_T ReadIntoBuffer(uint64_t user_offset, uint64_t first_page_start, - size_t bytes_to_read, size_t& left, AlignedBuffer& buffer, char* dest) const { - + size_t bytes_to_read, size_t& left, + AlignedBuffer& buffer, char* dest) const { assert(buffer.CurrentSize() == 0); assert(buffer.Capacity() >= bytes_to_read); - SSIZE_T read = pread(hFile_, buffer.Destination(), bytes_to_read, - first_page_start); + SSIZE_T read = + pread(hFile_, buffer.Destination(), bytes_to_read, first_page_start); if (read > 0) { buffer.Size(read); @@ -739,21 +744,22 @@ class WinRandomAccessFile : public RandomAccessFile { } SSIZE_T ReadIntoOneShotBuffer(uint64_t user_offset, uint64_t first_page_start, - size_t bytes_to_read, size_t& left, char* dest) const { - + size_t bytes_to_read, size_t& left, + char* dest) const { AlignedBuffer bigBuffer; bigBuffer.Alignment(buffer_.Alignment()); bigBuffer.AllocateNewBuffer(bytes_to_read); return ReadIntoBuffer(user_offset, first_page_start, bytes_to_read, left, - bigBuffer, dest); + bigBuffer, dest); } - SSIZE_T ReadIntoInstanceBuffer(uint64_t user_offset, uint64_t first_page_start, - size_t bytes_to_read, size_t& left, char* dest) const { - + SSIZE_T ReadIntoInstanceBuffer(uint64_t user_offset, + uint64_t first_page_start, + size_t bytes_to_read, size_t& left, + char* dest) const { SSIZE_T read = ReadIntoBuffer(user_offset, first_page_start, bytes_to_read, - left, buffer_, dest); + left, buffer_, dest); if (read > 0) { buffered_start_ = first_page_start; @@ -789,9 +795,7 @@ class WinRandomAccessFile : public RandomAccessFile { } } - virtual void EnableReadAhead() override { - this->Hint(SEQUENTIAL); - } + virtual void EnableReadAhead() override { this->Hint(SEQUENTIAL); } virtual Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const override { @@ -824,7 +828,7 @@ class WinRandomAccessFile : public RandomAccessFile { // Figure out the start/end offset for reading and amount to read const size_t alignment = buffer_.Alignment(); const size_t first_page_start = - TruncateToPageBoundary(alignment, offset); + TruncateToPageBoundary(alignment, offset); size_t bytes_requested = left; if (read_ahead_ && bytes_requested < compaction_readahead_size_) { @@ -832,29 +836,29 @@ class WinRandomAccessFile : public RandomAccessFile { } const size_t last_page_start = - TruncateToPageBoundary(alignment, offset + bytes_requested - 1); + TruncateToPageBoundary(alignment, offset + bytes_requested - 1); const size_t actual_bytes_toread = - (last_page_start - first_page_start) + alignment; + (last_page_start - first_page_start) + alignment; if (buffer_.Capacity() < actual_bytes_toread) { // If we are in read-ahead mode or the requested size // exceeds max buffer size then use one-shot // big buffer otherwise reallocate main buffer if (read_ahead_ || - (actual_bytes_toread > random_access_max_buffer_size_)) { + (actual_bytes_toread > random_access_max_buffer_size_)) { // Unlock the mutex since we are not using instance buffer lock.unlock(); r = ReadIntoOneShotBuffer(offset, first_page_start, - actual_bytes_toread, left, dest); + actual_bytes_toread, left, dest); } else { buffer_.AllocateNewBuffer(actual_bytes_toread); r = ReadIntoInstanceBuffer(offset, first_page_start, - actual_bytes_toread, left, dest); + actual_bytes_toread, left, dest); } } else { buffer_.Clear(); r = ReadIntoInstanceBuffer(offset, first_page_start, - actual_bytes_toread, left, dest); + actual_bytes_toread, left, dest); } } } else { @@ -877,9 +881,7 @@ class WinRandomAccessFile : public RandomAccessFile { } virtual void Hint(AccessPattern pattern) override { - - if (pattern == SEQUENTIAL && - !use_os_buffer_ && + if (pattern == SEQUENTIAL && !use_os_buffer_ && compaction_readahead_size_ > 0) { std::lock_guard lg(buffer_mut_); if (!read_ahead_) { @@ -888,12 +890,12 @@ class WinRandomAccessFile : public RandomAccessFile { // - one for memory alignment which added implicitly by AlignedBuffer // - We add one more alignment because we will read one alignment more // from disk - buffer_.AllocateNewBuffer(compaction_readahead_size_ + buffer_.Alignment()); + buffer_.AllocateNewBuffer(compaction_readahead_size_ + + buffer_.Alignment()); } } } - virtual Status InvalidateCache(size_t offset, size_t length) override { return Status::OK(); } diff --git a/util/env.cc b/util/env.cc index 2c2339eaf..f6cc40893 100644 --- a/util/env.cc +++ b/util/env.cc @@ -293,7 +293,8 @@ void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) { env_options->set_fd_cloexec = options.is_fd_close_on_exec; env_options->bytes_per_sync = options.bytes_per_sync; env_options->compaction_readahead_size = options.compaction_readahead_size; - env_options->random_access_max_buffer_size = options.random_access_max_buffer_size; + env_options->random_access_max_buffer_size = + options.random_access_max_buffer_size; env_options->rate_limiter = options.rate_limiter.get(); env_options->allow_fallocate = options.allow_fallocate; } diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index d24046f01..050473bd2 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -381,20 +381,20 @@ Status WritableFileWriter::WriteUnbuffered() { namespace { class ReadaheadRandomAccessFile : public RandomAccessFile { public: - ReadaheadRandomAccessFile(std::unique_ptr&& file, - size_t readahead_size) - : file_(std::move(file)), - readahead_size_(readahead_size), - forward_calls_(file_->ShouldForwardRawRequest()), - buffer_(), - buffer_offset_(0), - buffer_len_(0) { - if (!forward_calls_) { - buffer_.reset(new char[readahead_size_]); - } else if (readahead_size_ > 0) { - file_->EnableReadAhead(); - } - } + ReadaheadRandomAccessFile(std::unique_ptr&& file, + size_t readahead_size) + : file_(std::move(file)), + readahead_size_(readahead_size), + forward_calls_(file_->ShouldForwardRawRequest()), + buffer_(), + buffer_offset_(0), + buffer_len_(0) { + if (!forward_calls_) { + buffer_.reset(new char[readahead_size_]); + } else if (readahead_size_ > 0) { + file_->EnableReadAhead(); + } + } ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete; diff --git a/util/options.cc b/util/options.cc index 8b595512f..126aa2121 100644 --- a/util/options.cc +++ b/util/options.cc @@ -260,9 +260,10 @@ DBOptions::DBOptions() skip_stats_update_on_db_open(false), wal_recovery_mode(WALRecoveryMode::kTolerateCorruptedTailRecords) #ifndef ROCKSDB_LITE - , wal_filter(nullptr) -#endif // ROCKSDB_LITE - { + , + wal_filter(nullptr) +#endif // ROCKSDB_LITE +{ } DBOptions::DBOptions(const Options& options) @@ -322,9 +323,10 @@ DBOptions::DBOptions(const Options& options) wal_recovery_mode(options.wal_recovery_mode), row_cache(options.row_cache) #ifndef ROCKSDB_LITE - , wal_filter(options.wal_filter) -#endif // ROCKSDB_LITE - { + , + wal_filter(options.wal_filter) +#endif // ROCKSDB_LITE +{ } static const char* const access_hints[] = { @@ -405,7 +407,8 @@ void DBOptions::Dump(Logger* log) const { " Options.compaction_readahead_size: %" ROCKSDB_PRIszt "d", compaction_readahead_size); - Header(log, + Header( + log, " Options.random_access_max_buffer_size: %" ROCKSDB_PRIszt "d", random_access_max_buffer_size); @@ -431,8 +434,8 @@ void DBOptions::Dump(Logger* log) const { } #ifndef ROCKSDB_LITE Header(log, " Options.wal_filter: %s", - wal_filter ? wal_filter->Name() : "None"); -#endif // ROCKDB_LITE + wal_filter ? wal_filter->Name() : "None"); +#endif // ROCKDB_LITE } // DBOptions::Dump void ColumnFamilyOptions::Dump(Logger* log) const { diff --git a/util/options_helper.h b/util/options_helper.h index 052bafd25..6df914241 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -181,8 +181,8 @@ static std::unordered_map db_options_type_info = { {offsetof(struct DBOptions, compaction_readahead_size), OptionType::kSizeT, OptionVerificationType::kNormal}}, {"random_access_max_buffer_size", - { offsetof(struct DBOptions, random_access_max_buffer_size), OptionType::kSizeT, - OptionVerificationType::kNormal}}, + {offsetof(struct DBOptions, random_access_max_buffer_size), + OptionType::kSizeT, OptionVerificationType::kNormal}}, {"use_adaptive_mutex", {offsetof(struct DBOptions, use_adaptive_mutex), OptionType::kBoolean, OptionVerificationType::kNormal}}, diff --git a/util/options_test.cc b/util/options_test.cc index 542151adb..0f2bbf8b3 100644 --- a/util/options_test.cc +++ b/util/options_test.cc @@ -339,7 +339,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"use_adaptive_mutex", "false"}, {"new_table_reader_for_compaction_inputs", "true"}, {"compaction_readahead_size", "100"}, - {"random_access_max_buffer_size", "3145728" }, + {"random_access_max_buffer_size", "3145728"}, {"bytes_per_sync", "47"}, {"wal_bytes_per_sync", "48"}, };