Fix CompactFiles() bug when used with CompactionFilter using SuperVersion

Summary:
GetAndRefSuperVersion() should not be called again in the same thread before ReturnAndCleanupSuperVersion() is called.

If we have a compaction filter that is using DB::Get, This will happen
```
CompactFiles() {
  GetAndRefSuperVersion() // -- first call
    ..
    CompactionFilter() {
      GetAndRefSuperVersion() // -- second call
      ReturnAndCleanupSuperVersion()
    }
    ..
  ReturnAndCleanupSuperVersion()
}
```

We solve this issue in the same way Iterator is solving it, but using GetReferencedSuperVersion()

This was discovered in https://github.com/facebook/mysql-5.6/issues/427 by alxyang
Closes https://github.com/facebook/rocksdb/pull/1803

Differential Revision: D4460155

Pulled By: IslamAbdelRahman

fbshipit-source-id: 5e54322
This commit is contained in:
Islam AbdelRahman 2017-01-25 14:03:49 -08:00 committed by Alex Yang
parent a7a2005873
commit d4b0ac2760
2 changed files with 62 additions and 2 deletions

View File

@ -253,6 +253,61 @@ TEST_F(CompactFilesTest, CapturingPendingFiles) {
delete db;
}
TEST_F(CompactFilesTest, CompactionFilterWithGetSv) {
class FilterWithGet : public CompactionFilter {
public:
virtual bool Filter(int level, const Slice& key, const Slice& value,
std::string* new_value,
bool* value_changed) const override {
if (db_ == nullptr) {
return true;
}
std::string res;
db_->Get(ReadOptions(), "", &res);
return true;
}
void SetDB(DB* db) {
db_ = db;
}
virtual const char* Name() const override { return "FilterWithGet"; }
private:
DB* db_;
};
std::shared_ptr<FilterWithGet> cf(new FilterWithGet());
Options options;
options.create_if_missing = true;
options.compaction_filter = cf.get();
DB* db = nullptr;
DestroyDB(db_name_, options);
Status s = DB::Open(options, db_name_, &db);
ASSERT_OK(s);
cf->SetDB(db);
// Write one L0 file
db->Put(WriteOptions(), "K1", "V1");
db->Flush(FlushOptions());
// Compact all L0 files using CompactFiles
rocksdb::ColumnFamilyMetaData meta;
db->GetColumnFamilyMetaData(&meta);
for (auto& file : meta.levels[0].files) {
std::string fname = file.db_path + "/" + file.name;
ASSERT_OK(
db->CompactFiles(rocksdb::CompactionOptions(), {fname}, 0));
}
delete db;
}
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -2149,7 +2149,7 @@ Status DBImpl::CompactFiles(
immutable_db_options_.info_log.get());
// Perform CompactFiles
SuperVersion* sv = GetAndRefSuperVersion(cfd);
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
{
InstrumentedMutexLock l(&mutex_);
@ -2161,7 +2161,12 @@ Status DBImpl::CompactFiles(
input_file_names, output_level,
output_path_id, &job_context, &log_buffer);
}
ReturnAndCleanupSuperVersion(cfd, sv);
if (sv->Unref()) {
mutex_.Lock();
sv->Cleanup();
mutex_.Unlock();
delete sv;
}
// Find and delete obsolete files
{