Compare commits

...

20 Commits
main ... 6.5.fb

Author SHA1 Message Date
Levi Tamasi
f48aa1c308 Bump up version to 6.5.3 2020-01-10 09:58:14 -08:00
Levi Tamasi
e7d7b1075e Update HISTORY.md with the recent memtable trimming fixes
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/6194

Differential Revision: D19125292

Pulled By: ltamasi

fbshipit-source-id: d41aca2755ec4bec07feedd6b561e8d18606a931
2020-01-10 09:57:50 -08:00
Levi Tamasi
c0a5673aa7 Fix a data race related to memtable trimming (#6187)
Summary:
https://github.com/facebook/rocksdb/pull/6177 introduced a data race
involving `MemTableList::InstallNewVersion` and `MemTableList::NumFlushed`.
The patch fixes this by caching whether the current version has any
memtable history (i.e. flushed memtables that are kept around for
transaction conflict checking) in an `std::atomic<bool>` member called
`current_has_history_`, similarly to how `current_memory_usage_excluding_last_`
is handled.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6187

Test Plan:
```
make clean
COMPILE_WITH_TSAN=1 make db_test -j24
./db_test
```

Differential Revision: D19084059

Pulled By: ltamasi

fbshipit-source-id: 327a5af9700fb7102baea2cc8903c085f69543b9
2020-01-10 09:54:56 -08:00
Levi Tamasi
8f121d86a4 Do not schedule memtable trimming if there is no history (#6177)
Summary:
We have observed an increase in CPU load caused by frequent calls to
`ColumnFamilyData::InstallSuperVersion` from `DBImpl::TrimMemtableHistory`
when using `max_write_buffer_size_to_maintain` to limit the amount of
memtable history maintained for transaction conflict checking. Part of the issue
is that trimming can potentially be scheduled even if there is no memtable
history. The patch adds a check that fixes this.

See also https://github.com/facebook/rocksdb/pull/6169.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6177

Test Plan:
Compared `perf` output for

```
./db_bench -benchmarks=randomtransaction -optimistic_transaction_db=1 -statistics -stats_interval_seconds=1 -duration=90 -num=500000 --max_write_buffer_size_to_maintain=16000000 --transaction_set_snapshot=1 --threads=32
```

before and after the change. There is a significant reduction for the call chain
`rocksdb::DBImpl::TrimMemtableHistory` -> `rocksdb::ColumnFamilyData::InstallSuperVersion` ->
`rocksdb::ThreadLocalPtr::StaticMeta::Scrape` even without https://github.com/facebook/rocksdb/pull/6169.

Differential Revision: D19057445

Pulled By: ltamasi

fbshipit-source-id: dff81882d7b280e17eda7d9b072a2d4882c50f79
2020-01-10 09:53:31 -08:00
Levi Tamasi
afbfb3f567 Do not create/install new SuperVersion if nothing was deleted during memtable trim (#6169)
Summary:
We have observed an increase in CPU load caused by frequent calls to
`ColumnFamilyData::InstallSuperVersion` from `DBImpl::TrimMemtableHistory`
when using `max_write_buffer_size_to_maintain` to limit the amount of
memtable history maintained for transaction conflict checking. As it turns out,
this is caused by the code creating and installing a new `SuperVersion` even if
no memtables were actually trimmed. The patch adds a check to avoid this.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6169

Test Plan:
Compared `perf` output for

```
./db_bench -benchmarks=randomtransaction -optimistic_transaction_db=1 -statistics -stats_interval_seconds=1 -duration=90 -num=500000 --max_write_buffer_size_to_maintain=16000000 --transaction_set_snapshot=1 --threads=32
```

before and after the change. With the fix, the call chain `rocksdb::DBImpl::TrimMemtableHistory` ->
`rocksdb::ColumnFamilyData::InstallSuperVersion` -> `rocksdb::ThreadLocalPtr::StaticMeta::Scrape`
no longer registers in the `perf` report.

Differential Revision: D19031509

Pulled By: ltamasi

fbshipit-source-id: 02686fce594e5b50eba0710e4b28a9b808c8aa20
2020-01-10 09:53:31 -08:00
Fosco Marotto
4cfbd87afd Update history and version for 6.5.2 2019-11-15 13:56:29 -08:00
Peter Dillinger
8a72bb14bd More fixes to auto-GarbageCollect in BackupEngine (#6023)
Summary:
Production:
* Fixes GarbageCollect (and auto-GC triggered by PurgeOldBackups, DeleteBackup, or CreateNewBackup) to clean up backup directory independent of current settings (except max_valid_backups_to_open; see issue https://github.com/facebook/rocksdb/issues/4997) and prior settings used with same backup directory.
* Fixes GarbageCollect (and auto-GC) not to attempt to remove "." and ".." entries from directories.
* Clarifies contract with users in modifying BackupEngine operations. In short, leftovers from any incomplete operation are cleaned up by any subsequent call to that same kind of operation (PurgeOldBackups and DeleteBackup considered the same kind of operation). GarbageCollect is available to clean up after all kinds. (NB: right now PurgeOldBackups and DeleteBackup will clean up after incomplete CreateNewBackup, but we aren't promising to continue that behavior.)
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6023

Test Plan:
* Refactors open parameters to use an option enum, for readability, etc. (Also fixes an unused parameter bug in the redundant OpenDBAndBackupEngineShareWithChecksum.)
* Fixes an apparent bug in ShareTableFilesWithChecksumsTransition in which old backup data was destroyed in the transition to be tested. That test is now augmented to ensure GarbageCollect (or auto-GC) does not remove shared files when BackupEngine is opened with share_table_files=false.
* Augments DeleteTmpFiles test to ensure that CreateNewBackup does auto-GC when an incompletely created backup is detected.

Differential Revision: D18453559

Pulled By: pdillinger

fbshipit-source-id: 5e54e7b08d711b161bc9c656181012b69a8feac4
2019-11-15 12:15:12 -08:00
Peter Dillinger
a6d418384d Auto-GarbageCollect on PurgeOldBackups and DeleteBackup (#6015)
Summary:
Only if there is a crash, power failure, or I/O error in
DeleteBackup, shared or private files from the backup might be left
behind that are not cleaned up by PurgeOldBackups or DeleteBackup-- only
by GarbageCollect. This makes the BackupEngine API "leaky by default."
Even if it means a modest performance hit, I think we should make
Delete and Purge do as they say, with ongoing best effort: i.e. future
calls will attempt to finish any incomplete work from earlier calls.

This change does that by having DeleteBackup and PurgeOldBackups do a
GarbageCollect, unless (to minimize performance hit) this BackupEngine
has already done a GarbageCollect and there have been no
deletion-related I/O errors in that GarbageCollect or since then.

Rejected alternative 1: remove meta file last instead of first. This would in theory turn partially deleted backups into corrupted backups, but code changes would be needed to allow the missing files and consider it acceptably corrupt, rather than failing to open the BackupEngine. This might be a reasonable choice, but I mostly rejected it because it doesn't solve the legacy problem of cleaning up existing lingering files.

Rejected alternative 2: use a deletion marker file. If deletion started with creating a file that marks a backup as flagged for deletion, then we could reliably detect partially deleted backups and efficiently finish removing them. In addition to not solving the legacy problem, this could be precarious if there's a disk full situation, and we try to create a new file in order to delete some files. Ugh.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6015

Test Plan: Updated unit tests

Differential Revision: D18401333

Pulled By: pdillinger

fbshipit-source-id: 12944e372ce6809f3f5a4c416c3b321a8927d925
2019-11-15 12:15:12 -08:00
anand76
cb1dc29655 Fix a buffer overrun problem in BlockBasedTable::MultiGet (#6014)
Summary:
The calculation in BlockBasedTable::MultiGet for the required buffer length for reading in compressed blocks is incorrect. It needs to take the 5-byte block trailer into account.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6014

Test Plan: Add a unit test DBBasicTest.MultiGetBufferOverrun that fails in asan_check before the fix, and passes after.

Differential Revision: D18412753

Pulled By: anand1976

fbshipit-source-id: 754dfb66be1d5f161a7efdf87be872198c7e3b72
2019-11-12 10:57:32 -08:00
anand76
98e5189fb0 Fix MultiGet crash when no_block_cache is set (#5991)
Summary:
This PR fixes https://github.com/facebook/rocksdb/issues/5975. In ```BlockBasedTable::RetrieveMultipleBlocks()```, we were calling ```MaybeReadBlocksAndLoadToCache()```, which is a no-op if neither uncompressed nor compressed block cache are configured.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5991

Test Plan:
1. Add unit tests that fail with the old code and pass with the new
2. make check and asan_check

Cc spetrunia

Differential Revision: D18272744

Pulled By: anand1976

fbshipit-source-id: e62fa6090d1a6adf84fcd51dfd6859b03c6aebfe
2019-11-12 10:56:03 -08:00
Vijay Nadimpalli
3353b7141d Making platform 007 (gcc 7) default in build_detect_platform.sh (#5947)
Summary:
Making platform 007 (gcc 7) default in build_detect_platform.sh.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5947

Differential Revision: D18038837

Pulled By: vjnadimpalli

fbshipit-source-id: 9ac2ddaa93bf328a416faec028970e039886378e
2019-10-30 10:32:53 -07:00
sdong
d72cceb443 Fix VerifyChecksum readahead with mmap mode (#5945)
Summary:
A recent change introduced readahead inside VerifyChecksum(). However it is not compatible with mmap mode and generated wrong checksum verification failure. Fix it by not enabling readahead in mmap
 mode.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5945

Test Plan: Add a unit test that used to fail.

Differential Revision: D18021443

fbshipit-source-id: 6f2eb600f81b26edb02222563a4006869d576bff
2019-10-22 11:42:36 -07:00
myabandeh
1d5083a007 Bump up the version to 6.5.1 2019-10-16 10:55:02 -07:00
Maysam Yabandeh
6ea6aa77cd Update HISTORY for SeekForPrev bug fix (#5925)
Summary:
Update history for the bug fix in https://github.com/facebook/rocksdb/pull/5907
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5925

Differential Revision: D17952605

Pulled By: maysamyabandeh

fbshipit-source-id: 609afcbb2e4087f9153822c4d11193a75a7b0e7a
2019-10-16 10:52:59 -07:00
Maysam Yabandeh
4229f6df50 Fix SeekForPrev bug with Partitioned Filters and Prefix (#5907)
Summary:
Partition Filters make use of a top-level index to find the partition that might have the bloom hash of the key. The index is with internal key format (before format version 3). Each partition contains the i) blooms of the keys in that range ii) bloom of prefixes of keys in that range, iii) the bloom of the prefix of the last key in the previous partition.
When ::SeekForPrev(key), we first perform a prefix bloom test on the SST file. The partition however is identified using the full internal key, rather than the prefix key. The reason is to be compatible with the internal key format of the top-level index. This creates a corner case. Example:
- SST k, Partition N: P1K1, P1K2
- SST k, top-level index: P1K2
- SST k+1, Partition 1: P2K1, P3K1
- SST k+1 top-level index: P3K1
When SeekForPrev(P1K3), it should point us to P1K2. However SST k top-level index would reject P1K3 since it is out of range.
One possible fix would be to search with the prefix P1 (instead of full internal key P1K3) however the details of properly comparing prefix with full internal key might get complicated. The fix we apply in this PR is to look into the last partition anyway even if the key is out of range.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5907

Differential Revision: D17889918

Pulled By: maysamyabandeh

fbshipit-source-id: 169fd7b3c71dbc08808eae5a8340611ebe5bdc1e
2019-10-16 10:51:46 -07:00
anand76
73a35c6e17 Update HISTORY.md with a bug fix
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
2019-10-07 16:54:33 -07:00
anand76
fc53ac86f6 Fix data block upper bound checking for iterator reseek case (#5883)
Summary:
When an iterator reseek happens with the user specifying a new iterate_upper_bound in ReadOptions, and the new seek position is at the end of the same data block, the Seek() ends up using a stale value of data_block_within_upper_bound_ and may return incorrect results.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5883

Test Plan: Added a new test case DBIteratorTest.IterReseekNewUpperBound. Verified that it failed due to the assertion failure without the fix, and passes with the fix.

Differential Revision: D17752740

Pulled By: anand1976

fbshipit-source-id: f9b635ff5d6aeb0e1bef102cf8b2f900efd378e3
2019-10-07 16:39:18 -07:00
sdong
2060a008b0 Fix a previous revert 2019-10-01 16:58:47 -07:00
sdong
89865776b7 Revert "Merging iterator to avoid child iterator reseek for some cases (#5286)" (#5871)
Summary:
This reverts commit 9fad3e21eb.

Iterator verification in stress tests sometimes fail for assertion
table/block_based/block_based_table_reader.cc:2973: void rocksdb::BlockBasedTableIterator<TBlockIter, TValue>::FindBlockForward() [with TBlockIter = rocksdb::DataBlockIter; TValue = rocksdb::Slice]: Assertion `!next_block_is_out_of_bound || user_comparator_.Compare(*read_options_.iterate_upper_bound, index_iter_->user_key()) <= 0' failed.

It is likely to be linked to https://github.com/facebook/rocksdb/pull/5286 together with https://github.com/facebook/rocksdb/pull/5468 as the former PR makes some child iterator's seek being avoided, so that upper bound condition fails to be updated there. Strictly speaking, the former PR was merged before the latter one, but the latter one feels a more important improvement so I choose to revert the former one for now.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5871

Differential Revision: D17689196

fbshipit-source-id: 4ded5be68f67bee2782d31a29cb72ea68f59dd8c
2019-10-01 14:41:58 -07:00
Fosco Marotto
749b35d019 Update history and version for 6.5 branch 2019-09-13 11:53:36 -07:00
23 changed files with 610 additions and 314 deletions

View File

@ -1,5 +1,21 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased ## 6.5.3 (1/10/2020)
### Bug Fixes
* Fixed two performance issues related to memtable history trimming. First, a new SuperVersion is now created only if some memtables were actually trimmed. Second, trimming is only scheduled if there is at least one flushed memtable that is kept in memory for the purposes of transaction conflict checking.
## 6.5.2 (11/15/2019)
### Bug Fixes
* Fix a assertion failure in MultiGe4t() when BlockBasedTableOptions::no_block_cache is true and there is no compressed block cache
* Fix a buffer overrun problem in BlockBasedTable::MultiGet() when compression is enabled and no compressed block cache is configured.
* If a call to BackupEngine::PurgeOldBackups or BackupEngine::DeleteBackup suffered a crash, power failure, or I/O error, files could be left over from old backups that could only be purged with a call to GarbageCollect. Any call to PurgeOldBackups, DeleteBackup, or GarbageCollect should now suffice to purge such files.
## 6.5.1 (10/16/2019)
### Bug Fixes
* Revert the feature "Merging iterator to avoid child iterator reseek for some cases (#5286)" since it might cause strange results when reseek happens with a different iterator upper bound.
* Fix a bug in BlockBasedTableIterator that might return incorrect results when reseek happens with a different iterator upper bound.
* Fix a bug when partitioned filters and prefix search are used in conjunction, ::SeekForPrev could return invalid for an existing prefix. ::SeekForPrev might be called by the user, or internally on ::Prev, or within ::Seek if the return value involves Delete or a Merge operand.
## 6.5.0 (9/13/2019)
### Bug Fixes ### Bug Fixes
* Fixed a number of data races in BlobDB. * Fixed a number of data races in BlobDB.
* Fix a bug where the compaction snapshot refresh feature is not disabled as advertised when `snap_refresh_nanos` is set to 0.. * Fix a bug where the compaction snapshot refresh feature is not disabled as advertised when `snap_refresh_nanos` is set to 0..
@ -102,7 +118,6 @@
* Fix a bug caused by secondary not skipping the beginning of new MANIFEST. * Fix a bug caused by secondary not skipping the beginning of new MANIFEST.
* On DB open, delete WAL trash files left behind in wal_dir * On DB open, delete WAL trash files left behind in wal_dir
## 6.2.0 (4/30/2019) ## 6.2.0 (4/30/2019)
### New Features ### New Features
* Add an option `strict_bytes_per_sync` that causes a file-writing thread to block rather than exceed the limit on bytes pending writeback specified by `bytes_per_sync` or `wal_bytes_per_sync`. * Add an option `strict_bytes_per_sync` that causes a file-writing thread to block rather than exceed the limit on bytes pending writeback specified by `bytes_per_sync` or `wal_bytes_per_sync`.

View File

@ -56,10 +56,10 @@ if [ -z "$ROCKSDB_NO_FBCODE" -a -d /mnt/gvfs/third-party ]; then
if [ -n "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then if [ -n "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then
# we need this to build with MySQL. Don't use for other purposes. # we need this to build with MySQL. Don't use for other purposes.
source "$PWD/build_tools/fbcode_config4.8.1.sh" source "$PWD/build_tools/fbcode_config4.8.1.sh"
elif [ -n "$ROCKSDB_FBCODE_BUILD_WITH_PLATFORM007" ]; then elif [ -n "$ROCKSDB_FBCODE_BUILD_WITH_5xx" ]; then
source "$PWD/build_tools/fbcode_config_platform007.sh"
else
source "$PWD/build_tools/fbcode_config.sh" source "$PWD/build_tools/fbcode_config.sh"
else
source "$PWD/build_tools/fbcode_config_platform007.sh"
fi fi
fi fi

View File

@ -369,6 +369,13 @@ TEST_F(CorruptionTest, VerifyChecksumReadahead) {
ASSERT_GE(senv.random_read_counter_.Read(), 213); ASSERT_GE(senv.random_read_counter_.Read(), 213);
ASSERT_LE(senv.random_read_counter_.Read(), 447); ASSERT_LE(senv.random_read_counter_.Read(), 447);
// Test readahead shouldn't break mmap mode (where it should be
// disabled).
options.allow_mmap_reads = true;
Reopen(&options);
dbi = static_cast<DBImpl*>(db_);
ASSERT_OK(dbi->VerifyChecksum(ro));
CloseDb(); CloseDb();
} }

View File

@ -11,6 +11,7 @@
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/perf_context.h" #include "rocksdb/perf_context.h"
#include "rocksdb/utilities/debug.h" #include "rocksdb/utilities/debug.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/block_based/block_builder.h" #include "table/block_based/block_builder.h"
#include "test_util/fault_injection_test_env.h" #include "test_util/fault_injection_test_env.h"
#if !defined(ROCKSDB_LITE) #if !defined(ROCKSDB_LITE)
@ -1541,6 +1542,45 @@ TEST_F(DBBasicTest, GetAllKeyVersions) {
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
TEST_F(DBBasicTest, MultiGetIOBufferOverrun) {
Options options = CurrentOptions();
Random rnd(301);
BlockBasedTableOptions table_options;
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
table_options.block_size = 16 * 1024;
assert(table_options.block_size >
BlockBasedTable::kMultiGetReadStackBufSize);
options.table_factory.reset(new BlockBasedTableFactory(table_options));
Reopen(options);
std::string zero_str(128, '\0');
for (int i = 0; i < 100; ++i) {
// Make the value compressible. A purely random string doesn't compress
// and the resultant data block will not be compressed
std::string value(RandomString(&rnd, 128) + zero_str);
assert(Put(Key(i), value) == Status::OK());
}
Flush();
std::vector<std::string> key_data(10);
std::vector<Slice> keys;
// We cannot resize a PinnableSlice vector, so just set initial size to
// largest we think we will need
std::vector<PinnableSlice> values(10);
std::vector<Status> statuses;
ReadOptions ro;
// Warm up the cache first
key_data.emplace_back(Key(0));
keys.emplace_back(Slice(key_data.back()));
key_data.emplace_back(Key(50));
keys.emplace_back(Slice(key_data.back()));
statuses.resize(keys.size());
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data(), true);
}
class DBBasicTestWithParallelIO class DBBasicTestWithParallelIO
: public DBTestBase, : public DBTestBase,
public testing::WithParamInterface<std::tuple<bool,bool,bool,bool>> { public testing::WithParamInterface<std::tuple<bool,bool,bool,bool>> {
@ -1566,8 +1606,12 @@ class DBBasicTestWithParallelIO
Options options = CurrentOptions(); Options options = CurrentOptions();
Random rnd(301); Random rnd(301);
BlockBasedTableOptions table_options; BlockBasedTableOptions table_options;
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
table_options.block_cache = uncompressed_cache_; table_options.block_cache = uncompressed_cache_;
if (table_options.block_cache == nullptr) {
table_options.no_block_cache = true;
} else {
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
}
table_options.block_cache_compressed = compressed_cache_; table_options.block_cache_compressed = compressed_cache_;
table_options.flush_block_policy_factory.reset( table_options.flush_block_policy_factory.reset(
new MyFlushBlockPolicyFactory()); new MyFlushBlockPolicyFactory());
@ -1609,6 +1653,9 @@ class DBBasicTestWithParallelIO
} }
bool fill_cache() { return fill_cache_; } bool fill_cache() { return fill_cache_; }
bool compression_enabled() { return compression_enabled_; }
bool has_compressed_cache() { return compressed_cache_ != nullptr; }
bool has_uncompressed_cache() { return uncompressed_cache_ != nullptr; }
static void SetUpTestCase() {} static void SetUpTestCase() {}
static void TearDownTestCase() {} static void TearDownTestCase() {}
@ -1793,7 +1840,16 @@ TEST_P(DBBasicTestWithParallelIO, MultiGet) {
ASSERT_TRUE(CheckValue(1, values[0].ToString())); ASSERT_TRUE(CheckValue(1, values[0].ToString()));
ASSERT_TRUE(CheckValue(51, values[1].ToString())); ASSERT_TRUE(CheckValue(51, values[1].ToString()));
int expected_reads = random_reads + (fill_cache() ? 0 : 2); bool read_from_cache = false;
if (fill_cache()) {
if (has_uncompressed_cache()) {
read_from_cache = true;
} else if (has_compressed_cache() && compression_enabled()) {
read_from_cache = true;
}
}
int expected_reads = random_reads + (read_from_cache ? 0 : 2);
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads); ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
keys.resize(10); keys.resize(10);
@ -1811,7 +1867,7 @@ TEST_P(DBBasicTestWithParallelIO, MultiGet) {
ASSERT_OK(statuses[i]); ASSERT_OK(statuses[i]);
ASSERT_TRUE(CheckValue(key_ints[i], values[i].ToString())); ASSERT_TRUE(CheckValue(key_ints[i], values[i].ToString()));
} }
expected_reads += (fill_cache() ? 2 : 4); expected_reads += (read_from_cache ? 2 : 4);
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads); ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
} }
@ -1822,12 +1878,8 @@ INSTANTIATE_TEST_CASE_P(
// Param 1 - Uncompressed cache enabled // Param 1 - Uncompressed cache enabled
// Param 2 - Data compression enabled // Param 2 - Data compression enabled
// Param 3 - ReadOptions::fill_cache // Param 3 - ReadOptions::fill_cache
::testing::Values(std::make_tuple(false, true, true, true), ::testing::Combine(::testing::Bool(), ::testing::Bool(),
std::make_tuple(true, true, true, true), ::testing::Bool(), ::testing::Bool()));
std::make_tuple(false, true, false, true),
std::make_tuple(false, true, true, false),
std::make_tuple(true, true, true, false),
std::make_tuple(false, true, false, false)));
class DBBasicTestWithTimestampWithParam class DBBasicTestWithTimestampWithParam
: public DBTestBase, : public DBTestBase,

View File

@ -1499,12 +1499,14 @@ Status DBImpl::TrimMemtableHistory(WriteContext* context) {
for (auto& cfd : cfds) { for (auto& cfd : cfds) {
autovector<MemTable*> to_delete; autovector<MemTable*> to_delete;
cfd->imm()->TrimHistory(&to_delete, cfd->mem()->ApproximateMemoryUsage()); cfd->imm()->TrimHistory(&to_delete, cfd->mem()->ApproximateMemoryUsage());
for (auto m : to_delete) { if (!to_delete.empty()) {
delete m; for (auto m : to_delete) {
delete m;
}
context->superversion_context.NewSuperVersion();
assert(context->superversion_context.new_superversion.get() != nullptr);
cfd->InstallSuperVersion(&context->superversion_context, &mutex_);
} }
context->superversion_context.NewSuperVersion();
assert(context->superversion_context.new_superversion.get() != nullptr);
cfd->InstallSuperVersion(&context->superversion_context, &mutex_);
if (cfd->Unref()) { if (cfd->Unref()) {
delete cfd; delete cfd;

View File

@ -182,6 +182,33 @@ TEST_P(DBIteratorTest, IterSeekBeforePrev) {
delete iter; delete iter;
} }
TEST_P(DBIteratorTest, IterReseekNewUpperBound) {
Random rnd(301);
Options options = CurrentOptions();
BlockBasedTableOptions table_options;
table_options.block_size = 1024;
table_options.block_size_deviation = 50;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.compression = kNoCompression;
Reopen(options);
ASSERT_OK(Put("a", RandomString(&rnd, 400)));
ASSERT_OK(Put("aabb", RandomString(&rnd, 400)));
ASSERT_OK(Put("aaef", RandomString(&rnd, 400)));
ASSERT_OK(Put("b", RandomString(&rnd, 400)));
dbfull()->Flush(FlushOptions());
ReadOptions opts;
Slice ub = Slice("aa");
opts.iterate_upper_bound = &ub;
auto iter = NewIterator(opts);
iter->Seek(Slice("a"));
ub = Slice("b");
iter->Seek(Slice("aabc"));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), "aaef");
delete iter;
}
TEST_P(DBIteratorTest, IterSeekForPrevBeforeNext) { TEST_P(DBIteratorTest, IterSeekForPrevBeforeNext) {
ASSERT_OK(Put("a", "b")); ASSERT_OK(Put("a", "b"));
ASSERT_OK(Put("c", "d")); ASSERT_OK(Put("c", "d"));
@ -2690,75 +2717,6 @@ TEST_P(DBIteratorTest, AvoidReseekLevelIterator) {
SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->DisableProcessing();
} }
TEST_P(DBIteratorTest, AvoidReseekChildIterator) {
Options options = CurrentOptions();
options.compression = CompressionType::kNoCompression;
BlockBasedTableOptions table_options;
table_options.block_size = 800;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options);
Random rnd(301);
std::string random_str = RandomString(&rnd, 180);
ASSERT_OK(Put("1", random_str));
ASSERT_OK(Put("2", random_str));
ASSERT_OK(Put("3", random_str));
ASSERT_OK(Put("4", random_str));
ASSERT_OK(Put("8", random_str));
ASSERT_OK(Put("9", random_str));
ASSERT_OK(Flush());
ASSERT_OK(Put("5", random_str));
ASSERT_OK(Put("6", random_str));
ASSERT_OK(Put("7", random_str));
ASSERT_OK(Flush());
// These two keys will be kept in memtable.
ASSERT_OK(Put("0", random_str));
ASSERT_OK(Put("8", random_str));
int num_iter_wrapper_seek = 0;
SyncPoint::GetInstance()->SetCallBack(
"IteratorWrapper::Seek:0",
[&](void* /*arg*/) { num_iter_wrapper_seek++; });
SyncPoint::GetInstance()->EnableProcessing();
{
std::unique_ptr<Iterator> iter(NewIterator(ReadOptions()));
iter->Seek("1");
ASSERT_TRUE(iter->Valid());
// DBIter always wraps internal iterator with IteratorWrapper,
// and in merging iterator each child iterator will be wrapped
// with IteratorWrapper.
ASSERT_EQ(4, num_iter_wrapper_seek);
// child position: 1 and 5
num_iter_wrapper_seek = 0;
iter->Seek("2");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(3, num_iter_wrapper_seek);
// child position: 2 and 5
num_iter_wrapper_seek = 0;
iter->Seek("6");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(4, num_iter_wrapper_seek);
// child position: 8 and 6
num_iter_wrapper_seek = 0;
iter->Seek("7");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(3, num_iter_wrapper_seek);
// child position: 8 and 7
num_iter_wrapper_seek = 0;
iter->Seek("5");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(4, num_iter_wrapper_seek);
}
SyncPoint::GetInstance()->DisableProcessing();
}
// MyRocks may change iterate bounds before seek. Simply test to make sure such // MyRocks may change iterate bounds before seek. Simply test to make sure such
// usage doesn't break iterator. // usage doesn't break iterator.
TEST_P(DBIteratorTest, IterateBoundChangedBeforeSeek) { TEST_P(DBIteratorTest, IterateBoundChangedBeforeSeek) {

View File

@ -135,7 +135,7 @@ class MemTable {
// As a cheap version of `ApproximateMemoryUsage()`, this function doens't // As a cheap version of `ApproximateMemoryUsage()`, this function doens't
// require external synchronization. The value may be less accurate though // require external synchronization. The value may be less accurate though
size_t ApproximateMemoryUsageFast() { size_t ApproximateMemoryUsageFast() const {
return approximate_memory_usage_.load(std::memory_order_relaxed); return approximate_memory_usage_.load(std::memory_order_relaxed);
} }

View File

@ -266,7 +266,7 @@ void MemTableListVersion::Remove(MemTable* m,
} }
// return the total memory usage assuming the oldest flushed memtable is dropped // return the total memory usage assuming the oldest flushed memtable is dropped
size_t MemTableListVersion::ApproximateMemoryUsageExcludingLast() { size_t MemTableListVersion::ApproximateMemoryUsageExcludingLast() const {
size_t total_memtable_size = 0; size_t total_memtable_size = 0;
for (auto& memtable : memlist_) { for (auto& memtable : memlist_) {
total_memtable_size += memtable->ApproximateMemoryUsage(); total_memtable_size += memtable->ApproximateMemoryUsage();
@ -480,7 +480,7 @@ Status MemTableList::TryInstallMemtableFlushResults(
cfd->GetName().c_str(), m->file_number_, mem_id); cfd->GetName().c_str(), m->file_number_, mem_id);
assert(m->file_number_ > 0); assert(m->file_number_ > 0);
current_->Remove(m, to_delete); current_->Remove(m, to_delete);
UpdateMemoryUsageExcludingLast(); UpdateCachedValuesFromMemTableListVersion();
ResetTrimHistoryNeeded(); ResetTrimHistoryNeeded();
++mem_id; ++mem_id;
} }
@ -521,14 +521,14 @@ void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) {
if (num_flush_not_started_ == 1) { if (num_flush_not_started_ == 1) {
imm_flush_needed.store(true, std::memory_order_release); imm_flush_needed.store(true, std::memory_order_release);
} }
UpdateMemoryUsageExcludingLast(); UpdateCachedValuesFromMemTableListVersion();
ResetTrimHistoryNeeded(); ResetTrimHistoryNeeded();
} }
void MemTableList::TrimHistory(autovector<MemTable*>* to_delete, size_t usage) { void MemTableList::TrimHistory(autovector<MemTable*>* to_delete, size_t usage) {
InstallNewVersion(); InstallNewVersion();
current_->TrimHistory(to_delete, usage); current_->TrimHistory(to_delete, usage);
UpdateMemoryUsageExcludingLast(); UpdateCachedValuesFromMemTableListVersion();
ResetTrimHistoryNeeded(); ResetTrimHistoryNeeded();
} }
@ -543,18 +543,25 @@ size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() {
size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; } size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; }
size_t MemTableList::ApproximateMemoryUsageExcludingLast() { size_t MemTableList::ApproximateMemoryUsageExcludingLast() const {
size_t usage = const size_t usage =
current_memory_usage_excluding_last_.load(std::memory_order_relaxed); current_memory_usage_excluding_last_.load(std::memory_order_relaxed);
return usage; return usage;
} }
// Update current_memory_usage_excluding_last_, need to call whenever state bool MemTableList::HasHistory() const {
// changes for MemtableListVersion (whenever InstallNewVersion() is called) const bool has_history = current_has_history_.load(std::memory_order_relaxed);
void MemTableList::UpdateMemoryUsageExcludingLast() { return has_history;
size_t total_memtable_size = current_->ApproximateMemoryUsageExcludingLast(); }
void MemTableList::UpdateCachedValuesFromMemTableListVersion() {
const size_t total_memtable_size =
current_->ApproximateMemoryUsageExcludingLast();
current_memory_usage_excluding_last_.store(total_memtable_size, current_memory_usage_excluding_last_.store(total_memtable_size,
std::memory_order_relaxed); std::memory_order_relaxed);
const bool has_history = current_->HasHistory();
current_has_history_.store(has_history, std::memory_order_relaxed);
} }
uint64_t MemTableList::ApproximateOldestKeyTime() const { uint64_t MemTableList::ApproximateOldestKeyTime() const {
@ -684,7 +691,7 @@ Status InstallMemtableAtomicFlushResults(
cfds[i]->GetName().c_str(), m->GetFileNumber(), cfds[i]->GetName().c_str(), m->GetFileNumber(),
mem_id); mem_id);
imm->current_->Remove(m, to_delete); imm->current_->Remove(m, to_delete);
imm->UpdateMemoryUsageExcludingLast(); imm->UpdateCachedValuesFromMemTableListVersion();
imm->ResetTrimHistoryNeeded(); imm->ResetTrimHistoryNeeded();
} }
} }
@ -727,7 +734,8 @@ void MemTableList::RemoveOldMemTables(uint64_t log_number,
imm_flush_needed.store(false, std::memory_order_release); imm_flush_needed.store(false, std::memory_order_release);
} }
} }
UpdateMemoryUsageExcludingLast();
UpdateCachedValuesFromMemTableListVersion();
ResetTrimHistoryNeeded(); ResetTrimHistoryNeeded();
} }

View File

@ -157,7 +157,11 @@ class MemTableListVersion {
// excluding the last MemTable in memlist_history_. The reason for excluding // excluding the last MemTable in memlist_history_. The reason for excluding
// the last MemTable is to see if dropping the last MemTable will keep total // the last MemTable is to see if dropping the last MemTable will keep total
// memory usage above or equal to max_write_buffer_size_to_maintain_ // memory usage above or equal to max_write_buffer_size_to_maintain_
size_t ApproximateMemoryUsageExcludingLast(); size_t ApproximateMemoryUsageExcludingLast() const;
// Whether this version contains flushed memtables that are only kept around
// for transaction conflict checking.
bool HasHistory() const { return !memlist_history_.empty(); }
bool MemtableLimitExceeded(size_t usage); bool MemtableLimitExceeded(size_t usage);
@ -206,7 +210,8 @@ class MemTableList {
commit_in_progress_(false), commit_in_progress_(false),
flush_requested_(false), flush_requested_(false),
current_memory_usage_(0), current_memory_usage_(0),
current_memory_usage_excluding_last_(0) { current_memory_usage_excluding_last_(0),
current_has_history_(false) {
current_->Ref(); current_->Ref();
} }
@ -260,11 +265,16 @@ class MemTableList {
// Returns an estimate of the number of bytes of data in use. // Returns an estimate of the number of bytes of data in use.
size_t ApproximateMemoryUsage(); size_t ApproximateMemoryUsage();
// Returns the cached current_memory_usage_excluding_last_ value // Returns the cached current_memory_usage_excluding_last_ value.
size_t ApproximateMemoryUsageExcludingLast(); size_t ApproximateMemoryUsageExcludingLast() const;
// Update current_memory_usage_excluding_last_ from MemtableListVersion // Returns the cached current_has_history_ value.
void UpdateMemoryUsageExcludingLast(); bool HasHistory() const;
// Updates current_memory_usage_excluding_last_ and current_has_history_
// from MemTableListVersion. Must be called whenever InstallNewVersion is
// called.
void UpdateCachedValuesFromMemTableListVersion();
// `usage` is the current size of the mutable Memtable. When // `usage` is the current size of the mutable Memtable. When
// max_write_buffer_size_to_maintain is used, total size of mutable and // max_write_buffer_size_to_maintain is used, total size of mutable and
@ -382,7 +392,11 @@ class MemTableList {
// The current memory usage. // The current memory usage.
size_t current_memory_usage_; size_t current_memory_usage_;
// Cached value of current_->ApproximateMemoryUsageExcludingLast().
std::atomic<size_t> current_memory_usage_excluding_last_; std::atomic<size_t> current_memory_usage_excluding_last_;
// Cached value of current_->HasHistory().
std::atomic<bool> current_has_history_;
}; };
// Installs memtable atomic flush results. // Installs memtable atomic flush results.

View File

@ -859,8 +859,7 @@ class LevelIterator final : public InternalIterator {
bool skip_filters, int level, RangeDelAggregator* range_del_agg, bool skip_filters, int level, RangeDelAggregator* range_del_agg,
const std::vector<AtomicCompactionUnitBoundary>* const std::vector<AtomicCompactionUnitBoundary>*
compaction_boundaries = nullptr) compaction_boundaries = nullptr)
: InternalIterator(false), : table_cache_(table_cache),
table_cache_(table_cache),
read_options_(read_options), read_options_(read_options),
env_options_(env_options), env_options_(env_options),
icomparator_(icomparator), icomparator_(icomparator),

View File

@ -1786,14 +1786,28 @@ class MemTableInserter : public WriteBatch::Handler {
// check if memtable_list size exceeds max_write_buffer_size_to_maintain // check if memtable_list size exceeds max_write_buffer_size_to_maintain
if (trim_history_scheduler_ != nullptr) { if (trim_history_scheduler_ != nullptr) {
auto* cfd = cf_mems_->current(); auto* cfd = cf_mems_->current();
assert(cfd != nullptr);
if (cfd->ioptions()->max_write_buffer_size_to_maintain > 0 && assert(cfd);
cfd->mem()->ApproximateMemoryUsageFast() + assert(cfd->ioptions());
cfd->imm()->ApproximateMemoryUsageExcludingLast() >=
static_cast<size_t>( const size_t size_to_maintain = static_cast<size_t>(
cfd->ioptions()->max_write_buffer_size_to_maintain) && cfd->ioptions()->max_write_buffer_size_to_maintain);
cfd->imm()->MarkTrimHistoryNeeded()) {
trim_history_scheduler_->ScheduleWork(cfd); if (size_to_maintain > 0) {
MemTableList* const imm = cfd->imm();
assert(imm);
if (imm->HasHistory()) {
const MemTable* const mem = cfd->mem();
assert(mem);
if (mem->ApproximateMemoryUsageFast() +
imm->ApproximateMemoryUsageExcludingLast() >=
size_to_maintain &&
imm->MarkTrimHistoryNeeded()) {
trim_history_scheduler_->ScheduleWork(cfd);
}
}
} }
} }
} }

View File

@ -276,10 +276,14 @@ class BackupEngine {
progress_callback); progress_callback);
} }
// deletes old backups, keeping latest num_backups_to_keep alive // Deletes old backups, keeping latest num_backups_to_keep alive.
// See also DeleteBackup.
virtual Status PurgeOldBackups(uint32_t num_backups_to_keep) = 0; virtual Status PurgeOldBackups(uint32_t num_backups_to_keep) = 0;
// deletes a specific backup // Deletes a specific backup. If this operation (or PurgeOldBackups)
// is not completed due to crash, power failure, etc. the state
// will be cleaned up the next time you call DeleteBackup,
// PurgeOldBackups, or GarbageCollect.
virtual Status DeleteBackup(BackupID backup_id) = 0; virtual Status DeleteBackup(BackupID backup_id) = 0;
// Call this from another thread if you want to stop the backup // Call this from another thread if you want to stop the backup
@ -287,8 +291,8 @@ class BackupEngine {
// not wait for the backup to stop. // not wait for the backup to stop.
// The backup will stop ASAP and the call to CreateNewBackup will // The backup will stop ASAP and the call to CreateNewBackup will
// return Status::Incomplete(). It will not clean up after itself, but // return Status::Incomplete(). It will not clean up after itself, but
// the state will remain consistent. The state will be cleaned up // the state will remain consistent. The state will be cleaned up the
// next time you create BackupableDB or RestoreBackupableDB. // next time you call CreateNewBackup or GarbageCollect.
virtual void StopBackup() = 0; virtual void StopBackup() = 0;
// Returns info about backups in backup_info // Returns info about backups in backup_info
@ -323,9 +327,13 @@ class BackupEngine {
// Returns Status::OK() if all checks are good // Returns Status::OK() if all checks are good
virtual Status VerifyBackup(BackupID backup_id) = 0; virtual Status VerifyBackup(BackupID backup_id) = 0;
// Will delete all the files we don't need anymore // Will delete any files left over from incomplete creation or deletion of
// It will do the full scan of the files/ directory and delete all the // a backup. This is not normally needed as those operations also clean up
// files that are not referenced. // after prior incomplete calls to the same kind of operation (create or
// delete).
// NOTE: This is not designed to delete arbitrary files added to the backup
// directory outside of BackupEngine, and clean-up is always subject to
// permissions on and availability of the underlying filesystem.
virtual Status GarbageCollect() = 0; virtual Status GarbageCollect() = 0;
}; };

View File

@ -5,8 +5,8 @@
#pragma once #pragma once
#define ROCKSDB_MAJOR 6 #define ROCKSDB_MAJOR 6
#define ROCKSDB_MINOR 4 #define ROCKSDB_MINOR 5
#define ROCKSDB_PATCH 0 #define ROCKSDB_PATCH 3
// Do not use these. We made the mistake of declaring macros starting with // Do not use these. We made the mistake of declaring macros starting with
// double underscore. Now we have to live with our choice. We'll deprecate these // double underscore. Now we have to live with our choice. We'll deprecate these

View File

@ -474,7 +474,7 @@ class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon {
return; return;
} }
handle = biter.value().handle; handle = biter.value().handle;
uint64_t last_off = handle.offset() + handle.size() + kBlockTrailerSize; uint64_t last_off = handle.offset() + block_size(handle);
uint64_t prefetch_len = last_off - prefetch_off; uint64_t prefetch_len = last_off - prefetch_off;
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer; std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
auto& file = rep->file; auto& file = rep->file;
@ -2299,7 +2299,7 @@ void BlockBasedTable::RetrieveMultipleBlocks(
} }
ReadRequest req; ReadRequest req;
req.len = handle.size() + kBlockTrailerSize; req.len = block_size(handle);
if (scratch == nullptr) { if (scratch == nullptr) {
req.scratch = new char[req.len]; req.scratch = new char[req.len];
} else { } else {
@ -2326,11 +2326,11 @@ void BlockBasedTable::RetrieveMultipleBlocks(
ReadRequest& req = read_reqs[read_req_idx++]; ReadRequest& req = read_reqs[read_req_idx++];
Status s = req.status; Status s = req.status;
if (s.ok()) { if (s.ok()) {
if (req.result.size() != handle.size() + kBlockTrailerSize) { if (req.result.size() != req.len) {
s = Status::Corruption("truncated block read from " + s = Status::Corruption("truncated block read from " +
rep_->file->file_name() + " offset " + rep_->file->file_name() + " offset " +
ToString(handle.offset()) + ", expected " + ToString(handle.offset()) + ", expected " +
ToString(handle.size() + kBlockTrailerSize) + ToString(req.len) +
" bytes, got " + ToString(req.result.size())); " bytes, got " + ToString(req.result.size()));
} }
} }
@ -2368,37 +2368,46 @@ void BlockBasedTable::RetrieveMultipleBlocks(
// MaybeReadBlockAndLoadToCache will insert into the block caches if // MaybeReadBlockAndLoadToCache will insert into the block caches if
// necessary. Since we're passing the raw block contents, it will // necessary. Since we're passing the raw block contents, it will
// avoid looking up the block cache // avoid looking up the block cache
s = MaybeReadBlockAndLoadToCache(nullptr, options, handle, s = MaybeReadBlockAndLoadToCache(
uncompression_dict, block_entry, BlockType::kData, nullptr, options, handle, uncompression_dict, block_entry,
mget_iter->get_context, &lookup_data_block_context, BlockType::kData, mget_iter->get_context,
&raw_block_contents); &lookup_data_block_context, &raw_block_contents);
// block_entry value could be null if no block cache is present, i.e
// BlockBasedTableOptions::no_block_cache is true and no compressed
// block cache is configured. In that case, fall
// through and set up the block explicitly
if (block_entry->GetValue() != nullptr) {
continue;
}
}
CompressionType compression_type =
raw_block_contents.get_compression_type();
BlockContents contents;
if (compression_type != kNoCompression) {
UncompressionContext context(compression_type);
UncompressionInfo info(context, uncompression_dict, compression_type);
s = UncompressBlockContents(info, req.result.data(), handle.size(),
&contents, footer.version(),
rep_->ioptions, memory_allocator);
} else { } else {
CompressionType compression_type = if (scratch != nullptr) {
raw_block_contents.get_compression_type(); // If we used the scratch buffer, then the contents need to be
BlockContents contents; // copied to heap
if (compression_type != kNoCompression) { Slice raw = Slice(req.result.data(), handle.size());
UncompressionContext context(compression_type); contents = BlockContents(
UncompressionInfo info(context, uncompression_dict, compression_type); CopyBufferToHeap(GetMemoryAllocator(rep_->table_options), raw),
s = UncompressBlockContents(info, req.result.data(), handle.size(), handle.size());
&contents, footer.version(), rep_->ioptions,
memory_allocator);
} else { } else {
if (scratch != nullptr) { contents = std::move(raw_block_contents);
// If we used the scratch buffer, then the contents need to be
// copied to heap
Slice raw = Slice(req.result.data(), handle.size());
contents = BlockContents(CopyBufferToHeap(
GetMemoryAllocator(rep_->table_options), raw),
handle.size());
} else {
contents = std::move(raw_block_contents);
}
}
if (s.ok()) {
(*results)[idx_in_batch].SetOwnedValue(new Block(std::move(contents),
global_seqno, read_amp_bytes_per_bit, ioptions.statistics));
} }
} }
if (s.ok()) {
(*results)[idx_in_batch].SetOwnedValue(
new Block(std::move(contents), global_seqno,
read_amp_bytes_per_bit, ioptions.statistics));
}
} }
(*statuses)[idx_in_batch] = s; (*statuses)[idx_in_batch] = s;
} }
@ -2706,11 +2715,21 @@ void BlockBasedTableIterator<TBlockIter, TValue>::SeekImpl(
// Index contains the first key of the block, and it's >= target. // Index contains the first key of the block, and it's >= target.
// We can defer reading the block. // We can defer reading the block.
is_at_first_key_from_index_ = true; is_at_first_key_from_index_ = true;
// ResetDataIter() will invalidate block_iter_. Thus, there is no need to
// call CheckDataBlockWithinUpperBound() to check for iterate_upper_bound
// as that will be done later when the data block is actually read.
ResetDataIter(); ResetDataIter();
} else { } else {
// Need to use the data block. // Need to use the data block.
if (!same_block) { if (!same_block) {
InitDataBlock(); InitDataBlock();
} else {
// When the user does a reseek, the iterate_upper_bound might have
// changed. CheckDataBlockWithinUpperBound() needs to be called
// explicitly if the reseek ends up in the same data block.
// If the reseek ends up in a different block, InitDataBlock() will do
// the iterator upper bound check.
CheckDataBlockWithinUpperBound();
} }
if (target) { if (target) {
@ -2721,7 +2740,6 @@ void BlockBasedTableIterator<TBlockIter, TValue>::SeekImpl(
FindKeyForward(); FindKeyForward();
} }
CheckDataBlockWithinUpperBound();
CheckOutOfBound(); CheckOutOfBound();
if (target) { if (target) {
@ -2868,8 +2886,7 @@ void BlockBasedTableIterator<TBlockIter, TValue>::InitDataBlock() {
BlockBasedTable::kMinNumFileReadsToStartAutoReadahead) { BlockBasedTable::kMinNumFileReadsToStartAutoReadahead) {
if (!rep->file->use_direct_io() && if (!rep->file->use_direct_io() &&
(data_block_handle.offset() + (data_block_handle.offset() +
static_cast<size_t>(data_block_handle.size()) + static_cast<size_t>(block_size(data_block_handle)) >
kBlockTrailerSize >
readahead_limit_)) { readahead_limit_)) {
// Buffered I/O // Buffered I/O
// Discarding the return status of Prefetch calls intentionally, as // Discarding the return status of Prefetch calls intentionally, as
@ -3367,7 +3384,6 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE> block_handles; autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE> block_handles;
autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE> results; autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE> results;
autovector<Status, MultiGetContext::MAX_BATCH_SIZE> statuses; autovector<Status, MultiGetContext::MAX_BATCH_SIZE> statuses;
static const size_t kMultiGetReadStackBufSize = 8192;
char stack_buf[kMultiGetReadStackBufSize]; char stack_buf[kMultiGetReadStackBufSize];
std::unique_ptr<char[]> block_buf; std::unique_ptr<char[]> block_buf;
{ {
@ -3449,7 +3465,7 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
block_handles.emplace_back(BlockHandle::NullBlockHandle()); block_handles.emplace_back(BlockHandle::NullBlockHandle());
} else { } else {
block_handles.emplace_back(handle); block_handles.emplace_back(handle);
total_len += handle.size(); total_len += block_size(handle);
} }
} }
@ -3737,8 +3753,12 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
size_t readahead_size = (read_options.readahead_size != 0) size_t readahead_size = (read_options.readahead_size != 0)
? read_options.readahead_size ? read_options.readahead_size
: kMaxAutoReadaheadSize; : kMaxAutoReadaheadSize;
FilePrefetchBuffer prefetch_buffer(rep_->file.get(), readahead_size, // FilePrefetchBuffer doesn't work in mmap mode and readahead is not
readahead_size); // needed there.
FilePrefetchBuffer prefetch_buffer(
rep_->file.get(), readahead_size /* readadhead_size */,
readahead_size /* max_readahead_size */,
!rep_->ioptions.allow_mmap_reads /* enable */);
for (index_iter->SeekToFirst(); index_iter->Valid(); index_iter->Next()) { for (index_iter->SeekToFirst(); index_iter->Valid(); index_iter->Next()) {
s = index_iter->status(); s = index_iter->status();

View File

@ -461,8 +461,13 @@ class BlockBasedTable : public TableReader {
void DumpKeyValue(const Slice& key, const Slice& value, void DumpKeyValue(const Slice& key, const Slice& value,
WritableFile* out_file); WritableFile* out_file);
// A cumulative data block file read in MultiGet lower than this size will
// use a stack buffer
static constexpr size_t kMultiGetReadStackBufSize = 8192;
friend class PartitionedFilterBlockReader; friend class PartitionedFilterBlockReader;
friend class PartitionedFilterBlockTest; friend class PartitionedFilterBlockTest;
friend class DBBasicTest_MultiGetIOBufferOverrun_Test;
}; };
// Maitaning state of a two-level iteration on a partitioned index structure. // Maitaning state of a two-level iteration on a partitioned index structure.
@ -615,8 +620,7 @@ class BlockBasedTableIterator : public InternalIteratorBase<TValue> {
const SliceTransform* prefix_extractor, const SliceTransform* prefix_extractor,
BlockType block_type, TableReaderCaller caller, BlockType block_type, TableReaderCaller caller,
size_t compaction_readahead_size = 0) size_t compaction_readahead_size = 0)
: InternalIteratorBase<TValue>(false), : table_(table),
table_(table),
read_options_(read_options), read_options_(read_options),
icomp_(icomp), icomp_(icomp),
user_comparator_(icomp.user_comparator()), user_comparator_(icomp.user_comparator()),

View File

@ -192,7 +192,12 @@ BlockHandle PartitionedFilterBlockReader::GetFilterPartitionHandle(
index_key_includes_seq(), index_value_is_full()); index_key_includes_seq(), index_value_is_full());
iter.Seek(entry); iter.Seek(entry);
if (UNLIKELY(!iter.Valid())) { if (UNLIKELY(!iter.Valid())) {
return BlockHandle(0, 0); // entry is larger than all the keys. However its prefix might still be
// present in the last partition. If this is called by PrefixMayMatch this
// is necessary for correct behavior. Otherwise it is unnecessary but safe.
// Assuming this is an unlikely case for full key search, the performance
// overhead should be negligible.
iter.SeekToLast();
} }
assert(iter.Valid()); assert(iter.Valid());
BlockHandle fltr_blk_handle = iter.value().handle; BlockHandle fltr_blk_handle = iter.value().handle;

View File

@ -327,7 +327,7 @@ TEST_P(PartitionedFilterBlockTest, SamePrefixInMultipleBlocks) {
std::unique_ptr<PartitionedIndexBuilder> pib(NewIndexBuilder()); std::unique_ptr<PartitionedIndexBuilder> pib(NewIndexBuilder());
std::unique_ptr<PartitionedFilterBlockBuilder> builder( std::unique_ptr<PartitionedFilterBlockBuilder> builder(
NewBuilder(pib.get(), prefix_extractor.get())); NewBuilder(pib.get(), prefix_extractor.get()));
const std::string pkeys[3] = {"p-key1", "p-key2", "p-key3"}; const std::string pkeys[3] = {"p-key10", "p-key20", "p-key30"};
builder->Add(pkeys[0]); builder->Add(pkeys[0]);
CutABlock(pib.get(), pkeys[0], pkeys[1]); CutABlock(pib.get(), pkeys[0], pkeys[1]);
builder->Add(pkeys[1]); builder->Add(pkeys[1]);
@ -344,6 +344,16 @@ TEST_P(PartitionedFilterBlockTest, SamePrefixInMultipleBlocks) {
/*no_io=*/false, &ikey_slice, /*get_context=*/nullptr, /*no_io=*/false, &ikey_slice, /*get_context=*/nullptr,
/*lookup_context=*/nullptr)); /*lookup_context=*/nullptr));
} }
// Non-existent keys but with the same prefix
const std::string pnonkeys[4] = {"p-key9", "p-key11", "p-key21", "p-key31"};
for (auto key : pnonkeys) {
auto ikey = InternalKey(key, 0, ValueType::kTypeValue);
const Slice ikey_slice = Slice(*ikey.rep());
ASSERT_TRUE(reader->PrefixMayMatch(
prefix_extractor->Transform(key), prefix_extractor.get(), kNotValid,
/*no_io=*/false, &ikey_slice, /*get_context=*/nullptr,
/*lookup_context=*/nullptr));
}
} }
TEST_P(PartitionedFilterBlockTest, OneBlockPerKey) { TEST_P(PartitionedFilterBlockTest, OneBlockPerKey) {

View File

@ -220,6 +220,11 @@ Status ReadFooterFromFile(RandomAccessFileReader* file,
// 1-byte type + 32-bit crc // 1-byte type + 32-bit crc
static const size_t kBlockTrailerSize = 5; static const size_t kBlockTrailerSize = 5;
// Make block size calculation for IO less error prone
inline uint64_t block_size(const BlockHandle& handle) {
return handle.size() + kBlockTrailerSize;
}
inline CompressionType get_block_compression_type(const char* block_data, inline CompressionType get_block_compression_type(const char* block_data,
size_t block_size) { size_t block_size) {
return static_cast<CompressionType>(block_data[block_size]); return static_cast<CompressionType>(block_data[block_size]);

View File

@ -25,8 +25,8 @@ struct IterateResult {
template <class TValue> template <class TValue>
class InternalIteratorBase : public Cleanable { class InternalIteratorBase : public Cleanable {
public: public:
InternalIteratorBase() : is_mutable_(true) {} InternalIteratorBase() {}
InternalIteratorBase(bool _is_mutable) : is_mutable_(_is_mutable) {}
// No copying allowed // No copying allowed
InternalIteratorBase(const InternalIteratorBase&) = delete; InternalIteratorBase(const InternalIteratorBase&) = delete;
InternalIteratorBase& operator=(const InternalIteratorBase&) = delete; InternalIteratorBase& operator=(const InternalIteratorBase&) = delete;
@ -148,7 +148,6 @@ class InternalIteratorBase : public Cleanable {
virtual Status GetProperty(std::string /*prop_name*/, std::string* /*prop*/) { virtual Status GetProperty(std::string /*prop_name*/, std::string* /*prop*/) {
return Status::NotSupported(""); return Status::NotSupported("");
} }
bool is_mutable() const { return is_mutable_; }
protected: protected:
void SeekForPrevImpl(const Slice& target, const Comparator* cmp) { void SeekForPrevImpl(const Slice& target, const Comparator* cmp) {

View File

@ -73,7 +73,6 @@ class IteratorWrapperBase {
} }
void Prev() { assert(iter_); iter_->Prev(); Update(); } void Prev() { assert(iter_); iter_->Prev(); Update(); }
void Seek(const Slice& k) { void Seek(const Slice& k) {
TEST_SYNC_POINT("IteratorWrapper::Seek:0");
assert(iter_); assert(iter_);
iter_->Seek(k); iter_->Seek(k);
Update(); Update();

View File

@ -127,29 +127,14 @@ class MergingIterator : public InternalIterator {
} }
void Seek(const Slice& target) override { void Seek(const Slice& target) override {
bool is_increasing_reseek = false;
if (current_ != nullptr && direction_ == kForward && status_.ok() &&
comparator_->Compare(target, key()) >= 0) {
is_increasing_reseek = true;
}
ClearHeaps(); ClearHeaps();
status_ = Status::OK(); status_ = Status::OK();
for (auto& child : children_) { for (auto& child : children_) {
// If upper bound never changes, we can skip Seek() for {
// the !Valid() case too, but people do hack the code to change
// upper bound between Seek(), so it's not a good idea to break
// the API.
// If DBIter is used on top of merging iterator, we probably
// can skip mutable child iterators if they are invalid too,
// but it's a less clean API. We can optimize for it later if
// needed.
if (!is_increasing_reseek || !child.Valid() ||
comparator_->Compare(target, child.key()) > 0 ||
child.iter()->is_mutable()) {
PERF_TIMER_GUARD(seek_child_seek_time); PERF_TIMER_GUARD(seek_child_seek_time);
child.Seek(target); child.Seek(target);
PERF_COUNTER_ADD(seek_child_seek_count, 1);
} }
PERF_COUNTER_ADD(seek_child_seek_count, 1);
if (child.Valid()) { if (child.Valid()) {
assert(child.status().ok()); assert(child.status().ok());

View File

@ -9,24 +9,10 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include "rocksdb/utilities/backupable_db.h"
#include "file/filename.h"
#include "logging/logging.h"
#include "port/port.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/transaction_log.h"
#include "test_util/sync_point.h"
#include "util/channel.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/file_reader_writer.h"
#include "util/string_util.h"
#include "utilities/checkpoint/checkpoint_impl.h"
#include <cinttypes>
#include <stdlib.h> #include <stdlib.h>
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
#include <cinttypes>
#include <functional> #include <functional>
#include <future> #include <future>
#include <limits> #include <limits>
@ -39,6 +25,20 @@
#include <unordered_set> #include <unordered_set>
#include <vector> #include <vector>
#include "file/filename.h"
#include "logging/logging.h"
#include "port/port.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/transaction_log.h"
#include "rocksdb/utilities/backupable_db.h"
#include "test_util/sync_point.h"
#include "util/channel.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/file_reader_writer.h"
#include "util/string_util.h"
#include "utilities/checkpoint/checkpoint_impl.h"
namespace rocksdb { namespace rocksdb {
void BackupStatistics::IncrementNumberSuccessBackup() { void BackupStatistics::IncrementNumberSuccessBackup() {
@ -120,6 +120,7 @@ class BackupEngineImpl : public BackupEngine {
private: private:
void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0); void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0);
Status DeleteBackupInternal(BackupID backup_id);
// Extends the "result" map with pathname->size mappings for the contents of // Extends the "result" map with pathname->size mappings for the contents of
// "dir" in "env". Pathnames are prefixed with "dir". // "dir" in "env". Pathnames are prefixed with "dir".
@ -456,6 +457,10 @@ class BackupEngineImpl : public BackupEngine {
std::mutex byte_report_mutex_; std::mutex byte_report_mutex_;
channel<CopyOrCreateWorkItem> files_to_copy_or_create_; channel<CopyOrCreateWorkItem> files_to_copy_or_create_;
std::vector<port::Thread> threads_; std::vector<port::Thread> threads_;
// Certain operations like PurgeOldBackups and DeleteBackup will trigger
// automatic GarbageCollect (true) unless we've already done one in this
// session and have not failed to delete backup files since then (false).
bool might_need_garbage_collect_ = true;
// Adds a file to the backup work queue to be copied or created if it doesn't // Adds a file to the backup work queue to be copied or created if it doesn't
// already exist. // already exist.
@ -559,6 +564,9 @@ Status BackupEngineImpl::Initialize() {
options_.Dump(options_.info_log); options_.Dump(options_.info_log);
if (!read_only_) { if (!read_only_) {
// we might need to clean up from previous crash or I/O errors
might_need_garbage_collect_ = true;
// gather the list of directories that we need to create // gather the list of directories that we need to create
std::vector<std::pair<std::string, std::unique_ptr<Directory>*>> std::vector<std::pair<std::string, std::unique_ptr<Directory>*>>
directories; directories;
@ -755,7 +763,11 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
if (s.ok()) { if (s.ok()) {
// maybe last backup failed and left partial state behind, clean it up. // maybe last backup failed and left partial state behind, clean it up.
// need to do this before updating backups_ such that a private dir // need to do this before updating backups_ such that a private dir
// named after new_backup_id will be cleaned up // named after new_backup_id will be cleaned up.
// (If an incomplete new backup is followed by an incomplete delete
// of the latest full backup, then there could be more than one next
// id with a private dir, the last thing to be deleted in delete
// backup, but all will be cleaned up with a GarbageCollect.)
s = GarbageCollect(); s = GarbageCollect();
} else if (s.IsNotFound()) { } else if (s.IsNotFound()) {
// normal case, the new backup's private dir doesn't exist yet // normal case, the new backup's private dir doesn't exist yet
@ -928,8 +940,8 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s\n", ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s\n",
backup_statistics_.ToString().c_str()); backup_statistics_.ToString().c_str());
// delete files that we might have already written // delete files that we might have already written
might_need_garbage_collect_ = true;
DeleteBackup(new_backup_id); DeleteBackup(new_backup_id);
GarbageCollect();
return s; return s;
} }
@ -957,6 +969,10 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) { Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
assert(initialized_); assert(initialized_);
assert(!read_only_); assert(!read_only_);
// Best effort deletion even with errors
Status overall_status = Status::OK();
ROCKS_LOG_INFO(options_.info_log, "Purging old backups, keeping %u", ROCKS_LOG_INFO(options_.info_log, "Purging old backups, keeping %u",
num_backups_to_keep); num_backups_to_keep);
std::vector<BackupID> to_delete; std::vector<BackupID> to_delete;
@ -966,17 +982,44 @@ Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
itr++; itr++;
} }
for (auto backup_id : to_delete) { for (auto backup_id : to_delete) {
auto s = DeleteBackup(backup_id); auto s = DeleteBackupInternal(backup_id);
if (!s.ok()) { if (!s.ok()) {
return s; overall_status = s;
} }
} }
return Status::OK(); // Clean up after any incomplete backup deletion, potentially from
// earlier session.
if (might_need_garbage_collect_) {
auto s = GarbageCollect();
if (!s.ok() && overall_status.ok()) {
overall_status = s;
}
}
return overall_status;
} }
Status BackupEngineImpl::DeleteBackup(BackupID backup_id) { Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
auto s1 = DeleteBackupInternal(backup_id);
auto s2 = Status::OK();
// Clean up after any incomplete backup deletion, potentially from
// earlier session.
if (might_need_garbage_collect_) {
s2 = GarbageCollect();
}
if (!s1.ok()) {
return s1;
} else {
return s2;
}
}
// Does not auto-GarbageCollect
Status BackupEngineImpl::DeleteBackupInternal(BackupID backup_id) {
assert(initialized_); assert(initialized_);
assert(!read_only_); assert(!read_only_);
ROCKS_LOG_INFO(options_.info_log, "Deleting backup %u", backup_id); ROCKS_LOG_INFO(options_.info_log, "Deleting backup %u", backup_id);
auto backup = backups_.find(backup_id); auto backup = backups_.find(backup_id);
if (backup != backups_.end()) { if (backup != backups_.end()) {
@ -997,6 +1040,10 @@ Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
corrupt_backups_.erase(corrupt); corrupt_backups_.erase(corrupt);
} }
// After removing meta file, best effort deletion even with errors.
// (Don't delete other files if we can't delete the meta file right
// now.)
if (options_.max_valid_backups_to_open == port::kMaxInt32) { if (options_.max_valid_backups_to_open == port::kMaxInt32) {
std::vector<std::string> to_delete; std::vector<std::string> to_delete;
for (auto& itr : backuped_file_infos_) { for (auto& itr : backuped_file_infos_) {
@ -1005,6 +1052,10 @@ Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
itr.first.c_str(), s.ToString().c_str()); itr.first.c_str(), s.ToString().c_str());
to_delete.push_back(itr.first); to_delete.push_back(itr.first);
if (!s.ok()) {
// Trying again later might work
might_need_garbage_collect_ = true;
}
} }
} }
for (auto& td : to_delete) { for (auto& td : to_delete) {
@ -1023,6 +1074,10 @@ Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
Status s = backup_env_->DeleteDir(GetAbsolutePath(private_dir)); Status s = backup_env_->DeleteDir(GetAbsolutePath(private_dir));
ROCKS_LOG_INFO(options_.info_log, "Deleting private dir %s -- %s", ROCKS_LOG_INFO(options_.info_log, "Deleting private dir %s -- %s",
private_dir.c_str(), s.ToString().c_str()); private_dir.c_str(), s.ToString().c_str());
if (!s.ok()) {
// Full gc or trying again later might work
might_need_garbage_collect_ = true;
}
return Status::OK(); return Status::OK();
} }
@ -1505,54 +1560,71 @@ Status BackupEngineImpl::InsertPathnameToSizeBytes(
Status BackupEngineImpl::GarbageCollect() { Status BackupEngineImpl::GarbageCollect() {
assert(!read_only_); assert(!read_only_);
// We will make a best effort to remove all garbage even in the presence
// of inconsistencies or I/O failures that inhibit finding garbage.
Status overall_status = Status::OK();
// If all goes well, we don't need another auto-GC this session
might_need_garbage_collect_ = false;
ROCKS_LOG_INFO(options_.info_log, "Starting garbage collection"); ROCKS_LOG_INFO(options_.info_log, "Starting garbage collection");
if (options_.max_valid_backups_to_open == port::kMaxInt32) { if (options_.max_valid_backups_to_open != port::kMaxInt32) {
ROCKS_LOG_WARN( ROCKS_LOG_WARN(
options_.info_log, options_.info_log,
"Garbage collection is limited since `max_valid_backups_to_open` " "Garbage collection is limited since `max_valid_backups_to_open` "
"constrains how many backups the engine knows about"); "constrains how many backups the engine knows about");
} }
if (options_.share_table_files && if (options_.max_valid_backups_to_open == port::kMaxInt32) {
options_.max_valid_backups_to_open == port::kMaxInt32) {
// delete obsolete shared files // delete obsolete shared files
// we cannot do this when BackupEngine has `max_valid_backups_to_open` set // we cannot do this when BackupEngine has `max_valid_backups_to_open` set
// as those engines don't know about all shared files. // as those engines don't know about all shared files.
std::vector<std::string> shared_children; for (bool with_checksum : {false, true}) {
{ std::vector<std::string> shared_children;
std::string shared_path; {
if (options_.share_files_with_checksum) { std::string shared_path;
shared_path = GetAbsolutePath(GetSharedFileWithChecksumRel()); if (with_checksum) {
} else { shared_path = GetAbsolutePath(GetSharedFileWithChecksumRel());
shared_path = GetAbsolutePath(GetSharedFileRel()); } else {
shared_path = GetAbsolutePath(GetSharedFileRel());
}
auto s = backup_env_->FileExists(shared_path);
if (s.ok()) {
s = backup_env_->GetChildren(shared_path, &shared_children);
} else if (s.IsNotFound()) {
s = Status::OK();
}
if (!s.ok()) {
overall_status = s;
// Trying again later might work
might_need_garbage_collect_ = true;
}
} }
auto s = backup_env_->FileExists(shared_path); for (auto& child : shared_children) {
if (s.ok()) { if (child == "." || child == "..") {
s = backup_env_->GetChildren(shared_path, &shared_children); continue;
} else if (s.IsNotFound()) { }
s = Status::OK(); std::string rel_fname;
} if (with_checksum) {
if (!s.ok()) { rel_fname = GetSharedFileWithChecksumRel(child);
return s; } else {
} rel_fname = GetSharedFileRel(child);
} }
for (auto& child : shared_children) { auto child_itr = backuped_file_infos_.find(rel_fname);
std::string rel_fname; // if it's not refcounted, delete it
if (options_.share_files_with_checksum) { if (child_itr == backuped_file_infos_.end() ||
rel_fname = GetSharedFileWithChecksumRel(child); child_itr->second->refs == 0) {
} else { // this might be a directory, but DeleteFile will just fail in that
rel_fname = GetSharedFileRel(child); // case, so we're good
} Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname));
auto child_itr = backuped_file_infos_.find(rel_fname); ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
// if it's not refcounted, delete it rel_fname.c_str(), s.ToString().c_str());
if (child_itr == backuped_file_infos_.end() || backuped_file_infos_.erase(rel_fname);
child_itr->second->refs == 0) { if (!s.ok()) {
// this might be a directory, but DeleteFile will just fail in that // Trying again later might work
// case, so we're good might_need_garbage_collect_ = true;
Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname)); }
ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", }
rel_fname.c_str(), s.ToString().c_str());
backuped_file_infos_.erase(rel_fname);
} }
} }
} }
@ -1563,10 +1635,15 @@ Status BackupEngineImpl::GarbageCollect() {
auto s = backup_env_->GetChildren(GetAbsolutePath(GetPrivateDirRel()), auto s = backup_env_->GetChildren(GetAbsolutePath(GetPrivateDirRel()),
&private_children); &private_children);
if (!s.ok()) { if (!s.ok()) {
return s; overall_status = s;
// Trying again later might work
might_need_garbage_collect_ = true;
} }
} }
for (auto& child : private_children) { for (auto& child : private_children) {
if (child == "." || child == "..") {
continue;
}
// it's ok to do this when BackupEngine has `max_valid_backups_to_open` set // it's ok to do this when BackupEngine has `max_valid_backups_to_open` set
// as the engine always knows all valid backup numbers. // as the engine always knows all valid backup numbers.
BackupID backup_id = 0; BackupID backup_id = 0;
@ -1583,18 +1660,30 @@ Status BackupEngineImpl::GarbageCollect() {
std::vector<std::string> subchildren; std::vector<std::string> subchildren;
backup_env_->GetChildren(full_private_path, &subchildren); backup_env_->GetChildren(full_private_path, &subchildren);
for (auto& subchild : subchildren) { for (auto& subchild : subchildren) {
if (subchild == "." || subchild == "..") {
continue;
}
Status s = backup_env_->DeleteFile(full_private_path + subchild); Status s = backup_env_->DeleteFile(full_private_path + subchild);
ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
(full_private_path + subchild).c_str(), (full_private_path + subchild).c_str(),
s.ToString().c_str()); s.ToString().c_str());
if (!s.ok()) {
// Trying again later might work
might_need_garbage_collect_ = true;
}
} }
// finally delete the private dir // finally delete the private dir
Status s = backup_env_->DeleteDir(full_private_path); Status s = backup_env_->DeleteDir(full_private_path);
ROCKS_LOG_INFO(options_.info_log, "Deleting dir %s -- %s", ROCKS_LOG_INFO(options_.info_log, "Deleting dir %s -- %s",
full_private_path.c_str(), s.ToString().c_str()); full_private_path.c_str(), s.ToString().c_str());
if (!s.ok()) {
// Trying again later might work
might_need_garbage_collect_ = true;
}
} }
return Status::OK(); assert(overall_status.ok() || might_need_garbage_collect_);
return overall_status;
} }
// ------- BackupMeta class -------- // ------- BackupMeta class --------

View File

@ -10,7 +10,9 @@
#if !defined(ROCKSDB_LITE) && !defined(OS_WIN) #if !defined(ROCKSDB_LITE) && !defined(OS_WIN)
#include <algorithm> #include <algorithm>
#include <limits>
#include <string> #include <string>
#include <utility>
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "env/env_chroot.h" #include "env/env_chroot.h"
@ -516,6 +518,15 @@ static void AssertEmpty(DB* db, int from, int to) {
class BackupableDBTest : public testing::Test { class BackupableDBTest : public testing::Test {
public: public:
enum ShareOption {
kNoShare,
kShareNoChecksum,
kShareWithChecksum,
};
const std::vector<ShareOption> kAllShareOptions = {kNoShare, kShareNoChecksum,
kShareWithChecksum};
BackupableDBTest() { BackupableDBTest() {
// set up files // set up files
std::string db_chroot = test::PerThreadDBPath("backupable_db"); std::string db_chroot = test::PerThreadDBPath("backupable_db");
@ -561,15 +572,8 @@ class BackupableDBTest : public testing::Test {
return db; return db;
} }
void OpenDBAndBackupEngineShareWithChecksum(
bool destroy_old_data = false, bool dummy = false,
bool /*share_table_files*/ = true, bool share_with_checksums = false) {
backupable_options_->share_files_with_checksum = share_with_checksums;
OpenDBAndBackupEngine(destroy_old_data, dummy, share_with_checksums);
}
void OpenDBAndBackupEngine(bool destroy_old_data = false, bool dummy = false, void OpenDBAndBackupEngine(bool destroy_old_data = false, bool dummy = false,
bool share_table_files = true) { ShareOption shared_option = kShareNoChecksum) {
// reset all the defaults // reset all the defaults
test_backup_env_->SetLimitWrittenFiles(1000000); test_backup_env_->SetLimitWrittenFiles(1000000);
test_db_env_->SetLimitWrittenFiles(1000000); test_db_env_->SetLimitWrittenFiles(1000000);
@ -584,7 +588,9 @@ class BackupableDBTest : public testing::Test {
} }
db_.reset(db); db_.reset(db);
backupable_options_->destroy_old_data = destroy_old_data; backupable_options_->destroy_old_data = destroy_old_data;
backupable_options_->share_table_files = share_table_files; backupable_options_->share_table_files = shared_option != kNoShare;
backupable_options_->share_files_with_checksum =
shared_option == kShareWithChecksum;
BackupEngine* backup_engine; BackupEngine* backup_engine;
ASSERT_OK(BackupEngine::Open(test_db_env_.get(), *backupable_options_, ASSERT_OK(BackupEngine::Open(test_db_env_.get(), *backupable_options_,
&backup_engine)); &backup_engine));
@ -839,7 +845,7 @@ INSTANTIATE_TEST_CASE_P(BackupableDBTestWithParam, BackupableDBTestWithParam,
::testing::Bool()); ::testing::Bool());
// this will make sure that backup does not copy the same file twice // this will make sure that backup does not copy the same file twice
TEST_F(BackupableDBTest, NoDoubleCopy) { TEST_F(BackupableDBTest, NoDoubleCopy_And_AutoGC) {
OpenDBAndBackupEngine(true, true); OpenDBAndBackupEngine(true, true);
// should write 5 DB files + one meta file // should write 5 DB files + one meta file
@ -857,23 +863,30 @@ TEST_F(BackupableDBTest, NoDoubleCopy) {
AppendPath(backupdir_, should_have_written); AppendPath(backupdir_, should_have_written);
test_backup_env_->AssertWrittenFiles(should_have_written); test_backup_env_->AssertWrittenFiles(should_have_written);
// should write 4 new DB files + one meta file char db_number = '1';
// should not write/copy 00010.sst, since it's already there!
test_backup_env_->SetLimitWrittenFiles(6);
test_backup_env_->ClearWrittenFiles();
dummy_db_->live_files_ = {"/00010.sst", "/00015.sst", "/CURRENT", for (std::string other_sst : {"00015.sst", "00017.sst", "00019.sst"}) {
"/MANIFEST-01"}; // should write 4 new DB files + one meta file
dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}}; // should not write/copy 00010.sst, since it's already there!
test_db_env_->SetFilenamesForMockedAttrs(dummy_db_->live_files_); test_backup_env_->SetLimitWrittenFiles(6);
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false)); test_backup_env_->ClearWrittenFiles();
// should not open 00010.sst - it's already there
should_have_written = {"/shared/.00015.sst.tmp", "/private/2/CURRENT", dummy_db_->live_files_ = {"/00010.sst", "/" + other_sst, "/CURRENT",
"/private/2/MANIFEST-01", "/private/2/00011.log", "/MANIFEST-01"};
"/meta/.2.tmp"}; dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}};
AppendPath(backupdir_, should_have_written); test_db_env_->SetFilenamesForMockedAttrs(dummy_db_->live_files_);
test_backup_env_->AssertWrittenFiles(should_have_written); ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false));
// should not open 00010.sst - it's already there
++db_number;
std::string private_dir = std::string("/private/") + db_number;
should_have_written = {
"/shared/." + other_sst + ".tmp", private_dir + "/CURRENT",
private_dir + "/MANIFEST-01", private_dir + "/00011.log",
std::string("/meta/.") + db_number + ".tmp"};
AppendPath(backupdir_, should_have_written);
test_backup_env_->AssertWrittenFiles(should_have_written);
}
ASSERT_OK(backup_engine_->DeleteBackup(1)); ASSERT_OK(backup_engine_->DeleteBackup(1));
ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00010.sst")); ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00010.sst"));
@ -890,6 +903,42 @@ TEST_F(BackupableDBTest, NoDoubleCopy) {
test_backup_env_->GetFileSize(backupdir_ + "/shared/00015.sst", &size); test_backup_env_->GetFileSize(backupdir_ + "/shared/00015.sst", &size);
ASSERT_EQ(200UL, size); ASSERT_EQ(200UL, size);
CloseBackupEngine();
//
// Now simulate incomplete delete by removing just meta
//
ASSERT_OK(test_backup_env_->DeleteFile(backupdir_ + "/meta/2"));
OpenBackupEngine();
// 1 appears to be removed, so
// 2 non-corrupt and 0 corrupt seen
std::vector<BackupInfo> backup_info;
std::vector<BackupID> corrupt_backup_ids;
backup_engine_->GetBackupInfo(&backup_info);
backup_engine_->GetCorruptedBackups(&corrupt_backup_ids);
ASSERT_EQ(2UL, backup_info.size());
ASSERT_EQ(0UL, corrupt_backup_ids.size());
// Keep the two we see, but this should suffice to purge unreferenced
// shared files from incomplete delete.
ASSERT_OK(backup_engine_->PurgeOldBackups(2));
// Make sure dangling sst file has been removed (somewhere along this
// process). GarbageCollect should not be needed.
ASSERT_EQ(Status::NotFound(),
test_backup_env_->FileExists(backupdir_ + "/shared/00015.sst"));
ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00017.sst"));
ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00019.sst"));
// Now actually purge a good one
ASSERT_OK(backup_engine_->PurgeOldBackups(1));
ASSERT_EQ(Status::NotFound(),
test_backup_env_->FileExists(backupdir_ + "/shared/00017.sst"));
ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00019.sst"));
CloseDBAndBackupEngine(); CloseDBAndBackupEngine();
} }
@ -972,7 +1021,8 @@ TEST_F(BackupableDBTest, CorruptionsTest) {
ASSERT_OK(backup_engine_->DeleteBackup(4)); ASSERT_OK(backup_engine_->DeleteBackup(4));
ASSERT_OK(backup_engine_->DeleteBackup(3)); ASSERT_OK(backup_engine_->DeleteBackup(3));
ASSERT_OK(backup_engine_->DeleteBackup(2)); ASSERT_OK(backup_engine_->DeleteBackup(2));
(void)backup_engine_->GarbageCollect(); // Should not be needed anymore with auto-GC on DeleteBackup
//(void)backup_engine_->GarbageCollect();
ASSERT_EQ(Status::NotFound(), ASSERT_EQ(Status::NotFound(),
file_manager_->FileExists(backupdir_ + "/meta/5")); file_manager_->FileExists(backupdir_ + "/meta/5"));
ASSERT_EQ(Status::NotFound(), ASSERT_EQ(Status::NotFound(),
@ -1161,7 +1211,7 @@ TEST_F(BackupableDBTest, FailOverwritingBackups) {
TEST_F(BackupableDBTest, NoShareTableFiles) { TEST_F(BackupableDBTest, NoShareTableFiles) {
const int keys_iteration = 5000; const int keys_iteration = 5000;
OpenDBAndBackupEngine(true, false, false); OpenDBAndBackupEngine(true, false, kNoShare);
for (int i = 0; i < 5; ++i) { for (int i = 0; i < 5; ++i) {
FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1));
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(i % 2))); ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(i % 2)));
@ -1177,7 +1227,7 @@ TEST_F(BackupableDBTest, NoShareTableFiles) {
// Verify that you can backup and restore with share_files_with_checksum on // Verify that you can backup and restore with share_files_with_checksum on
TEST_F(BackupableDBTest, ShareTableFilesWithChecksums) { TEST_F(BackupableDBTest, ShareTableFilesWithChecksums) {
const int keys_iteration = 5000; const int keys_iteration = 5000;
OpenDBAndBackupEngineShareWithChecksum(true, false, true, true); OpenDBAndBackupEngine(true, false, kShareWithChecksum);
for (int i = 0; i < 5; ++i) { for (int i = 0; i < 5; ++i) {
FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1));
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(i % 2))); ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(i % 2)));
@ -1195,7 +1245,7 @@ TEST_F(BackupableDBTest, ShareTableFilesWithChecksums) {
TEST_F(BackupableDBTest, ShareTableFilesWithChecksumsTransition) { TEST_F(BackupableDBTest, ShareTableFilesWithChecksumsTransition) {
const int keys_iteration = 5000; const int keys_iteration = 5000;
// set share_files_with_checksum to false // set share_files_with_checksum to false
OpenDBAndBackupEngineShareWithChecksum(true, false, true, false); OpenDBAndBackupEngine(true, false, kShareNoChecksum);
for (int i = 0; i < 5; ++i) { for (int i = 0; i < 5; ++i) {
FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1));
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
@ -1208,55 +1258,108 @@ TEST_F(BackupableDBTest, ShareTableFilesWithChecksumsTransition) {
} }
// set share_files_with_checksum to true and do some more backups // set share_files_with_checksum to true and do some more backups
OpenDBAndBackupEngineShareWithChecksum(true, false, true, true); OpenDBAndBackupEngine(false /* destroy_old_data */, false,
kShareWithChecksum);
for (int i = 5; i < 10; ++i) { for (int i = 5; i < 10; ++i) {
FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1));
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
} }
CloseDBAndBackupEngine(); CloseDBAndBackupEngine();
for (int i = 0; i < 5; ++i) { // Verify first (about to delete)
AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 5 + 1), AssertBackupConsistency(1, 0, keys_iteration, keys_iteration * 11);
// For an extra challenge, make sure that GarbageCollect / DeleteBackup
// is OK even if we open without share_table_files
OpenDBAndBackupEngine(false /* destroy_old_data */, false, kNoShare);
backup_engine_->DeleteBackup(1);
backup_engine_->GarbageCollect();
CloseDBAndBackupEngine();
// Verify rest (not deleted)
for (int i = 1; i < 10; ++i) {
AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1),
keys_iteration * 11); keys_iteration * 11);
} }
} }
// This test simulates cleaning up after aborted or incomplete creation
// of a new backup.
TEST_F(BackupableDBTest, DeleteTmpFiles) { TEST_F(BackupableDBTest, DeleteTmpFiles) {
for (bool shared_checksum : {false, true}) { for (int cleanup_fn : {1, 2, 3, 4}) {
if (shared_checksum) { for (ShareOption shared_option : kAllShareOptions) {
OpenDBAndBackupEngineShareWithChecksum( OpenDBAndBackupEngine(false /* destroy_old_data */, false /* dummy */,
false /* destroy_old_data */, false /* dummy */, shared_option);
true /* share_table_files */, true /* share_with_checksums */); ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
} else { BackupID next_id = 1;
OpenDBAndBackupEngine(); BackupID oldest_id = std::numeric_limits<BackupID>::max();
{
std::vector<BackupInfo> backup_info;
backup_engine_->GetBackupInfo(&backup_info);
for (const auto& bi : backup_info) {
next_id = std::max(next_id, bi.backup_id + 1);
oldest_id = std::min(oldest_id, bi.backup_id);
}
}
CloseDBAndBackupEngine();
// An aborted or incomplete new backup will always be in the next
// id (maybe more)
std::string next_private = "private/" + std::to_string(next_id);
// NOTE: both shared and shared_checksum should be cleaned up
// regardless of how the backup engine is opened.
std::vector<std::string> tmp_files_and_dirs;
for (const auto& dir_and_file : {
std::make_pair(std::string("shared"),
std::string(".00006.sst.tmp")),
std::make_pair(std::string("shared_checksum"),
std::string(".00007.sst.tmp")),
std::make_pair(next_private, std::string("00003.sst")),
}) {
std::string dir = backupdir_ + "/" + dir_and_file.first;
file_manager_->CreateDir(dir);
ASSERT_OK(file_manager_->FileExists(dir));
std::string file = dir + "/" + dir_and_file.second;
file_manager_->WriteToFile(file, "tmp");
ASSERT_OK(file_manager_->FileExists(file));
tmp_files_and_dirs.push_back(file);
}
if (cleanup_fn != /*CreateNewBackup*/ 4) {
// This exists after CreateNewBackup because it's deleted then
// re-created.
tmp_files_and_dirs.push_back(backupdir_ + "/" + next_private);
}
OpenDBAndBackupEngine(false /* destroy_old_data */, false /* dummy */,
shared_option);
// Need to call one of these explicitly to delete tmp files
switch (cleanup_fn) {
case 1:
ASSERT_OK(backup_engine_->GarbageCollect());
break;
case 2:
ASSERT_OK(backup_engine_->DeleteBackup(oldest_id));
break;
case 3:
ASSERT_OK(backup_engine_->PurgeOldBackups(1));
break;
case 4:
// Does a garbage collect if it sees that next private dir exists
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
break;
default:
assert(false);
}
CloseDBAndBackupEngine();
for (std::string file_or_dir : tmp_files_and_dirs) {
if (file_manager_->FileExists(file_or_dir) != Status::NotFound()) {
FAIL() << file_or_dir << " was expected to be deleted." << cleanup_fn;
}
}
} }
CloseDBAndBackupEngine();
std::string shared_tmp = backupdir_;
if (shared_checksum) {
shared_tmp += "/shared_checksum";
} else {
shared_tmp += "/shared";
}
shared_tmp += "/.00006.sst.tmp";
std::string private_tmp_dir = backupdir_ + "/private/10";
std::string private_tmp_file = private_tmp_dir + "/00003.sst";
file_manager_->WriteToFile(shared_tmp, "tmp");
file_manager_->CreateDir(private_tmp_dir);
file_manager_->WriteToFile(private_tmp_file, "tmp");
ASSERT_OK(file_manager_->FileExists(private_tmp_dir));
if (shared_checksum) {
OpenDBAndBackupEngineShareWithChecksum(
false /* destroy_old_data */, false /* dummy */,
true /* share_table_files */, true /* share_with_checksums */);
} else {
OpenDBAndBackupEngine();
}
// Need to call this explicitly to delete tmp files
(void)backup_engine_->GarbageCollect();
CloseDBAndBackupEngine();
ASSERT_EQ(Status::NotFound(), file_manager_->FileExists(shared_tmp));
ASSERT_EQ(Status::NotFound(), file_manager_->FileExists(private_tmp_file));
ASSERT_EQ(Status::NotFound(), file_manager_->FileExists(private_tmp_dir));
} }
} }