CompactedDB should not be used if there is outstanding WAL files

Summary: CompactedDB skips memtable. So we shouldn't use compacted DB if there is outstanding WAL files.

Test Plan: Change to options.max_open_files = -1 perf context test to create a compacted DB, which we shouldn't do.

Reviewers: yhchiang, kradhakrishnan, IslamAbdelRahman

Reviewed By: IslamAbdelRahman

Subscribers: leveldb, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D57057
This commit is contained in:
sdong 2016-04-21 15:32:06 -07:00
parent d719b095dc
commit ac0e54b4c6
5 changed files with 47 additions and 11 deletions

View File

@ -93,7 +93,7 @@ Status CompactedDBImpl::Init(const Options& options) {
mutex_.Lock();
ColumnFamilyDescriptor cf(kDefaultColumnFamilyName,
ColumnFamilyOptions(options));
Status s = Recover({ cf }, true /* read only */, false);
Status s = Recover({cf}, true /* read only */, false, true);
if (s.ok()) {
cfd_ = reinterpret_cast<ColumnFamilyHandleImpl*>(
DefaultColumnFamily())->cfd();

View File

@ -988,7 +988,7 @@ Directory* DBImpl::Directories::GetDataDir(size_t path_id) {
Status DBImpl::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
bool error_if_log_file_exist) {
bool error_if_log_file_exist, bool error_if_data_exists_in_logs) {
mutex_.AssertHeld();
bool is_new_db = false;
@ -1083,10 +1083,25 @@ Status DBImpl::Recover(
}
}
if (logs.size() > 0 && error_if_log_file_exist) {
return Status::Corruption(""
if (logs.size() > 0) {
if (error_if_log_file_exist) {
return Status::Corruption(
"The db was opened in readonly mode with error_if_log_file_exist"
"flag but a log file already exists");
} else if (error_if_data_exists_in_logs) {
for (auto& log : logs) {
std::string fname = LogFileName(db_options_.wal_dir, log);
uint64_t bytes;
s = env_->GetFileSize(fname, &bytes);
if (s.ok()) {
if (bytes > 0) {
return Status::Corruption(
"error_if_data_exists_in_logs is set but there are data "
" in log files.");
}
}
}
}
}
if (!logs.empty()) {

View File

@ -483,7 +483,8 @@ class DBImpl : public DB {
// amount of work to recover recently logged updates. Any changes to
// be made to the descriptor are added to *edit.
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only = false, bool error_if_log_file_exist = false);
bool read_only = false, bool error_if_log_file_exist = false,
bool error_if_data_exists_in_logs = false);
void MaybeIgnoreError(Status* s) const;

View File

@ -338,6 +338,15 @@ TEST_F(DBTest, CompactedDB) {
ASSERT_OK(status_list[4]);
ASSERT_EQ(DummyString(kFileSize / 2, 'i'), values[4]);
ASSERT_TRUE(status_list[5].IsNotFound());
Reopen(options);
// Add a key
ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
Close();
ASSERT_OK(ReadOnlyReopen(options));
s = Put("new", "value");
ASSERT_EQ(s.ToString(),
"Not implemented: Not supported operation in read only mode.");
}
TEST_F(DBTest, LevelLimitReopen) {

View File

@ -37,6 +37,7 @@ std::shared_ptr<DB> OpenDb(bool read_only = false) {
DB* db;
Options options;
options.create_if_missing = true;
options.max_open_files = -1;
options.write_buffer_size = FLAGS_write_buffer_size;
options.max_write_buffer_number = FLAGS_max_write_buffer_number;
options.min_write_buffer_number_to_merge =
@ -279,14 +280,19 @@ void ProfileQueries(bool enabled_time = false) {
#endif
for (const int i : keys) {
if (i == kFlushFlag) {
continue;
}
std::string key = "k" + ToString(i);
std::string value = "v" + ToString(i);
std::string expected_value = "v" + ToString(i);
std::string value;
std::vector<Slice> multiget_keys = {Slice(key)};
std::vector<std::string> values;
perf_context.Reset();
db->Get(read_options, key, &value);
ASSERT_OK(db->Get(read_options, key, &value));
ASSERT_EQ(expected_value, value);
hist_get_snapshot.Add(perf_context.get_snapshot_time);
hist_get_memtable.Add(perf_context.get_from_memtable_time);
hist_get_files.Add(perf_context.get_from_output_files_time);
@ -375,14 +381,19 @@ void ProfileQueries(bool enabled_time = false) {
hist_mget_num_memtable_checked.Clear();
for (const int i : keys) {
if (i == kFlushFlag) {
continue;
}
std::string key = "k" + ToString(i);
std::string value = "v" + ToString(i);
std::string expected_value = "v" + ToString(i);
std::string value;
std::vector<Slice> multiget_keys = {Slice(key)};
std::vector<std::string> values;
perf_context.Reset();
db->Get(read_options, key, &value);
ASSERT_OK(db->Get(read_options, key, &value));
ASSERT_EQ(expected_value, value);
hist_get_snapshot.Add(perf_context.get_snapshot_time);
hist_get_memtable.Add(perf_context.get_from_memtable_time);
hist_get_files.Add(perf_context.get_from_output_files_time);