Compare commits
20 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
f48aa1c308 | ||
|
e7d7b1075e | ||
|
c0a5673aa7 | ||
|
8f121d86a4 | ||
|
afbfb3f567 | ||
|
4cfbd87afd | ||
|
8a72bb14bd | ||
|
a6d418384d | ||
|
cb1dc29655 | ||
|
98e5189fb0 | ||
|
3353b7141d | ||
|
d72cceb443 | ||
|
1d5083a007 | ||
|
6ea6aa77cd | ||
|
4229f6df50 | ||
|
73a35c6e17 | ||
|
fc53ac86f6 | ||
|
2060a008b0 | ||
|
89865776b7 | ||
|
749b35d019 |
19
HISTORY.md
19
HISTORY.md
@ -1,5 +1,21 @@
|
||||
# Rocksdb Change Log
|
||||
## Unreleased
|
||||
## 6.5.3 (1/10/2020)
|
||||
### Bug Fixes
|
||||
* Fixed two performance issues related to memtable history trimming. First, a new SuperVersion is now created only if some memtables were actually trimmed. Second, trimming is only scheduled if there is at least one flushed memtable that is kept in memory for the purposes of transaction conflict checking.
|
||||
|
||||
## 6.5.2 (11/15/2019)
|
||||
### Bug Fixes
|
||||
* Fix a assertion failure in MultiGe4t() when BlockBasedTableOptions::no_block_cache is true and there is no compressed block cache
|
||||
* Fix a buffer overrun problem in BlockBasedTable::MultiGet() when compression is enabled and no compressed block cache is configured.
|
||||
* If a call to BackupEngine::PurgeOldBackups or BackupEngine::DeleteBackup suffered a crash, power failure, or I/O error, files could be left over from old backups that could only be purged with a call to GarbageCollect. Any call to PurgeOldBackups, DeleteBackup, or GarbageCollect should now suffice to purge such files.
|
||||
|
||||
## 6.5.1 (10/16/2019)
|
||||
### Bug Fixes
|
||||
* Revert the feature "Merging iterator to avoid child iterator reseek for some cases (#5286)" since it might cause strange results when reseek happens with a different iterator upper bound.
|
||||
* Fix a bug in BlockBasedTableIterator that might return incorrect results when reseek happens with a different iterator upper bound.
|
||||
* Fix a bug when partitioned filters and prefix search are used in conjunction, ::SeekForPrev could return invalid for an existing prefix. ::SeekForPrev might be called by the user, or internally on ::Prev, or within ::Seek if the return value involves Delete or a Merge operand.
|
||||
|
||||
## 6.5.0 (9/13/2019)
|
||||
### Bug Fixes
|
||||
* Fixed a number of data races in BlobDB.
|
||||
* Fix a bug where the compaction snapshot refresh feature is not disabled as advertised when `snap_refresh_nanos` is set to 0..
|
||||
@ -102,7 +118,6 @@
|
||||
* Fix a bug caused by secondary not skipping the beginning of new MANIFEST.
|
||||
* On DB open, delete WAL trash files left behind in wal_dir
|
||||
|
||||
|
||||
## 6.2.0 (4/30/2019)
|
||||
### New Features
|
||||
* Add an option `strict_bytes_per_sync` that causes a file-writing thread to block rather than exceed the limit on bytes pending writeback specified by `bytes_per_sync` or `wal_bytes_per_sync`.
|
||||
|
@ -56,10 +56,10 @@ if [ -z "$ROCKSDB_NO_FBCODE" -a -d /mnt/gvfs/third-party ]; then
|
||||
if [ -n "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then
|
||||
# we need this to build with MySQL. Don't use for other purposes.
|
||||
source "$PWD/build_tools/fbcode_config4.8.1.sh"
|
||||
elif [ -n "$ROCKSDB_FBCODE_BUILD_WITH_PLATFORM007" ]; then
|
||||
source "$PWD/build_tools/fbcode_config_platform007.sh"
|
||||
else
|
||||
elif [ -n "$ROCKSDB_FBCODE_BUILD_WITH_5xx" ]; then
|
||||
source "$PWD/build_tools/fbcode_config.sh"
|
||||
else
|
||||
source "$PWD/build_tools/fbcode_config_platform007.sh"
|
||||
fi
|
||||
fi
|
||||
|
||||
|
@ -369,6 +369,13 @@ TEST_F(CorruptionTest, VerifyChecksumReadahead) {
|
||||
ASSERT_GE(senv.random_read_counter_.Read(), 213);
|
||||
ASSERT_LE(senv.random_read_counter_.Read(), 447);
|
||||
|
||||
// Test readahead shouldn't break mmap mode (where it should be
|
||||
// disabled).
|
||||
options.allow_mmap_reads = true;
|
||||
Reopen(&options);
|
||||
dbi = static_cast<DBImpl*>(db_);
|
||||
ASSERT_OK(dbi->VerifyChecksum(ro));
|
||||
|
||||
CloseDb();
|
||||
}
|
||||
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include "port/stack_trace.h"
|
||||
#include "rocksdb/perf_context.h"
|
||||
#include "rocksdb/utilities/debug.h"
|
||||
#include "table/block_based/block_based_table_reader.h"
|
||||
#include "table/block_based/block_builder.h"
|
||||
#include "test_util/fault_injection_test_env.h"
|
||||
#if !defined(ROCKSDB_LITE)
|
||||
@ -1541,6 +1542,45 @@ TEST_F(DBBasicTest, GetAllKeyVersions) {
|
||||
}
|
||||
#endif // !ROCKSDB_LITE
|
||||
|
||||
TEST_F(DBBasicTest, MultiGetIOBufferOverrun) {
|
||||
Options options = CurrentOptions();
|
||||
Random rnd(301);
|
||||
BlockBasedTableOptions table_options;
|
||||
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
|
||||
table_options.block_size = 16 * 1024;
|
||||
assert(table_options.block_size >
|
||||
BlockBasedTable::kMultiGetReadStackBufSize);
|
||||
options.table_factory.reset(new BlockBasedTableFactory(table_options));
|
||||
Reopen(options);
|
||||
|
||||
std::string zero_str(128, '\0');
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
// Make the value compressible. A purely random string doesn't compress
|
||||
// and the resultant data block will not be compressed
|
||||
std::string value(RandomString(&rnd, 128) + zero_str);
|
||||
assert(Put(Key(i), value) == Status::OK());
|
||||
}
|
||||
Flush();
|
||||
|
||||
std::vector<std::string> key_data(10);
|
||||
std::vector<Slice> keys;
|
||||
// We cannot resize a PinnableSlice vector, so just set initial size to
|
||||
// largest we think we will need
|
||||
std::vector<PinnableSlice> values(10);
|
||||
std::vector<Status> statuses;
|
||||
ReadOptions ro;
|
||||
|
||||
// Warm up the cache first
|
||||
key_data.emplace_back(Key(0));
|
||||
keys.emplace_back(Slice(key_data.back()));
|
||||
key_data.emplace_back(Key(50));
|
||||
keys.emplace_back(Slice(key_data.back()));
|
||||
statuses.resize(keys.size());
|
||||
|
||||
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
|
||||
keys.data(), values.data(), statuses.data(), true);
|
||||
}
|
||||
|
||||
class DBBasicTestWithParallelIO
|
||||
: public DBTestBase,
|
||||
public testing::WithParamInterface<std::tuple<bool,bool,bool,bool>> {
|
||||
@ -1566,8 +1606,12 @@ class DBBasicTestWithParallelIO
|
||||
Options options = CurrentOptions();
|
||||
Random rnd(301);
|
||||
BlockBasedTableOptions table_options;
|
||||
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
|
||||
table_options.block_cache = uncompressed_cache_;
|
||||
if (table_options.block_cache == nullptr) {
|
||||
table_options.no_block_cache = true;
|
||||
} else {
|
||||
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
|
||||
}
|
||||
table_options.block_cache_compressed = compressed_cache_;
|
||||
table_options.flush_block_policy_factory.reset(
|
||||
new MyFlushBlockPolicyFactory());
|
||||
@ -1609,6 +1653,9 @@ class DBBasicTestWithParallelIO
|
||||
}
|
||||
|
||||
bool fill_cache() { return fill_cache_; }
|
||||
bool compression_enabled() { return compression_enabled_; }
|
||||
bool has_compressed_cache() { return compressed_cache_ != nullptr; }
|
||||
bool has_uncompressed_cache() { return uncompressed_cache_ != nullptr; }
|
||||
|
||||
static void SetUpTestCase() {}
|
||||
static void TearDownTestCase() {}
|
||||
@ -1793,7 +1840,16 @@ TEST_P(DBBasicTestWithParallelIO, MultiGet) {
|
||||
ASSERT_TRUE(CheckValue(1, values[0].ToString()));
|
||||
ASSERT_TRUE(CheckValue(51, values[1].ToString()));
|
||||
|
||||
int expected_reads = random_reads + (fill_cache() ? 0 : 2);
|
||||
bool read_from_cache = false;
|
||||
if (fill_cache()) {
|
||||
if (has_uncompressed_cache()) {
|
||||
read_from_cache = true;
|
||||
} else if (has_compressed_cache() && compression_enabled()) {
|
||||
read_from_cache = true;
|
||||
}
|
||||
}
|
||||
|
||||
int expected_reads = random_reads + (read_from_cache ? 0 : 2);
|
||||
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
|
||||
|
||||
keys.resize(10);
|
||||
@ -1811,7 +1867,7 @@ TEST_P(DBBasicTestWithParallelIO, MultiGet) {
|
||||
ASSERT_OK(statuses[i]);
|
||||
ASSERT_TRUE(CheckValue(key_ints[i], values[i].ToString()));
|
||||
}
|
||||
expected_reads += (fill_cache() ? 2 : 4);
|
||||
expected_reads += (read_from_cache ? 2 : 4);
|
||||
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
|
||||
}
|
||||
|
||||
@ -1822,12 +1878,8 @@ INSTANTIATE_TEST_CASE_P(
|
||||
// Param 1 - Uncompressed cache enabled
|
||||
// Param 2 - Data compression enabled
|
||||
// Param 3 - ReadOptions::fill_cache
|
||||
::testing::Values(std::make_tuple(false, true, true, true),
|
||||
std::make_tuple(true, true, true, true),
|
||||
std::make_tuple(false, true, false, true),
|
||||
std::make_tuple(false, true, true, false),
|
||||
std::make_tuple(true, true, true, false),
|
||||
std::make_tuple(false, true, false, false)));
|
||||
::testing::Combine(::testing::Bool(), ::testing::Bool(),
|
||||
::testing::Bool(), ::testing::Bool()));
|
||||
|
||||
class DBBasicTestWithTimestampWithParam
|
||||
: public DBTestBase,
|
||||
|
@ -1499,12 +1499,14 @@ Status DBImpl::TrimMemtableHistory(WriteContext* context) {
|
||||
for (auto& cfd : cfds) {
|
||||
autovector<MemTable*> to_delete;
|
||||
cfd->imm()->TrimHistory(&to_delete, cfd->mem()->ApproximateMemoryUsage());
|
||||
if (!to_delete.empty()) {
|
||||
for (auto m : to_delete) {
|
||||
delete m;
|
||||
}
|
||||
context->superversion_context.NewSuperVersion();
|
||||
assert(context->superversion_context.new_superversion.get() != nullptr);
|
||||
cfd->InstallSuperVersion(&context->superversion_context, &mutex_);
|
||||
}
|
||||
|
||||
if (cfd->Unref()) {
|
||||
delete cfd;
|
||||
|
@ -182,6 +182,33 @@ TEST_P(DBIteratorTest, IterSeekBeforePrev) {
|
||||
delete iter;
|
||||
}
|
||||
|
||||
TEST_P(DBIteratorTest, IterReseekNewUpperBound) {
|
||||
Random rnd(301);
|
||||
Options options = CurrentOptions();
|
||||
BlockBasedTableOptions table_options;
|
||||
table_options.block_size = 1024;
|
||||
table_options.block_size_deviation = 50;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
options.compression = kNoCompression;
|
||||
Reopen(options);
|
||||
|
||||
ASSERT_OK(Put("a", RandomString(&rnd, 400)));
|
||||
ASSERT_OK(Put("aabb", RandomString(&rnd, 400)));
|
||||
ASSERT_OK(Put("aaef", RandomString(&rnd, 400)));
|
||||
ASSERT_OK(Put("b", RandomString(&rnd, 400)));
|
||||
dbfull()->Flush(FlushOptions());
|
||||
ReadOptions opts;
|
||||
Slice ub = Slice("aa");
|
||||
opts.iterate_upper_bound = &ub;
|
||||
auto iter = NewIterator(opts);
|
||||
iter->Seek(Slice("a"));
|
||||
ub = Slice("b");
|
||||
iter->Seek(Slice("aabc"));
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ(iter->key().ToString(), "aaef");
|
||||
delete iter;
|
||||
}
|
||||
|
||||
TEST_P(DBIteratorTest, IterSeekForPrevBeforeNext) {
|
||||
ASSERT_OK(Put("a", "b"));
|
||||
ASSERT_OK(Put("c", "d"));
|
||||
@ -2690,75 +2717,6 @@ TEST_P(DBIteratorTest, AvoidReseekLevelIterator) {
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_P(DBIteratorTest, AvoidReseekChildIterator) {
|
||||
Options options = CurrentOptions();
|
||||
options.compression = CompressionType::kNoCompression;
|
||||
BlockBasedTableOptions table_options;
|
||||
table_options.block_size = 800;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
Reopen(options);
|
||||
|
||||
Random rnd(301);
|
||||
std::string random_str = RandomString(&rnd, 180);
|
||||
|
||||
ASSERT_OK(Put("1", random_str));
|
||||
ASSERT_OK(Put("2", random_str));
|
||||
ASSERT_OK(Put("3", random_str));
|
||||
ASSERT_OK(Put("4", random_str));
|
||||
ASSERT_OK(Put("8", random_str));
|
||||
ASSERT_OK(Put("9", random_str));
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_OK(Put("5", random_str));
|
||||
ASSERT_OK(Put("6", random_str));
|
||||
ASSERT_OK(Put("7", random_str));
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
// These two keys will be kept in memtable.
|
||||
ASSERT_OK(Put("0", random_str));
|
||||
ASSERT_OK(Put("8", random_str));
|
||||
|
||||
int num_iter_wrapper_seek = 0;
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"IteratorWrapper::Seek:0",
|
||||
[&](void* /*arg*/) { num_iter_wrapper_seek++; });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
{
|
||||
std::unique_ptr<Iterator> iter(NewIterator(ReadOptions()));
|
||||
iter->Seek("1");
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
// DBIter always wraps internal iterator with IteratorWrapper,
|
||||
// and in merging iterator each child iterator will be wrapped
|
||||
// with IteratorWrapper.
|
||||
ASSERT_EQ(4, num_iter_wrapper_seek);
|
||||
|
||||
// child position: 1 and 5
|
||||
num_iter_wrapper_seek = 0;
|
||||
iter->Seek("2");
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ(3, num_iter_wrapper_seek);
|
||||
|
||||
// child position: 2 and 5
|
||||
num_iter_wrapper_seek = 0;
|
||||
iter->Seek("6");
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ(4, num_iter_wrapper_seek);
|
||||
|
||||
// child position: 8 and 6
|
||||
num_iter_wrapper_seek = 0;
|
||||
iter->Seek("7");
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ(3, num_iter_wrapper_seek);
|
||||
|
||||
// child position: 8 and 7
|
||||
num_iter_wrapper_seek = 0;
|
||||
iter->Seek("5");
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ(4, num_iter_wrapper_seek);
|
||||
}
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
// MyRocks may change iterate bounds before seek. Simply test to make sure such
|
||||
// usage doesn't break iterator.
|
||||
TEST_P(DBIteratorTest, IterateBoundChangedBeforeSeek) {
|
||||
|
@ -135,7 +135,7 @@ class MemTable {
|
||||
|
||||
// As a cheap version of `ApproximateMemoryUsage()`, this function doens't
|
||||
// require external synchronization. The value may be less accurate though
|
||||
size_t ApproximateMemoryUsageFast() {
|
||||
size_t ApproximateMemoryUsageFast() const {
|
||||
return approximate_memory_usage_.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
|
@ -266,7 +266,7 @@ void MemTableListVersion::Remove(MemTable* m,
|
||||
}
|
||||
|
||||
// return the total memory usage assuming the oldest flushed memtable is dropped
|
||||
size_t MemTableListVersion::ApproximateMemoryUsageExcludingLast() {
|
||||
size_t MemTableListVersion::ApproximateMemoryUsageExcludingLast() const {
|
||||
size_t total_memtable_size = 0;
|
||||
for (auto& memtable : memlist_) {
|
||||
total_memtable_size += memtable->ApproximateMemoryUsage();
|
||||
@ -480,7 +480,7 @@ Status MemTableList::TryInstallMemtableFlushResults(
|
||||
cfd->GetName().c_str(), m->file_number_, mem_id);
|
||||
assert(m->file_number_ > 0);
|
||||
current_->Remove(m, to_delete);
|
||||
UpdateMemoryUsageExcludingLast();
|
||||
UpdateCachedValuesFromMemTableListVersion();
|
||||
ResetTrimHistoryNeeded();
|
||||
++mem_id;
|
||||
}
|
||||
@ -521,14 +521,14 @@ void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) {
|
||||
if (num_flush_not_started_ == 1) {
|
||||
imm_flush_needed.store(true, std::memory_order_release);
|
||||
}
|
||||
UpdateMemoryUsageExcludingLast();
|
||||
UpdateCachedValuesFromMemTableListVersion();
|
||||
ResetTrimHistoryNeeded();
|
||||
}
|
||||
|
||||
void MemTableList::TrimHistory(autovector<MemTable*>* to_delete, size_t usage) {
|
||||
InstallNewVersion();
|
||||
current_->TrimHistory(to_delete, usage);
|
||||
UpdateMemoryUsageExcludingLast();
|
||||
UpdateCachedValuesFromMemTableListVersion();
|
||||
ResetTrimHistoryNeeded();
|
||||
}
|
||||
|
||||
@ -543,18 +543,25 @@ size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() {
|
||||
|
||||
size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; }
|
||||
|
||||
size_t MemTableList::ApproximateMemoryUsageExcludingLast() {
|
||||
size_t usage =
|
||||
size_t MemTableList::ApproximateMemoryUsageExcludingLast() const {
|
||||
const size_t usage =
|
||||
current_memory_usage_excluding_last_.load(std::memory_order_relaxed);
|
||||
return usage;
|
||||
}
|
||||
|
||||
// Update current_memory_usage_excluding_last_, need to call whenever state
|
||||
// changes for MemtableListVersion (whenever InstallNewVersion() is called)
|
||||
void MemTableList::UpdateMemoryUsageExcludingLast() {
|
||||
size_t total_memtable_size = current_->ApproximateMemoryUsageExcludingLast();
|
||||
bool MemTableList::HasHistory() const {
|
||||
const bool has_history = current_has_history_.load(std::memory_order_relaxed);
|
||||
return has_history;
|
||||
}
|
||||
|
||||
void MemTableList::UpdateCachedValuesFromMemTableListVersion() {
|
||||
const size_t total_memtable_size =
|
||||
current_->ApproximateMemoryUsageExcludingLast();
|
||||
current_memory_usage_excluding_last_.store(total_memtable_size,
|
||||
std::memory_order_relaxed);
|
||||
|
||||
const bool has_history = current_->HasHistory();
|
||||
current_has_history_.store(has_history, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
uint64_t MemTableList::ApproximateOldestKeyTime() const {
|
||||
@ -684,7 +691,7 @@ Status InstallMemtableAtomicFlushResults(
|
||||
cfds[i]->GetName().c_str(), m->GetFileNumber(),
|
||||
mem_id);
|
||||
imm->current_->Remove(m, to_delete);
|
||||
imm->UpdateMemoryUsageExcludingLast();
|
||||
imm->UpdateCachedValuesFromMemTableListVersion();
|
||||
imm->ResetTrimHistoryNeeded();
|
||||
}
|
||||
}
|
||||
@ -727,7 +734,8 @@ void MemTableList::RemoveOldMemTables(uint64_t log_number,
|
||||
imm_flush_needed.store(false, std::memory_order_release);
|
||||
}
|
||||
}
|
||||
UpdateMemoryUsageExcludingLast();
|
||||
|
||||
UpdateCachedValuesFromMemTableListVersion();
|
||||
ResetTrimHistoryNeeded();
|
||||
}
|
||||
|
||||
|
@ -157,7 +157,11 @@ class MemTableListVersion {
|
||||
// excluding the last MemTable in memlist_history_. The reason for excluding
|
||||
// the last MemTable is to see if dropping the last MemTable will keep total
|
||||
// memory usage above or equal to max_write_buffer_size_to_maintain_
|
||||
size_t ApproximateMemoryUsageExcludingLast();
|
||||
size_t ApproximateMemoryUsageExcludingLast() const;
|
||||
|
||||
// Whether this version contains flushed memtables that are only kept around
|
||||
// for transaction conflict checking.
|
||||
bool HasHistory() const { return !memlist_history_.empty(); }
|
||||
|
||||
bool MemtableLimitExceeded(size_t usage);
|
||||
|
||||
@ -206,7 +210,8 @@ class MemTableList {
|
||||
commit_in_progress_(false),
|
||||
flush_requested_(false),
|
||||
current_memory_usage_(0),
|
||||
current_memory_usage_excluding_last_(0) {
|
||||
current_memory_usage_excluding_last_(0),
|
||||
current_has_history_(false) {
|
||||
current_->Ref();
|
||||
}
|
||||
|
||||
@ -260,11 +265,16 @@ class MemTableList {
|
||||
// Returns an estimate of the number of bytes of data in use.
|
||||
size_t ApproximateMemoryUsage();
|
||||
|
||||
// Returns the cached current_memory_usage_excluding_last_ value
|
||||
size_t ApproximateMemoryUsageExcludingLast();
|
||||
// Returns the cached current_memory_usage_excluding_last_ value.
|
||||
size_t ApproximateMemoryUsageExcludingLast() const;
|
||||
|
||||
// Update current_memory_usage_excluding_last_ from MemtableListVersion
|
||||
void UpdateMemoryUsageExcludingLast();
|
||||
// Returns the cached current_has_history_ value.
|
||||
bool HasHistory() const;
|
||||
|
||||
// Updates current_memory_usage_excluding_last_ and current_has_history_
|
||||
// from MemTableListVersion. Must be called whenever InstallNewVersion is
|
||||
// called.
|
||||
void UpdateCachedValuesFromMemTableListVersion();
|
||||
|
||||
// `usage` is the current size of the mutable Memtable. When
|
||||
// max_write_buffer_size_to_maintain is used, total size of mutable and
|
||||
@ -382,7 +392,11 @@ class MemTableList {
|
||||
// The current memory usage.
|
||||
size_t current_memory_usage_;
|
||||
|
||||
// Cached value of current_->ApproximateMemoryUsageExcludingLast().
|
||||
std::atomic<size_t> current_memory_usage_excluding_last_;
|
||||
|
||||
// Cached value of current_->HasHistory().
|
||||
std::atomic<bool> current_has_history_;
|
||||
};
|
||||
|
||||
// Installs memtable atomic flush results.
|
||||
|
@ -859,8 +859,7 @@ class LevelIterator final : public InternalIterator {
|
||||
bool skip_filters, int level, RangeDelAggregator* range_del_agg,
|
||||
const std::vector<AtomicCompactionUnitBoundary>*
|
||||
compaction_boundaries = nullptr)
|
||||
: InternalIterator(false),
|
||||
table_cache_(table_cache),
|
||||
: table_cache_(table_cache),
|
||||
read_options_(read_options),
|
||||
env_options_(env_options),
|
||||
icomparator_(icomparator),
|
||||
|
@ -1786,17 +1786,31 @@ class MemTableInserter : public WriteBatch::Handler {
|
||||
// check if memtable_list size exceeds max_write_buffer_size_to_maintain
|
||||
if (trim_history_scheduler_ != nullptr) {
|
||||
auto* cfd = cf_mems_->current();
|
||||
assert(cfd != nullptr);
|
||||
if (cfd->ioptions()->max_write_buffer_size_to_maintain > 0 &&
|
||||
cfd->mem()->ApproximateMemoryUsageFast() +
|
||||
cfd->imm()->ApproximateMemoryUsageExcludingLast() >=
|
||||
static_cast<size_t>(
|
||||
cfd->ioptions()->max_write_buffer_size_to_maintain) &&
|
||||
cfd->imm()->MarkTrimHistoryNeeded()) {
|
||||
|
||||
assert(cfd);
|
||||
assert(cfd->ioptions());
|
||||
|
||||
const size_t size_to_maintain = static_cast<size_t>(
|
||||
cfd->ioptions()->max_write_buffer_size_to_maintain);
|
||||
|
||||
if (size_to_maintain > 0) {
|
||||
MemTableList* const imm = cfd->imm();
|
||||
assert(imm);
|
||||
|
||||
if (imm->HasHistory()) {
|
||||
const MemTable* const mem = cfd->mem();
|
||||
assert(mem);
|
||||
|
||||
if (mem->ApproximateMemoryUsageFast() +
|
||||
imm->ApproximateMemoryUsageExcludingLast() >=
|
||||
size_to_maintain &&
|
||||
imm->MarkTrimHistoryNeeded()) {
|
||||
trim_history_scheduler_->ScheduleWork(cfd);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The write batch handler calls MarkBeginPrepare with unprepare set to true
|
||||
// if it encounters the kTypeBeginUnprepareXID marker.
|
||||
|
@ -276,10 +276,14 @@ class BackupEngine {
|
||||
progress_callback);
|
||||
}
|
||||
|
||||
// deletes old backups, keeping latest num_backups_to_keep alive
|
||||
// Deletes old backups, keeping latest num_backups_to_keep alive.
|
||||
// See also DeleteBackup.
|
||||
virtual Status PurgeOldBackups(uint32_t num_backups_to_keep) = 0;
|
||||
|
||||
// deletes a specific backup
|
||||
// Deletes a specific backup. If this operation (or PurgeOldBackups)
|
||||
// is not completed due to crash, power failure, etc. the state
|
||||
// will be cleaned up the next time you call DeleteBackup,
|
||||
// PurgeOldBackups, or GarbageCollect.
|
||||
virtual Status DeleteBackup(BackupID backup_id) = 0;
|
||||
|
||||
// Call this from another thread if you want to stop the backup
|
||||
@ -287,8 +291,8 @@ class BackupEngine {
|
||||
// not wait for the backup to stop.
|
||||
// The backup will stop ASAP and the call to CreateNewBackup will
|
||||
// return Status::Incomplete(). It will not clean up after itself, but
|
||||
// the state will remain consistent. The state will be cleaned up
|
||||
// next time you create BackupableDB or RestoreBackupableDB.
|
||||
// the state will remain consistent. The state will be cleaned up the
|
||||
// next time you call CreateNewBackup or GarbageCollect.
|
||||
virtual void StopBackup() = 0;
|
||||
|
||||
// Returns info about backups in backup_info
|
||||
@ -323,9 +327,13 @@ class BackupEngine {
|
||||
// Returns Status::OK() if all checks are good
|
||||
virtual Status VerifyBackup(BackupID backup_id) = 0;
|
||||
|
||||
// Will delete all the files we don't need anymore
|
||||
// It will do the full scan of the files/ directory and delete all the
|
||||
// files that are not referenced.
|
||||
// Will delete any files left over from incomplete creation or deletion of
|
||||
// a backup. This is not normally needed as those operations also clean up
|
||||
// after prior incomplete calls to the same kind of operation (create or
|
||||
// delete).
|
||||
// NOTE: This is not designed to delete arbitrary files added to the backup
|
||||
// directory outside of BackupEngine, and clean-up is always subject to
|
||||
// permissions on and availability of the underlying filesystem.
|
||||
virtual Status GarbageCollect() = 0;
|
||||
};
|
||||
|
||||
|
@ -5,8 +5,8 @@
|
||||
#pragma once
|
||||
|
||||
#define ROCKSDB_MAJOR 6
|
||||
#define ROCKSDB_MINOR 4
|
||||
#define ROCKSDB_PATCH 0
|
||||
#define ROCKSDB_MINOR 5
|
||||
#define ROCKSDB_PATCH 3
|
||||
|
||||
// Do not use these. We made the mistake of declaring macros starting with
|
||||
// double underscore. Now we have to live with our choice. We'll deprecate these
|
||||
|
@ -474,7 +474,7 @@ class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon {
|
||||
return;
|
||||
}
|
||||
handle = biter.value().handle;
|
||||
uint64_t last_off = handle.offset() + handle.size() + kBlockTrailerSize;
|
||||
uint64_t last_off = handle.offset() + block_size(handle);
|
||||
uint64_t prefetch_len = last_off - prefetch_off;
|
||||
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
|
||||
auto& file = rep->file;
|
||||
@ -2299,7 +2299,7 @@ void BlockBasedTable::RetrieveMultipleBlocks(
|
||||
}
|
||||
|
||||
ReadRequest req;
|
||||
req.len = handle.size() + kBlockTrailerSize;
|
||||
req.len = block_size(handle);
|
||||
if (scratch == nullptr) {
|
||||
req.scratch = new char[req.len];
|
||||
} else {
|
||||
@ -2326,11 +2326,11 @@ void BlockBasedTable::RetrieveMultipleBlocks(
|
||||
ReadRequest& req = read_reqs[read_req_idx++];
|
||||
Status s = req.status;
|
||||
if (s.ok()) {
|
||||
if (req.result.size() != handle.size() + kBlockTrailerSize) {
|
||||
if (req.result.size() != req.len) {
|
||||
s = Status::Corruption("truncated block read from " +
|
||||
rep_->file->file_name() + " offset " +
|
||||
ToString(handle.offset()) + ", expected " +
|
||||
ToString(handle.size() + kBlockTrailerSize) +
|
||||
ToString(req.len) +
|
||||
" bytes, got " + ToString(req.result.size()));
|
||||
}
|
||||
}
|
||||
@ -2368,11 +2368,20 @@ void BlockBasedTable::RetrieveMultipleBlocks(
|
||||
// MaybeReadBlockAndLoadToCache will insert into the block caches if
|
||||
// necessary. Since we're passing the raw block contents, it will
|
||||
// avoid looking up the block cache
|
||||
s = MaybeReadBlockAndLoadToCache(nullptr, options, handle,
|
||||
uncompression_dict, block_entry, BlockType::kData,
|
||||
mget_iter->get_context, &lookup_data_block_context,
|
||||
&raw_block_contents);
|
||||
} else {
|
||||
s = MaybeReadBlockAndLoadToCache(
|
||||
nullptr, options, handle, uncompression_dict, block_entry,
|
||||
BlockType::kData, mget_iter->get_context,
|
||||
&lookup_data_block_context, &raw_block_contents);
|
||||
|
||||
// block_entry value could be null if no block cache is present, i.e
|
||||
// BlockBasedTableOptions::no_block_cache is true and no compressed
|
||||
// block cache is configured. In that case, fall
|
||||
// through and set up the block explicitly
|
||||
if (block_entry->GetValue() != nullptr) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
CompressionType compression_type =
|
||||
raw_block_contents.get_compression_type();
|
||||
BlockContents contents;
|
||||
@ -2380,24 +2389,24 @@ void BlockBasedTable::RetrieveMultipleBlocks(
|
||||
UncompressionContext context(compression_type);
|
||||
UncompressionInfo info(context, uncompression_dict, compression_type);
|
||||
s = UncompressBlockContents(info, req.result.data(), handle.size(),
|
||||
&contents, footer.version(), rep_->ioptions,
|
||||
memory_allocator);
|
||||
&contents, footer.version(),
|
||||
rep_->ioptions, memory_allocator);
|
||||
} else {
|
||||
if (scratch != nullptr) {
|
||||
// If we used the scratch buffer, then the contents need to be
|
||||
// copied to heap
|
||||
Slice raw = Slice(req.result.data(), handle.size());
|
||||
contents = BlockContents(CopyBufferToHeap(
|
||||
GetMemoryAllocator(rep_->table_options), raw),
|
||||
contents = BlockContents(
|
||||
CopyBufferToHeap(GetMemoryAllocator(rep_->table_options), raw),
|
||||
handle.size());
|
||||
} else {
|
||||
contents = std::move(raw_block_contents);
|
||||
}
|
||||
}
|
||||
if (s.ok()) {
|
||||
(*results)[idx_in_batch].SetOwnedValue(new Block(std::move(contents),
|
||||
global_seqno, read_amp_bytes_per_bit, ioptions.statistics));
|
||||
}
|
||||
(*results)[idx_in_batch].SetOwnedValue(
|
||||
new Block(std::move(contents), global_seqno,
|
||||
read_amp_bytes_per_bit, ioptions.statistics));
|
||||
}
|
||||
}
|
||||
(*statuses)[idx_in_batch] = s;
|
||||
@ -2706,11 +2715,21 @@ void BlockBasedTableIterator<TBlockIter, TValue>::SeekImpl(
|
||||
// Index contains the first key of the block, and it's >= target.
|
||||
// We can defer reading the block.
|
||||
is_at_first_key_from_index_ = true;
|
||||
// ResetDataIter() will invalidate block_iter_. Thus, there is no need to
|
||||
// call CheckDataBlockWithinUpperBound() to check for iterate_upper_bound
|
||||
// as that will be done later when the data block is actually read.
|
||||
ResetDataIter();
|
||||
} else {
|
||||
// Need to use the data block.
|
||||
if (!same_block) {
|
||||
InitDataBlock();
|
||||
} else {
|
||||
// When the user does a reseek, the iterate_upper_bound might have
|
||||
// changed. CheckDataBlockWithinUpperBound() needs to be called
|
||||
// explicitly if the reseek ends up in the same data block.
|
||||
// If the reseek ends up in a different block, InitDataBlock() will do
|
||||
// the iterator upper bound check.
|
||||
CheckDataBlockWithinUpperBound();
|
||||
}
|
||||
|
||||
if (target) {
|
||||
@ -2721,7 +2740,6 @@ void BlockBasedTableIterator<TBlockIter, TValue>::SeekImpl(
|
||||
FindKeyForward();
|
||||
}
|
||||
|
||||
CheckDataBlockWithinUpperBound();
|
||||
CheckOutOfBound();
|
||||
|
||||
if (target) {
|
||||
@ -2868,8 +2886,7 @@ void BlockBasedTableIterator<TBlockIter, TValue>::InitDataBlock() {
|
||||
BlockBasedTable::kMinNumFileReadsToStartAutoReadahead) {
|
||||
if (!rep->file->use_direct_io() &&
|
||||
(data_block_handle.offset() +
|
||||
static_cast<size_t>(data_block_handle.size()) +
|
||||
kBlockTrailerSize >
|
||||
static_cast<size_t>(block_size(data_block_handle)) >
|
||||
readahead_limit_)) {
|
||||
// Buffered I/O
|
||||
// Discarding the return status of Prefetch calls intentionally, as
|
||||
@ -3367,7 +3384,6 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
|
||||
autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE> block_handles;
|
||||
autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE> results;
|
||||
autovector<Status, MultiGetContext::MAX_BATCH_SIZE> statuses;
|
||||
static const size_t kMultiGetReadStackBufSize = 8192;
|
||||
char stack_buf[kMultiGetReadStackBufSize];
|
||||
std::unique_ptr<char[]> block_buf;
|
||||
{
|
||||
@ -3449,7 +3465,7 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
|
||||
block_handles.emplace_back(BlockHandle::NullBlockHandle());
|
||||
} else {
|
||||
block_handles.emplace_back(handle);
|
||||
total_len += handle.size();
|
||||
total_len += block_size(handle);
|
||||
}
|
||||
}
|
||||
|
||||
@ -3737,8 +3753,12 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
|
||||
size_t readahead_size = (read_options.readahead_size != 0)
|
||||
? read_options.readahead_size
|
||||
: kMaxAutoReadaheadSize;
|
||||
FilePrefetchBuffer prefetch_buffer(rep_->file.get(), readahead_size,
|
||||
readahead_size);
|
||||
// FilePrefetchBuffer doesn't work in mmap mode and readahead is not
|
||||
// needed there.
|
||||
FilePrefetchBuffer prefetch_buffer(
|
||||
rep_->file.get(), readahead_size /* readadhead_size */,
|
||||
readahead_size /* max_readahead_size */,
|
||||
!rep_->ioptions.allow_mmap_reads /* enable */);
|
||||
|
||||
for (index_iter->SeekToFirst(); index_iter->Valid(); index_iter->Next()) {
|
||||
s = index_iter->status();
|
||||
|
@ -461,8 +461,13 @@ class BlockBasedTable : public TableReader {
|
||||
void DumpKeyValue(const Slice& key, const Slice& value,
|
||||
WritableFile* out_file);
|
||||
|
||||
// A cumulative data block file read in MultiGet lower than this size will
|
||||
// use a stack buffer
|
||||
static constexpr size_t kMultiGetReadStackBufSize = 8192;
|
||||
|
||||
friend class PartitionedFilterBlockReader;
|
||||
friend class PartitionedFilterBlockTest;
|
||||
friend class DBBasicTest_MultiGetIOBufferOverrun_Test;
|
||||
};
|
||||
|
||||
// Maitaning state of a two-level iteration on a partitioned index structure.
|
||||
@ -615,8 +620,7 @@ class BlockBasedTableIterator : public InternalIteratorBase<TValue> {
|
||||
const SliceTransform* prefix_extractor,
|
||||
BlockType block_type, TableReaderCaller caller,
|
||||
size_t compaction_readahead_size = 0)
|
||||
: InternalIteratorBase<TValue>(false),
|
||||
table_(table),
|
||||
: table_(table),
|
||||
read_options_(read_options),
|
||||
icomp_(icomp),
|
||||
user_comparator_(icomp.user_comparator()),
|
||||
|
@ -192,7 +192,12 @@ BlockHandle PartitionedFilterBlockReader::GetFilterPartitionHandle(
|
||||
index_key_includes_seq(), index_value_is_full());
|
||||
iter.Seek(entry);
|
||||
if (UNLIKELY(!iter.Valid())) {
|
||||
return BlockHandle(0, 0);
|
||||
// entry is larger than all the keys. However its prefix might still be
|
||||
// present in the last partition. If this is called by PrefixMayMatch this
|
||||
// is necessary for correct behavior. Otherwise it is unnecessary but safe.
|
||||
// Assuming this is an unlikely case for full key search, the performance
|
||||
// overhead should be negligible.
|
||||
iter.SeekToLast();
|
||||
}
|
||||
assert(iter.Valid());
|
||||
BlockHandle fltr_blk_handle = iter.value().handle;
|
||||
|
@ -327,7 +327,7 @@ TEST_P(PartitionedFilterBlockTest, SamePrefixInMultipleBlocks) {
|
||||
std::unique_ptr<PartitionedIndexBuilder> pib(NewIndexBuilder());
|
||||
std::unique_ptr<PartitionedFilterBlockBuilder> builder(
|
||||
NewBuilder(pib.get(), prefix_extractor.get()));
|
||||
const std::string pkeys[3] = {"p-key1", "p-key2", "p-key3"};
|
||||
const std::string pkeys[3] = {"p-key10", "p-key20", "p-key30"};
|
||||
builder->Add(pkeys[0]);
|
||||
CutABlock(pib.get(), pkeys[0], pkeys[1]);
|
||||
builder->Add(pkeys[1]);
|
||||
@ -344,6 +344,16 @@ TEST_P(PartitionedFilterBlockTest, SamePrefixInMultipleBlocks) {
|
||||
/*no_io=*/false, &ikey_slice, /*get_context=*/nullptr,
|
||||
/*lookup_context=*/nullptr));
|
||||
}
|
||||
// Non-existent keys but with the same prefix
|
||||
const std::string pnonkeys[4] = {"p-key9", "p-key11", "p-key21", "p-key31"};
|
||||
for (auto key : pnonkeys) {
|
||||
auto ikey = InternalKey(key, 0, ValueType::kTypeValue);
|
||||
const Slice ikey_slice = Slice(*ikey.rep());
|
||||
ASSERT_TRUE(reader->PrefixMayMatch(
|
||||
prefix_extractor->Transform(key), prefix_extractor.get(), kNotValid,
|
||||
/*no_io=*/false, &ikey_slice, /*get_context=*/nullptr,
|
||||
/*lookup_context=*/nullptr));
|
||||
}
|
||||
}
|
||||
|
||||
TEST_P(PartitionedFilterBlockTest, OneBlockPerKey) {
|
||||
|
@ -220,6 +220,11 @@ Status ReadFooterFromFile(RandomAccessFileReader* file,
|
||||
// 1-byte type + 32-bit crc
|
||||
static const size_t kBlockTrailerSize = 5;
|
||||
|
||||
// Make block size calculation for IO less error prone
|
||||
inline uint64_t block_size(const BlockHandle& handle) {
|
||||
return handle.size() + kBlockTrailerSize;
|
||||
}
|
||||
|
||||
inline CompressionType get_block_compression_type(const char* block_data,
|
||||
size_t block_size) {
|
||||
return static_cast<CompressionType>(block_data[block_size]);
|
||||
|
@ -25,8 +25,8 @@ struct IterateResult {
|
||||
template <class TValue>
|
||||
class InternalIteratorBase : public Cleanable {
|
||||
public:
|
||||
InternalIteratorBase() : is_mutable_(true) {}
|
||||
InternalIteratorBase(bool _is_mutable) : is_mutable_(_is_mutable) {}
|
||||
InternalIteratorBase() {}
|
||||
|
||||
// No copying allowed
|
||||
InternalIteratorBase(const InternalIteratorBase&) = delete;
|
||||
InternalIteratorBase& operator=(const InternalIteratorBase&) = delete;
|
||||
@ -148,7 +148,6 @@ class InternalIteratorBase : public Cleanable {
|
||||
virtual Status GetProperty(std::string /*prop_name*/, std::string* /*prop*/) {
|
||||
return Status::NotSupported("");
|
||||
}
|
||||
bool is_mutable() const { return is_mutable_; }
|
||||
|
||||
protected:
|
||||
void SeekForPrevImpl(const Slice& target, const Comparator* cmp) {
|
||||
|
@ -73,7 +73,6 @@ class IteratorWrapperBase {
|
||||
}
|
||||
void Prev() { assert(iter_); iter_->Prev(); Update(); }
|
||||
void Seek(const Slice& k) {
|
||||
TEST_SYNC_POINT("IteratorWrapper::Seek:0");
|
||||
assert(iter_);
|
||||
iter_->Seek(k);
|
||||
Update();
|
||||
|
@ -127,29 +127,14 @@ class MergingIterator : public InternalIterator {
|
||||
}
|
||||
|
||||
void Seek(const Slice& target) override {
|
||||
bool is_increasing_reseek = false;
|
||||
if (current_ != nullptr && direction_ == kForward && status_.ok() &&
|
||||
comparator_->Compare(target, key()) >= 0) {
|
||||
is_increasing_reseek = true;
|
||||
}
|
||||
ClearHeaps();
|
||||
status_ = Status::OK();
|
||||
for (auto& child : children_) {
|
||||
// If upper bound never changes, we can skip Seek() for
|
||||
// the !Valid() case too, but people do hack the code to change
|
||||
// upper bound between Seek(), so it's not a good idea to break
|
||||
// the API.
|
||||
// If DBIter is used on top of merging iterator, we probably
|
||||
// can skip mutable child iterators if they are invalid too,
|
||||
// but it's a less clean API. We can optimize for it later if
|
||||
// needed.
|
||||
if (!is_increasing_reseek || !child.Valid() ||
|
||||
comparator_->Compare(target, child.key()) > 0 ||
|
||||
child.iter()->is_mutable()) {
|
||||
{
|
||||
PERF_TIMER_GUARD(seek_child_seek_time);
|
||||
child.Seek(target);
|
||||
PERF_COUNTER_ADD(seek_child_seek_count, 1);
|
||||
}
|
||||
PERF_COUNTER_ADD(seek_child_seek_count, 1);
|
||||
|
||||
if (child.Valid()) {
|
||||
assert(child.status().ok());
|
||||
|
@ -9,24 +9,10 @@
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#include "rocksdb/utilities/backupable_db.h"
|
||||
#include "file/filename.h"
|
||||
#include "logging/logging.h"
|
||||
#include "port/port.h"
|
||||
#include "rocksdb/rate_limiter.h"
|
||||
#include "rocksdb/transaction_log.h"
|
||||
#include "test_util/sync_point.h"
|
||||
#include "util/channel.h"
|
||||
#include "util/coding.h"
|
||||
#include "util/crc32c.h"
|
||||
#include "util/file_reader_writer.h"
|
||||
#include "util/string_util.h"
|
||||
#include "utilities/checkpoint/checkpoint_impl.h"
|
||||
|
||||
#include <cinttypes>
|
||||
#include <stdlib.h>
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <cinttypes>
|
||||
#include <functional>
|
||||
#include <future>
|
||||
#include <limits>
|
||||
@ -39,6 +25,20 @@
|
||||
#include <unordered_set>
|
||||
#include <vector>
|
||||
|
||||
#include "file/filename.h"
|
||||
#include "logging/logging.h"
|
||||
#include "port/port.h"
|
||||
#include "rocksdb/rate_limiter.h"
|
||||
#include "rocksdb/transaction_log.h"
|
||||
#include "rocksdb/utilities/backupable_db.h"
|
||||
#include "test_util/sync_point.h"
|
||||
#include "util/channel.h"
|
||||
#include "util/coding.h"
|
||||
#include "util/crc32c.h"
|
||||
#include "util/file_reader_writer.h"
|
||||
#include "util/string_util.h"
|
||||
#include "utilities/checkpoint/checkpoint_impl.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
void BackupStatistics::IncrementNumberSuccessBackup() {
|
||||
@ -120,6 +120,7 @@ class BackupEngineImpl : public BackupEngine {
|
||||
|
||||
private:
|
||||
void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0);
|
||||
Status DeleteBackupInternal(BackupID backup_id);
|
||||
|
||||
// Extends the "result" map with pathname->size mappings for the contents of
|
||||
// "dir" in "env". Pathnames are prefixed with "dir".
|
||||
@ -456,6 +457,10 @@ class BackupEngineImpl : public BackupEngine {
|
||||
std::mutex byte_report_mutex_;
|
||||
channel<CopyOrCreateWorkItem> files_to_copy_or_create_;
|
||||
std::vector<port::Thread> threads_;
|
||||
// Certain operations like PurgeOldBackups and DeleteBackup will trigger
|
||||
// automatic GarbageCollect (true) unless we've already done one in this
|
||||
// session and have not failed to delete backup files since then (false).
|
||||
bool might_need_garbage_collect_ = true;
|
||||
|
||||
// Adds a file to the backup work queue to be copied or created if it doesn't
|
||||
// already exist.
|
||||
@ -559,6 +564,9 @@ Status BackupEngineImpl::Initialize() {
|
||||
options_.Dump(options_.info_log);
|
||||
|
||||
if (!read_only_) {
|
||||
// we might need to clean up from previous crash or I/O errors
|
||||
might_need_garbage_collect_ = true;
|
||||
|
||||
// gather the list of directories that we need to create
|
||||
std::vector<std::pair<std::string, std::unique_ptr<Directory>*>>
|
||||
directories;
|
||||
@ -755,7 +763,11 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
|
||||
if (s.ok()) {
|
||||
// maybe last backup failed and left partial state behind, clean it up.
|
||||
// need to do this before updating backups_ such that a private dir
|
||||
// named after new_backup_id will be cleaned up
|
||||
// named after new_backup_id will be cleaned up.
|
||||
// (If an incomplete new backup is followed by an incomplete delete
|
||||
// of the latest full backup, then there could be more than one next
|
||||
// id with a private dir, the last thing to be deleted in delete
|
||||
// backup, but all will be cleaned up with a GarbageCollect.)
|
||||
s = GarbageCollect();
|
||||
} else if (s.IsNotFound()) {
|
||||
// normal case, the new backup's private dir doesn't exist yet
|
||||
@ -928,8 +940,8 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
|
||||
ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s\n",
|
||||
backup_statistics_.ToString().c_str());
|
||||
// delete files that we might have already written
|
||||
might_need_garbage_collect_ = true;
|
||||
DeleteBackup(new_backup_id);
|
||||
GarbageCollect();
|
||||
return s;
|
||||
}
|
||||
|
||||
@ -957,6 +969,10 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
|
||||
Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
|
||||
assert(initialized_);
|
||||
assert(!read_only_);
|
||||
|
||||
// Best effort deletion even with errors
|
||||
Status overall_status = Status::OK();
|
||||
|
||||
ROCKS_LOG_INFO(options_.info_log, "Purging old backups, keeping %u",
|
||||
num_backups_to_keep);
|
||||
std::vector<BackupID> to_delete;
|
||||
@ -966,17 +982,44 @@ Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
|
||||
itr++;
|
||||
}
|
||||
for (auto backup_id : to_delete) {
|
||||
auto s = DeleteBackup(backup_id);
|
||||
auto s = DeleteBackupInternal(backup_id);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
overall_status = s;
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
// Clean up after any incomplete backup deletion, potentially from
|
||||
// earlier session.
|
||||
if (might_need_garbage_collect_) {
|
||||
auto s = GarbageCollect();
|
||||
if (!s.ok() && overall_status.ok()) {
|
||||
overall_status = s;
|
||||
}
|
||||
}
|
||||
return overall_status;
|
||||
}
|
||||
|
||||
Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
|
||||
auto s1 = DeleteBackupInternal(backup_id);
|
||||
auto s2 = Status::OK();
|
||||
|
||||
// Clean up after any incomplete backup deletion, potentially from
|
||||
// earlier session.
|
||||
if (might_need_garbage_collect_) {
|
||||
s2 = GarbageCollect();
|
||||
}
|
||||
|
||||
if (!s1.ok()) {
|
||||
return s1;
|
||||
} else {
|
||||
return s2;
|
||||
}
|
||||
}
|
||||
|
||||
// Does not auto-GarbageCollect
|
||||
Status BackupEngineImpl::DeleteBackupInternal(BackupID backup_id) {
|
||||
assert(initialized_);
|
||||
assert(!read_only_);
|
||||
|
||||
ROCKS_LOG_INFO(options_.info_log, "Deleting backup %u", backup_id);
|
||||
auto backup = backups_.find(backup_id);
|
||||
if (backup != backups_.end()) {
|
||||
@ -997,6 +1040,10 @@ Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
|
||||
corrupt_backups_.erase(corrupt);
|
||||
}
|
||||
|
||||
// After removing meta file, best effort deletion even with errors.
|
||||
// (Don't delete other files if we can't delete the meta file right
|
||||
// now.)
|
||||
|
||||
if (options_.max_valid_backups_to_open == port::kMaxInt32) {
|
||||
std::vector<std::string> to_delete;
|
||||
for (auto& itr : backuped_file_infos_) {
|
||||
@ -1005,6 +1052,10 @@ Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
|
||||
ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
|
||||
itr.first.c_str(), s.ToString().c_str());
|
||||
to_delete.push_back(itr.first);
|
||||
if (!s.ok()) {
|
||||
// Trying again later might work
|
||||
might_need_garbage_collect_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
for (auto& td : to_delete) {
|
||||
@ -1023,6 +1074,10 @@ Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
|
||||
Status s = backup_env_->DeleteDir(GetAbsolutePath(private_dir));
|
||||
ROCKS_LOG_INFO(options_.info_log, "Deleting private dir %s -- %s",
|
||||
private_dir.c_str(), s.ToString().c_str());
|
||||
if (!s.ok()) {
|
||||
// Full gc or trying again later might work
|
||||
might_need_garbage_collect_ = true;
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -1505,23 +1560,30 @@ Status BackupEngineImpl::InsertPathnameToSizeBytes(
|
||||
|
||||
Status BackupEngineImpl::GarbageCollect() {
|
||||
assert(!read_only_);
|
||||
|
||||
// We will make a best effort to remove all garbage even in the presence
|
||||
// of inconsistencies or I/O failures that inhibit finding garbage.
|
||||
Status overall_status = Status::OK();
|
||||
// If all goes well, we don't need another auto-GC this session
|
||||
might_need_garbage_collect_ = false;
|
||||
|
||||
ROCKS_LOG_INFO(options_.info_log, "Starting garbage collection");
|
||||
if (options_.max_valid_backups_to_open == port::kMaxInt32) {
|
||||
if (options_.max_valid_backups_to_open != port::kMaxInt32) {
|
||||
ROCKS_LOG_WARN(
|
||||
options_.info_log,
|
||||
"Garbage collection is limited since `max_valid_backups_to_open` "
|
||||
"constrains how many backups the engine knows about");
|
||||
}
|
||||
|
||||
if (options_.share_table_files &&
|
||||
options_.max_valid_backups_to_open == port::kMaxInt32) {
|
||||
if (options_.max_valid_backups_to_open == port::kMaxInt32) {
|
||||
// delete obsolete shared files
|
||||
// we cannot do this when BackupEngine has `max_valid_backups_to_open` set
|
||||
// as those engines don't know about all shared files.
|
||||
for (bool with_checksum : {false, true}) {
|
||||
std::vector<std::string> shared_children;
|
||||
{
|
||||
std::string shared_path;
|
||||
if (options_.share_files_with_checksum) {
|
||||
if (with_checksum) {
|
||||
shared_path = GetAbsolutePath(GetSharedFileWithChecksumRel());
|
||||
} else {
|
||||
shared_path = GetAbsolutePath(GetSharedFileRel());
|
||||
@ -1533,12 +1595,17 @@ Status BackupEngineImpl::GarbageCollect() {
|
||||
s = Status::OK();
|
||||
}
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
overall_status = s;
|
||||
// Trying again later might work
|
||||
might_need_garbage_collect_ = true;
|
||||
}
|
||||
}
|
||||
for (auto& child : shared_children) {
|
||||
if (child == "." || child == "..") {
|
||||
continue;
|
||||
}
|
||||
std::string rel_fname;
|
||||
if (options_.share_files_with_checksum) {
|
||||
if (with_checksum) {
|
||||
rel_fname = GetSharedFileWithChecksumRel(child);
|
||||
} else {
|
||||
rel_fname = GetSharedFileRel(child);
|
||||
@ -1553,6 +1620,11 @@ Status BackupEngineImpl::GarbageCollect() {
|
||||
ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
|
||||
rel_fname.c_str(), s.ToString().c_str());
|
||||
backuped_file_infos_.erase(rel_fname);
|
||||
if (!s.ok()) {
|
||||
// Trying again later might work
|
||||
might_need_garbage_collect_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1563,10 +1635,15 @@ Status BackupEngineImpl::GarbageCollect() {
|
||||
auto s = backup_env_->GetChildren(GetAbsolutePath(GetPrivateDirRel()),
|
||||
&private_children);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
overall_status = s;
|
||||
// Trying again later might work
|
||||
might_need_garbage_collect_ = true;
|
||||
}
|
||||
}
|
||||
for (auto& child : private_children) {
|
||||
if (child == "." || child == "..") {
|
||||
continue;
|
||||
}
|
||||
// it's ok to do this when BackupEngine has `max_valid_backups_to_open` set
|
||||
// as the engine always knows all valid backup numbers.
|
||||
BackupID backup_id = 0;
|
||||
@ -1583,18 +1660,30 @@ Status BackupEngineImpl::GarbageCollect() {
|
||||
std::vector<std::string> subchildren;
|
||||
backup_env_->GetChildren(full_private_path, &subchildren);
|
||||
for (auto& subchild : subchildren) {
|
||||
if (subchild == "." || subchild == "..") {
|
||||
continue;
|
||||
}
|
||||
Status s = backup_env_->DeleteFile(full_private_path + subchild);
|
||||
ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
|
||||
(full_private_path + subchild).c_str(),
|
||||
s.ToString().c_str());
|
||||
if (!s.ok()) {
|
||||
// Trying again later might work
|
||||
might_need_garbage_collect_ = true;
|
||||
}
|
||||
}
|
||||
// finally delete the private dir
|
||||
Status s = backup_env_->DeleteDir(full_private_path);
|
||||
ROCKS_LOG_INFO(options_.info_log, "Deleting dir %s -- %s",
|
||||
full_private_path.c_str(), s.ToString().c_str());
|
||||
if (!s.ok()) {
|
||||
// Trying again later might work
|
||||
might_need_garbage_collect_ = true;
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
assert(overall_status.ok() || might_need_garbage_collect_);
|
||||
return overall_status;
|
||||
}
|
||||
|
||||
// ------- BackupMeta class --------
|
||||
|
@ -10,7 +10,9 @@
|
||||
#if !defined(ROCKSDB_LITE) && !defined(OS_WIN)
|
||||
|
||||
#include <algorithm>
|
||||
#include <limits>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
#include "db/db_impl/db_impl.h"
|
||||
#include "env/env_chroot.h"
|
||||
@ -516,6 +518,15 @@ static void AssertEmpty(DB* db, int from, int to) {
|
||||
|
||||
class BackupableDBTest : public testing::Test {
|
||||
public:
|
||||
enum ShareOption {
|
||||
kNoShare,
|
||||
kShareNoChecksum,
|
||||
kShareWithChecksum,
|
||||
};
|
||||
|
||||
const std::vector<ShareOption> kAllShareOptions = {kNoShare, kShareNoChecksum,
|
||||
kShareWithChecksum};
|
||||
|
||||
BackupableDBTest() {
|
||||
// set up files
|
||||
std::string db_chroot = test::PerThreadDBPath("backupable_db");
|
||||
@ -561,15 +572,8 @@ class BackupableDBTest : public testing::Test {
|
||||
return db;
|
||||
}
|
||||
|
||||
void OpenDBAndBackupEngineShareWithChecksum(
|
||||
bool destroy_old_data = false, bool dummy = false,
|
||||
bool /*share_table_files*/ = true, bool share_with_checksums = false) {
|
||||
backupable_options_->share_files_with_checksum = share_with_checksums;
|
||||
OpenDBAndBackupEngine(destroy_old_data, dummy, share_with_checksums);
|
||||
}
|
||||
|
||||
void OpenDBAndBackupEngine(bool destroy_old_data = false, bool dummy = false,
|
||||
bool share_table_files = true) {
|
||||
ShareOption shared_option = kShareNoChecksum) {
|
||||
// reset all the defaults
|
||||
test_backup_env_->SetLimitWrittenFiles(1000000);
|
||||
test_db_env_->SetLimitWrittenFiles(1000000);
|
||||
@ -584,7 +588,9 @@ class BackupableDBTest : public testing::Test {
|
||||
}
|
||||
db_.reset(db);
|
||||
backupable_options_->destroy_old_data = destroy_old_data;
|
||||
backupable_options_->share_table_files = share_table_files;
|
||||
backupable_options_->share_table_files = shared_option != kNoShare;
|
||||
backupable_options_->share_files_with_checksum =
|
||||
shared_option == kShareWithChecksum;
|
||||
BackupEngine* backup_engine;
|
||||
ASSERT_OK(BackupEngine::Open(test_db_env_.get(), *backupable_options_,
|
||||
&backup_engine));
|
||||
@ -839,7 +845,7 @@ INSTANTIATE_TEST_CASE_P(BackupableDBTestWithParam, BackupableDBTestWithParam,
|
||||
::testing::Bool());
|
||||
|
||||
// this will make sure that backup does not copy the same file twice
|
||||
TEST_F(BackupableDBTest, NoDoubleCopy) {
|
||||
TEST_F(BackupableDBTest, NoDoubleCopy_And_AutoGC) {
|
||||
OpenDBAndBackupEngine(true, true);
|
||||
|
||||
// should write 5 DB files + one meta file
|
||||
@ -857,23 +863,30 @@ TEST_F(BackupableDBTest, NoDoubleCopy) {
|
||||
AppendPath(backupdir_, should_have_written);
|
||||
test_backup_env_->AssertWrittenFiles(should_have_written);
|
||||
|
||||
char db_number = '1';
|
||||
|
||||
for (std::string other_sst : {"00015.sst", "00017.sst", "00019.sst"}) {
|
||||
// should write 4 new DB files + one meta file
|
||||
// should not write/copy 00010.sst, since it's already there!
|
||||
test_backup_env_->SetLimitWrittenFiles(6);
|
||||
test_backup_env_->ClearWrittenFiles();
|
||||
|
||||
dummy_db_->live_files_ = {"/00010.sst", "/00015.sst", "/CURRENT",
|
||||
dummy_db_->live_files_ = {"/00010.sst", "/" + other_sst, "/CURRENT",
|
||||
"/MANIFEST-01"};
|
||||
dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}};
|
||||
test_db_env_->SetFilenamesForMockedAttrs(dummy_db_->live_files_);
|
||||
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false));
|
||||
// should not open 00010.sst - it's already there
|
||||
|
||||
should_have_written = {"/shared/.00015.sst.tmp", "/private/2/CURRENT",
|
||||
"/private/2/MANIFEST-01", "/private/2/00011.log",
|
||||
"/meta/.2.tmp"};
|
||||
++db_number;
|
||||
std::string private_dir = std::string("/private/") + db_number;
|
||||
should_have_written = {
|
||||
"/shared/." + other_sst + ".tmp", private_dir + "/CURRENT",
|
||||
private_dir + "/MANIFEST-01", private_dir + "/00011.log",
|
||||
std::string("/meta/.") + db_number + ".tmp"};
|
||||
AppendPath(backupdir_, should_have_written);
|
||||
test_backup_env_->AssertWrittenFiles(should_have_written);
|
||||
}
|
||||
|
||||
ASSERT_OK(backup_engine_->DeleteBackup(1));
|
||||
ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00010.sst"));
|
||||
@ -890,6 +903,42 @@ TEST_F(BackupableDBTest, NoDoubleCopy) {
|
||||
test_backup_env_->GetFileSize(backupdir_ + "/shared/00015.sst", &size);
|
||||
ASSERT_EQ(200UL, size);
|
||||
|
||||
CloseBackupEngine();
|
||||
|
||||
//
|
||||
// Now simulate incomplete delete by removing just meta
|
||||
//
|
||||
ASSERT_OK(test_backup_env_->DeleteFile(backupdir_ + "/meta/2"));
|
||||
|
||||
OpenBackupEngine();
|
||||
|
||||
// 1 appears to be removed, so
|
||||
// 2 non-corrupt and 0 corrupt seen
|
||||
std::vector<BackupInfo> backup_info;
|
||||
std::vector<BackupID> corrupt_backup_ids;
|
||||
backup_engine_->GetBackupInfo(&backup_info);
|
||||
backup_engine_->GetCorruptedBackups(&corrupt_backup_ids);
|
||||
ASSERT_EQ(2UL, backup_info.size());
|
||||
ASSERT_EQ(0UL, corrupt_backup_ids.size());
|
||||
|
||||
// Keep the two we see, but this should suffice to purge unreferenced
|
||||
// shared files from incomplete delete.
|
||||
ASSERT_OK(backup_engine_->PurgeOldBackups(2));
|
||||
|
||||
// Make sure dangling sst file has been removed (somewhere along this
|
||||
// process). GarbageCollect should not be needed.
|
||||
ASSERT_EQ(Status::NotFound(),
|
||||
test_backup_env_->FileExists(backupdir_ + "/shared/00015.sst"));
|
||||
ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00017.sst"));
|
||||
ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00019.sst"));
|
||||
|
||||
// Now actually purge a good one
|
||||
ASSERT_OK(backup_engine_->PurgeOldBackups(1));
|
||||
|
||||
ASSERT_EQ(Status::NotFound(),
|
||||
test_backup_env_->FileExists(backupdir_ + "/shared/00017.sst"));
|
||||
ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00019.sst"));
|
||||
|
||||
CloseDBAndBackupEngine();
|
||||
}
|
||||
|
||||
@ -972,7 +1021,8 @@ TEST_F(BackupableDBTest, CorruptionsTest) {
|
||||
ASSERT_OK(backup_engine_->DeleteBackup(4));
|
||||
ASSERT_OK(backup_engine_->DeleteBackup(3));
|
||||
ASSERT_OK(backup_engine_->DeleteBackup(2));
|
||||
(void)backup_engine_->GarbageCollect();
|
||||
// Should not be needed anymore with auto-GC on DeleteBackup
|
||||
//(void)backup_engine_->GarbageCollect();
|
||||
ASSERT_EQ(Status::NotFound(),
|
||||
file_manager_->FileExists(backupdir_ + "/meta/5"));
|
||||
ASSERT_EQ(Status::NotFound(),
|
||||
@ -1161,7 +1211,7 @@ TEST_F(BackupableDBTest, FailOverwritingBackups) {
|
||||
|
||||
TEST_F(BackupableDBTest, NoShareTableFiles) {
|
||||
const int keys_iteration = 5000;
|
||||
OpenDBAndBackupEngine(true, false, false);
|
||||
OpenDBAndBackupEngine(true, false, kNoShare);
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1));
|
||||
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(i % 2)));
|
||||
@ -1177,7 +1227,7 @@ TEST_F(BackupableDBTest, NoShareTableFiles) {
|
||||
// Verify that you can backup and restore with share_files_with_checksum on
|
||||
TEST_F(BackupableDBTest, ShareTableFilesWithChecksums) {
|
||||
const int keys_iteration = 5000;
|
||||
OpenDBAndBackupEngineShareWithChecksum(true, false, true, true);
|
||||
OpenDBAndBackupEngine(true, false, kShareWithChecksum);
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1));
|
||||
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(i % 2)));
|
||||
@ -1195,7 +1245,7 @@ TEST_F(BackupableDBTest, ShareTableFilesWithChecksums) {
|
||||
TEST_F(BackupableDBTest, ShareTableFilesWithChecksumsTransition) {
|
||||
const int keys_iteration = 5000;
|
||||
// set share_files_with_checksum to false
|
||||
OpenDBAndBackupEngineShareWithChecksum(true, false, true, false);
|
||||
OpenDBAndBackupEngine(true, false, kShareNoChecksum);
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1));
|
||||
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
|
||||
@ -1208,55 +1258,108 @@ TEST_F(BackupableDBTest, ShareTableFilesWithChecksumsTransition) {
|
||||
}
|
||||
|
||||
// set share_files_with_checksum to true and do some more backups
|
||||
OpenDBAndBackupEngineShareWithChecksum(true, false, true, true);
|
||||
OpenDBAndBackupEngine(false /* destroy_old_data */, false,
|
||||
kShareWithChecksum);
|
||||
for (int i = 5; i < 10; ++i) {
|
||||
FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1));
|
||||
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
|
||||
}
|
||||
CloseDBAndBackupEngine();
|
||||
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 5 + 1),
|
||||
// Verify first (about to delete)
|
||||
AssertBackupConsistency(1, 0, keys_iteration, keys_iteration * 11);
|
||||
|
||||
// For an extra challenge, make sure that GarbageCollect / DeleteBackup
|
||||
// is OK even if we open without share_table_files
|
||||
OpenDBAndBackupEngine(false /* destroy_old_data */, false, kNoShare);
|
||||
backup_engine_->DeleteBackup(1);
|
||||
backup_engine_->GarbageCollect();
|
||||
CloseDBAndBackupEngine();
|
||||
|
||||
// Verify rest (not deleted)
|
||||
for (int i = 1; i < 10; ++i) {
|
||||
AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1),
|
||||
keys_iteration * 11);
|
||||
}
|
||||
}
|
||||
|
||||
// This test simulates cleaning up after aborted or incomplete creation
|
||||
// of a new backup.
|
||||
TEST_F(BackupableDBTest, DeleteTmpFiles) {
|
||||
for (bool shared_checksum : {false, true}) {
|
||||
if (shared_checksum) {
|
||||
OpenDBAndBackupEngineShareWithChecksum(
|
||||
false /* destroy_old_data */, false /* dummy */,
|
||||
true /* share_table_files */, true /* share_with_checksums */);
|
||||
} else {
|
||||
OpenDBAndBackupEngine();
|
||||
for (int cleanup_fn : {1, 2, 3, 4}) {
|
||||
for (ShareOption shared_option : kAllShareOptions) {
|
||||
OpenDBAndBackupEngine(false /* destroy_old_data */, false /* dummy */,
|
||||
shared_option);
|
||||
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
|
||||
BackupID next_id = 1;
|
||||
BackupID oldest_id = std::numeric_limits<BackupID>::max();
|
||||
{
|
||||
std::vector<BackupInfo> backup_info;
|
||||
backup_engine_->GetBackupInfo(&backup_info);
|
||||
for (const auto& bi : backup_info) {
|
||||
next_id = std::max(next_id, bi.backup_id + 1);
|
||||
oldest_id = std::min(oldest_id, bi.backup_id);
|
||||
}
|
||||
}
|
||||
CloseDBAndBackupEngine();
|
||||
std::string shared_tmp = backupdir_;
|
||||
if (shared_checksum) {
|
||||
shared_tmp += "/shared_checksum";
|
||||
} else {
|
||||
shared_tmp += "/shared";
|
||||
|
||||
// An aborted or incomplete new backup will always be in the next
|
||||
// id (maybe more)
|
||||
std::string next_private = "private/" + std::to_string(next_id);
|
||||
|
||||
// NOTE: both shared and shared_checksum should be cleaned up
|
||||
// regardless of how the backup engine is opened.
|
||||
std::vector<std::string> tmp_files_and_dirs;
|
||||
for (const auto& dir_and_file : {
|
||||
std::make_pair(std::string("shared"),
|
||||
std::string(".00006.sst.tmp")),
|
||||
std::make_pair(std::string("shared_checksum"),
|
||||
std::string(".00007.sst.tmp")),
|
||||
std::make_pair(next_private, std::string("00003.sst")),
|
||||
}) {
|
||||
std::string dir = backupdir_ + "/" + dir_and_file.first;
|
||||
file_manager_->CreateDir(dir);
|
||||
ASSERT_OK(file_manager_->FileExists(dir));
|
||||
|
||||
std::string file = dir + "/" + dir_and_file.second;
|
||||
file_manager_->WriteToFile(file, "tmp");
|
||||
ASSERT_OK(file_manager_->FileExists(file));
|
||||
|
||||
tmp_files_and_dirs.push_back(file);
|
||||
}
|
||||
shared_tmp += "/.00006.sst.tmp";
|
||||
std::string private_tmp_dir = backupdir_ + "/private/10";
|
||||
std::string private_tmp_file = private_tmp_dir + "/00003.sst";
|
||||
file_manager_->WriteToFile(shared_tmp, "tmp");
|
||||
file_manager_->CreateDir(private_tmp_dir);
|
||||
file_manager_->WriteToFile(private_tmp_file, "tmp");
|
||||
ASSERT_OK(file_manager_->FileExists(private_tmp_dir));
|
||||
if (shared_checksum) {
|
||||
OpenDBAndBackupEngineShareWithChecksum(
|
||||
false /* destroy_old_data */, false /* dummy */,
|
||||
true /* share_table_files */, true /* share_with_checksums */);
|
||||
} else {
|
||||
OpenDBAndBackupEngine();
|
||||
if (cleanup_fn != /*CreateNewBackup*/ 4) {
|
||||
// This exists after CreateNewBackup because it's deleted then
|
||||
// re-created.
|
||||
tmp_files_and_dirs.push_back(backupdir_ + "/" + next_private);
|
||||
}
|
||||
|
||||
OpenDBAndBackupEngine(false /* destroy_old_data */, false /* dummy */,
|
||||
shared_option);
|
||||
// Need to call one of these explicitly to delete tmp files
|
||||
switch (cleanup_fn) {
|
||||
case 1:
|
||||
ASSERT_OK(backup_engine_->GarbageCollect());
|
||||
break;
|
||||
case 2:
|
||||
ASSERT_OK(backup_engine_->DeleteBackup(oldest_id));
|
||||
break;
|
||||
case 3:
|
||||
ASSERT_OK(backup_engine_->PurgeOldBackups(1));
|
||||
break;
|
||||
case 4:
|
||||
// Does a garbage collect if it sees that next private dir exists
|
||||
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
|
||||
break;
|
||||
default:
|
||||
assert(false);
|
||||
}
|
||||
// Need to call this explicitly to delete tmp files
|
||||
(void)backup_engine_->GarbageCollect();
|
||||
CloseDBAndBackupEngine();
|
||||
ASSERT_EQ(Status::NotFound(), file_manager_->FileExists(shared_tmp));
|
||||
ASSERT_EQ(Status::NotFound(), file_manager_->FileExists(private_tmp_file));
|
||||
ASSERT_EQ(Status::NotFound(), file_manager_->FileExists(private_tmp_dir));
|
||||
for (std::string file_or_dir : tmp_files_and_dirs) {
|
||||
if (file_manager_->FileExists(file_or_dir) != Status::NotFound()) {
|
||||
FAIL() << file_or_dir << " was expected to be deleted." << cleanup_fn;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user