Fix DeleteFile() + enable deleting files oldest files in level 0

Summary:
DeleteFile() call was broken for non-default column family. This fixes it. We might need this feature for mongo.

I also introduced a possibility of deleting oldest file in level 0.

Test Plan: added unit test to deletefile_test

Reviewers: ljin, yhchiang, rven, sdong

Reviewed By: sdong

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D24909
This commit is contained in:
Igor Canadi 2014-10-21 11:23:06 -07:00
parent 0d3145198e
commit 6398e6a6a5
2 changed files with 76 additions and 1 deletions

View File

@ -4542,7 +4542,7 @@ Status DBImpl::DeleteFile(std::string name) {
name.c_str()); name.c_str());
return Status::InvalidArgument("File not found"); return Status::InvalidArgument("File not found");
} }
assert((level > 0) && (level < cfd->NumberLevels())); assert(level < cfd->NumberLevels());
// If the file is being compacted no need to delete. // If the file is being compacted no need to delete.
if (metadata->being_compacted) { if (metadata->being_compacted) {
@ -4561,6 +4561,12 @@ Status DBImpl::DeleteFile(std::string name) {
return Status::InvalidArgument("File not in last level"); return Status::InvalidArgument("File not in last level");
} }
} }
// if level == 0, it has to be the oldest file
if (level == 0 &&
cfd->current()->files_[0].back()->fd.GetNumber() != number) {
return Status::InvalidArgument("File in level 0, but not oldest");
}
edit.SetColumnFamily(cfd->GetID());
edit.DeleteFile(level, number); edit.DeleteFile(level, number);
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_, db_directory_.get()); &edit, &mutex_, db_directory_.get());

View File

@ -287,6 +287,75 @@ TEST(DeleteFileTest, DeleteLogFiles) {
CloseDB(); CloseDB();
} }
TEST(DeleteFileTest, DeleteNonDefaultColumnFamily) {
CloseDB();
DBOptions db_options;
db_options.create_if_missing = true;
db_options.create_missing_column_families = true;
std::vector<ColumnFamilyDescriptor> column_families;
column_families.emplace_back();
column_families.emplace_back("new_cf", ColumnFamilyOptions());
std::vector<rocksdb::ColumnFamilyHandle*> handles;
rocksdb::DB* db;
ASSERT_OK(DB::Open(db_options, dbname_, column_families, &handles, &db));
Random rnd(5);
for (int i = 0; i < 1000; ++i) {
ASSERT_OK(db->Put(WriteOptions(), handles[1], test::RandomKey(&rnd, 10),
test::RandomKey(&rnd, 10)));
}
ASSERT_OK(db->Flush(FlushOptions(), handles[1]));
for (int i = 0; i < 1000; ++i) {
ASSERT_OK(db->Put(WriteOptions(), handles[1], test::RandomKey(&rnd, 10),
test::RandomKey(&rnd, 10)));
}
ASSERT_OK(db->Flush(FlushOptions(), handles[1]));
std::vector<LiveFileMetaData> metadata;
db->GetLiveFilesMetaData(&metadata);
ASSERT_EQ(2U, metadata.size());
ASSERT_EQ("new_cf", metadata[0].column_family_name);
ASSERT_EQ("new_cf", metadata[1].column_family_name);
auto old_file = metadata[0].smallest_seqno < metadata[1].smallest_seqno
? metadata[0].name
: metadata[1].name;
auto new_file = metadata[0].smallest_seqno > metadata[1].smallest_seqno
? metadata[0].name
: metadata[1].name;
ASSERT_TRUE(db->DeleteFile(new_file).IsInvalidArgument());
ASSERT_OK(db->DeleteFile(old_file));
{
std::unique_ptr<Iterator> itr(db->NewIterator(ReadOptions(), handles[1]));
int count = 0;
for (itr->SeekToFirst(); itr->Valid(); itr->Next()) {
ASSERT_OK(itr->status());
++count;
}
ASSERT_EQ(count, 1000);
}
delete handles[0];
delete handles[1];
delete db;
ASSERT_OK(DB::Open(db_options, dbname_, column_families, &handles, &db));
{
std::unique_ptr<Iterator> itr(db->NewIterator(ReadOptions(), handles[1]));
int count = 0;
for (itr->SeekToFirst(); itr->Valid(); itr->Next()) {
ASSERT_OK(itr->status());
++count;
}
ASSERT_EQ(count, 1000);
}
delete handles[0];
delete handles[1];
delete db;
}
} //namespace rocksdb } //namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {