rocksdb/utilities/checkpoint/checkpoint_test.cc

410 lines
13 KiB
C++
Raw Normal View History

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
// Syncpoint prevents us building and running tests in release
#ifndef ROCKSDB_LITE
#ifndef OS_WIN
#include <unistd.h>
#endif
#include <iostream>
#include <thread>
#include <utility>
#include "db/db_impl.h"
#include "port/stack_trace.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/utilities/checkpoint.h"
#include "util/sync_point.h"
#include "util/testharness.h"
#include "util/xfunc.h"
namespace rocksdb {
class DBTest : public testing::Test {
protected:
// Sequence of option configurations to try
enum OptionConfig {
kDefault = 0,
};
int option_config_;
public:
std::string dbname_;
std::string alternative_wal_dir_;
Env* env_;
DB* db_;
Options last_options_;
std::vector<ColumnFamilyHandle*> handles_;
DBTest() : env_(Env::Default()) {
env_->SetBackgroundThreads(1, Env::LOW);
env_->SetBackgroundThreads(1, Env::HIGH);
dbname_ = test::TmpDir(env_) + "/db_test";
alternative_wal_dir_ = dbname_ + "/wal";
auto options = CurrentOptions();
auto delete_options = options;
delete_options.wal_dir = alternative_wal_dir_;
EXPECT_OK(DestroyDB(dbname_, delete_options));
// Destroy it for not alternative WAL dir is used.
EXPECT_OK(DestroyDB(dbname_, options));
db_ = nullptr;
Reopen(options);
}
~DBTest() {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->LoadDependency({});
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
Options options;
options.db_paths.emplace_back(dbname_, 0);
options.db_paths.emplace_back(dbname_ + "_2", 0);
options.db_paths.emplace_back(dbname_ + "_3", 0);
options.db_paths.emplace_back(dbname_ + "_4", 0);
EXPECT_OK(DestroyDB(dbname_, options));
}
// Return the current option configuration.
Options CurrentOptions() {
Options options;
options.env = env_;
options.create_if_missing = true;
return options;
}
void CreateColumnFamilies(const std::vector<std::string>& cfs,
const Options& options) {
ColumnFamilyOptions cf_opts(options);
size_t cfi = handles_.size();
handles_.resize(cfi + cfs.size());
for (auto cf : cfs) {
ASSERT_OK(db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++]));
}
}
void CreateAndReopenWithCF(const std::vector<std::string>& cfs,
const Options& options) {
CreateColumnFamilies(cfs, options);
std::vector<std::string> cfs_plus_default = cfs;
cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
ReopenWithColumnFamilies(cfs_plus_default, options);
}
void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
const std::vector<Options>& options) {
ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
}
void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
const Options& options) {
ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
}
Status TryReopenWithColumnFamilies(
const std::vector<std::string>& cfs,
const std::vector<Options>& options) {
Close();
EXPECT_EQ(cfs.size(), options.size());
std::vector<ColumnFamilyDescriptor> column_families;
for (size_t i = 0; i < cfs.size(); ++i) {
column_families.push_back(ColumnFamilyDescriptor(cfs[i], options[i]));
}
DBOptions db_opts = DBOptions(options[0]);
return DB::Open(db_opts, dbname_, column_families, &handles_, &db_);
}
Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
const Options& options) {
Close();
std::vector<Options> v_opts(cfs.size(), options);
return TryReopenWithColumnFamilies(cfs, v_opts);
}
void Reopen(const Options& options) {
ASSERT_OK(TryReopen(options));
}
void Close() {
for (auto h : handles_) {
delete h;
}
handles_.clear();
delete db_;
db_ = nullptr;
}
void DestroyAndReopen(const Options& options) {
// Destroy using last options
Destroy(last_options_);
ASSERT_OK(TryReopen(options));
}
void Destroy(const Options& options) {
Close();
ASSERT_OK(DestroyDB(dbname_, options));
}
Status ReadOnlyReopen(const Options& options) {
return DB::OpenForReadOnly(options, dbname_, &db_);
}
Status TryReopen(const Options& options) {
Close();
last_options_ = options;
return DB::Open(options, dbname_, &db_);
}
Status Flush(int cf = 0) {
if (cf == 0) {
return db_->Flush(FlushOptions());
} else {
return db_->Flush(FlushOptions(), handles_[cf]);
}
}
Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()) {
return db_->Put(wo, k, v);
}
Status Put(int cf, const Slice& k, const Slice& v,
WriteOptions wo = WriteOptions()) {
return db_->Put(wo, handles_[cf], k, v);
}
Status Delete(const std::string& k) {
return db_->Delete(WriteOptions(), k);
}
Status Delete(int cf, const std::string& k) {
return db_->Delete(WriteOptions(), handles_[cf], k);
}
std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) {
ReadOptions options;
options.verify_checksums = true;
options.snapshot = snapshot;
std::string result;
Status s = db_->Get(options, k, &result);
if (s.IsNotFound()) {
result = "NOT_FOUND";
} else if (!s.ok()) {
result = s.ToString();
}
return result;
}
std::string Get(int cf, const std::string& k,
const Snapshot* snapshot = nullptr) {
ReadOptions options;
options.verify_checksums = true;
options.snapshot = snapshot;
std::string result;
Status s = db_->Get(options, handles_[cf], k, &result);
if (s.IsNotFound()) {
result = "NOT_FOUND";
} else if (!s.ok()) {
result = s.ToString();
}
return result;
}
};
TEST_F(DBTest, GetSnapshotLink) {
Options options;
const std::string snapshot_name = test::TmpDir(env_) + "/snapshot";
DB* snapshotDB;
ReadOptions roptions;
std::string result;
Checkpoint* checkpoint;
options = CurrentOptions();
delete db_;
db_ = nullptr;
ASSERT_OK(DestroyDB(dbname_, options));
ASSERT_OK(DestroyDB(snapshot_name, options));
env_->DeleteDir(snapshot_name);
// Create a database
Status s;
options.create_if_missing = true;
ASSERT_OK(DB::Open(options, dbname_, &db_));
std::string key = std::string("foo");
ASSERT_OK(Put(key, "v1"));
// Take a snapshot
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name));
ASSERT_OK(Put(key, "v2"));
ASSERT_EQ("v2", Get(key));
ASSERT_OK(Flush());
ASSERT_EQ("v2", Get(key));
// Open snapshot and verify contents while DB is running
options.create_if_missing = false;
ASSERT_OK(DB::Open(options, snapshot_name, &snapshotDB));
ASSERT_OK(snapshotDB->Get(roptions, key, &result));
ASSERT_EQ("v1", result);
delete snapshotDB;
snapshotDB = nullptr;
delete db_;
db_ = nullptr;
// Destroy original DB
ASSERT_OK(DestroyDB(dbname_, options));
// Open snapshot and verify contents
options.create_if_missing = false;
dbname_ = snapshot_name;
ASSERT_OK(DB::Open(options, dbname_, &db_));
ASSERT_EQ("v1", Get(key));
delete db_;
db_ = nullptr;
ASSERT_OK(DestroyDB(dbname_, options));
delete checkpoint;
// Restore DB name
dbname_ = test::TmpDir(env_) + "/db_test";
}
TEST_F(DBTest, CheckpointCF) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options);
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBTest::CheckpointCF:2",
"DBImpl::GetLiveFiles:2"},
{"DBImpl::GetLiveFiles:1",
"DBTest::CheckpointCF:1"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put(0, "Default", "Default"));
ASSERT_OK(Put(1, "one", "one"));
ASSERT_OK(Put(2, "two", "two"));
ASSERT_OK(Put(3, "three", "three"));
ASSERT_OK(Put(4, "four", "four"));
ASSERT_OK(Put(5, "five", "five"));
const std::string snapshot_name = test::TmpDir(env_) + "/snapshot";
DB* snapshotDB;
ReadOptions roptions;
std::string result;
std::vector<ColumnFamilyHandle*> cphandles;
ASSERT_OK(DestroyDB(snapshot_name, options));
env_->DeleteDir(snapshot_name);
Status s;
// Take a snapshot
std::thread t([&]() {
Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name));
delete checkpoint;
});
TEST_SYNC_POINT("DBTest::CheckpointCF:1");
ASSERT_OK(Put(0, "Default", "Default1"));
ASSERT_OK(Put(1, "one", "eleven"));
ASSERT_OK(Put(2, "two", "twelve"));
ASSERT_OK(Put(3, "three", "thirteen"));
ASSERT_OK(Put(4, "four", "fourteen"));
ASSERT_OK(Put(5, "five", "fifteen"));
TEST_SYNC_POINT("DBTest::CheckpointCF:2");
t.join();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
ASSERT_OK(Put(1, "one", "twentyone"));
ASSERT_OK(Put(2, "two", "twentytwo"));
ASSERT_OK(Put(3, "three", "twentythree"));
ASSERT_OK(Put(4, "four", "twentyfour"));
ASSERT_OK(Put(5, "five", "twentyfive"));
ASSERT_OK(Flush());
// Open snapshot and verify contents while DB is running
options.create_if_missing = false;
std::vector<std::string> cfs;
cfs= {kDefaultColumnFamilyName, "one", "two", "three", "four", "five"};
std::vector<ColumnFamilyDescriptor> column_families;
for (size_t i = 0; i < cfs.size(); ++i) {
column_families.push_back(ColumnFamilyDescriptor(cfs[i], options));
}
ASSERT_OK(DB::Open(options, snapshot_name,
column_families, &cphandles, &snapshotDB));
ASSERT_OK(snapshotDB->Get(roptions, cphandles[0], "Default", &result));
ASSERT_EQ("Default1", result);
ASSERT_OK(snapshotDB->Get(roptions, cphandles[1], "one", &result));
ASSERT_EQ("eleven", result);
ASSERT_OK(snapshotDB->Get(roptions, cphandles[2], "two", &result));
for (auto h : cphandles) {
delete h;
}
cphandles.clear();
delete snapshotDB;
snapshotDB = nullptr;
ASSERT_OK(DestroyDB(snapshot_name, options));
}
TEST_F(DBTest, CurrentFileModifiedWhileCheckpointing) {
const std::string kSnapshotName = test::TmpDir(env_) + "/snapshot";
ASSERT_OK(DestroyDB(kSnapshotName, CurrentOptions()));
env_->DeleteDir(kSnapshotName);
Options options = CurrentOptions();
options.max_manifest_file_size = 0; // always rollover manifest for file add
Reopen(options);
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{// Get past the flush in the checkpoint thread before adding any keys to
// the db so the checkpoint thread won't hit the WriteManifest
// syncpoints.
{"DBImpl::GetLiveFiles:1",
"DBTest::CurrentFileModifiedWhileCheckpointing:PrePut"},
// Roll the manifest during checkpointing right after live files are
// snapshotted.
{"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1",
"VersionSet::LogAndApply:WriteManifest"},
{"VersionSet::LogAndApply:WriteManifestDone",
"CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread t([&]() {
Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
ASSERT_OK(checkpoint->CreateCheckpoint(kSnapshotName));
delete checkpoint;
});
TEST_SYNC_POINT("DBTest::CurrentFileModifiedWhileCheckpointing:PrePut");
ASSERT_OK(Put("Default", "Default1"));
ASSERT_OK(Flush());
t.join();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
DB* snapshotDB;
// Successful Open() implies that CURRENT pointed to the manifest in the
// checkpoint.
ASSERT_OK(DB::Open(options, kSnapshotName, &snapshotDB));
delete snapshotDB;
snapshotDB = nullptr;
}
} // namespace rocksdb
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#else
#include <stdio.h>
int main(int argc, char** argv) {
fprintf(stderr, "SKIPPED as Checkpoint is not supported in ROCKSDB_LITE\n");
return 0;
}
#endif // !ROCKSDB_LITE