A utility function to help users migrate DB after options change
Summary: Add a utility function that trigger necessary full compaction and put output to the correct level by looking at new options and old options. Test Plan: Add unit tests for it. Reviewers: andrewkr, igor, IslamAbdelRahman Reviewed By: IslamAbdelRahman Subscribers: muthu, sumeet, leveldb, andrewkr, dhruba Differential Revision: https://reviews.facebook.net/D60783
This commit is contained in:
parent
5bb0a7f73d
commit
7c4615cf1f
@ -264,6 +264,7 @@ set(SOURCES
|
||||
utilities/merge_operators/put.cc
|
||||
utilities/merge_operators/max.cc
|
||||
utilities/merge_operators/uint64add.cc
|
||||
utilities/option_change_migration/option_change_migration.cc
|
||||
utilities/options/options_util.cc
|
||||
utilities/persistent_cache/block_cache_tier.cc
|
||||
utilities/persistent_cache/block_cache_tier_file.cc
|
||||
@ -439,6 +440,7 @@ set(TESTS
|
||||
utilities/geodb/geodb_test.cc
|
||||
utilities/memory/memory_test.cc
|
||||
utilities/merge_operators/string_append/stringappend_test.cc
|
||||
utilities/option_change_migration/option_change_migration_test.cc
|
||||
utilities/options/options_util_test.cc
|
||||
utilities/persistent_cache/hash_table_test.cc
|
||||
utilities/persistent_cache/persistent_cache_test.cc
|
||||
|
@ -11,6 +11,7 @@
|
||||
* Add a read option background_purge_on_iterator_cleanup to avoid deleting files in foreground when destroying iterators. Instead, a job is scheduled in high priority queue and would be executed in a separate background thread.
|
||||
* RepairDB support for column families. RepairDB now associates data with non-default column families using information embedded in the SST/WAL files (4.7 or later). For data written by 4.6 or earlier, RepairDB associates it with the default column family.
|
||||
* Add options.write_buffer_manager which allows users to control total memtable sizes across multiple DB instances.
|
||||
* A tool to migrate DB after options change. See include/rocksdb/utilities/option_change_migration.h.
|
||||
|
||||
## 4.9.0 (6/9/2016)
|
||||
### Public API changes
|
||||
|
4
Makefile
4
Makefile
@ -377,6 +377,7 @@ TESTS = \
|
||||
heap_test \
|
||||
compact_on_deletion_collector_test \
|
||||
compaction_job_stats_test \
|
||||
option_change_migration_test \
|
||||
transaction_test \
|
||||
ldb_cmd_test \
|
||||
iostats_context_test \
|
||||
@ -889,6 +890,9 @@ cache_test: util/cache_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
coding_test: util/coding_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
option_change_migration_test: utilities/option_change_migration/option_change_migration_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
stringappend_test: utilities/merge_operators/string_append/stringappend_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
|
19
include/rocksdb/utilities/option_change_migration.h
Normal file
19
include/rocksdb/utilities/option_change_migration.h
Normal file
@ -0,0 +1,19 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include "rocksdb/options.h"
|
||||
#include "rocksdb/status.h"
|
||||
|
||||
namespace rocksdb {
|
||||
// Try to migrate DB created with old_opts to be use new_opts.
|
||||
// Multiple column families is not supported.
|
||||
// It is best-effort. No guarantee to succeed.
|
||||
// A full compaction may be executed.
|
||||
Status OptionChangeMigration(std::string dbname, const Options& old_opts,
|
||||
const Options& new_opts);
|
||||
} // namespace rocksdb
|
2
src.mk
2
src.mk
@ -131,6 +131,7 @@ LIB_SOURCES = \
|
||||
utilities/merge_operators/string_append/stringappend2.cc \
|
||||
utilities/merge_operators/string_append/stringappend.cc \
|
||||
utilities/merge_operators/uint64add.cc \
|
||||
utilities/option_change_migration/option_change_migration.cc \
|
||||
utilities/options/options_util.cc \
|
||||
utilities/persistent_cache/persistent_cache_tier.cc \
|
||||
utilities/persistent_cache/volatile_tier_impl.cc \
|
||||
@ -292,6 +293,7 @@ MAIN_SOURCES = \
|
||||
utilities/geodb/geodb_test.cc \
|
||||
utilities/memory/memory_test.cc \
|
||||
utilities/merge_operators/string_append/stringappend_test.cc \
|
||||
utilities/option_change_migration/option_change_migration_test.cc \
|
||||
utilities/options/options_util_test.cc \
|
||||
utilities/redis/redis_lists_test.cc \
|
||||
utilities/simulator_cache/sim_cache_test.cc \
|
||||
|
153
utilities/option_change_migration/option_change_migration.cc
Normal file
153
utilities/option_change_migration/option_change_migration.cc
Normal file
@ -0,0 +1,153 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#include "rocksdb/utilities/option_change_migration.h"
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
#include "rocksdb/db.h"
|
||||
|
||||
namespace rocksdb {
|
||||
namespace {
|
||||
// Return a version of Options `opts` that allow us to open/write into a DB
|
||||
// without triggering an automatic compaction or stalling. This is guaranteed
|
||||
// by disabling automatic compactions and using huge values for stalling
|
||||
// triggers.
|
||||
Options GetNoCompactionOptions(const Options& opts) {
|
||||
Options ret_opts = opts;
|
||||
ret_opts.disable_auto_compactions = true;
|
||||
ret_opts.level0_slowdown_writes_trigger = 999999;
|
||||
ret_opts.level0_stop_writes_trigger = 999999;
|
||||
ret_opts.soft_pending_compaction_bytes_limit = 0;
|
||||
ret_opts.hard_pending_compaction_bytes_limit = 0;
|
||||
return ret_opts;
|
||||
}
|
||||
|
||||
Status OpenDb(const Options& options, const std::string& dbname,
|
||||
std::unique_ptr<DB>* db) {
|
||||
db->reset();
|
||||
DB* tmpdb;
|
||||
Status s = DB::Open(options, dbname, &tmpdb);
|
||||
if (s.ok()) {
|
||||
db->reset(tmpdb);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
Status CompactToLevel(const Options& options, const std::string& dbname,
|
||||
int dest_level, bool need_reopen) {
|
||||
std::unique_ptr<DB> db;
|
||||
Options no_compact_opts = GetNoCompactionOptions(options);
|
||||
if (dest_level == 0) {
|
||||
// L0 has strict sequenceID requirements to files to it. It's safer
|
||||
// to only put one compacted file to there.
|
||||
// This is only used for converting to universal compaction with
|
||||
// only one level. In this case, compacting to one file is also
|
||||
// optimal.
|
||||
no_compact_opts.target_file_size_base = 999999999999999;
|
||||
}
|
||||
Status s = OpenDb(no_compact_opts, dbname, &db);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
CompactRangeOptions cro;
|
||||
cro.change_level = true;
|
||||
cro.target_level = dest_level;
|
||||
db->CompactRange(cro, nullptr, nullptr);
|
||||
|
||||
if (need_reopen) {
|
||||
// Need to restart DB to rewrite the manifest file.
|
||||
// In order to open a DB with specific num_levels, the manifest file should
|
||||
// contain no record that mentiones any level beyond num_levels. Issuing a
|
||||
// full compaction will move all the data to a level not exceeding
|
||||
// num_levels, but the manifest may still contain previous record mentioning
|
||||
// a higher level. Reopening the DB will force the manifest to be rewritten
|
||||
// so that those records will be cleared.
|
||||
db.reset();
|
||||
s = OpenDb(no_compact_opts, dbname, &db);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
Status MigrateToUniversal(std::string dbname, const Options& old_opts,
|
||||
const Options& new_opts) {
|
||||
if (old_opts.num_levels <= new_opts.num_levels) {
|
||||
return Status::OK();
|
||||
} else {
|
||||
bool need_compact = false;
|
||||
{
|
||||
std::unique_ptr<DB> db;
|
||||
Options opts = GetNoCompactionOptions(old_opts);
|
||||
Status s = OpenDb(opts, dbname, &db);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
ColumnFamilyMetaData metadata;
|
||||
db->GetColumnFamilyMetaData(&metadata);
|
||||
if (!metadata.levels.empty() &&
|
||||
metadata.levels.back().level >= new_opts.num_levels) {
|
||||
need_compact = true;
|
||||
}
|
||||
}
|
||||
if (need_compact) {
|
||||
return CompactToLevel(old_opts, dbname, new_opts.num_levels - 1, true);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
|
||||
Status MigrateToLevelBase(std::string dbname, const Options& old_opts,
|
||||
const Options& new_opts) {
|
||||
if (!new_opts.level_compaction_dynamic_level_bytes) {
|
||||
if (old_opts.num_levels == 1) {
|
||||
return Status::OK();
|
||||
}
|
||||
// Compact everything to level 1 to guarantee it can be safely opened.
|
||||
Options opts = old_opts;
|
||||
opts.target_file_size_base = new_opts.target_file_size_base;
|
||||
// Although sometimes we can open the DB with the new option without error,
|
||||
// We still want to compact the files to avoid the LSM tree to stuck
|
||||
// in bad shape. For example, if the user changed the level size
|
||||
// multiplier from 4 to 8, with the same data, we will have fewer
|
||||
// levels. Unless we issue a full comaction, the LSM tree may stuck
|
||||
// with more levels than needed and it won't recover automatically.
|
||||
return CompactToLevel(opts, dbname, 1, true);
|
||||
} else {
|
||||
// Compact everything to the last level to guarantee it can be safely
|
||||
// opened.
|
||||
if (old_opts.num_levels == 1) {
|
||||
return Status::OK();
|
||||
} else if (new_opts.num_levels > old_opts.num_levels) {
|
||||
// Dynamic level mode requires data to be put in the last level first.
|
||||
return CompactToLevel(new_opts, dbname, new_opts.num_levels - 1, false);
|
||||
} else {
|
||||
Options opts = old_opts;
|
||||
opts.target_file_size_base = new_opts.target_file_size_base;
|
||||
return CompactToLevel(opts, dbname, new_opts.num_levels - 1, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
} // namespace
|
||||
|
||||
Status OptionChangeMigration(std::string dbname, const Options& old_opts,
|
||||
const Options& new_opts) {
|
||||
if (new_opts.compaction_style == CompactionStyle::kCompactionStyleUniversal) {
|
||||
return MigrateToUniversal(dbname, old_opts, new_opts);
|
||||
} else if (new_opts.compaction_style ==
|
||||
CompactionStyle::kCompactionStyleLevel) {
|
||||
return MigrateToLevelBase(dbname, old_opts, new_opts);
|
||||
} else {
|
||||
return Status::NotSupported(
|
||||
"Do not how to migrate to this compaction style");
|
||||
}
|
||||
}
|
||||
} // namespace rocksdb
|
||||
#else
|
||||
namespace rocksdb {
|
||||
Status OptionChangeMigration(std::string dbname, const Options& old_opts,
|
||||
const Options& new_opts) {
|
||||
return Status::NotSupported();
|
||||
}
|
||||
} // namespace rocksdb
|
||||
#endif // ROCKSDB_LITE
|
@ -0,0 +1,207 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
//
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#include "rocksdb/utilities/option_change_migration.h"
|
||||
#include <set>
|
||||
#include "db/db_test_util.h"
|
||||
#include "port/stack_trace.h"
|
||||
namespace rocksdb {
|
||||
|
||||
class DBOptionChangeMigrationTest
|
||||
: public DBTestBase,
|
||||
public testing::WithParamInterface<
|
||||
std::tuple<int, bool, bool, int, bool, bool>> {
|
||||
public:
|
||||
DBOptionChangeMigrationTest()
|
||||
: DBTestBase("/db_option_change_migration_test") {
|
||||
level1_ = std::get<0>(GetParam());
|
||||
is_universal1_ = std::get<1>(GetParam());
|
||||
is_dynamic1_ = std::get<2>(GetParam());
|
||||
|
||||
level2_ = std::get<3>(GetParam());
|
||||
is_universal2_ = std::get<4>(GetParam());
|
||||
is_dynamic2_ = std::get<5>(GetParam());
|
||||
}
|
||||
|
||||
// Required if inheriting from testing::WithParamInterface<>
|
||||
static void SetUpTestCase() {}
|
||||
static void TearDownTestCase() {}
|
||||
|
||||
int level1_;
|
||||
bool is_universal1_;
|
||||
bool is_dynamic1_;
|
||||
|
||||
int level2_;
|
||||
bool is_universal2_;
|
||||
bool is_dynamic2_;
|
||||
};
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
TEST_P(DBOptionChangeMigrationTest, Migrate1) {
|
||||
Options old_options = CurrentOptions();
|
||||
if (is_universal1_) {
|
||||
old_options.compaction_style = CompactionStyle::kCompactionStyleUniversal;
|
||||
} else {
|
||||
old_options.compaction_style = CompactionStyle::kCompactionStyleLevel;
|
||||
old_options.level_compaction_dynamic_level_bytes = is_dynamic1_;
|
||||
}
|
||||
old_options.level0_file_num_compaction_trigger = 3;
|
||||
old_options.write_buffer_size = 64 * 1024;
|
||||
old_options.target_file_size_base = 128 * 1024;
|
||||
// Make level target of L1, L2 to be 200KB and 600KB
|
||||
old_options.num_levels = level1_;
|
||||
old_options.max_bytes_for_level_multiplier = 3;
|
||||
old_options.max_bytes_for_level_base = 200 * 1024;
|
||||
|
||||
Reopen(old_options);
|
||||
|
||||
Random rnd(301);
|
||||
int key_idx = 0;
|
||||
|
||||
// Generate at least 2MB of data
|
||||
for (int num = 0; num < 20; num++) {
|
||||
GenerateNewFile(&rnd, &key_idx);
|
||||
}
|
||||
dbfull()->TEST_WaitForFlushMemTable();
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
|
||||
// Will make sure exactly those keys are in the DB after migration.
|
||||
std::set<std::string> keys;
|
||||
{
|
||||
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
|
||||
it->SeekToFirst();
|
||||
for (; it->Valid(); it->Next()) {
|
||||
keys.insert(it->key().ToString());
|
||||
}
|
||||
}
|
||||
Close();
|
||||
|
||||
Options new_options = old_options;
|
||||
if (is_universal2_) {
|
||||
new_options.compaction_style = CompactionStyle::kCompactionStyleUniversal;
|
||||
} else {
|
||||
new_options.compaction_style = CompactionStyle::kCompactionStyleLevel;
|
||||
new_options.level_compaction_dynamic_level_bytes = is_dynamic2_;
|
||||
}
|
||||
new_options.target_file_size_base = 256 * 1024;
|
||||
new_options.num_levels = level2_;
|
||||
new_options.max_bytes_for_level_base = 150 * 1024;
|
||||
new_options.max_bytes_for_level_multiplier = 4;
|
||||
ASSERT_OK(OptionChangeMigration(dbname_, old_options, new_options));
|
||||
Reopen(new_options);
|
||||
|
||||
// Wait for compaction to finish and make sure it can reopen
|
||||
dbfull()->TEST_WaitForFlushMemTable();
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
Reopen(new_options);
|
||||
|
||||
{
|
||||
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
|
||||
it->SeekToFirst();
|
||||
for (std::string key : keys) {
|
||||
ASSERT_TRUE(it->Valid());
|
||||
ASSERT_EQ(key, it->key().ToString());
|
||||
it->Next();
|
||||
}
|
||||
ASSERT_TRUE(!it->Valid());
|
||||
}
|
||||
}
|
||||
|
||||
TEST_P(DBOptionChangeMigrationTest, Migrate2) {
|
||||
Options old_options = CurrentOptions();
|
||||
if (is_universal2_) {
|
||||
old_options.compaction_style = CompactionStyle::kCompactionStyleUniversal;
|
||||
} else {
|
||||
old_options.compaction_style = CompactionStyle::kCompactionStyleLevel;
|
||||
old_options.level_compaction_dynamic_level_bytes = is_dynamic2_;
|
||||
}
|
||||
old_options.level0_file_num_compaction_trigger = 3;
|
||||
old_options.write_buffer_size = 64 * 1024;
|
||||
old_options.target_file_size_base = 128 * 1024;
|
||||
// Make level target of L1, L2 to be 200KB and 600KB
|
||||
old_options.num_levels = level2_;
|
||||
old_options.max_bytes_for_level_multiplier = 3;
|
||||
old_options.max_bytes_for_level_base = 200 * 1024;
|
||||
|
||||
Reopen(old_options);
|
||||
|
||||
Random rnd(301);
|
||||
int key_idx = 0;
|
||||
|
||||
// Generate at least 2MB of data
|
||||
for (int num = 0; num < 20; num++) {
|
||||
GenerateNewFile(&rnd, &key_idx);
|
||||
}
|
||||
dbfull()->TEST_WaitForFlushMemTable();
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
|
||||
// Will make sure exactly those keys are in the DB after migration.
|
||||
std::set<std::string> keys;
|
||||
{
|
||||
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
|
||||
it->SeekToFirst();
|
||||
for (; it->Valid(); it->Next()) {
|
||||
keys.insert(it->key().ToString());
|
||||
}
|
||||
}
|
||||
|
||||
Close();
|
||||
|
||||
Options new_options = old_options;
|
||||
if (is_universal1_) {
|
||||
new_options.compaction_style = CompactionStyle::kCompactionStyleUniversal;
|
||||
} else {
|
||||
new_options.compaction_style = CompactionStyle::kCompactionStyleLevel;
|
||||
new_options.level_compaction_dynamic_level_bytes = is_dynamic1_;
|
||||
}
|
||||
new_options.target_file_size_base = 256 * 1024;
|
||||
new_options.num_levels = level1_;
|
||||
new_options.max_bytes_for_level_base = 150 * 1024;
|
||||
new_options.max_bytes_for_level_multiplier = 4;
|
||||
ASSERT_OK(OptionChangeMigration(dbname_, old_options, new_options));
|
||||
Reopen(new_options);
|
||||
// Wait for compaction to finish and make sure it can reopen
|
||||
dbfull()->TEST_WaitForFlushMemTable();
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
Reopen(new_options);
|
||||
|
||||
{
|
||||
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
|
||||
it->SeekToFirst();
|
||||
for (std::string key : keys) {
|
||||
ASSERT_TRUE(it->Valid());
|
||||
ASSERT_EQ(key, it->key().ToString());
|
||||
it->Next();
|
||||
}
|
||||
ASSERT_TRUE(!it->Valid());
|
||||
}
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
DBOptionChangeMigrationTest, DBOptionChangeMigrationTest,
|
||||
::testing::Values(std::make_tuple(3, false, false, 4, false, false),
|
||||
std::make_tuple(3, false, true, 4, false, true),
|
||||
std::make_tuple(3, false, true, 4, false, false),
|
||||
std::make_tuple(3, false, false, 4, false, true),
|
||||
std::make_tuple(3, true, false, 4, true, false),
|
||||
std::make_tuple(1, true, false, 4, true, false),
|
||||
std::make_tuple(3, false, false, 4, true, false),
|
||||
std::make_tuple(3, false, false, 1, true, false),
|
||||
std::make_tuple(3, false, true, 4, true, false),
|
||||
std::make_tuple(3, false, true, 1, true, false),
|
||||
std::make_tuple(1, true, false, 4, false, false)));
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
rocksdb::port::InstallStackTraceHandler();
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
Loading…
Reference in New Issue
Block a user