rocksdb/db/compact_files_test.cc
omegaga c3a4bea5dc Fix flaky test ObsoleteFiles
Summary: The test `ObsoleteFiles` failed occasionally on slow device. This problem appears on Travis CI several times. The reason is that we did not wait until compaction jobs are finished in the test, while in slower device the background jobs take longer time to finish.

Test Plan: Pass existing tests.

Reviewers: yiwu, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D61479
2016-08-03 15:19:35 -07:00

230 lines
6.7 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef ROCKSDB_LITE
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include "db/db_impl.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "util/testharness.h"
namespace rocksdb {
class CompactFilesTest : public testing::Test {
public:
CompactFilesTest() {
env_ = Env::Default();
db_name_ = test::TmpDir(env_) + "/compact_files_test";
}
std::string db_name_;
Env* env_;
};
// A class which remembers the name of each flushed file.
class FlushedFileCollector : public EventListener {
public:
FlushedFileCollector() {}
~FlushedFileCollector() {}
virtual void OnFlushCompleted(
DB* db, const FlushJobInfo& info) override {
std::lock_guard<std::mutex> lock(mutex_);
flushed_files_.push_back(info.file_path);
}
std::vector<std::string> GetFlushedFiles() {
std::lock_guard<std::mutex> lock(mutex_);
std::vector<std::string> result;
for (auto fname : flushed_files_) {
result.push_back(fname);
}
return result;
}
private:
std::vector<std::string> flushed_files_;
std::mutex mutex_;
};
TEST_F(CompactFilesTest, L0ConflictsFiles) {
Options options;
// to trigger compaction more easily
const int kWriteBufferSize = 10000;
const int kLevel0Trigger = 2;
options.create_if_missing = true;
options.compaction_style = kCompactionStyleLevel;
// Small slowdown and stop trigger for experimental purpose.
options.level0_slowdown_writes_trigger = 20;
options.level0_stop_writes_trigger = 20;
options.level0_stop_writes_trigger = 20;
options.write_buffer_size = kWriteBufferSize;
options.level0_file_num_compaction_trigger = kLevel0Trigger;
options.compression = kNoCompression;
DB* db = nullptr;
DestroyDB(db_name_, options);
Status s = DB::Open(options, db_name_, &db);
assert(s.ok());
assert(db);
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"CompactFilesImpl:0", "BackgroundCallCompaction:0"},
{"BackgroundCallCompaction:1", "CompactFilesImpl:1"},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// create couple files
// Background compaction starts and waits in BackgroundCallCompaction:0
for (int i = 0; i < kLevel0Trigger * 4; ++i) {
db->Put(WriteOptions(), ToString(i), "");
db->Put(WriteOptions(), ToString(100 - i), "");
db->Flush(FlushOptions());
}
rocksdb::ColumnFamilyMetaData meta;
db->GetColumnFamilyMetaData(&meta);
std::string file1;
for (auto& file : meta.levels[0].files) {
ASSERT_EQ(0, meta.levels[0].level);
if (file1 == "") {
file1 = file.db_path + "/" + file.name;
} else {
std::string file2 = file.db_path + "/" + file.name;
// Another thread starts a compact files and creates an L0 compaction
// The background compaction then notices that there is an L0 compaction
// already in progress and doesn't do an L0 compaction
// Once the background compaction finishes, the compact files finishes
ASSERT_OK(
db->CompactFiles(rocksdb::CompactionOptions(), {file1, file2}, 0));
break;
}
}
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
delete db;
}
TEST_F(CompactFilesTest, ObsoleteFiles) {
Options options;
// to trigger compaction more easily
const int kWriteBufferSize = 10000;
options.create_if_missing = true;
// Disable RocksDB background compaction.
options.compaction_style = kCompactionStyleNone;
// Small slowdown and stop trigger for experimental purpose.
options.level0_slowdown_writes_trigger = 20;
options.level0_stop_writes_trigger = 20;
options.write_buffer_size = kWriteBufferSize;
options.max_write_buffer_number = 2;
options.compression = kNoCompression;
// Add listener
FlushedFileCollector* collector = new FlushedFileCollector();
options.listeners.emplace_back(collector);
DB* db = nullptr;
DestroyDB(db_name_, options);
Status s = DB::Open(options, db_name_, &db);
assert(s.ok());
assert(db);
// create couple files
for (int i = 1000; i < 2000; ++i) {
db->Put(WriteOptions(), ToString(i),
std::string(kWriteBufferSize / 10, 'a' + (i % 26)));
}
auto l0_files = collector->GetFlushedFiles();
ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1));
reinterpret_cast<DBImpl*>(db)->TEST_WaitForCompact();
// verify all compaction input files are deleted
for (auto fname : l0_files) {
ASSERT_EQ(Status::NotFound(), env_->FileExists(fname));
}
delete db;
}
TEST_F(CompactFilesTest, CapturingPendingFiles) {
Options options;
options.create_if_missing = true;
// Disable RocksDB background compaction.
options.compaction_style = kCompactionStyleNone;
// Always do full scans for obsolete files (needed to reproduce the issue).
options.delete_obsolete_files_period_micros = 0;
// Add listener.
FlushedFileCollector* collector = new FlushedFileCollector();
options.listeners.emplace_back(collector);
DB* db = nullptr;
DestroyDB(db_name_, options);
Status s = DB::Open(options, db_name_, &db);
assert(s.ok());
assert(db);
// Create 5 files.
for (int i = 0; i < 5; ++i) {
db->Put(WriteOptions(), "key" + ToString(i), "value");
db->Flush(FlushOptions());
}
auto l0_files = collector->GetFlushedFiles();
EXPECT_EQ(5, l0_files.size());
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"CompactFilesImpl:2", "CompactFilesTest.CapturingPendingFiles:0"},
{"CompactFilesTest.CapturingPendingFiles:1", "CompactFilesImpl:3"},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// Start compacting files.
std::thread compaction_thread(
[&] { EXPECT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1)); });
// In the meantime flush another file.
TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:0");
db->Put(WriteOptions(), "key5", "value");
db->Flush(FlushOptions());
TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:1");
compaction_thread.join();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
delete db;
// Make sure we can reopen the DB.
s = DB::Open(options, db_name_, &db);
ASSERT_TRUE(s.ok());
assert(db);
delete db;
}
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#else
#include <stdio.h>
int main(int argc, char** argv) {
fprintf(stderr,
"SKIPPED as DBImpl::CompactFiles is not supported in ROCKSDB_LITE\n");
return 0;
}
#endif // !ROCKSDB_LITE