Revert checkpoint fix (#8607)
Summary: PR https://github.com/facebook/rocksdb/pull/8572 looses custom types in the options file. Need more API changes to fix this issue. Revert this PR. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8607 Reviewed By: ajkr Differential Revision: D30058289 Pulled By: autopear fbshipit-source-id: 78f5a154c0bf193e8441bae4a36fa79b95277fd4
This commit is contained in:
parent
3f7e929865
commit
4811115b3e
@ -40,13 +40,9 @@ class Checkpoint {
|
||||
// sequence_number_ptr: if it is not nullptr, the value it points to will be
|
||||
// set to the DB's sequence number. The default value of this parameter is
|
||||
// nullptr.
|
||||
// db_log_dir / wal_dir: override db_log_dir or wal_dir option in the
|
||||
// snapshot. If empty, checkpoint_dir will be used.
|
||||
virtual Status CreateCheckpoint(const std::string& checkpoint_dir,
|
||||
uint64_t log_size_for_flush = 0,
|
||||
uint64_t* sequence_number_ptr = nullptr,
|
||||
const std::string& db_log_dir = "",
|
||||
const std::string& wal_dir = "");
|
||||
uint64_t* sequence_number_ptr = nullptr);
|
||||
|
||||
// Exports all live SST files of a specified Column Family onto export_dir,
|
||||
// returning SST files information in metadata.
|
||||
|
@ -20,14 +20,12 @@
|
||||
#include "db/wal_manager.h"
|
||||
#include "file/file_util.h"
|
||||
#include "file/filename.h"
|
||||
#include "options/options_parser.h"
|
||||
#include "port/port.h"
|
||||
#include "rocksdb/db.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/metadata.h"
|
||||
#include "rocksdb/transaction_log.h"
|
||||
#include "rocksdb/utilities/checkpoint.h"
|
||||
#include "rocksdb/utilities/options_util.h"
|
||||
#include "test_util/sync_point.h"
|
||||
#include "util/cast_util.h"
|
||||
#include "util/file_checksum_helper.h"
|
||||
@ -41,21 +39,19 @@ Status Checkpoint::Create(DB* db, Checkpoint** checkpoint_ptr) {
|
||||
|
||||
Status Checkpoint::CreateCheckpoint(const std::string& /*checkpoint_dir*/,
|
||||
uint64_t /*log_size_for_flush*/,
|
||||
uint64_t* /*sequence_number_ptr*/,
|
||||
const std::string& /*db_log_dir*/,
|
||||
const std::string& /*wal_dir*/) {
|
||||
uint64_t* /*sequence_number_ptr*/) {
|
||||
return Status::NotSupported("");
|
||||
}
|
||||
|
||||
void CheckpointImpl::CleanStagingDirectory(
|
||||
const std::string& full_private_path, Logger* info_log) {
|
||||
std::vector<std::string> subchildren;
|
||||
void CheckpointImpl::CleanStagingDirectory(const std::string& full_private_path,
|
||||
Logger* info_log) {
|
||||
std::vector<std::string> subchildren;
|
||||
Status s = db_->GetEnv()->FileExists(full_private_path);
|
||||
if (s.IsNotFound()) {
|
||||
return;
|
||||
}
|
||||
ROCKS_LOG_INFO(info_log, "File exists %s -- %s",
|
||||
full_private_path.c_str(), s.ToString().c_str());
|
||||
ROCKS_LOG_INFO(info_log, "File exists %s -- %s", full_private_path.c_str(),
|
||||
s.ToString().c_str());
|
||||
s = db_->GetEnv()->GetChildren(full_private_path, &subchildren);
|
||||
if (s.ok()) {
|
||||
for (auto& subchild : subchildren) {
|
||||
@ -67,8 +63,8 @@ void CheckpointImpl::CleanStagingDirectory(
|
||||
}
|
||||
// finally delete the private dir
|
||||
s = db_->GetEnv()->DeleteDir(full_private_path);
|
||||
ROCKS_LOG_INFO(info_log, "Delete dir %s -- %s",
|
||||
full_private_path.c_str(), s.ToString().c_str());
|
||||
ROCKS_LOG_INFO(info_log, "Delete dir %s -- %s", full_private_path.c_str(),
|
||||
s.ToString().c_str());
|
||||
}
|
||||
|
||||
Status Checkpoint::ExportColumnFamily(
|
||||
@ -80,9 +76,7 @@ Status Checkpoint::ExportColumnFamily(
|
||||
// Builds an openable snapshot of RocksDB
|
||||
Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
|
||||
uint64_t log_size_for_flush,
|
||||
uint64_t* sequence_number_ptr,
|
||||
const std::string& db_log_dir,
|
||||
const std::string& wal_dir) {
|
||||
uint64_t* sequence_number_ptr) {
|
||||
DBOptions db_options = db_->GetDBOptions();
|
||||
|
||||
Status s = db_->GetEnv()->FileExists(checkpoint_dir);
|
||||
@ -107,54 +101,14 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
|
||||
return Status::InvalidArgument("invalid checkpoint directory name");
|
||||
}
|
||||
|
||||
std::string parsed_checkpoint_dir =
|
||||
checkpoint_dir.substr(0, final_nonslash_idx + 1);
|
||||
std::string full_private_path = parsed_checkpoint_dir + ".tmp";
|
||||
std::string full_private_path =
|
||||
checkpoint_dir.substr(0, final_nonslash_idx + 1) + ".tmp";
|
||||
ROCKS_LOG_INFO(db_options.info_log,
|
||||
"Snapshot process -- using temporary directory %s",
|
||||
full_private_path.c_str());
|
||||
CleanStagingDirectory(full_private_path, db_options.info_log.get());
|
||||
// create snapshot directory
|
||||
s = db_->GetEnv()->CreateDir(full_private_path);
|
||||
|
||||
// Remove the last `/`s if needed
|
||||
std::string parsed_log_dir =
|
||||
db_log_dir.empty()
|
||||
? ""
|
||||
: db_log_dir.substr(0, db_log_dir.find_last_not_of('/') + 1);
|
||||
std::string parsed_wal_dir =
|
||||
wal_dir.empty() ? ""
|
||||
: wal_dir.substr(0, wal_dir.find_last_not_of('/') + 1);
|
||||
|
||||
// Info log files are not copied or linked, just update the option value.
|
||||
std::string value_log_dir = parsed_log_dir == db_->GetName() ||
|
||||
parsed_log_dir == parsed_checkpoint_dir
|
||||
? ""
|
||||
: parsed_log_dir;
|
||||
|
||||
// If the wal_dir is empty, or the same as the source db dir, update the
|
||||
// option value to the checkpoint dir.
|
||||
std::string value_wal_dir; // Option value to override
|
||||
std::string new_wal_dir; // The target location to copy/link WAL files
|
||||
if (parsed_wal_dir.empty() || parsed_wal_dir == db_->GetName() ||
|
||||
parsed_wal_dir == parsed_checkpoint_dir) {
|
||||
value_wal_dir = parsed_checkpoint_dir;
|
||||
new_wal_dir = full_private_path; // Copy to the temp dir
|
||||
} else {
|
||||
value_wal_dir = parsed_wal_dir;
|
||||
std::string prefix = parsed_checkpoint_dir + "/";
|
||||
// If checkpoint_dir is parent of wal_dir, create the wal dir inside the tmp
|
||||
// dir; otherwise, create it directly.
|
||||
new_wal_dir =
|
||||
parsed_wal_dir.rfind(prefix, 0) == 0
|
||||
? full_private_path + "/" + parsed_wal_dir.substr(prefix.size())
|
||||
: parsed_wal_dir;
|
||||
s = db_->GetEnv()->FileExists(new_wal_dir);
|
||||
if (s.IsNotFound()) {
|
||||
s = db_->GetEnv()->CreateDir(new_wal_dir);
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t sequence_number = 0;
|
||||
if (s.ok()) {
|
||||
// enable file deletions
|
||||
@ -165,32 +119,21 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
|
||||
s = CreateCustomCheckpoint(
|
||||
db_options,
|
||||
[&](const std::string& src_dirname, const std::string& fname,
|
||||
FileType type) {
|
||||
FileType) {
|
||||
ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s",
|
||||
fname.c_str());
|
||||
// WAL file links may be created in another location.
|
||||
return db_->GetFileSystem()->LinkFile(
|
||||
src_dirname + fname,
|
||||
(type == kWalFile ? new_wal_dir : full_private_path) + fname,
|
||||
IOOptions(), nullptr);
|
||||
return db_->GetFileSystem()->LinkFile(src_dirname + fname,
|
||||
full_private_path + fname,
|
||||
IOOptions(), nullptr);
|
||||
} /* link_file_cb */,
|
||||
[&](const std::string& src_dirname, const std::string& fname,
|
||||
uint64_t size_limit_bytes, FileType type,
|
||||
uint64_t size_limit_bytes, FileType,
|
||||
const std::string& /* checksum_func_name */,
|
||||
const std::string& /* checksum_val */) {
|
||||
ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str());
|
||||
if (type == kOptionsFile) {
|
||||
// Modify and rewrite option files
|
||||
return CopyOptionsFile(src_dirname + fname,
|
||||
full_private_path + fname, value_log_dir,
|
||||
value_wal_dir);
|
||||
} else {
|
||||
// Copy other files. WAL files may be copied to another location.
|
||||
return Status(CopyFile(
|
||||
db_->GetFileSystem(), src_dirname + fname,
|
||||
(type == kWalFile ? new_wal_dir : full_private_path) + fname,
|
||||
size_limit_bytes, db_options.use_fsync));
|
||||
}
|
||||
return CopyFile(db_->GetFileSystem(), src_dirname + fname,
|
||||
full_private_path + fname, size_limit_bytes,
|
||||
db_options.use_fsync);
|
||||
} /* copy_file_cb */,
|
||||
[&](const std::string& fname, const std::string& contents, FileType) {
|
||||
ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str());
|
||||
@ -210,12 +153,11 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
|
||||
|
||||
if (s.ok()) {
|
||||
// move tmp private backup to real snapshot directory
|
||||
s = db_->GetEnv()->RenameFile(full_private_path, parsed_checkpoint_dir);
|
||||
s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir);
|
||||
}
|
||||
if (s.ok()) {
|
||||
std::unique_ptr<Directory> checkpoint_directory;
|
||||
s = db_->GetEnv()->NewDirectory(parsed_checkpoint_dir,
|
||||
&checkpoint_directory);
|
||||
s = db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory);
|
||||
if (s.ok() && checkpoint_directory != nullptr) {
|
||||
s = checkpoint_directory->Fsync();
|
||||
}
|
||||
@ -438,8 +380,7 @@ Status CheckpointImpl::CreateCustomCheckpoint(
|
||||
auto wal_dir = ioptions.GetWalDir();
|
||||
for (size_t i = 0; s.ok() && i < wal_size; ++i) {
|
||||
if ((live_wal_files[i]->Type() == kAliveLogFile) &&
|
||||
(!flush_memtable ||
|
||||
live_wal_files[i]->LogNumber() >= min_log_num)) {
|
||||
(!flush_memtable || live_wal_files[i]->LogNumber() >= min_log_num)) {
|
||||
if (i + 1 == wal_size) {
|
||||
s = copy_file_cb(wal_dir, live_wal_files[i]->PathName(),
|
||||
live_wal_files[i]->SizeFileBytes(), kWalFile,
|
||||
@ -645,37 +586,6 @@ Status CheckpointImpl::ExportFilesInMetaData(
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
Status CheckpointImpl::CopyOptionsFile(const std::string& src_file,
|
||||
const std::string& target_file,
|
||||
const std::string& db_log_dir,
|
||||
const std::string& wal_dir) {
|
||||
Status s;
|
||||
DBOptions src_db_options;
|
||||
std::vector<ColumnFamilyDescriptor> src_cf_descs;
|
||||
s = LoadOptionsFromFile(ConfigOptions(), src_file, &src_db_options,
|
||||
&src_cf_descs);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
// Override these 2 options
|
||||
src_db_options.db_log_dir = db_log_dir;
|
||||
src_db_options.wal_dir = wal_dir;
|
||||
|
||||
std::vector<std::string> src_cf_names;
|
||||
std::vector<ColumnFamilyOptions> src_cf_opts;
|
||||
src_cf_names.reserve(src_cf_descs.size());
|
||||
src_cf_opts.reserve(src_cf_descs.size());
|
||||
for (ColumnFamilyDescriptor desc : src_cf_descs) {
|
||||
src_cf_names.push_back(desc.name);
|
||||
src_cf_opts.push_back(desc.options);
|
||||
}
|
||||
|
||||
return PersistRocksDBOptions(src_db_options, src_cf_names, src_cf_opts,
|
||||
target_file, db_->GetFileSystem());
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
|
@ -20,9 +20,7 @@ class CheckpointImpl : public Checkpoint {
|
||||
|
||||
Status CreateCheckpoint(const std::string& checkpoint_dir,
|
||||
uint64_t log_size_for_flush,
|
||||
uint64_t* sequence_number_ptr,
|
||||
const std::string& db_log_dir,
|
||||
const std::string& wal_dir) override;
|
||||
uint64_t* sequence_number_ptr) override;
|
||||
|
||||
Status ExportColumnFamily(ColumnFamilyHandle* handle,
|
||||
const std::string& export_dir,
|
||||
@ -59,11 +57,6 @@ class CheckpointImpl : public Checkpoint {
|
||||
const std::string& fname)>
|
||||
copy_file_cb);
|
||||
|
||||
Status CopyOptionsFile(const std::string& src_file,
|
||||
const std::string& target_file,
|
||||
const std::string& db_log_dir,
|
||||
const std::string& wal_dir);
|
||||
|
||||
private:
|
||||
DB* db_;
|
||||
};
|
||||
|
@ -14,21 +14,16 @@
|
||||
#ifndef OS_WIN
|
||||
#include <unistd.h>
|
||||
#endif
|
||||
#include <iomanip>
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
#include <thread>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "db/db_impl/db_impl.h"
|
||||
#include "file/file_util.h"
|
||||
#include "port/port.h"
|
||||
#include "port/stack_trace.h"
|
||||
#include "rocksdb/convenience.h"
|
||||
#include "rocksdb/db.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/utilities/options_util.h"
|
||||
#include "rocksdb/utilities/transaction_db.h"
|
||||
#include "test_util/sync_point.h"
|
||||
#include "test_util/testharness.h"
|
||||
@ -264,12 +259,6 @@ class CheckpointTest : public testing::Test {
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
static std::string IntToFixedWidthString(size_t i, int len) {
|
||||
std::stringstream ss;
|
||||
ss << std::setw(len) << std::setfill('0') << i;
|
||||
return ss.str();
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(CheckpointTest, GetSnapshotLink) {
|
||||
@ -913,144 +902,6 @@ TEST_F(CheckpointTest, CheckpointReadOnlyDBWithMultipleColumnFamilies) {
|
||||
delete snapshot_db;
|
||||
}
|
||||
|
||||
TEST_F(CheckpointTest, CheckpointWithOptionsDirsTest) {
|
||||
// If the checkpoint and the source db share the same wal_dir, files may be
|
||||
// corrupted if both write to or delete from the same wal_dir. db_log_dir
|
||||
// should also be updated during checkpointing, but it is less important since
|
||||
// log files are not copied or linked to the checkpoint.
|
||||
|
||||
// 8 bytes key, 1 kB record, 4 kB MemTable. Each batch should trigger 25
|
||||
// flushes
|
||||
const int key_len = 8;
|
||||
const int value_len = 1016;
|
||||
const size_t num_keys = 100;
|
||||
const size_t buffer_size = 4096;
|
||||
|
||||
std::string value(value_len, ' ');
|
||||
|
||||
std::vector<std::string> dirs1 = {"", "", "", ""};
|
||||
std::vector<std::string> dirs2 = {"/logs", "/wal", "", ""};
|
||||
std::vector<std::string> dirs3 = {"/logs", "/wal", "/logs", "/wal"};
|
||||
|
||||
for (auto dirs : {dirs1, dirs2, dirs3}) {
|
||||
std::string src_log_dir = dirs[0].empty() ? "" : dbname_ + dirs[0];
|
||||
std::string src_wal_dir = dirs[1].empty() ? "" : dbname_ + dirs[1];
|
||||
std::string snap_log_dir = dirs[2].empty() ? "" : snapshot_name_ + dirs[2];
|
||||
std::string snap_wal_dir = dirs[3].empty() ? "" : snapshot_name_ + dirs[3];
|
||||
|
||||
Options src_opts = CurrentOptions();
|
||||
WriteOptions w_opts;
|
||||
ReadOptions r_opts;
|
||||
DB* snapshotDB;
|
||||
Checkpoint* checkpoint;
|
||||
|
||||
src_opts = CurrentOptions();
|
||||
delete db_;
|
||||
db_ = nullptr;
|
||||
ASSERT_OK(DestroyDB(dbname_, src_opts));
|
||||
|
||||
// Create a database
|
||||
src_opts.create_if_missing = true;
|
||||
src_opts.write_buffer_size = buffer_size;
|
||||
src_opts.OptimizeUniversalStyleCompaction(buffer_size);
|
||||
src_opts.db_log_dir = src_log_dir;
|
||||
src_opts.wal_dir = src_wal_dir;
|
||||
|
||||
ASSERT_OK(DB::Open(src_opts, dbname_, &db_));
|
||||
|
||||
// Write to src db
|
||||
for (size_t i = 1; i <= num_keys; i++) {
|
||||
ASSERT_OK(db_->Put(w_opts, IntToFixedWidthString(i, key_len), value));
|
||||
}
|
||||
|
||||
// Take a snapshot
|
||||
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
|
||||
ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_, 0, nullptr,
|
||||
snap_log_dir, snap_wal_dir));
|
||||
|
||||
// Write to src db again
|
||||
for (size_t i = num_keys + 1; i <= num_keys * 2; i++) {
|
||||
ASSERT_OK(db_->Put(w_opts, IntToFixedWidthString(i, key_len), value));
|
||||
}
|
||||
|
||||
std::string result;
|
||||
std::string key1 = IntToFixedWidthString(num_keys, key_len);
|
||||
std::string key2 = IntToFixedWidthString(num_keys * 2, key_len);
|
||||
std::string key3 = IntToFixedWidthString(num_keys * 3, key_len);
|
||||
|
||||
ASSERT_OK(db_->Get(r_opts, key1, &result));
|
||||
ASSERT_OK(db_->Get(r_opts, key2, &result));
|
||||
|
||||
// Open snapshot with its own options
|
||||
DBOptions snap_opts;
|
||||
std::vector<ColumnFamilyDescriptor> snap_cfs;
|
||||
ASSERT_OK(LoadLatestOptions(ConfigOptions(), snapshot_name_, &snap_opts,
|
||||
&snap_cfs));
|
||||
|
||||
ASSERT_EQ(snap_opts.db_log_dir, snap_log_dir);
|
||||
ASSERT_EQ(snap_opts.wal_dir,
|
||||
snap_wal_dir.empty() ? snapshot_name_ : snap_wal_dir);
|
||||
|
||||
std::vector<ColumnFamilyHandle*> handles;
|
||||
ASSERT_OK(
|
||||
DB::Open(snap_opts, snapshot_name_, snap_cfs, &handles, &snapshotDB));
|
||||
for (ColumnFamilyHandle* handle : handles) {
|
||||
delete handle;
|
||||
}
|
||||
handles.clear();
|
||||
|
||||
ASSERT_OK(snapshotDB->Get(r_opts, key1, &result));
|
||||
ASSERT_TRUE(snapshotDB->Get(r_opts, key2, &result).IsNotFound());
|
||||
|
||||
// Write to snapshot
|
||||
for (size_t i = num_keys * 2 + 1; i <= num_keys * 3; i++) {
|
||||
ASSERT_OK(
|
||||
snapshotDB->Put(w_opts, IntToFixedWidthString(i, key_len), value));
|
||||
}
|
||||
|
||||
ASSERT_OK(snapshotDB->Get(r_opts, key3, &result));
|
||||
ASSERT_TRUE(db_->Get(r_opts, key3, &result).IsNotFound());
|
||||
|
||||
// Close and reopen the snapshot
|
||||
delete snapshotDB;
|
||||
ASSERT_OK(
|
||||
DB::Open(snap_opts, snapshot_name_, snap_cfs, &handles, &snapshotDB));
|
||||
for (ColumnFamilyHandle* handle : handles) {
|
||||
delete handle;
|
||||
}
|
||||
handles.clear();
|
||||
ASSERT_TRUE(snapshotDB->Get(r_opts, key2, &result).IsNotFound());
|
||||
ASSERT_OK(snapshotDB->Get(r_opts, key3, &result));
|
||||
|
||||
delete snapshotDB;
|
||||
|
||||
// Close and reopen the source db
|
||||
delete db_;
|
||||
src_opts.create_if_missing = false;
|
||||
ASSERT_OK(DB::Open(src_opts, dbname_, &db_));
|
||||
ASSERT_OK(db_->Get(r_opts, key2, &result));
|
||||
ASSERT_TRUE(db_->Get(r_opts, key3, &result).IsNotFound());
|
||||
delete db_;
|
||||
|
||||
// Delete the snapshot
|
||||
Options del_opts;
|
||||
del_opts.db_log_dir = snap_opts.db_log_dir;
|
||||
del_opts.wal_dir = snap_opts.wal_dir;
|
||||
ASSERT_OK(DestroyDB(snapshot_name_, del_opts));
|
||||
|
||||
// Reopen the source db again
|
||||
ASSERT_OK(DB::Open(src_opts, dbname_, &db_));
|
||||
|
||||
delete db_;
|
||||
db_ = nullptr;
|
||||
ASSERT_OK(DestroyDB(dbname_, src_opts));
|
||||
|
||||
dbname_ = test::PerThreadDBPath(env_, "db_test");
|
||||
|
||||
delete checkpoint;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user