Bug fixes introduced by code cleanup

This commit is contained in:
Igor Canadi 2014-03-12 10:52:32 -07:00
parent dff9214165
commit b5d6ad69fc
3 changed files with 31 additions and 29 deletions

View File

@ -972,6 +972,10 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
} }
} }
if (versions_->LastSequence() < *max_sequence) {
versions_->SetLastSequence(*max_sequence);
}
if (!read_only) { if (!read_only) {
// no need to refcount since client still doesn't have access // no need to refcount since client still doesn't have access
// to the DB and can not drop column families while we iterate // to the DB and can not drop column families while we iterate
@ -1012,9 +1016,6 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
// VersionSet::next_file_number_ always to be strictly greater than any // VersionSet::next_file_number_ always to be strictly greater than any
// log number // log number
versions_->MarkFileNumberUsed(log_number + 1); versions_->MarkFileNumberUsed(log_number + 1);
if (versions_->LastSequence() < *max_sequence) {
versions_->SetLastSequence(*max_sequence);
}
status = versions_->LogAndApply(cfd, edit, &mutex_); status = versions_->LogAndApply(cfd, edit, &mutex_);
if (!status.ok()) { if (!status.ok()) {
return status; return status;
@ -4071,7 +4072,6 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
soptions.AdaptForLogWrite()); soptions.AdaptForLogWrite());
if (s.ok()) { if (s.ok()) {
lfile->SetPreallocationBlockSize(1.1 * max_write_buffer_size); lfile->SetPreallocationBlockSize(1.1 * max_write_buffer_size);
VersionEdit edit;
impl->logfile_number_ = new_log_number; impl->logfile_number_ = new_log_number;
impl->log_.reset(new log::Writer(std::move(lfile))); impl->log_.reset(new log::Writer(std::move(lfile)));

View File

@ -1998,35 +1998,34 @@ TEST(DBTest, RollLog) {
TEST(DBTest, WAL) { TEST(DBTest, WAL) {
do { do {
CreateAndReopenWithCF({"pikachu"});
WriteOptions writeOpt = WriteOptions(); WriteOptions writeOpt = WriteOptions();
writeOpt.disableWAL = true; writeOpt.disableWAL = true;
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1")); ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1"));
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1")); ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v1"));
ReopenWithColumnFamilies({"default", "pikachu"}); Reopen();
ASSERT_EQ("v1", Get(1, "foo")); ASSERT_EQ("v1", Get("foo"));
ASSERT_EQ("v1", Get(1, "bar")); ASSERT_EQ("v1", Get("bar"));
writeOpt.disableWAL = false; writeOpt.disableWAL = false;
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2")); ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v2"));
writeOpt.disableWAL = true; writeOpt.disableWAL = true;
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2")); ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v2"));
ReopenWithColumnFamilies({"default", "pikachu"}); Reopen();
// Both value's should be present. // Both value's should be present.
ASSERT_EQ("v2", Get(1, "bar")); ASSERT_EQ("v2", Get("bar"));
ASSERT_EQ("v2", Get(1, "foo")); ASSERT_EQ("v2", Get("foo"));
writeOpt.disableWAL = true; writeOpt.disableWAL = true;
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3")); ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v3"));
writeOpt.disableWAL = false; writeOpt.disableWAL = false;
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3")); ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v3"));
ReopenWithColumnFamilies({"default", "pikachu"}); Reopen();
// again both values should be present. // again both values should be present.
ASSERT_EQ("v3", Get(1, "foo")); ASSERT_EQ("v3", Get("foo"));
ASSERT_EQ("v3", Get(1, "bar")); ASSERT_EQ("v3", Get("bar"));
} while (ChangeCompactOptions()); } while (ChangeCompactOptions());
} }
@ -2131,7 +2130,7 @@ TEST(DBTest, FLUSH) {
ReopenWithColumnFamilies({"default", "pikachu"}); ReopenWithColumnFamilies({"default", "pikachu"});
ASSERT_EQ("v1", Get(1, "foo")); ASSERT_EQ("v1", Get(1, "foo"));
ASSERT_EQ("v1", Get(1, "bar")); ASSERT_EQ("NOT_FOUND", Get(1, "bar"));
writeOpt.disableWAL = true; writeOpt.disableWAL = true;
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2")); ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));

View File

@ -1771,15 +1771,20 @@ Status VersionSet::Recover(
} }
// remove the trailing '\n' // remove the trailing '\n'
manifest_filename.resize(manifest_filename.size() - 1); manifest_filename.resize(manifest_filename.size() - 1);
FileType type;
bool parse_ok =
ParseFileName(manifest_filename, &manifest_file_number_, &type);
if (!parse_ok || type != kDescriptorFile) {
return Status::Corruption("CURRENT file corrupted");
}
Log(options_->info_log, "Recovering from manifest file:%s\n", Log(options_->info_log, "Recovering from manifest file:%s\n",
manifest_filename.c_str()); manifest_filename.c_str());
manifest_filename = dbname_ + "/" + manifest_filename; manifest_filename = dbname_ + "/" + manifest_filename;
unique_ptr<SequentialFile> manifest_file; unique_ptr<SequentialFile> manifest_file;
s = env_->NewSequentialFile( s = env_->NewSequentialFile(manifest_filename, &manifest_file,
manifest_filename, &manifest_file, storage_options_ storage_options_);
);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -1988,7 +1993,6 @@ Status VersionSet::Recover(
} }
manifest_file_size_ = manifest_file_size; manifest_file_size_ = manifest_file_size;
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1; next_file_number_ = next_file + 1;
last_sequence_ = last_sequence; last_sequence_ = last_sequence;
prev_log_number_ = prev_log_number; prev_log_number_ = prev_log_number;
@ -2329,16 +2333,15 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
delete v; delete v;
} }
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1; next_file_number_ = next_file + 1;
last_sequence_ = last_sequence; last_sequence_ = last_sequence;
prev_log_number_ = prev_log_number; prev_log_number_ = prev_log_number;
printf( printf(
"manifest_file_number %lu next_file_number %lu last_sequence " "next_file_number %lu last_sequence "
"%lu prev_log_number %lu max_column_family %u\n", "%lu prev_log_number %lu max_column_family %u\n",
(unsigned long)manifest_file_number_, (unsigned long)next_file_number_, (unsigned long)next_file_number_, (unsigned long)last_sequence,
(unsigned long)last_sequence, (unsigned long)prev_log_number, (unsigned long)prev_log_number,
column_family_set_->GetMaxColumnFamily()); column_family_set_->GetMaxColumnFamily());
} }