Allow checkpointing without flushing
Summary: Add a parameter to Checkpoint::CreateCheckpoint() so that flush can be skipped if total log file size is within a threshold. Closes https://github.com/facebook/rocksdb/pull/1993 Differential Revision: D4719842 Pulled By: siying fbshipit-source-id: 4f9d9e1
This commit is contained in:
parent
93989b0ae8
commit
47177be34e
@ -27,7 +27,14 @@ class Checkpoint {
|
||||
// (2) a copied manifest files and other files
|
||||
// The directory should not already exist and will be created by this API.
|
||||
// The directory will be an absolute path
|
||||
virtual Status CreateCheckpoint(const std::string& checkpoint_dir);
|
||||
// log_size_for_flush: if the total log file size is equal or larger than
|
||||
// this value, then a flush is triggered for all the column families. The
|
||||
// default value is 0, which means flush is always triggered. If you move
|
||||
// away from the default, the checkpoint may not contain up-to-date data
|
||||
// if WAL writing is not always enabled.
|
||||
// Flush will always trigger if it is 2PC.
|
||||
virtual Status CreateCheckpoint(const std::string& checkpoint_dir,
|
||||
uint64_t log_size_for_flush = 0);
|
||||
|
||||
virtual ~Checkpoint() {}
|
||||
};
|
||||
|
@ -42,7 +42,8 @@ class CheckpointImpl : public Checkpoint {
|
||||
// The directory should not already exist and will be created by this API.
|
||||
// The directory will be an absolute path
|
||||
using Checkpoint::CreateCheckpoint;
|
||||
virtual Status CreateCheckpoint(const std::string& checkpoint_dir) override;
|
||||
virtual Status CreateCheckpoint(const std::string& checkpoint_dir,
|
||||
uint64_t log_size_for_flush) override;
|
||||
|
||||
private:
|
||||
DB* db_;
|
||||
@ -53,12 +54,14 @@ Status Checkpoint::Create(DB* db, Checkpoint** checkpoint_ptr) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status Checkpoint::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
Status Checkpoint::CreateCheckpoint(const std::string& checkpoint_dir,
|
||||
uint64_t log_size_for_flush) {
|
||||
return Status::NotSupported("");
|
||||
}
|
||||
|
||||
// Builds an openable snapshot of RocksDB
|
||||
Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
|
||||
uint64_t log_size_for_flush) {
|
||||
Status s;
|
||||
std::vector<std::string> live_files;
|
||||
uint64_t manifest_file_size = 0;
|
||||
@ -77,9 +80,32 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
}
|
||||
|
||||
s = db_->DisableFileDeletions();
|
||||
bool flush_memtable = true;
|
||||
if (s.ok()) {
|
||||
if (!db_options.allow_2pc) {
|
||||
// If out standing log files are small, we skip the flush.
|
||||
s = db_->GetSortedWalFiles(live_wal_files);
|
||||
|
||||
if (!s.ok()) {
|
||||
db_->EnableFileDeletions(false);
|
||||
return s;
|
||||
}
|
||||
|
||||
// Don't flush column families if total log size is smaller than
|
||||
// log_size_for_flush. We copy the log files instead.
|
||||
// We may be able to cover 2PC case too.
|
||||
uint64_t total_wal_size = 0;
|
||||
for (auto& wal : live_wal_files) {
|
||||
total_wal_size += wal->SizeFileBytes();
|
||||
}
|
||||
if (total_wal_size < log_size_for_flush) {
|
||||
flush_memtable = false;
|
||||
}
|
||||
live_wal_files.clear();
|
||||
}
|
||||
|
||||
// this will return live_files prefixed with "/"
|
||||
s = db_->GetLiveFiles(live_files, &manifest_file_size);
|
||||
s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
|
||||
|
||||
if (s.ok() && db_options.allow_2pc) {
|
||||
// If 2PC is enabled, we need to get minimum log number after the flush.
|
||||
@ -188,7 +214,8 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
// that has changes after the last flush.
|
||||
for (size_t i = 0; s.ok() && i < wal_size; ++i) {
|
||||
if ((live_wal_files[i]->Type() == kAliveLogFile) &&
|
||||
(live_wal_files[i]->StartSequence() >= sequence_number ||
|
||||
(!flush_memtable ||
|
||||
live_wal_files[i]->StartSequence() >= sequence_number ||
|
||||
live_wal_files[i]->LogNumber() >= min_log_num)) {
|
||||
if (i + 1 == wal_size) {
|
||||
Log(db_options.info_log, "Copying %s",
|
||||
|
@ -217,6 +217,7 @@ class CheckpointTest : public testing::Test {
|
||||
};
|
||||
|
||||
TEST_F(CheckpointTest, GetSnapshotLink) {
|
||||
for (uint64_t log_size_for_fush : {0, 1000000}) {
|
||||
Options options;
|
||||
const std::string snapshot_name = test::TmpDir(env_) + "/snapshot";
|
||||
DB* snapshotDB;
|
||||
@ -239,7 +240,7 @@ TEST_F(CheckpointTest, GetSnapshotLink) {
|
||||
ASSERT_OK(Put(key, "v1"));
|
||||
// Take a snapshot
|
||||
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
|
||||
ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name));
|
||||
ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name, log_size_for_fush));
|
||||
ASSERT_OK(Put(key, "v2"));
|
||||
ASSERT_EQ("v2", Get(key));
|
||||
ASSERT_OK(Flush());
|
||||
@ -269,6 +270,7 @@ TEST_F(CheckpointTest, GetSnapshotLink) {
|
||||
|
||||
// Restore DB name
|
||||
dbname_ = test::TmpDir(env_) + "/db_test";
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(CheckpointTest, CheckpointCF) {
|
||||
@ -345,6 +347,71 @@ TEST_F(CheckpointTest, CheckpointCF) {
|
||||
ASSERT_OK(DestroyDB(snapshot_name, options));
|
||||
}
|
||||
|
||||
TEST_F(CheckpointTest, CheckpointCFNoFlush) {
|
||||
Options options = CurrentOptions();
|
||||
CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options);
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ASSERT_OK(Put(0, "Default", "Default"));
|
||||
ASSERT_OK(Put(1, "one", "one"));
|
||||
Flush();
|
||||
ASSERT_OK(Put(2, "two", "two"));
|
||||
|
||||
const std::string snapshot_name = test::TmpDir(env_) + "/snapshot";
|
||||
DB* snapshotDB;
|
||||
ReadOptions roptions;
|
||||
std::string result;
|
||||
std::vector<ColumnFamilyHandle*> cphandles;
|
||||
|
||||
ASSERT_OK(DestroyDB(snapshot_name, options));
|
||||
env_->DeleteDir(snapshot_name);
|
||||
|
||||
Status s;
|
||||
// Take a snapshot
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::BackgroundCallFlush:start", [&](void* arg) {
|
||||
// Flush should never trigger.
|
||||
ASSERT_TRUE(false);
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
Checkpoint* checkpoint;
|
||||
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
|
||||
ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name, 1000000));
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
||||
delete checkpoint;
|
||||
ASSERT_OK(Put(1, "one", "two"));
|
||||
ASSERT_OK(Flush(1));
|
||||
ASSERT_OK(Put(2, "two", "twentytwo"));
|
||||
Close();
|
||||
EXPECT_OK(DestroyDB(dbname_, options));
|
||||
|
||||
// Open snapshot and verify contents while DB is running
|
||||
options.create_if_missing = false;
|
||||
std::vector<std::string> cfs;
|
||||
cfs = {kDefaultColumnFamilyName, "one", "two", "three", "four", "five"};
|
||||
std::vector<ColumnFamilyDescriptor> column_families;
|
||||
for (size_t i = 0; i < cfs.size(); ++i) {
|
||||
column_families.push_back(ColumnFamilyDescriptor(cfs[i], options));
|
||||
}
|
||||
ASSERT_OK(DB::Open(options, snapshot_name, column_families, &cphandles,
|
||||
&snapshotDB));
|
||||
ASSERT_OK(snapshotDB->Get(roptions, cphandles[0], "Default", &result));
|
||||
ASSERT_EQ("Default", result);
|
||||
ASSERT_OK(snapshotDB->Get(roptions, cphandles[1], "one", &result));
|
||||
ASSERT_EQ("one", result);
|
||||
ASSERT_OK(snapshotDB->Get(roptions, cphandles[2], "two", &result));
|
||||
ASSERT_EQ("two", result);
|
||||
for (auto h : cphandles) {
|
||||
delete h;
|
||||
}
|
||||
cphandles.clear();
|
||||
delete snapshotDB;
|
||||
snapshotDB = nullptr;
|
||||
ASSERT_OK(DestroyDB(snapshot_name, options));
|
||||
}
|
||||
|
||||
TEST_F(CheckpointTest, CurrentFileModifiedWhileCheckpointing) {
|
||||
const std::string kSnapshotName = test::TmpDir(env_) + "/snapshot";
|
||||
ASSERT_OK(DestroyDB(kSnapshotName, CurrentOptions()));
|
||||
|
Loading…
Reference in New Issue
Block a user