"make format" in some recent commits
Summary: Run "make format" for some recent commits. Test Plan: Build and run tests Reviewers: IslamAbdelRahman Reviewed By: IslamAbdelRahman Subscribers: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D49707
This commit is contained in:
parent
6388e7f4e2
commit
296c3a1f94
103
db/db_impl.cc
103
db/db_impl.cc
@ -1158,42 +1158,44 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
|
|||||||
bool batch_changed = false;
|
bool batch_changed = false;
|
||||||
|
|
||||||
WalFilter::WalProcessingOption wal_processing_option =
|
WalFilter::WalProcessingOption wal_processing_option =
|
||||||
db_options_.wal_filter->LogRecord(batch, &new_batch, &batch_changed);
|
db_options_.wal_filter->LogRecord(batch, &new_batch,
|
||||||
|
&batch_changed);
|
||||||
|
|
||||||
switch (wal_processing_option) {
|
switch (wal_processing_option) {
|
||||||
case WalFilter::WalProcessingOption::kContinueProcessing:
|
case WalFilter::WalProcessingOption::kContinueProcessing:
|
||||||
//do nothing, proceeed normally
|
// do nothing, proceeed normally
|
||||||
break;
|
break;
|
||||||
case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
|
case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
|
||||||
//skip current record
|
// skip current record
|
||||||
continue;
|
|
||||||
case WalFilter::WalProcessingOption::kStopReplay:
|
|
||||||
//skip current record and stop replay
|
|
||||||
continue_replay_log = false;
|
|
||||||
continue;
|
|
||||||
case WalFilter::WalProcessingOption::kCorruptedRecord: {
|
|
||||||
status = Status::Corruption("Corruption reported by Wal Filter ",
|
|
||||||
db_options_.wal_filter->Name());
|
|
||||||
MaybeIgnoreError(&status);
|
|
||||||
if (!status.ok()) {
|
|
||||||
reporter.Corruption(record.size(), status);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
case WalFilter::WalProcessingOption::kStopReplay:
|
||||||
break;
|
// skip current record and stop replay
|
||||||
}
|
continue_replay_log = false;
|
||||||
default: {
|
|
||||||
assert(false); //unhandled case
|
|
||||||
status = Status::NotSupported("Unknown WalProcessingOption returned"
|
|
||||||
" by Wal Filter ", db_options_.wal_filter->Name());
|
|
||||||
MaybeIgnoreError(&status);
|
|
||||||
if (!status.ok()) {
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
// Ignore the error with current record processing.
|
|
||||||
continue;
|
continue;
|
||||||
|
case WalFilter::WalProcessingOption::kCorruptedRecord: {
|
||||||
|
status = Status::Corruption("Corruption reported by Wal Filter ",
|
||||||
|
db_options_.wal_filter->Name());
|
||||||
|
MaybeIgnoreError(&status);
|
||||||
|
if (!status.ok()) {
|
||||||
|
reporter.Corruption(record.size(), status);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default: {
|
||||||
|
assert(false); // unhandled case
|
||||||
|
status = Status::NotSupported(
|
||||||
|
"Unknown WalProcessingOption returned"
|
||||||
|
" by Wal Filter ",
|
||||||
|
db_options_.wal_filter->Name());
|
||||||
|
MaybeIgnoreError(&status);
|
||||||
|
if (!status.ok()) {
|
||||||
|
return status;
|
||||||
|
} else {
|
||||||
|
// Ignore the error with current record processing.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (batch_changed) {
|
if (batch_changed) {
|
||||||
@ -1203,23 +1205,26 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
|
|||||||
int original_count = WriteBatchInternal::Count(&batch);
|
int original_count = WriteBatchInternal::Count(&batch);
|
||||||
if (new_count > original_count) {
|
if (new_count > original_count) {
|
||||||
Log(InfoLogLevel::FATAL_LEVEL, db_options_.info_log,
|
Log(InfoLogLevel::FATAL_LEVEL, db_options_.info_log,
|
||||||
"Recovering log #%" PRIu64 " mode %d log filter %s returned "
|
"Recovering log #%" PRIu64
|
||||||
"more records (%d) than original (%d) which is not allowed. "
|
" mode %d log filter %s returned "
|
||||||
"Aborting recovery.",
|
"more records (%d) than original (%d) which is not allowed. "
|
||||||
log_number, db_options_.wal_recovery_mode,
|
"Aborting recovery.",
|
||||||
db_options_.wal_filter->Name(), new_count, original_count);
|
log_number, db_options_.wal_recovery_mode,
|
||||||
status = Status::NotSupported("More than original # of records "
|
db_options_.wal_filter->Name(), new_count, original_count);
|
||||||
"returned by Wal Filter ", db_options_.wal_filter->Name());
|
status = Status::NotSupported(
|
||||||
|
"More than original # of records "
|
||||||
|
"returned by Wal Filter ",
|
||||||
|
db_options_.wal_filter->Name());
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
// Set the same sequence number in the new_batch
|
// Set the same sequence number in the new_batch
|
||||||
// as the original batch.
|
// as the original batch.
|
||||||
WriteBatchInternal::SetSequence(&new_batch,
|
WriteBatchInternal::SetSequence(&new_batch,
|
||||||
WriteBatchInternal::Sequence(&batch));
|
WriteBatchInternal::Sequence(&batch));
|
||||||
batch = new_batch;
|
batch = new_batch;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif //ROCKSDB_LITE
|
#endif // ROCKSDB_LITE
|
||||||
|
|
||||||
// If column family was not found, it might mean that the WAL write
|
// If column family was not found, it might mean that the WAL write
|
||||||
// batch references to the column family that was dropped after the
|
// batch references to the column family that was dropped after the
|
||||||
@ -4161,9 +4166,9 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
|
|||||||
LogFileName(db_options_.wal_dir, recycle_log_number), &lfile,
|
LogFileName(db_options_.wal_dir, recycle_log_number), &lfile,
|
||||||
opt_env_opt);
|
opt_env_opt);
|
||||||
} else {
|
} else {
|
||||||
s = NewWritableFile(env_,
|
s = NewWritableFile(env_,
|
||||||
LogFileName(db_options_.wal_dir, new_log_number),
|
LogFileName(db_options_.wal_dir, new_log_number),
|
||||||
&lfile, opt_env_opt);
|
&lfile, opt_env_opt);
|
||||||
}
|
}
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
// Our final size should be less than write_buffer_size
|
// Our final size should be less than write_buffer_size
|
||||||
@ -4172,9 +4177,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
|
|||||||
mutable_cf_options.write_buffer_size);
|
mutable_cf_options.write_buffer_size);
|
||||||
unique_ptr<WritableFileWriter> file_writer(
|
unique_ptr<WritableFileWriter> file_writer(
|
||||||
new WritableFileWriter(std::move(lfile), opt_env_opt));
|
new WritableFileWriter(std::move(lfile), opt_env_opt));
|
||||||
new_log = new log::Writer(std::move(file_writer),
|
new_log = new log::Writer(std::move(file_writer), new_log_number,
|
||||||
new_log_number,
|
db_options_.recycle_log_file_num > 0);
|
||||||
db_options_.recycle_log_file_num > 0);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4815,9 +4819,8 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
|
|||||||
new WritableFileWriter(std::move(lfile), opt_env_options));
|
new WritableFileWriter(std::move(lfile), opt_env_options));
|
||||||
impl->logs_.emplace_back(
|
impl->logs_.emplace_back(
|
||||||
new_log_number,
|
new_log_number,
|
||||||
new log::Writer(std::move(file_writer),
|
new log::Writer(std::move(file_writer), new_log_number,
|
||||||
new_log_number,
|
impl->db_options_.recycle_log_file_num > 0));
|
||||||
impl->db_options_.recycle_log_file_num > 0));
|
|
||||||
|
|
||||||
// set column family handles
|
// set column family handles
|
||||||
for (auto cf : column_families) {
|
for (auto cf : column_families) {
|
||||||
|
201
db/db_test.cc
201
db/db_test.cc
@ -5073,9 +5073,9 @@ class RecoveryTestHelper {
|
|||||||
ASSERT_OK(db_options.env->NewWritableFile(fname, &file, env_options));
|
ASSERT_OK(db_options.env->NewWritableFile(fname, &file, env_options));
|
||||||
unique_ptr<WritableFileWriter> file_writer(
|
unique_ptr<WritableFileWriter> file_writer(
|
||||||
new WritableFileWriter(std::move(file), env_options));
|
new WritableFileWriter(std::move(file), env_options));
|
||||||
current_log_writer.reset(new log::Writer(
|
current_log_writer.reset(
|
||||||
std::move(file_writer), current_log_number,
|
new log::Writer(std::move(file_writer), current_log_number,
|
||||||
db_options.recycle_log_file_num > 0));
|
db_options.recycle_log_file_num > 0));
|
||||||
|
|
||||||
for (int i = 0; i < kKeysPerWALFile; i++) {
|
for (int i = 0; i < kKeysPerWALFile; i++) {
|
||||||
std::string key = "key" + ToString(count++);
|
std::string key = "key" + ToString(count++);
|
||||||
@ -9891,36 +9891,33 @@ TEST_F(DBTest, PauseBackgroundWorkTest) {
|
|||||||
|
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
namespace {
|
namespace {
|
||||||
void ValidateKeyExistence(DB* db,
|
void ValidateKeyExistence(DB* db, const std::vector<Slice>& keys_must_exist,
|
||||||
const std::vector<Slice>& keys_must_exist,
|
const std::vector<Slice>& keys_must_not_exist) {
|
||||||
const std::vector<Slice>& keys_must_not_exist) {
|
// Ensure that expected keys exist
|
||||||
// Ensure that expected keys exist
|
std::vector<std::string> values;
|
||||||
std::vector<std::string> values;
|
if (keys_must_exist.size() > 0) {
|
||||||
if (keys_must_exist.size() > 0) {
|
std::vector<Status> status_list =
|
||||||
std::vector<Status> status_list = db->MultiGet(ReadOptions(),
|
db->MultiGet(ReadOptions(), keys_must_exist, &values);
|
||||||
keys_must_exist,
|
for (size_t i = 0; i < keys_must_exist.size(); i++) {
|
||||||
&values);
|
ASSERT_OK(status_list[i]);
|
||||||
for (size_t i = 0; i < keys_must_exist.size(); i++) {
|
|
||||||
ASSERT_OK(status_list[i]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure that given keys don't exist
|
|
||||||
if (keys_must_not_exist.size() > 0) {
|
|
||||||
std::vector<Status> status_list = db->MultiGet(ReadOptions(),
|
|
||||||
keys_must_not_exist,
|
|
||||||
&values);
|
|
||||||
for (size_t i = 0; i < keys_must_not_exist.size(); i++) {
|
|
||||||
ASSERT_TRUE(status_list[i].IsNotFound());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} //namespace
|
// Ensure that given keys don't exist
|
||||||
|
if (keys_must_not_exist.size() > 0) {
|
||||||
|
std::vector<Status> status_list =
|
||||||
|
db->MultiGet(ReadOptions(), keys_must_not_exist, &values);
|
||||||
|
for (size_t i = 0; i < keys_must_not_exist.size(); i++) {
|
||||||
|
ASSERT_TRUE(status_list[i].IsNotFound());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
TEST_F(DBTest, WalFilterTest) {
|
TEST_F(DBTest, WalFilterTest) {
|
||||||
class TestWalFilter : public WalFilter {
|
class TestWalFilter : public WalFilter {
|
||||||
private:
|
private:
|
||||||
// Processing option that is requested to be applied at the given index
|
// Processing option that is requested to be applied at the given index
|
||||||
WalFilter::WalProcessingOption wal_processing_option_;
|
WalFilter::WalProcessingOption wal_processing_option_;
|
||||||
// Index at which to apply wal_processing_option_
|
// Index at which to apply wal_processing_option_
|
||||||
@ -9929,21 +9926,22 @@ TEST_F(DBTest, WalFilterTest) {
|
|||||||
size_t apply_option_at_record_index_;
|
size_t apply_option_at_record_index_;
|
||||||
// Current record index, incremented with each record encountered.
|
// Current record index, incremented with each record encountered.
|
||||||
size_t current_record_index_;
|
size_t current_record_index_;
|
||||||
public:
|
|
||||||
TestWalFilter(WalFilter::WalProcessingOption wal_processing_option,
|
|
||||||
size_t apply_option_for_record_index) :
|
|
||||||
wal_processing_option_(wal_processing_option),
|
|
||||||
apply_option_at_record_index_(apply_option_for_record_index),
|
|
||||||
current_record_index_(0) { }
|
|
||||||
|
|
||||||
virtual WalProcessingOption LogRecord(const WriteBatch & batch,
|
public:
|
||||||
WriteBatch* new_batch, bool* batch_changed) const override {
|
TestWalFilter(WalFilter::WalProcessingOption wal_processing_option,
|
||||||
|
size_t apply_option_for_record_index)
|
||||||
|
: wal_processing_option_(wal_processing_option),
|
||||||
|
apply_option_at_record_index_(apply_option_for_record_index),
|
||||||
|
current_record_index_(0) {}
|
||||||
|
|
||||||
|
virtual WalProcessingOption LogRecord(const WriteBatch& batch,
|
||||||
|
WriteBatch* new_batch,
|
||||||
|
bool* batch_changed) const override {
|
||||||
WalFilter::WalProcessingOption option_to_return;
|
WalFilter::WalProcessingOption option_to_return;
|
||||||
|
|
||||||
if (current_record_index_ == apply_option_at_record_index_) {
|
if (current_record_index_ == apply_option_at_record_index_) {
|
||||||
option_to_return = wal_processing_option_;
|
option_to_return = wal_processing_option_;
|
||||||
}
|
} else {
|
||||||
else {
|
|
||||||
option_to_return = WalProcessingOption::kContinueProcessing;
|
option_to_return = WalProcessingOption::kContinueProcessing;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -9955,9 +9953,7 @@ TEST_F(DBTest, WalFilterTest) {
|
|||||||
return option_to_return;
|
return option_to_return;
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual const char* Name() const override {
|
virtual const char* Name() const override { return "TestWalFilter"; }
|
||||||
return "TestWalFilter";
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Create 3 batches with two keys each
|
// Create 3 batches with two keys each
|
||||||
@ -9971,12 +9967,13 @@ TEST_F(DBTest, WalFilterTest) {
|
|||||||
batch_keys[2].push_back("key6");
|
batch_keys[2].push_back("key6");
|
||||||
|
|
||||||
// Test with all WAL processing options
|
// Test with all WAL processing options
|
||||||
for (int option = 0;
|
for (int option = 0;
|
||||||
option < static_cast<int>(WalFilter::WalProcessingOption::kWalProcessingOptionMax);
|
option < static_cast<int>(
|
||||||
option++) {
|
WalFilter::WalProcessingOption::kWalProcessingOptionMax);
|
||||||
|
option++) {
|
||||||
Options options = OptionsForLogIterTest();
|
Options options = OptionsForLogIterTest();
|
||||||
DestroyAndReopen(options);
|
DestroyAndReopen(options);
|
||||||
CreateAndReopenWithCF({ "pikachu" }, options);
|
CreateAndReopenWithCF({"pikachu"}, options);
|
||||||
|
|
||||||
// Write given keys in given batches
|
// Write given keys in given batches
|
||||||
for (size_t i = 0; i < batch_keys.size(); i++) {
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
||||||
@ -9988,26 +9985,26 @@ TEST_F(DBTest, WalFilterTest) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
WalFilter::WalProcessingOption wal_processing_option =
|
WalFilter::WalProcessingOption wal_processing_option =
|
||||||
static_cast<WalFilter::WalProcessingOption>(option);
|
static_cast<WalFilter::WalProcessingOption>(option);
|
||||||
|
|
||||||
// Create a test filter that would apply wal_processing_option at the first
|
// Create a test filter that would apply wal_processing_option at the first
|
||||||
// record
|
// record
|
||||||
size_t apply_option_for_record_index = 1;
|
size_t apply_option_for_record_index = 1;
|
||||||
TestWalFilter test_wal_filter(wal_processing_option,
|
TestWalFilter test_wal_filter(wal_processing_option,
|
||||||
apply_option_for_record_index);
|
apply_option_for_record_index);
|
||||||
|
|
||||||
// Reopen database with option to use WAL filter
|
// Reopen database with option to use WAL filter
|
||||||
options = OptionsForLogIterTest();
|
options = OptionsForLogIterTest();
|
||||||
options.wal_filter = &test_wal_filter;
|
options.wal_filter = &test_wal_filter;
|
||||||
Status status = TryReopenWithColumnFamilies({ "default", "pikachu" },
|
Status status =
|
||||||
options);
|
TryReopenWithColumnFamilies({"default", "pikachu"}, options);
|
||||||
if (wal_processing_option ==
|
if (wal_processing_option ==
|
||||||
WalFilter::WalProcessingOption::kCorruptedRecord) {
|
WalFilter::WalProcessingOption::kCorruptedRecord) {
|
||||||
assert(!status.ok());
|
assert(!status.ok());
|
||||||
// In case of corruption we can turn off paranoid_checks to reopen
|
// In case of corruption we can turn off paranoid_checks to reopen
|
||||||
// databse
|
// databse
|
||||||
options.paranoid_checks = false;
|
options.paranoid_checks = false;
|
||||||
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
ReopenWithColumnFamilies({"default", "pikachu"}, options);
|
||||||
} else {
|
} else {
|
||||||
assert(status.ok());
|
assert(status.ok());
|
||||||
}
|
}
|
||||||
@ -10017,10 +10014,10 @@ TEST_F(DBTest, WalFilterTest) {
|
|||||||
std::vector<Slice> keys_must_exist;
|
std::vector<Slice> keys_must_exist;
|
||||||
std::vector<Slice> keys_must_not_exist;
|
std::vector<Slice> keys_must_not_exist;
|
||||||
switch (wal_processing_option) {
|
switch (wal_processing_option) {
|
||||||
case WalFilter::WalProcessingOption::kCorruptedRecord:
|
case WalFilter::WalProcessingOption::kCorruptedRecord:
|
||||||
case WalFilter::WalProcessingOption::kContinueProcessing: {
|
case WalFilter::WalProcessingOption::kContinueProcessing: {
|
||||||
fprintf(stderr, "Testing with complete WAL processing\n");
|
fprintf(stderr, "Testing with complete WAL processing\n");
|
||||||
//we expect all records to be processed
|
// we expect all records to be processed
|
||||||
for (size_t i = 0; i < batch_keys.size(); i++) {
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
||||||
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
||||||
keys_must_exist.push_back(Slice(batch_keys[i][j]));
|
keys_must_exist.push_back(Slice(batch_keys[i][j]));
|
||||||
@ -10029,16 +10026,16 @@ TEST_F(DBTest, WalFilterTest) {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: {
|
case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: {
|
||||||
fprintf(stderr, "Testing with ignoring record %" ROCKSDB_PRIszt " only\n",
|
fprintf(stderr,
|
||||||
apply_option_for_record_index);
|
"Testing with ignoring record %" ROCKSDB_PRIszt " only\n",
|
||||||
|
apply_option_for_record_index);
|
||||||
// We expect the record with apply_option_for_record_index to be not
|
// We expect the record with apply_option_for_record_index to be not
|
||||||
// found.
|
// found.
|
||||||
for (size_t i = 0; i < batch_keys.size(); i++) {
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
||||||
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
||||||
if (i == apply_option_for_record_index) {
|
if (i == apply_option_for_record_index) {
|
||||||
keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
|
keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
|
||||||
}
|
} else {
|
||||||
else {
|
|
||||||
keys_must_exist.push_back(Slice(batch_keys[i][j]));
|
keys_must_exist.push_back(Slice(batch_keys[i][j]));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -10046,16 +10043,17 @@ TEST_F(DBTest, WalFilterTest) {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case WalFilter::WalProcessingOption::kStopReplay: {
|
case WalFilter::WalProcessingOption::kStopReplay: {
|
||||||
fprintf(stderr, "Testing with stopping replay from record %" ROCKSDB_PRIszt "\n",
|
fprintf(stderr,
|
||||||
apply_option_for_record_index);
|
"Testing with stopping replay from record %" ROCKSDB_PRIszt
|
||||||
|
"\n",
|
||||||
|
apply_option_for_record_index);
|
||||||
// We expect records beyond apply_option_for_record_index to be not
|
// We expect records beyond apply_option_for_record_index to be not
|
||||||
// found.
|
// found.
|
||||||
for (size_t i = 0; i < batch_keys.size(); i++) {
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
||||||
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
||||||
if (i >= apply_option_for_record_index) {
|
if (i >= apply_option_for_record_index) {
|
||||||
keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
|
keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
|
||||||
}
|
} else {
|
||||||
else {
|
|
||||||
keys_must_exist.push_back(Slice(batch_keys[i][j]));
|
keys_must_exist.push_back(Slice(batch_keys[i][j]));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -10063,7 +10061,7 @@ TEST_F(DBTest, WalFilterTest) {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
assert(false); //unhandled case
|
assert(false); // unhandled case
|
||||||
}
|
}
|
||||||
|
|
||||||
bool checked_after_reopen = false;
|
bool checked_after_reopen = false;
|
||||||
@ -10077,11 +10075,11 @@ TEST_F(DBTest, WalFilterTest) {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
//reopen database again to make sure previous log(s) are not used
|
// reopen database again to make sure previous log(s) are not used
|
||||||
//(even if they were skipped)
|
//(even if they were skipped)
|
||||||
//reopn database with option to use WAL filter
|
// reopn database with option to use WAL filter
|
||||||
options = OptionsForLogIterTest();
|
options = OptionsForLogIterTest();
|
||||||
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
ReopenWithColumnFamilies({"default", "pikachu"}, options);
|
||||||
|
|
||||||
checked_after_reopen = true;
|
checked_after_reopen = true;
|
||||||
}
|
}
|
||||||
@ -10090,19 +10088,20 @@ TEST_F(DBTest, WalFilterTest) {
|
|||||||
|
|
||||||
TEST_F(DBTest, WalFilterTestWithChangeBatch) {
|
TEST_F(DBTest, WalFilterTestWithChangeBatch) {
|
||||||
class ChangeBatchHandler : public WriteBatch::Handler {
|
class ChangeBatchHandler : public WriteBatch::Handler {
|
||||||
private:
|
private:
|
||||||
// Batch to insert keys in
|
// Batch to insert keys in
|
||||||
WriteBatch* new_write_batch_;
|
WriteBatch* new_write_batch_;
|
||||||
// Number of keys to add in the new batch
|
// Number of keys to add in the new batch
|
||||||
size_t num_keys_to_add_in_new_batch_;
|
size_t num_keys_to_add_in_new_batch_;
|
||||||
// Number of keys added to new batch
|
// Number of keys added to new batch
|
||||||
size_t num_keys_added_;
|
size_t num_keys_added_;
|
||||||
public:
|
|
||||||
ChangeBatchHandler(WriteBatch* new_write_batch,
|
public:
|
||||||
size_t num_keys_to_add_in_new_batch) :
|
ChangeBatchHandler(WriteBatch* new_write_batch,
|
||||||
new_write_batch_(new_write_batch),
|
size_t num_keys_to_add_in_new_batch)
|
||||||
num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
|
: new_write_batch_(new_write_batch),
|
||||||
num_keys_added_(0){ }
|
num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
|
||||||
|
num_keys_added_(0) {}
|
||||||
virtual void Put(const Slice& key, const Slice& value) override {
|
virtual void Put(const Slice& key, const Slice& value) override {
|
||||||
if (num_keys_added_ < num_keys_to_add_in_new_batch_) {
|
if (num_keys_added_ < num_keys_to_add_in_new_batch_) {
|
||||||
new_write_batch_->Put(key, value);
|
new_write_batch_->Put(key, value);
|
||||||
@ -10112,24 +10111,24 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) {
|
|||||||
};
|
};
|
||||||
|
|
||||||
class TestWalFilterWithChangeBatch : public WalFilter {
|
class TestWalFilterWithChangeBatch : public WalFilter {
|
||||||
private:
|
private:
|
||||||
// Index at which to start changing records
|
// Index at which to start changing records
|
||||||
size_t change_records_from_index_;
|
size_t change_records_from_index_;
|
||||||
// Number of keys to add in the new batch
|
// Number of keys to add in the new batch
|
||||||
size_t num_keys_to_add_in_new_batch_;
|
size_t num_keys_to_add_in_new_batch_;
|
||||||
// Current record index, incremented with each record encountered.
|
// Current record index, incremented with each record encountered.
|
||||||
size_t current_record_index_;
|
size_t current_record_index_;
|
||||||
public:
|
|
||||||
TestWalFilterWithChangeBatch(
|
|
||||||
size_t change_records_from_index,
|
|
||||||
size_t num_keys_to_add_in_new_batch) :
|
|
||||||
change_records_from_index_(change_records_from_index),
|
|
||||||
num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
|
|
||||||
current_record_index_(0) { }
|
|
||||||
|
|
||||||
virtual WalProcessingOption LogRecord(const WriteBatch & batch,
|
public:
|
||||||
WriteBatch* new_batch, bool* batch_changed) const override {
|
TestWalFilterWithChangeBatch(size_t change_records_from_index,
|
||||||
|
size_t num_keys_to_add_in_new_batch)
|
||||||
|
: change_records_from_index_(change_records_from_index),
|
||||||
|
num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
|
||||||
|
current_record_index_(0) {}
|
||||||
|
|
||||||
|
virtual WalProcessingOption LogRecord(const WriteBatch& batch,
|
||||||
|
WriteBatch* new_batch,
|
||||||
|
bool* batch_changed) const override {
|
||||||
if (current_record_index_ >= change_records_from_index_) {
|
if (current_record_index_ >= change_records_from_index_) {
|
||||||
ChangeBatchHandler handler(new_batch, num_keys_to_add_in_new_batch_);
|
ChangeBatchHandler handler(new_batch, num_keys_to_add_in_new_batch_);
|
||||||
batch.Iterate(&handler);
|
batch.Iterate(&handler);
|
||||||
@ -10139,7 +10138,8 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) {
|
|||||||
// Filter is passed as a const object for RocksDB to not modify the
|
// Filter is passed as a const object for RocksDB to not modify the
|
||||||
// object, however we modify it for our own purpose here and hence
|
// object, however we modify it for our own purpose here and hence
|
||||||
// cast the constness away.
|
// cast the constness away.
|
||||||
(const_cast<TestWalFilterWithChangeBatch*>(this)->current_record_index_)++;
|
(const_cast<TestWalFilterWithChangeBatch*>(this)
|
||||||
|
->current_record_index_)++;
|
||||||
|
|
||||||
return WalProcessingOption::kContinueProcessing;
|
return WalProcessingOption::kContinueProcessing;
|
||||||
}
|
}
|
||||||
@ -10160,7 +10160,7 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) {
|
|||||||
|
|
||||||
Options options = OptionsForLogIterTest();
|
Options options = OptionsForLogIterTest();
|
||||||
DestroyAndReopen(options);
|
DestroyAndReopen(options);
|
||||||
CreateAndReopenWithCF({ "pikachu" }, options);
|
CreateAndReopenWithCF({"pikachu"}, options);
|
||||||
|
|
||||||
// Write given keys in given batches
|
// Write given keys in given batches
|
||||||
for (size_t i = 0; i < batch_keys.size(); i++) {
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
||||||
@ -10176,12 +10176,12 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) {
|
|||||||
size_t change_records_from_index = 1;
|
size_t change_records_from_index = 1;
|
||||||
size_t num_keys_to_add_in_new_batch = 1;
|
size_t num_keys_to_add_in_new_batch = 1;
|
||||||
TestWalFilterWithChangeBatch test_wal_filter_with_change_batch(
|
TestWalFilterWithChangeBatch test_wal_filter_with_change_batch(
|
||||||
change_records_from_index, num_keys_to_add_in_new_batch);
|
change_records_from_index, num_keys_to_add_in_new_batch);
|
||||||
|
|
||||||
// Reopen database with option to use WAL filter
|
// Reopen database with option to use WAL filter
|
||||||
options = OptionsForLogIterTest();
|
options = OptionsForLogIterTest();
|
||||||
options.wal_filter = &test_wal_filter_with_change_batch;
|
options.wal_filter = &test_wal_filter_with_change_batch;
|
||||||
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
ReopenWithColumnFamilies({"default", "pikachu"}, options);
|
||||||
|
|
||||||
// Ensure that all keys exist before change_records_from_index_
|
// Ensure that all keys exist before change_records_from_index_
|
||||||
// And after that index only single key exists
|
// And after that index only single key exists
|
||||||
@ -10193,8 +10193,7 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) {
|
|||||||
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
||||||
if (i >= change_records_from_index && j >= num_keys_to_add_in_new_batch) {
|
if (i >= change_records_from_index && j >= num_keys_to_add_in_new_batch) {
|
||||||
keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
|
keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
|
||||||
}
|
} else {
|
||||||
else {
|
|
||||||
keys_must_exist.push_back(Slice(batch_keys[i][j]));
|
keys_must_exist.push_back(Slice(batch_keys[i][j]));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -10211,11 +10210,11 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
//reopen database again to make sure previous log(s) are not used
|
// reopen database again to make sure previous log(s) are not used
|
||||||
//(even if they were skipped)
|
//(even if they were skipped)
|
||||||
//reopn database with option to use WAL filter
|
// reopn database with option to use WAL filter
|
||||||
options = OptionsForLogIterTest();
|
options = OptionsForLogIterTest();
|
||||||
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
ReopenWithColumnFamilies({"default", "pikachu"}, options);
|
||||||
|
|
||||||
checked_after_reopen = true;
|
checked_after_reopen = true;
|
||||||
}
|
}
|
||||||
@ -10223,9 +10222,10 @@ TEST_F(DBTest, WalFilterTestWithChangeBatch) {
|
|||||||
|
|
||||||
TEST_F(DBTest, WalFilterTestWithChangeBatchExtraKeys) {
|
TEST_F(DBTest, WalFilterTestWithChangeBatchExtraKeys) {
|
||||||
class TestWalFilterWithChangeBatchAddExtraKeys : public WalFilter {
|
class TestWalFilterWithChangeBatchAddExtraKeys : public WalFilter {
|
||||||
public:
|
public:
|
||||||
virtual WalProcessingOption LogRecord(const WriteBatch & batch,
|
virtual WalProcessingOption LogRecord(const WriteBatch& batch,
|
||||||
WriteBatch* new_batch, bool* batch_changed) const override {
|
WriteBatch* new_batch,
|
||||||
|
bool* batch_changed) const override {
|
||||||
*new_batch = batch;
|
*new_batch = batch;
|
||||||
new_batch->Put("key_extra", "value_extra");
|
new_batch->Put("key_extra", "value_extra");
|
||||||
*batch_changed = true;
|
*batch_changed = true;
|
||||||
@ -10248,7 +10248,7 @@ TEST_F(DBTest, WalFilterTestWithChangeBatchExtraKeys) {
|
|||||||
|
|
||||||
Options options = OptionsForLogIterTest();
|
Options options = OptionsForLogIterTest();
|
||||||
DestroyAndReopen(options);
|
DestroyAndReopen(options);
|
||||||
CreateAndReopenWithCF({ "pikachu" }, options);
|
CreateAndReopenWithCF({"pikachu"}, options);
|
||||||
|
|
||||||
// Write given keys in given batches
|
// Write given keys in given batches
|
||||||
for (size_t i = 0; i < batch_keys.size(); i++) {
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
||||||
@ -10265,17 +10265,16 @@ TEST_F(DBTest, WalFilterTestWithChangeBatchExtraKeys) {
|
|||||||
// Reopen database with option to use WAL filter
|
// Reopen database with option to use WAL filter
|
||||||
options = OptionsForLogIterTest();
|
options = OptionsForLogIterTest();
|
||||||
options.wal_filter = &test_wal_filter_extra_keys;
|
options.wal_filter = &test_wal_filter_extra_keys;
|
||||||
Status status =
|
Status status = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
|
||||||
TryReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
|
||||||
ASSERT_TRUE(status.IsNotSupported());
|
ASSERT_TRUE(status.IsNotSupported());
|
||||||
|
|
||||||
// Reopen without filter, now reopen should succeed - previous
|
// Reopen without filter, now reopen should succeed - previous
|
||||||
// attempt to open must not have altered the db.
|
// attempt to open must not have altered the db.
|
||||||
options = OptionsForLogIterTest();
|
options = OptionsForLogIterTest();
|
||||||
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
ReopenWithColumnFamilies({"default", "pikachu"}, options);
|
||||||
|
|
||||||
std::vector<Slice> keys_must_exist;
|
std::vector<Slice> keys_must_exist;
|
||||||
std::vector<Slice> keys_must_not_exist; //empty vector
|
std::vector<Slice> keys_must_not_exist; // empty vector
|
||||||
|
|
||||||
for (size_t i = 0; i < batch_keys.size(); i++) {
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
||||||
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
||||||
@ -10286,7 +10285,7 @@ TEST_F(DBTest, WalFilterTestWithChangeBatchExtraKeys) {
|
|||||||
ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
|
ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif // ROCKSDB_LITE
|
#endif // ROCKSDB_LITE
|
||||||
|
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
class BloomStatsTestWithParam
|
class BloomStatsTestWithParam
|
||||||
|
@ -503,7 +503,8 @@ class DB {
|
|||||||
return CompactRange(options, DefaultColumnFamily(), begin, end);
|
return CompactRange(options, DefaultColumnFamily(), begin, end);
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual Status SetOptions(ColumnFamilyHandle* /*column_family*/,
|
virtual Status SetOptions(
|
||||||
|
ColumnFamilyHandle* /*column_family*/,
|
||||||
const std::unordered_map<std::string, std::string>& /*new_options*/) {
|
const std::unordered_map<std::string, std::string>& /*new_options*/) {
|
||||||
return Status::NotSupported("Not implemented");
|
return Status::NotSupported("Not implemented");
|
||||||
}
|
}
|
||||||
@ -655,7 +656,7 @@ class DB {
|
|||||||
// Returns a list of all table files with their level, start key
|
// Returns a list of all table files with their level, start key
|
||||||
// and end key
|
// and end key
|
||||||
virtual void GetLiveFilesMetaData(
|
virtual void GetLiveFilesMetaData(
|
||||||
std::vector<LiveFileMetaData>* /*metadata*/) {}
|
std::vector<LiveFileMetaData>* /*metadata*/) {}
|
||||||
|
|
||||||
// Obtains the meta data of the specified column family of the DB.
|
// Obtains the meta data of the specified column family of the DB.
|
||||||
// Status::NotFound() will be returned if the current DB does not have
|
// Status::NotFound() will be returned if the current DB does not have
|
||||||
@ -663,9 +664,8 @@ class DB {
|
|||||||
//
|
//
|
||||||
// If cf_name is not specified, then the metadata of the default
|
// If cf_name is not specified, then the metadata of the default
|
||||||
// column family will be returned.
|
// column family will be returned.
|
||||||
virtual void GetColumnFamilyMetaData(
|
virtual void GetColumnFamilyMetaData(ColumnFamilyHandle* /*column_family*/,
|
||||||
ColumnFamilyHandle* /*column_family*/,
|
ColumnFamilyMetaData* /*metadata*/) {}
|
||||||
ColumnFamilyMetaData* /*metadata*/) {}
|
|
||||||
|
|
||||||
// Get the metadata of the default column family.
|
// Get the metadata of the default column family.
|
||||||
void GetColumnFamilyMetaData(
|
void GetColumnFamilyMetaData(
|
||||||
|
@ -416,8 +416,7 @@ class RandomAccessFile {
|
|||||||
|
|
||||||
// For cases when read-ahead is implemented in the platform dependent
|
// For cases when read-ahead is implemented in the platform dependent
|
||||||
// layer
|
// layer
|
||||||
virtual void EnableReadAhead() {
|
virtual void EnableReadAhead() {}
|
||||||
}
|
|
||||||
|
|
||||||
// Tries to get an unique ID for this file that will be the same each time
|
// Tries to get an unique ID for this file that will be the same each time
|
||||||
// the file is opened (and will stay the same while the file is open).
|
// the file is opened (and will stay the same while the file is open).
|
||||||
|
@ -151,7 +151,7 @@ class EventListener {
|
|||||||
// it should not run for an extended period of time before the function
|
// it should not run for an extended period of time before the function
|
||||||
// returns. Otherwise, RocksDB may be blocked.
|
// returns. Otherwise, RocksDB may be blocked.
|
||||||
virtual void OnFlushCompleted(DB* /*db*/,
|
virtual void OnFlushCompleted(DB* /*db*/,
|
||||||
const FlushJobInfo& /*flush_job_info*/) {}
|
const FlushJobInfo& /*flush_job_info*/) {}
|
||||||
|
|
||||||
// A call-back function for RocksDB which will be called whenever
|
// A call-back function for RocksDB which will be called whenever
|
||||||
// a SST file is deleted. Different from OnCompactionCompleted and
|
// a SST file is deleted. Different from OnCompactionCompleted and
|
||||||
@ -180,7 +180,7 @@ class EventListener {
|
|||||||
// after this function is returned, and must be copied if it is needed
|
// after this function is returned, and must be copied if it is needed
|
||||||
// outside of this function.
|
// outside of this function.
|
||||||
virtual void OnCompactionCompleted(DB* /*db*/,
|
virtual void OnCompactionCompleted(DB* /*db*/,
|
||||||
const CompactionJobInfo& /*ci*/) {}
|
const CompactionJobInfo& /*ci*/) {}
|
||||||
|
|
||||||
// A call-back function for RocksDB which will be called whenever
|
// A call-back function for RocksDB which will be called whenever
|
||||||
// a SST file is created. Different from OnCompactionCompleted and
|
// a SST file is created. Different from OnCompactionCompleted and
|
||||||
|
@ -1163,7 +1163,7 @@ struct DBOptions {
|
|||||||
// The filter is invoked at startup and is invoked from a single-thread
|
// The filter is invoked at startup and is invoked from a single-thread
|
||||||
// currently.
|
// currently.
|
||||||
const WalFilter* wal_filter;
|
const WalFilter* wal_filter;
|
||||||
#endif //ROCKSDB_LITE
|
#endif // ROCKSDB_LITE
|
||||||
};
|
};
|
||||||
|
|
||||||
// Options to control the behavior of a database (passed to DB::Open)
|
// Options to control the behavior of a database (passed to DB::Open)
|
||||||
|
@ -13,7 +13,7 @@ class WriteBatch;
|
|||||||
// records or modify their processing on recovery.
|
// records or modify their processing on recovery.
|
||||||
// Please see the details below.
|
// Please see the details below.
|
||||||
class WalFilter {
|
class WalFilter {
|
||||||
public:
|
public:
|
||||||
enum class WalProcessingOption {
|
enum class WalProcessingOption {
|
||||||
// Continue processing as usual
|
// Continue processing as usual
|
||||||
kContinueProcessing = 0,
|
kContinueProcessing = 0,
|
||||||
@ -28,12 +28,12 @@ public:
|
|||||||
kWalProcessingOptionMax = 4
|
kWalProcessingOptionMax = 4
|
||||||
};
|
};
|
||||||
|
|
||||||
virtual ~WalFilter() { };
|
virtual ~WalFilter() {}
|
||||||
|
|
||||||
// LogRecord is invoked for each log record encountered for all the logs
|
// LogRecord is invoked for each log record encountered for all the logs
|
||||||
// during replay on logs on recovery. This method can be used to:
|
// during replay on logs on recovery. This method can be used to:
|
||||||
// * inspect the record (using the batch parameter)
|
// * inspect the record (using the batch parameter)
|
||||||
// * ignoring current record
|
// * ignoring current record
|
||||||
// (by returning WalProcessingOption::kIgnoreCurrentRecord)
|
// (by returning WalProcessingOption::kIgnoreCurrentRecord)
|
||||||
// * reporting corrupted record
|
// * reporting corrupted record
|
||||||
// (by returning WalProcessingOption::kCorruptedRecord)
|
// (by returning WalProcessingOption::kCorruptedRecord)
|
||||||
@ -55,7 +55,8 @@ public:
|
|||||||
// Please see WalProcessingOption enum above for
|
// Please see WalProcessingOption enum above for
|
||||||
// details.
|
// details.
|
||||||
virtual WalProcessingOption LogRecord(const WriteBatch& batch,
|
virtual WalProcessingOption LogRecord(const WriteBatch& batch,
|
||||||
WriteBatch* new_batch, bool* batch_changed) const = 0;
|
WriteBatch* new_batch,
|
||||||
|
bool* batch_changed) const = 0;
|
||||||
|
|
||||||
// Returns a name that identifies this WAL filter.
|
// Returns a name that identifies this WAL filter.
|
||||||
// The name will be printed to LOG file on start up for diagnosis.
|
// The name will be printed to LOG file on start up for diagnosis.
|
||||||
|
@ -688,39 +688,44 @@ class WinRandomAccessFile : public RandomAccessFile {
|
|||||||
const std::string filename_;
|
const std::string filename_;
|
||||||
HANDLE hFile_;
|
HANDLE hFile_;
|
||||||
const bool use_os_buffer_;
|
const bool use_os_buffer_;
|
||||||
bool read_ahead_;
|
bool read_ahead_;
|
||||||
const size_t compaction_readahead_size_;
|
const size_t compaction_readahead_size_;
|
||||||
const size_t random_access_max_buffer_size_;
|
const size_t random_access_max_buffer_size_;
|
||||||
mutable std::mutex buffer_mut_;
|
mutable std::mutex buffer_mut_;
|
||||||
mutable AlignedBuffer buffer_;
|
mutable AlignedBuffer buffer_;
|
||||||
mutable uint64_t
|
mutable uint64_t
|
||||||
buffered_start_; // file offset set that is currently buffered
|
buffered_start_; // file offset set that is currently buffered
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The function reads a requested amount of bytes into the specified aligned buffer
|
* The function reads a requested amount of bytes into the specified aligned
|
||||||
* Upon success the function sets the length of the buffer to the amount of bytes actually
|
* buffer Upon success the function sets the length of the buffer to the
|
||||||
* read even though it might be less than actually requested.
|
* amount of bytes actually read even though it might be less than actually
|
||||||
* It then copies the amount of bytes requested by the user (left) to the user supplied
|
* requested. It then copies the amount of bytes requested by the user (left)
|
||||||
* buffer (dest) and reduces left by the amount of bytes copied to the user buffer
|
* to the user supplied buffer (dest) and reduces left by the amount of bytes
|
||||||
|
* copied to the user buffer
|
||||||
*
|
*
|
||||||
* @user_offset [in] - offset on disk where the read was requested by the user
|
* @user_offset [in] - offset on disk where the read was requested by the user
|
||||||
* @first_page_start [in] - actual page aligned disk offset that we want to read from
|
* @first_page_start [in] - actual page aligned disk offset that we want to
|
||||||
* @bytes_to_read [in] - total amount of bytes that will be read from disk which is generally
|
* read from
|
||||||
* greater or equal to the amount that the user has requested due to the
|
* @bytes_to_read [in] - total amount of bytes that will be read from disk
|
||||||
* either alignment requirements or read_ahead in effect.
|
* which is generally greater or equal to the amount
|
||||||
* @left [in/out] total amount of bytes that needs to be copied to the user buffer. It is reduced
|
* that the user has requested due to the
|
||||||
* by the amount of bytes that actually copied
|
* either alignment requirements or read_ahead in
|
||||||
|
* effect.
|
||||||
|
* @left [in/out] total amount of bytes that needs to be copied to the user
|
||||||
|
* buffer. It is reduced by the amount of bytes that actually
|
||||||
|
* copied
|
||||||
* @buffer - buffer to use
|
* @buffer - buffer to use
|
||||||
* @dest - user supplied buffer
|
* @dest - user supplied buffer
|
||||||
*/
|
*/
|
||||||
SSIZE_T ReadIntoBuffer(uint64_t user_offset, uint64_t first_page_start,
|
SSIZE_T ReadIntoBuffer(uint64_t user_offset, uint64_t first_page_start,
|
||||||
size_t bytes_to_read, size_t& left, AlignedBuffer& buffer, char* dest) const {
|
size_t bytes_to_read, size_t& left,
|
||||||
|
AlignedBuffer& buffer, char* dest) const {
|
||||||
assert(buffer.CurrentSize() == 0);
|
assert(buffer.CurrentSize() == 0);
|
||||||
assert(buffer.Capacity() >= bytes_to_read);
|
assert(buffer.Capacity() >= bytes_to_read);
|
||||||
|
|
||||||
SSIZE_T read = pread(hFile_, buffer.Destination(), bytes_to_read,
|
SSIZE_T read =
|
||||||
first_page_start);
|
pread(hFile_, buffer.Destination(), bytes_to_read, first_page_start);
|
||||||
|
|
||||||
if (read > 0) {
|
if (read > 0) {
|
||||||
buffer.Size(read);
|
buffer.Size(read);
|
||||||
@ -739,21 +744,22 @@ class WinRandomAccessFile : public RandomAccessFile {
|
|||||||
}
|
}
|
||||||
|
|
||||||
SSIZE_T ReadIntoOneShotBuffer(uint64_t user_offset, uint64_t first_page_start,
|
SSIZE_T ReadIntoOneShotBuffer(uint64_t user_offset, uint64_t first_page_start,
|
||||||
size_t bytes_to_read, size_t& left, char* dest) const {
|
size_t bytes_to_read, size_t& left,
|
||||||
|
char* dest) const {
|
||||||
AlignedBuffer bigBuffer;
|
AlignedBuffer bigBuffer;
|
||||||
bigBuffer.Alignment(buffer_.Alignment());
|
bigBuffer.Alignment(buffer_.Alignment());
|
||||||
bigBuffer.AllocateNewBuffer(bytes_to_read);
|
bigBuffer.AllocateNewBuffer(bytes_to_read);
|
||||||
|
|
||||||
return ReadIntoBuffer(user_offset, first_page_start, bytes_to_read, left,
|
return ReadIntoBuffer(user_offset, first_page_start, bytes_to_read, left,
|
||||||
bigBuffer, dest);
|
bigBuffer, dest);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSIZE_T ReadIntoInstanceBuffer(uint64_t user_offset, uint64_t first_page_start,
|
SSIZE_T ReadIntoInstanceBuffer(uint64_t user_offset,
|
||||||
size_t bytes_to_read, size_t& left, char* dest) const {
|
uint64_t first_page_start,
|
||||||
|
size_t bytes_to_read, size_t& left,
|
||||||
|
char* dest) const {
|
||||||
SSIZE_T read = ReadIntoBuffer(user_offset, first_page_start, bytes_to_read,
|
SSIZE_T read = ReadIntoBuffer(user_offset, first_page_start, bytes_to_read,
|
||||||
left, buffer_, dest);
|
left, buffer_, dest);
|
||||||
|
|
||||||
if (read > 0) {
|
if (read > 0) {
|
||||||
buffered_start_ = first_page_start;
|
buffered_start_ = first_page_start;
|
||||||
@ -789,9 +795,7 @@ class WinRandomAccessFile : public RandomAccessFile {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual void EnableReadAhead() override {
|
virtual void EnableReadAhead() override { this->Hint(SEQUENTIAL); }
|
||||||
this->Hint(SEQUENTIAL);
|
|
||||||
}
|
|
||||||
|
|
||||||
virtual Status Read(uint64_t offset, size_t n, Slice* result,
|
virtual Status Read(uint64_t offset, size_t n, Slice* result,
|
||||||
char* scratch) const override {
|
char* scratch) const override {
|
||||||
@ -824,7 +828,7 @@ class WinRandomAccessFile : public RandomAccessFile {
|
|||||||
// Figure out the start/end offset for reading and amount to read
|
// Figure out the start/end offset for reading and amount to read
|
||||||
const size_t alignment = buffer_.Alignment();
|
const size_t alignment = buffer_.Alignment();
|
||||||
const size_t first_page_start =
|
const size_t first_page_start =
|
||||||
TruncateToPageBoundary(alignment, offset);
|
TruncateToPageBoundary(alignment, offset);
|
||||||
|
|
||||||
size_t bytes_requested = left;
|
size_t bytes_requested = left;
|
||||||
if (read_ahead_ && bytes_requested < compaction_readahead_size_) {
|
if (read_ahead_ && bytes_requested < compaction_readahead_size_) {
|
||||||
@ -832,29 +836,29 @@ class WinRandomAccessFile : public RandomAccessFile {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const size_t last_page_start =
|
const size_t last_page_start =
|
||||||
TruncateToPageBoundary(alignment, offset + bytes_requested - 1);
|
TruncateToPageBoundary(alignment, offset + bytes_requested - 1);
|
||||||
const size_t actual_bytes_toread =
|
const size_t actual_bytes_toread =
|
||||||
(last_page_start - first_page_start) + alignment;
|
(last_page_start - first_page_start) + alignment;
|
||||||
|
|
||||||
if (buffer_.Capacity() < actual_bytes_toread) {
|
if (buffer_.Capacity() < actual_bytes_toread) {
|
||||||
// If we are in read-ahead mode or the requested size
|
// If we are in read-ahead mode or the requested size
|
||||||
// exceeds max buffer size then use one-shot
|
// exceeds max buffer size then use one-shot
|
||||||
// big buffer otherwise reallocate main buffer
|
// big buffer otherwise reallocate main buffer
|
||||||
if (read_ahead_ ||
|
if (read_ahead_ ||
|
||||||
(actual_bytes_toread > random_access_max_buffer_size_)) {
|
(actual_bytes_toread > random_access_max_buffer_size_)) {
|
||||||
// Unlock the mutex since we are not using instance buffer
|
// Unlock the mutex since we are not using instance buffer
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
r = ReadIntoOneShotBuffer(offset, first_page_start,
|
r = ReadIntoOneShotBuffer(offset, first_page_start,
|
||||||
actual_bytes_toread, left, dest);
|
actual_bytes_toread, left, dest);
|
||||||
} else {
|
} else {
|
||||||
buffer_.AllocateNewBuffer(actual_bytes_toread);
|
buffer_.AllocateNewBuffer(actual_bytes_toread);
|
||||||
r = ReadIntoInstanceBuffer(offset, first_page_start,
|
r = ReadIntoInstanceBuffer(offset, first_page_start,
|
||||||
actual_bytes_toread, left, dest);
|
actual_bytes_toread, left, dest);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
buffer_.Clear();
|
buffer_.Clear();
|
||||||
r = ReadIntoInstanceBuffer(offset, first_page_start,
|
r = ReadIntoInstanceBuffer(offset, first_page_start,
|
||||||
actual_bytes_toread, left, dest);
|
actual_bytes_toread, left, dest);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -877,9 +881,7 @@ class WinRandomAccessFile : public RandomAccessFile {
|
|||||||
}
|
}
|
||||||
|
|
||||||
virtual void Hint(AccessPattern pattern) override {
|
virtual void Hint(AccessPattern pattern) override {
|
||||||
|
if (pattern == SEQUENTIAL && !use_os_buffer_ &&
|
||||||
if (pattern == SEQUENTIAL &&
|
|
||||||
!use_os_buffer_ &&
|
|
||||||
compaction_readahead_size_ > 0) {
|
compaction_readahead_size_ > 0) {
|
||||||
std::lock_guard<std::mutex> lg(buffer_mut_);
|
std::lock_guard<std::mutex> lg(buffer_mut_);
|
||||||
if (!read_ahead_) {
|
if (!read_ahead_) {
|
||||||
@ -888,12 +890,12 @@ class WinRandomAccessFile : public RandomAccessFile {
|
|||||||
// - one for memory alignment which added implicitly by AlignedBuffer
|
// - one for memory alignment which added implicitly by AlignedBuffer
|
||||||
// - We add one more alignment because we will read one alignment more
|
// - We add one more alignment because we will read one alignment more
|
||||||
// from disk
|
// from disk
|
||||||
buffer_.AllocateNewBuffer(compaction_readahead_size_ + buffer_.Alignment());
|
buffer_.AllocateNewBuffer(compaction_readahead_size_ +
|
||||||
|
buffer_.Alignment());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
virtual Status InvalidateCache(size_t offset, size_t length) override {
|
virtual Status InvalidateCache(size_t offset, size_t length) override {
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
@ -293,7 +293,8 @@ void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) {
|
|||||||
env_options->set_fd_cloexec = options.is_fd_close_on_exec;
|
env_options->set_fd_cloexec = options.is_fd_close_on_exec;
|
||||||
env_options->bytes_per_sync = options.bytes_per_sync;
|
env_options->bytes_per_sync = options.bytes_per_sync;
|
||||||
env_options->compaction_readahead_size = options.compaction_readahead_size;
|
env_options->compaction_readahead_size = options.compaction_readahead_size;
|
||||||
env_options->random_access_max_buffer_size = options.random_access_max_buffer_size;
|
env_options->random_access_max_buffer_size =
|
||||||
|
options.random_access_max_buffer_size;
|
||||||
env_options->rate_limiter = options.rate_limiter.get();
|
env_options->rate_limiter = options.rate_limiter.get();
|
||||||
env_options->allow_fallocate = options.allow_fallocate;
|
env_options->allow_fallocate = options.allow_fallocate;
|
||||||
}
|
}
|
||||||
|
@ -381,20 +381,20 @@ Status WritableFileWriter::WriteUnbuffered() {
|
|||||||
namespace {
|
namespace {
|
||||||
class ReadaheadRandomAccessFile : public RandomAccessFile {
|
class ReadaheadRandomAccessFile : public RandomAccessFile {
|
||||||
public:
|
public:
|
||||||
ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile>&& file,
|
ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile>&& file,
|
||||||
size_t readahead_size)
|
size_t readahead_size)
|
||||||
: file_(std::move(file)),
|
: file_(std::move(file)),
|
||||||
readahead_size_(readahead_size),
|
readahead_size_(readahead_size),
|
||||||
forward_calls_(file_->ShouldForwardRawRequest()),
|
forward_calls_(file_->ShouldForwardRawRequest()),
|
||||||
buffer_(),
|
buffer_(),
|
||||||
buffer_offset_(0),
|
buffer_offset_(0),
|
||||||
buffer_len_(0) {
|
buffer_len_(0) {
|
||||||
if (!forward_calls_) {
|
if (!forward_calls_) {
|
||||||
buffer_.reset(new char[readahead_size_]);
|
buffer_.reset(new char[readahead_size_]);
|
||||||
} else if (readahead_size_ > 0) {
|
} else if (readahead_size_ > 0) {
|
||||||
file_->EnableReadAhead();
|
file_->EnableReadAhead();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete;
|
ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete;
|
||||||
|
|
||||||
|
@ -260,9 +260,10 @@ DBOptions::DBOptions()
|
|||||||
skip_stats_update_on_db_open(false),
|
skip_stats_update_on_db_open(false),
|
||||||
wal_recovery_mode(WALRecoveryMode::kTolerateCorruptedTailRecords)
|
wal_recovery_mode(WALRecoveryMode::kTolerateCorruptedTailRecords)
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
, wal_filter(nullptr)
|
,
|
||||||
#endif // ROCKSDB_LITE
|
wal_filter(nullptr)
|
||||||
{
|
#endif // ROCKSDB_LITE
|
||||||
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
DBOptions::DBOptions(const Options& options)
|
DBOptions::DBOptions(const Options& options)
|
||||||
@ -322,9 +323,10 @@ DBOptions::DBOptions(const Options& options)
|
|||||||
wal_recovery_mode(options.wal_recovery_mode),
|
wal_recovery_mode(options.wal_recovery_mode),
|
||||||
row_cache(options.row_cache)
|
row_cache(options.row_cache)
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
, wal_filter(options.wal_filter)
|
,
|
||||||
#endif // ROCKSDB_LITE
|
wal_filter(options.wal_filter)
|
||||||
{
|
#endif // ROCKSDB_LITE
|
||||||
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
static const char* const access_hints[] = {
|
static const char* const access_hints[] = {
|
||||||
@ -405,7 +407,8 @@ void DBOptions::Dump(Logger* log) const {
|
|||||||
" Options.compaction_readahead_size: %" ROCKSDB_PRIszt
|
" Options.compaction_readahead_size: %" ROCKSDB_PRIszt
|
||||||
"d",
|
"d",
|
||||||
compaction_readahead_size);
|
compaction_readahead_size);
|
||||||
Header(log,
|
Header(
|
||||||
|
log,
|
||||||
" Options.random_access_max_buffer_size: %" ROCKSDB_PRIszt
|
" Options.random_access_max_buffer_size: %" ROCKSDB_PRIszt
|
||||||
"d",
|
"d",
|
||||||
random_access_max_buffer_size);
|
random_access_max_buffer_size);
|
||||||
@ -431,8 +434,8 @@ void DBOptions::Dump(Logger* log) const {
|
|||||||
}
|
}
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
Header(log, " Options.wal_filter: %s",
|
Header(log, " Options.wal_filter: %s",
|
||||||
wal_filter ? wal_filter->Name() : "None");
|
wal_filter ? wal_filter->Name() : "None");
|
||||||
#endif // ROCKDB_LITE
|
#endif // ROCKDB_LITE
|
||||||
} // DBOptions::Dump
|
} // DBOptions::Dump
|
||||||
|
|
||||||
void ColumnFamilyOptions::Dump(Logger* log) const {
|
void ColumnFamilyOptions::Dump(Logger* log) const {
|
||||||
|
@ -181,8 +181,8 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
|
|||||||
{offsetof(struct DBOptions, compaction_readahead_size), OptionType::kSizeT,
|
{offsetof(struct DBOptions, compaction_readahead_size), OptionType::kSizeT,
|
||||||
OptionVerificationType::kNormal}},
|
OptionVerificationType::kNormal}},
|
||||||
{"random_access_max_buffer_size",
|
{"random_access_max_buffer_size",
|
||||||
{ offsetof(struct DBOptions, random_access_max_buffer_size), OptionType::kSizeT,
|
{offsetof(struct DBOptions, random_access_max_buffer_size),
|
||||||
OptionVerificationType::kNormal}},
|
OptionType::kSizeT, OptionVerificationType::kNormal}},
|
||||||
{"use_adaptive_mutex",
|
{"use_adaptive_mutex",
|
||||||
{offsetof(struct DBOptions, use_adaptive_mutex), OptionType::kBoolean,
|
{offsetof(struct DBOptions, use_adaptive_mutex), OptionType::kBoolean,
|
||||||
OptionVerificationType::kNormal}},
|
OptionVerificationType::kNormal}},
|
||||||
|
@ -339,7 +339,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
|
|||||||
{"use_adaptive_mutex", "false"},
|
{"use_adaptive_mutex", "false"},
|
||||||
{"new_table_reader_for_compaction_inputs", "true"},
|
{"new_table_reader_for_compaction_inputs", "true"},
|
||||||
{"compaction_readahead_size", "100"},
|
{"compaction_readahead_size", "100"},
|
||||||
{"random_access_max_buffer_size", "3145728" },
|
{"random_access_max_buffer_size", "3145728"},
|
||||||
{"bytes_per_sync", "47"},
|
{"bytes_per_sync", "47"},
|
||||||
{"wal_bytes_per_sync", "48"},
|
{"wal_bytes_per_sync", "48"},
|
||||||
};
|
};
|
||||||
|
Loading…
Reference in New Issue
Block a user