Compare commits

...

14 Commits

Author SHA1 Message Date
Peter Dillinger
3e0ff19b1b Update version and HISTORY for 6.22.3 2021-07-19 08:42:01 -07:00
Peter Dillinger
414de699b6 Don't hold DB mutex for block cache entry stat scans (#8538)
Summary:
I previously didn't notice the DB mutex was being held during
block cache entry stat scans, probably because I primarily checked for
read performance regressions, because they require the block cache and
are traditionally latency-sensitive.

This change does some refactoring to avoid holding DB mutex and to
avoid triggering and waiting for a scan in GetProperty("rocksdb.cfstats").
Some tests have to be updated because now the stats collector is
populated in the Cache aggressively on DB startup rather than lazily.
(I hope to clean up some of this added complexity in the future.)

This change also ensures proper treatment of need_out_of_mutex for
non-int DB properties.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8538

Test Plan:
Added unit test logic that uses sync points to fail if the DB mutex
is held during a scan, covering the various ways that a scan might be
triggered.

Performance test - the known impact to holding the DB mutex is on
TransactionDB, and the easiest way to see the impact is to hack the
scan code to almost always miss and take an artificially long time
scanning. Here I've injected an unconditional 5s sleep at the call to
ApplyToAllEntries.

Before (hacked):

    $ TEST_TMPDIR=/dev/shm ./db_bench.base_xxx -benchmarks=randomtransaction,stats -cache_index_and_filter_blocks=1 -bloom_bits=10 -partition_index_and_filters=1 -duration=30 -stats_dump_period_sec=12 -cache_size=100000000 -statistics -transaction_db 2>&1 | egrep 'db.db.write.micros|micros/op'
    randomtransaction :     433.219 micros/op 2308 ops/sec;    0.1 MB/s ( transactions:78999 aborts:0)
    rocksdb.db.write.micros P50 : 16.135883 P95 : 36.622503 P99 : 66.036115 P100 : 5000614.000000 COUNT : 149677 SUM : 8364856
    $ TEST_TMPDIR=/dev/shm ./db_bench.base_xxx -benchmarks=randomtransaction,stats -cache_index_and_filter_blocks=1 -bloom_bits=10 -partition_index_and_filters=1 -duration=30 -stats_dump_period_sec=12 -cache_size=100000000 -statistics -transaction_db 2>&1 | egrep 'db.db.write.micros|micros/op'
    randomtransaction :     448.802 micros/op 2228 ops/sec;    0.1 MB/s ( transactions:75999 aborts:0)
    rocksdb.db.write.micros P50 : 16.629221 P95 : 37.320607 P99 : 72.144341 P100 : 5000871.000000 COUNT : 143995 SUM : 13472323

Notice the 5s P100 write time.

After (hacked):

    $ TEST_TMPDIR=/dev/shm ./db_bench.new_xxx -benchmarks=randomtransaction,stats -cache_index_and_filter_blocks=1 -bloom_bits=10 -partition_index_and_filters=1 -duration=30 -stats_dump_period_sec=12 -cache_size=100000000 -statistics -transaction_db 2>&1 | egrep 'db.db.write.micros|micros/op'
    randomtransaction :     303.645 micros/op 3293 ops/sec;    0.1 MB/s ( transactions:98999 aborts:0)
    rocksdb.db.write.micros P50 : 16.061871 P95 : 33.978834 P99 : 60.018017 P100 : 616315.000000 COUNT : 187619 SUM : 4097407
    $ TEST_TMPDIR=/dev/shm ./db_bench.new_xxx -benchmarks=randomtransaction,stats -cache_index_and_filter_blocks=1 -bloom_bits=10 -partition_index_and_filters=1 -duration=30 -stats_dump_period_sec=12 -cache_size=100000000 -statistics -transaction_db 2>&1 | egrep 'db.db.write.micros|micros/op'
    randomtransaction :     310.383 micros/op 3221 ops/sec;    0.1 MB/s ( transactions:96999 aborts:0)
    rocksdb.db.write.micros P50 : 16.270026 P95 : 35.786844 P99 : 64.302878 P100 : 603088.000000 COUNT : 183819 SUM : 4095918

P100 write is now ~0.6s. Not good, but it's the same even if I completely bypass all the scanning code:

    $ TEST_TMPDIR=/dev/shm ./db_bench.new_skip -benchmarks=randomtransaction,stats -cache_index_and_filter_blocks=1 -bloom_bits=10 -partition_index_and_filters=1 -duration=30 -stats_dump_period_sec=12 -cache_size=100000000 -statistics -transaction_db 2>&1 | egrep 'db.db.write.micros|micros/op'
    randomtransaction :     311.365 micros/op 3211 ops/sec;    0.1 MB/s ( transactions:96999 aborts:0)
    rocksdb.db.write.micros P50 : 16.274362 P95 : 36.221184 P99 : 68.809783 P100 : 649808.000000 COUNT : 183819 SUM : 4156767
    $ TEST_TMPDIR=/dev/shm ./db_bench.new_skip -benchmarks=randomtransaction,stats -cache_index_and_filter_blocks=1 -bloom_bits=10 -partition_index_and_filters=1 -duration=30 -stats_dump_period_sec=12 -cache_size=100000000 -statistics -transaction_db 2>&1 | egrep 'db.db.write.micros|micros/op'
    randomtransaction :     308.395 micros/op 3242 ops/sec;    0.1 MB/s ( transactions:97999 aborts:0)
    rocksdb.db.write.micros P50 : 16.106222 P95 : 37.202403 P99 : 67.081875 P100 : 598091.000000 COUNT : 185714 SUM : 4098832

No substantial difference.

Reviewed By: siying

Differential Revision: D29738847

Pulled By: pdillinger

fbshipit-source-id: 1c5c155f5a1b62e4fea0fd4eeb515a8b7474027b
2021-07-19 08:37:38 -07:00
Peter Dillinger
300b2249dc Standardize on GCC for TSAN conditional compilation (#8543)
Summary:
In https://github.com/facebook/rocksdb/issues/8539 I accidentally only checked for GCC TSAN, which is
what I tested locally, while CircleCI and FB CI use clang TSAN. Related:
other existing code like in stack_trace.cc only check for clang TSAN.

I've now standardized these to the GCC convention in port/lang.h, so now

    #ifdef __SANITIZE_THREAD__

can check for any TSAN (assuming lang.h include)

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8543

Test Plan:
Put an assert(false) in slice_test and look for the NOTE
about "signal-unsafe call", both GCC and clang. Eventually, CircleCI
TSAN in https://github.com/facebook/rocksdb/issues/8538

Reviewed By: zhichao-cao

Differential Revision: D29728483

Pulled By: pdillinger

fbshipit-source-id: 8a3b8015c2ed48078214c3ee17146a2c3f11c9f7
2021-07-19 08:37:07 -07:00
Peter Dillinger
a28f13c937 Work around falsely reported data race on LRUHandle::flags (#8539)
Summary:
Some bits are mutated and read while holding a lock, other
immutable bits (esp. secondary cache compatibility) can be read by
arbitrary threads without holding a lock. AFAIK, this doesn't cause an
issue on any architecture we care about, because you will get some
legitimate version of the value that includes the initialization, as
long as synchronization guarantees the initialization happens before the
read.

I've only seen this in https://github.com/facebook/rocksdb/issues/8538 so far, but it should be fixed regardless.
Otherwise, we'll surely get these false reports again some time.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8539

Test Plan: some local TSAN test runs and in CircleCI

Reviewed By: zhichao-cao

Differential Revision: D29720262

Pulled By: pdillinger

fbshipit-source-id: 365fd7e565577c648815161f71b339bcb5ce12d5
2021-07-19 08:36:55 -07:00
anand76
de97dddf61 Update HISTORY.md and version.h to 6.22.2 2021-07-14 22:08:10 -07:00
anand76
05835f4071 Avoid passing existing BG error to WriteStatusCheck (#8511)
Summary:
In ```DBImpl::WriteImpl()```, we call ```PreprocessWrite()``` which, among other things, checks the BG error and returns it set. This return status is later on passed to ```WriteStatusCheck()```, which calls ```SetBGError()```. This results in a spurious call, and info logs, on every user write request. We should avoid passing the ```PreprocessWrite()``` return status to ```WriteStatusCheck()```, as the former would have called ```SetBGError()``` already if it encountered any new errors, such as error when creating a new WAL file.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8511

Test Plan: Run existing tests

Reviewed By: zhichao-cao

Differential Revision: D29639917

Pulled By: anand1976

fbshipit-source-id: 19234163969e1645dbeb273712aaf5cd9ea2b182
2021-07-14 21:53:00 -07:00
Andrew Kryczka
51b540921d Update HISTORY.md and version.h for 6.22.1 2021-06-25 14:15:04 -07:00
Andrew Kryczka
e8b841eb47 add missing fields to GetLiveFilesMetaData() (#8460)
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/8460

Reviewed By: jay-zhuang

Differential Revision: D29381865

Pulled By: ajkr

fbshipit-source-id: 47ba54c25f3cc039d72ea32e1df20875795683b3
2021-06-25 14:14:22 -07:00
Andrew Kryczka
6ee0acdbb4 Revert "Make Comparator into a Customizable Object (#8336)"
This reverts commit 6ad0810393.
2021-06-23 15:02:20 -07:00
Andrew Kryczka
6fc2818ef8 Fixup HISTORY.md for 6.22 release (#8441)
Summary:
`git diff origin/6.21.fb origin/6.22.fb -- HISTORY.md` looked odd.
This PR fixes it up by moving items from 6.21.0 to 6.22.0 that were
never in any 6.21 release. Also mentioned the background stat collection
fix under 6.22 (previously it was mentioned under 6.21 patch releases
only).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8441

Reviewed By: jay-zhuang

Differential Revision: D29304812

Pulled By: ajkr

fbshipit-source-id: 2a928a9518a1d6615321d5c2d1e22b17cbb59093
2021-06-22 10:50:59 -07:00
Jay Zhuang
5881f4ac50 Fix DeleteFilesInRange may cause inconsistent compaction error (#8434)
Summary:
`DeleteFilesInRange()` marks deleting files to `being_compacted`
before deleting, which may cause ongoing compactions report corruption
exception or ASSERT for debug build.

Adding the missing `ComputeCompactionScore()` when `being_compacted` is set.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8434

Test Plan: Unittest

Reviewed By: ajkr

Differential Revision: D29276127

Pulled By: jay-zhuang

fbshipit-source-id: f5b223e3c1fc6d821e100e3f3442bc70c1d50cf7
2021-06-22 09:26:15 -07:00
anand76
28aa6d4e76 Fix a tsan warning due to reading flags in LRUHandle without holding a mutex (#8433)
Summary:
Tsan complains due to a perceived race condition in accessing LRUHandle flags. One thread calls ```LRUHandle::SetHit()``` from ```LRUCacheShard::Lookup()```, while another thread calls ```LRUHandle::IsPending()``` from ```LRUCacheShard::IsReady()```. The latter call is from ```MultiGet```. It doesn't actually have to call ```IsReady``` since a null value indicates the cache handle is not ready, so its sufficient to check for a null value.

Also modify ```IsReady``` to acquire the LRU shard mutex.

Tests:
1. make check
2. Run tsan_crash

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8433

Reviewed By: zhichao-cao

Differential Revision: D29278030

Pulled By: anand1976

fbshipit-source-id: 0c9fed56d12eda853e72dadebe75038361bd257f
2021-06-21 22:48:01 -07:00
Andrew Kryczka
9d7de9605a Skip c_test and env_test when ASSERT_STATUS_CHECKED=1 (#8430)
Summary:
- `c_test` fails because `rocksdb_compact_range()` swallows a `Status`.
- `env_test` fails because `ReadRequest`s to `MultiRead()` do not have their `Status`es checked.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8430

Test Plan: `ASSERT_STATUS_CHECKED=1 make -j48 check`

Reviewed By: jay-zhuang

Differential Revision: D29257473

Pulled By: ajkr

fbshipit-source-id: e02127f971703744be7de85f0a028e4664c79577
2021-06-21 22:47:55 -07:00
Andrew Kryczka
f64f08e0bc Update HISTORY.md and version.h 6.22 release (#8427)
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/8427

Reviewed By: zhichao-cao

Differential Revision: D29246916

Pulled By: ajkr

fbshipit-source-id: ccd44ca1a6dd5101dc37f19b8e1fe6c0e3883e0a
2021-06-21 22:47:41 -07:00
29 changed files with 423 additions and 295 deletions

View File

@ -1,5 +1,17 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased ## 6.22.3 (2021-07-19)
### Bug Fixes
* Fixed block cache entry stat scans not to hold the DB mutex, which was a serious performance bug for tail latencies in TransactionDB and likely elsewhere.
## 6.22.2 (2021-07-14)
### Bug Fixes
* Fix continuous logging of an existing background error on every user write
## 6.22.1 (2021-06-25)
### Bug Fixes
* `GetLiveFilesMetaData()` now populates the `temperature`, `oldest_ancester_time`, and `file_creation_time` fields of its `LiveFileMetaData` results when the information is available. Previously these fields always contained zero indicating unknown.
## 6.22.0 (2021-06-18)
### Behavior Changes ### Behavior Changes
* Added two additional tickers, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH and MEMTABLE_GARBAGE_BYTES_AT_FLUSH. These stats can be used to estimate the ratio of "garbage" (outdated) bytes in the memtable that are discarded at flush time. * Added two additional tickers, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH and MEMTABLE_GARBAGE_BYTES_AT_FLUSH. These stats can be used to estimate the ratio of "garbage" (outdated) bytes in the memtable that are discarded at flush time.
* Added API comments clarifying safe usage of Disable/EnableManualCompaction and EventListener callbacks for compaction. * Added API comments clarifying safe usage of Disable/EnableManualCompaction and EventListener callbacks for compaction.
@ -7,11 +19,15 @@
### Bug Fixes ### Bug Fixes
* fs_posix.cc GetFreeSpace() always report disk space available to root even when running as non-root. Linux defaults often have disk mounts with 5 to 10 percent of total space reserved only for root. Out of space could result for non-root users. * fs_posix.cc GetFreeSpace() always report disk space available to root even when running as non-root. Linux defaults often have disk mounts with 5 to 10 percent of total space reserved only for root. Out of space could result for non-root users.
* Subcompactions are now disabled when user-defined timestamps are used, since the subcompaction boundary picking logic is currently not timestamp-aware, which could lead to incorrect results when different subcompactions process keys that only differ by timestamp. * Subcompactions are now disabled when user-defined timestamps are used, since the subcompaction boundary picking logic is currently not timestamp-aware, which could lead to incorrect results when different subcompactions process keys that only differ by timestamp.
* Fix an issue that `DeleteFilesInRange()` may cause ongoing compaction reports corruption exception, or ASSERT for debug build. There's no actual data loss or corruption that we find.
* Fixed confusingly duplicated output in LOG for periodic stats ("DUMPING STATS"), including "Compaction Stats" and "File Read Latency Histogram By Level".
* Fixed performance bugs in background gathering of block cache entry statistics, that could consume a lot of CPU when there are many column families with a shared block cache.
### New Features ### New Features
* Marked the Ribbon filter and optimize_filters_for_memory features as production-ready, each enabling memory savings for Bloom-like filters. Use `NewRibbonFilterPolicy` in place of `NewBloomFilterPolicy` to use Ribbon filters instead of Bloom, or `ribbonfilter` in place of `bloomfilter` in configuration string. * Marked the Ribbon filter and optimize_filters_for_memory features as production-ready, each enabling memory savings for Bloom-like filters. Use `NewRibbonFilterPolicy` in place of `NewBloomFilterPolicy` to use Ribbon filters instead of Bloom, or `ribbonfilter` in place of `bloomfilter` in configuration string.
* Allow `DBWithTTL` to use `DeleteRange` api just like other DBs. `DeleteRangeCF()` which executes `WriteBatchInternal::DeleteRange()` has been added to the handler in `DBWithTTLImpl::Write()` to implement it. * Allow `DBWithTTL` to use `DeleteRange` api just like other DBs. `DeleteRangeCF()` which executes `WriteBatchInternal::DeleteRange()` has been added to the handler in `DBWithTTLImpl::Write()` to implement it.
* Add BlockBasedTableOptions.prepopulate_block_cache. If enabled, it prepopulate warm/hot data blocks which are already in memory into block cache at the time of flush. On a flush, the data block that is in memory (in memtables) get flushed to the device. If using Direct IO, additional IO is incurred to read this data back into memory again, which is avoided by enabling this option and it also helps with Distributed FileSystem. More details in include/rocksdb/table.h. * Add BlockBasedTableOptions.prepopulate_block_cache. If enabled, it prepopulate warm/hot data blocks which are already in memory into block cache at the time of flush. On a flush, the data block that is in memory (in memtables) get flushed to the device. If using Direct IO, additional IO is incurred to read this data back into memory again, which is avoided by enabling this option and it also helps with Distributed FileSystem. More details in include/rocksdb/table.h.
* Added a `cancel` field to `CompactRangeOptions`, allowing individual in-process manual range compactions to be cancelled.
## 6.21.0 (2021-05-21) ## 6.21.0 (2021-05-21)
### Bug Fixes ### Bug Fixes
@ -23,7 +39,6 @@
* Handle return code by io_uring_submit_and_wait() and io_uring_wait_cqe(). * Handle return code by io_uring_submit_and_wait() and io_uring_wait_cqe().
* In the IngestExternalFile() API, only try to sync the ingested file if the file is linked and the FileSystem/Env supports reopening a writable file. * In the IngestExternalFile() API, only try to sync the ingested file if the file is linked and the FileSystem/Env supports reopening a writable file.
* Fixed a bug that `AdvancedColumnFamilyOptions.max_compaction_bytes` is under-calculated for manual compaction (`CompactRange()`). Manual compaction is split to multiple compactions if the compaction size exceed the `max_compaction_bytes`. The bug creates much larger compaction which size exceed the user setting. On the other hand, larger manual compaction size can increase the subcompaction parallelism, you can tune that by setting `max_compaction_bytes`. * Fixed a bug that `AdvancedColumnFamilyOptions.max_compaction_bytes` is under-calculated for manual compaction (`CompactRange()`). Manual compaction is split to multiple compactions if the compaction size exceed the `max_compaction_bytes`. The bug creates much larger compaction which size exceed the user setting. On the other hand, larger manual compaction size can increase the subcompaction parallelism, you can tune that by setting `max_compaction_bytes`.
* Fixed confusingly duplicated output in LOG for periodic stats ("DUMPING STATS"), including "Compaction Stats" and "File Read Latency Histogram By Level".
### Behavior Changes ### Behavior Changes
* Due to the fix of false-postive alert of "SST file is ahead of WAL", all the CFs with no SST file (CF empty) will bypass the consistency check. We fixed a false-positive, but introduced a very rare true-negative which will be triggered in the following conditions: A CF with some delete operations in the last a few queries which will result in an empty CF (those are flushed to SST file and a compaction triggered which combines this file and all other SST files and generates an empty CF, or there is another reason to write a manifest entry for this CF after a flush that generates no SST file from an empty CF). The deletion entries are logged in a WAL and this WAL was corrupted, while the CF's log number points to the next WAL (due to the flush). Therefore, the DB can only recover to the point without these trailing deletions and cause the inconsistent DB status. * Due to the fix of false-postive alert of "SST file is ahead of WAL", all the CFs with no SST file (CF empty) will bypass the consistency check. We fixed a false-positive, but introduced a very rare true-negative which will be triggered in the following conditions: A CF with some delete operations in the last a few queries which will result in an empty CF (those are flushed to SST file and a compaction triggered which combines this file and all other SST files and generates an empty CF, or there is another reason to write a manifest entry for this CF after a flush that generates no SST file from an empty CF). The deletion entries are logged in a WAL and this WAL was corrupted, while the CF's log number points to the next WAL (due to the flush). Therefore, the DB can only recover to the point without these trailing deletions and cause the inconsistent DB status.
@ -36,7 +51,6 @@
* Add an experimental Remote Compaction feature, which allows the user to run Compaction on a different host or process. The feature is still under development, currently only works on some basic use cases. The interface will be changed without backward/forward compatibility support. * Add an experimental Remote Compaction feature, which allows the user to run Compaction on a different host or process. The feature is still under development, currently only works on some basic use cases. The interface will be changed without backward/forward compatibility support.
* RocksDB would validate total entries read in flush, and compare with counter inserted into it. If flush_verify_memtable_count = true (default), flush will fail. Otherwise, only log to info logs. * RocksDB would validate total entries read in flush, and compare with counter inserted into it. If flush_verify_memtable_count = true (default), flush will fail. Otherwise, only log to info logs.
* Add `TableProperties::num_filter_entries`, which can be used with `TableProperties::filter_size` to calculate the effective bits per filter entry (unique user key or prefix) for a table file. * Add `TableProperties::num_filter_entries`, which can be used with `TableProperties::filter_size` to calculate the effective bits per filter entry (unique user key or prefix) for a table file.
* Added a `cancel` field to `CompactRangeOptions`, allowing individual in-process manual range compactions to be cancelled.
### Performance Improvements ### Performance Improvements
* BlockPrefetcher is used by iterators to prefetch data if they anticipate more data to be used in future. It is enabled implicitly by rocksdb. Added change to take in account read pattern if reads are sequential. This would disable prefetching for random reads in MultiGet and iterators as readahead_size is increased exponential doing large prefetches. * BlockPrefetcher is used by iterators to prefetch data if they anticipate more data to be used in future. It is enabled implicitly by rocksdb. Added change to take in account read pattern if reads are sequential. This would disable prefetching for random reads in MultiGet and iterators as readahead_size is increased exponential doing large prefetches.

View File

@ -516,8 +516,10 @@ endif
ifdef ASSERT_STATUS_CHECKED ifdef ASSERT_STATUS_CHECKED
# TODO: finish fixing all tests to pass this check # TODO: finish fixing all tests to pass this check
TESTS_FAILING_ASC = \ TESTS_FAILING_ASC = \
c_test \
db_test \ db_test \
db_test2 \ db_test2 \
env_test \
range_locking_test \ range_locking_test \
testutil_test \ testutil_test \

View File

@ -51,7 +51,8 @@ namespace ROCKSDB_NAMESPACE {
template <class Stats> template <class Stats>
class CacheEntryStatsCollector { class CacheEntryStatsCollector {
public: public:
// Gathers stats and saves results into `stats` // Gather and save stats if saved stats are too old. (Use GetStats() to
// read saved stats.)
// //
// Maximum allowed age for a "hit" on saved results is determined by the // Maximum allowed age for a "hit" on saved results is determined by the
// two interval parameters. Both set to 0 forces a re-scan. For example // two interval parameters. Both set to 0 forces a re-scan. For example
@ -61,10 +62,9 @@ class CacheEntryStatsCollector {
// Justification: scans can vary wildly in duration, e.g. from 0.02 sec // Justification: scans can vary wildly in duration, e.g. from 0.02 sec
// to as much as 20 seconds, so we want to be able to cap the absolute // to as much as 20 seconds, so we want to be able to cap the absolute
// and relative frequency of scans. // and relative frequency of scans.
void GetStats(Stats *stats, int min_interval_seconds, void CollectStats(int min_interval_seconds, int min_interval_factor) {
int min_interval_factor) {
// Waits for any pending reader or writer (collector) // Waits for any pending reader or writer (collector)
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(working_mutex_);
uint64_t max_age_micros = uint64_t max_age_micros =
static_cast<uint64_t>(std::max(min_interval_seconds, 0)) * 1000000U; static_cast<uint64_t>(std::max(min_interval_seconds, 0)) * 1000000U;
@ -79,19 +79,28 @@ class CacheEntryStatsCollector {
uint64_t start_time_micros = clock_->NowMicros(); uint64_t start_time_micros = clock_->NowMicros();
if ((start_time_micros - last_end_time_micros_) > max_age_micros) { if ((start_time_micros - last_end_time_micros_) > max_age_micros) {
last_start_time_micros_ = start_time_micros; last_start_time_micros_ = start_time_micros;
saved_stats_.BeginCollection(cache_, clock_, start_time_micros); working_stats_.BeginCollection(cache_, clock_, start_time_micros);
cache_->ApplyToAllEntries(saved_stats_.GetEntryCallback(), {}); cache_->ApplyToAllEntries(working_stats_.GetEntryCallback(), {});
TEST_SYNC_POINT_CALLBACK( TEST_SYNC_POINT_CALLBACK(
"CacheEntryStatsCollector::GetStats:AfterApplyToAllEntries", nullptr); "CacheEntryStatsCollector::GetStats:AfterApplyToAllEntries", nullptr);
uint64_t end_time_micros = clock_->NowMicros(); uint64_t end_time_micros = clock_->NowMicros();
last_end_time_micros_ = end_time_micros; last_end_time_micros_ = end_time_micros;
saved_stats_.EndCollection(cache_, clock_, end_time_micros); working_stats_.EndCollection(cache_, clock_, end_time_micros);
} else { } else {
saved_stats_.SkippedCollection(); working_stats_.SkippedCollection();
} }
// Copy to caller
// Save so that we don't need to wait for an outstanding collection in
// order to make of copy of the last saved stats
std::lock_guard<std::mutex> lock2(saved_mutex_);
saved_stats_ = working_stats_;
}
// Gets saved stats, regardless of age
void GetStats(Stats *stats) {
std::lock_guard<std::mutex> lock(saved_mutex_);
*stats = saved_stats_; *stats = saved_stats_;
} }
@ -129,6 +138,7 @@ class CacheEntryStatsCollector {
Cache::Priority::HIGH); Cache::Priority::HIGH);
if (!s.ok()) { if (!s.ok()) {
assert(h == nullptr); assert(h == nullptr);
delete new_ptr;
return s; return s;
} }
} }
@ -145,6 +155,7 @@ class CacheEntryStatsCollector {
private: private:
explicit CacheEntryStatsCollector(Cache *cache, SystemClock *clock) explicit CacheEntryStatsCollector(Cache *cache, SystemClock *clock)
: saved_stats_(), : saved_stats_(),
working_stats_(),
last_start_time_micros_(0), last_start_time_micros_(0),
last_end_time_micros_(/*pessimistic*/ 10000000), last_end_time_micros_(/*pessimistic*/ 10000000),
cache_(cache), cache_(cache),
@ -154,10 +165,14 @@ class CacheEntryStatsCollector {
delete static_cast<CacheEntryStatsCollector *>(value); delete static_cast<CacheEntryStatsCollector *>(value);
} }
std::mutex mutex_; std::mutex saved_mutex_;
Stats saved_stats_; Stats saved_stats_;
std::mutex working_mutex_;
Stats working_stats_;
uint64_t last_start_time_micros_; uint64_t last_start_time_micros_;
uint64_t last_end_time_micros_; uint64_t last_end_time_micros_;
Cache *const cache_; Cache *const cache_;
SystemClock *const clock_; SystemClock *const clock_;
}; };

10
cache/lru_cache.cc vendored
View File

@ -553,6 +553,9 @@ Status LRUCacheShard::Insert(const Slice& key, uint32_t hash, void* value,
e->SetSecondaryCacheCompatible(true); e->SetSecondaryCacheCompatible(true);
e->info_.helper = helper; e->info_.helper = helper;
} else { } else {
#ifdef __SANITIZE_THREAD__
e->is_secondary_cache_compatible_for_tsan = false;
#endif // __SANITIZE_THREAD__
e->info_.deleter = deleter; e->info_.deleter = deleter;
} }
e->charge = charge; e->charge = charge;
@ -596,15 +599,12 @@ void LRUCacheShard::Erase(const Slice& key, uint32_t hash) {
bool LRUCacheShard::IsReady(Cache::Handle* handle) { bool LRUCacheShard::IsReady(Cache::Handle* handle) {
LRUHandle* e = reinterpret_cast<LRUHandle*>(handle); LRUHandle* e = reinterpret_cast<LRUHandle*>(handle);
MutexLock l(&mutex_);
bool ready = true; bool ready = true;
if (e->IsPending()) { if (e->IsPending()) {
assert(secondary_cache_); assert(secondary_cache_);
assert(e->sec_handle); assert(e->sec_handle);
if (e->sec_handle->IsReady()) { ready = e->sec_handle->IsReady();
Promote(e);
} else {
ready = false;
}
} }
return ready; return ready;
} }

35
cache/lru_cache.h vendored
View File

@ -12,6 +12,7 @@
#include <string> #include <string>
#include "cache/sharded_cache.h" #include "cache/sharded_cache.h"
#include "port/lang.h"
#include "port/malloc.h" #include "port/malloc.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/secondary_cache.h" #include "rocksdb/secondary_cache.h"
@ -80,8 +81,8 @@ struct LRUHandle {
IN_HIGH_PRI_POOL = (1 << 2), IN_HIGH_PRI_POOL = (1 << 2),
// Whether this entry has had any lookups (hits). // Whether this entry has had any lookups (hits).
HAS_HIT = (1 << 3), HAS_HIT = (1 << 3),
// Can this be inserted into the tiered cache // Can this be inserted into the secondary cache
IS_TIERED_CACHE_COMPATIBLE = (1 << 4), IS_SECONDARY_CACHE_COMPATIBLE = (1 << 4),
// Is the handle still being read from a lower tier // Is the handle still being read from a lower tier
IS_PENDING = (1 << 5), IS_PENDING = (1 << 5),
// Has the item been promoted from a lower tier // Has the item been promoted from a lower tier
@ -90,6 +91,14 @@ struct LRUHandle {
uint8_t flags; uint8_t flags;
#ifdef __SANITIZE_THREAD__
// TSAN can report a false data race on flags, where one thread is writing
// to one of the mutable bits and another thread is reading this immutable
// bit. So precisely suppress that TSAN warning, we separate out this bit
// during TSAN runs.
bool is_secondary_cache_compatible_for_tsan;
#endif // __SANITIZE_THREAD__
// Beginning of the key (MUST BE THE LAST FIELD IN THIS STRUCT!) // Beginning of the key (MUST BE THE LAST FIELD IN THIS STRUCT!)
char key_data[1]; char key_data[1];
@ -113,7 +122,11 @@ struct LRUHandle {
bool InHighPriPool() const { return flags & IN_HIGH_PRI_POOL; } bool InHighPriPool() const { return flags & IN_HIGH_PRI_POOL; }
bool HasHit() const { return flags & HAS_HIT; } bool HasHit() const { return flags & HAS_HIT; }
bool IsSecondaryCacheCompatible() const { bool IsSecondaryCacheCompatible() const {
return flags & IS_TIERED_CACHE_COMPATIBLE; #ifdef __SANITIZE_THREAD__
return is_secondary_cache_compatible_for_tsan;
#else
return flags & IS_SECONDARY_CACHE_COMPATIBLE;
#endif // __SANITIZE_THREAD__
} }
bool IsPending() const { return flags & IS_PENDING; } bool IsPending() const { return flags & IS_PENDING; }
bool IsPromoted() const { return flags & IS_PROMOTED; } bool IsPromoted() const { return flags & IS_PROMOTED; }
@ -144,12 +157,15 @@ struct LRUHandle {
void SetHit() { flags |= HAS_HIT; } void SetHit() { flags |= HAS_HIT; }
void SetSecondaryCacheCompatible(bool tiered) { void SetSecondaryCacheCompatible(bool compat) {
if (tiered) { if (compat) {
flags |= IS_TIERED_CACHE_COMPATIBLE; flags |= IS_SECONDARY_CACHE_COMPATIBLE;
} else { } else {
flags &= ~IS_TIERED_CACHE_COMPATIBLE; flags &= ~IS_SECONDARY_CACHE_COMPATIBLE;
} }
#ifdef __SANITIZE_THREAD__
is_secondary_cache_compatible_for_tsan = compat;
#endif // __SANITIZE_THREAD__
} }
void SetIncomplete(bool incomp) { void SetIncomplete(bool incomp) {
@ -170,6 +186,11 @@ struct LRUHandle {
void Free() { void Free() {
assert(refs == 0); assert(refs == 0);
#ifdef __SANITIZE_THREAD__
// Here we can safely assert they are the same without a data race reported
assert(((flags & IS_SECONDARY_CACHE_COMPATIBLE) != 0) ==
is_secondary_cache_compatible_for_tsan);
#endif // __SANITIZE_THREAD__
if (!IsSecondaryCacheCompatible() && info_.deleter) { if (!IsSecondaryCacheCompatible() && info_.deleter) {
(*info_.deleter)(key(), value); (*info_.deleter)(key(), value);
} else if (IsSecondaryCacheCompatible()) { } else if (IsSecondaryCacheCompatible()) {

View File

@ -11,6 +11,7 @@
#include "cache/cache_entry_roles.h" #include "cache/cache_entry_roles.h"
#include "cache/lru_cache.h" #include "cache/lru_cache.h"
#include "db/column_family.h"
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
@ -152,13 +153,15 @@ class DBBlockCacheTest : public DBTestBase {
} }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
const std::array<size_t, kNumCacheEntryRoles>& GetCacheEntryRoleCountsBg() { const std::array<size_t, kNumCacheEntryRoles> GetCacheEntryRoleCountsBg() {
// Verify in cache entry role stats // Verify in cache entry role stats
ColumnFamilyHandleImpl* cfh = ColumnFamilyHandleImpl* cfh =
static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily()); static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
InternalStats* internal_stats_ptr = cfh->cfd()->internal_stats(); InternalStats* internal_stats_ptr = cfh->cfd()->internal_stats();
return internal_stats_ptr->TEST_GetCacheEntryRoleStats(/*foreground=*/false) InternalStats::CacheEntryRoleStats stats;
.entry_counts; internal_stats_ptr->TEST_GetCacheEntryRoleStats(&stats,
/*foreground=*/false);
return stats.entry_counts;
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
}; };
@ -170,7 +173,13 @@ TEST_F(DBBlockCacheTest, IteratorBlockCacheUsage) {
auto options = GetOptions(table_options); auto options = GetOptions(table_options);
InitTable(options); InitTable(options);
std::shared_ptr<Cache> cache = NewLRUCache(0, 0, false); LRUCacheOptions co;
co.capacity = 0;
co.num_shard_bits = 0;
co.strict_capacity_limit = false;
// Needed not to count entry stats collector
co.metadata_charge_policy = kDontChargeCacheMetadata;
std::shared_ptr<Cache> cache = NewLRUCache(co);
table_options.block_cache = cache; table_options.block_cache = cache;
options.table_factory.reset(NewBlockBasedTableFactory(table_options)); options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options); Reopen(options);
@ -194,7 +203,13 @@ TEST_F(DBBlockCacheTest, TestWithoutCompressedBlockCache) {
auto options = GetOptions(table_options); auto options = GetOptions(table_options);
InitTable(options); InitTable(options);
std::shared_ptr<Cache> cache = NewLRUCache(0, 0, false); LRUCacheOptions co;
co.capacity = 0;
co.num_shard_bits = 0;
co.strict_capacity_limit = false;
// Needed not to count entry stats collector
co.metadata_charge_policy = kDontChargeCacheMetadata;
std::shared_ptr<Cache> cache = NewLRUCache(co);
table_options.block_cache = cache; table_options.block_cache = cache;
options.table_factory.reset(NewBlockBasedTableFactory(table_options)); options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options); Reopen(options);
@ -265,7 +280,13 @@ TEST_F(DBBlockCacheTest, TestWithCompressedBlockCache) {
ReadOptions read_options; ReadOptions read_options;
std::shared_ptr<Cache> compressed_cache = NewLRUCache(1 << 25, 0, false); std::shared_ptr<Cache> compressed_cache = NewLRUCache(1 << 25, 0, false);
std::shared_ptr<Cache> cache = NewLRUCache(0, 0, false); LRUCacheOptions co;
co.capacity = 0;
co.num_shard_bits = 0;
co.strict_capacity_limit = false;
// Needed not to count entry stats collector
co.metadata_charge_policy = kDontChargeCacheMetadata;
std::shared_ptr<Cache> cache = NewLRUCache(co);
table_options.block_cache = cache; table_options.block_cache = cache;
table_options.no_block_cache = false; table_options.no_block_cache = false;
table_options.block_cache_compressed = compressed_cache; table_options.block_cache_compressed = compressed_cache;
@ -944,10 +965,15 @@ TEST_F(DBBlockCacheTest, CacheCompressionDict) {
} }
static void ClearCache(Cache* cache) { static void ClearCache(Cache* cache) {
auto roles = CopyCacheDeleterRoleMap();
std::deque<std::string> keys; std::deque<std::string> keys;
Cache::ApplyToAllEntriesOptions opts; Cache::ApplyToAllEntriesOptions opts;
auto callback = [&](const Slice& key, void* /*value*/, size_t /*charge*/, auto callback = [&](const Slice& key, void* /*value*/, size_t /*charge*/,
Cache::DeleterFn /*deleter*/) { Cache::DeleterFn deleter) {
if (roles.find(deleter) == roles.end()) {
// Keep the stats collector
return;
}
keys.push_back(key.ToString()); keys.push_back(key.ToString());
}; };
cache->ApplyToAllEntries(callback, opts); cache->ApplyToAllEntries(callback, opts);
@ -1126,6 +1152,9 @@ TEST_F(DBBlockCacheTest, CacheEntryRoleStats) {
&h, Cache::Priority::HIGH)); &h, Cache::Priority::HIGH));
ASSERT_GT(cache->GetUsage(), cache->GetCapacity()); ASSERT_GT(cache->GetUsage(), cache->GetCapacity());
expected = {}; expected = {};
// For CacheEntryStatsCollector
expected[static_cast<size_t>(CacheEntryRole::kMisc)] = 1;
// For Fill-it-up
expected[static_cast<size_t>(CacheEntryRole::kMisc)]++; expected[static_cast<size_t>(CacheEntryRole::kMisc)]++;
// Still able to hit on saved stats // Still able to hit on saved stats
EXPECT_EQ(prev_expected, GetCacheEntryRoleCountsBg()); EXPECT_EQ(prev_expected, GetCacheEntryRoleCountsBg());
@ -1134,6 +1163,48 @@ TEST_F(DBBlockCacheTest, CacheEntryRoleStats) {
EXPECT_EQ(expected, GetCacheEntryRoleCountsBg()); EXPECT_EQ(expected, GetCacheEntryRoleCountsBg());
cache->Release(h); cache->Release(h);
// Now we test that the DB mutex is not held during scans, for the ways
// we know how to (possibly) trigger them. Without a better good way to
// check this, we simply inject an acquire & release of the DB mutex
// deep in the stat collection code. If we were already holding the
// mutex, that is UB that would at least be found by TSAN.
int scan_count = 0;
SyncPoint::GetInstance()->SetCallBack(
"CacheEntryStatsCollector::GetStats:AfterApplyToAllEntries",
[this, &scan_count](void*) {
dbfull()->TEST_LockMutex();
dbfull()->TEST_UnlockMutex();
++scan_count;
});
SyncPoint::GetInstance()->EnableProcessing();
// Different things that might trigger a scan, with mock sleeps to
// force a miss.
env_->MockSleepForSeconds(10000);
dbfull()->DumpStats();
ASSERT_EQ(scan_count, 1);
env_->MockSleepForSeconds(10000);
ASSERT_TRUE(
db_->GetMapProperty(DB::Properties::kBlockCacheEntryStats, &values));
ASSERT_EQ(scan_count, 2);
env_->MockSleepForSeconds(10000);
std::string value_str;
ASSERT_TRUE(
db_->GetProperty(DB::Properties::kBlockCacheEntryStats, &value_str));
ASSERT_EQ(scan_count, 3);
env_->MockSleepForSeconds(10000);
ASSERT_TRUE(db_->GetProperty(DB::Properties::kCFStats, &value_str));
// To match historical speed, querying this property no longer triggers
// a scan, even if results are old. But periodic dump stats should keep
// things reasonably updated.
ASSERT_EQ(scan_count, /*unchanged*/ 3);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
} }
EXPECT_GE(iterations_tested, 1); EXPECT_GE(iterations_tested, 1);
} }

View File

@ -3600,6 +3600,41 @@ TEST_F(DBCompactionTest, CompactFilesOverlapInL0Bug) {
ASSERT_EQ("new_val", Get(Key(0))); ASSERT_EQ("new_val", Get(Key(0)));
} }
TEST_F(DBCompactionTest, DeleteFilesInRangeConflictWithCompaction) {
Options options = CurrentOptions();
DestroyAndReopen(options);
const Snapshot* snapshot = nullptr;
const int kMaxKey = 10;
for (int i = 0; i < kMaxKey; i++) {
ASSERT_OK(Put(Key(i), Key(i)));
ASSERT_OK(Delete(Key(i)));
if (!snapshot) {
snapshot = db_->GetSnapshot();
}
}
ASSERT_OK(Flush());
MoveFilesToLevel(1);
ASSERT_OK(Put(Key(kMaxKey), Key(kMaxKey)));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// test DeleteFilesInRange() deletes the files already picked for compaction
SyncPoint::GetInstance()->LoadDependency(
{{"VersionSet::LogAndApply:WriteManifestStart",
"BackgroundCallCompaction:0"},
{"DBImpl::BackgroundCompaction:Finish",
"VersionSet::LogAndApply:WriteManifestDone"}});
SyncPoint::GetInstance()->EnableProcessing();
// release snapshot which mark bottommost file for compaction
db_->ReleaseSnapshot(snapshot);
std::string begin_string = Key(0);
std::string end_string = Key(kMaxKey + 1);
Slice begin(begin_string);
Slice end(end_string);
ASSERT_OK(DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin, &end));
SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBCompactionTest, CompactBottomLevelFilesWithDeletions) { TEST_F(DBCompactionTest, CompactBottomLevelFilesWithDeletions) {
// bottom-level files may contain deletions due to snapshots protecting the // bottom-level files may contain deletions due to snapshots protecting the
// deleted keys. Once the snapshot is released, we should see files with many // deleted keys. Once the snapshot is released, we should see files with many

View File

@ -63,6 +63,7 @@
#include "memtable/hash_linklist_rep.h" #include "memtable/hash_linklist_rep.h"
#include "memtable/hash_skiplist_rep.h" #include "memtable/hash_skiplist_rep.h"
#include "monitoring/in_memory_stats_history.h" #include "monitoring/in_memory_stats_history.h"
#include "monitoring/instrumented_mutex.h"
#include "monitoring/iostats_context_imp.h" #include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
#include "monitoring/persistent_stats_history.h" #include "monitoring/persistent_stats_history.h"
@ -911,18 +912,31 @@ void DBImpl::DumpStats() {
if (shutdown_initiated_) { if (shutdown_initiated_) {
return; return;
} }
TEST_SYNC_POINT("DBImpl::DumpStats:StartRunning"); TEST_SYNC_POINT("DBImpl::DumpStats:StartRunning");
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->initialized()) {
// Release DB mutex for gathering cache entry stats. Pass over all
// column families for this first so that other stats are dumped
// near-atomically.
InstrumentedMutexUnlock u(&mutex_);
cfd->internal_stats()->CollectCacheEntryStats(/*foreground=*/false);
}
}
const std::string* property = &DB::Properties::kDBStats; const std::string* property = &DB::Properties::kDBStats;
const DBPropertyInfo* property_info = GetPropertyInfo(*property); const DBPropertyInfo* property_info = GetPropertyInfo(*property);
assert(property_info != nullptr); assert(property_info != nullptr);
assert(!property_info->need_out_of_mutex);
default_cf_internal_stats_->GetStringProperty(*property_info, *property, default_cf_internal_stats_->GetStringProperty(*property_info, *property,
&stats); &stats);
property = &DB::Properties::kCFStatsNoFileHistogram; property = &DB::Properties::kCFStatsNoFileHistogram;
property_info = GetPropertyInfo(*property); property_info = GetPropertyInfo(*property);
assert(property_info != nullptr); assert(property_info != nullptr);
assert(!property_info->need_out_of_mutex);
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->initialized()) { if (cfd->initialized()) {
cfd->internal_stats()->GetStringProperty(*property_info, *property, cfd->internal_stats()->GetStringProperty(*property_info, *property,
@ -933,6 +947,7 @@ void DBImpl::DumpStats() {
property = &DB::Properties::kCFFileHistogram; property = &DB::Properties::kCFFileHistogram;
property_info = GetPropertyInfo(*property); property_info = GetPropertyInfo(*property);
assert(property_info != nullptr); assert(property_info != nullptr);
assert(!property_info->need_out_of_mutex);
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->initialized()) { if (cfd->initialized()) {
cfd->internal_stats()->GetStringProperty(*property_info, *property, cfd->internal_stats()->GetStringProperty(*property_info, *property,
@ -3231,16 +3246,21 @@ bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
} }
return ret_value; return ret_value;
} else if (property_info->handle_string) { } else if (property_info->handle_string) {
InstrumentedMutexLock l(&mutex_); if (property_info->need_out_of_mutex) {
return cfd->internal_stats()->GetStringProperty(*property_info, property, return cfd->internal_stats()->GetStringProperty(*property_info, property,
value); value);
} else if (property_info->handle_string_dbimpl) { } else {
std::string tmp_value; InstrumentedMutexLock l(&mutex_);
bool ret_value = (this->*(property_info->handle_string_dbimpl))(&tmp_value); return cfd->internal_stats()->GetStringProperty(*property_info, property,
if (ret_value) { value);
*value = tmp_value; }
} else if (property_info->handle_string_dbimpl) {
if (property_info->need_out_of_mutex) {
return (this->*(property_info->handle_string_dbimpl))(value);
} else {
InstrumentedMutexLock l(&mutex_);
return (this->*(property_info->handle_string_dbimpl))(value);
} }
return ret_value;
} }
// Shouldn't reach here since exactly one of handle_string and handle_int // Shouldn't reach here since exactly one of handle_string and handle_int
// should be non-nullptr. // should be non-nullptr.
@ -3258,9 +3278,14 @@ bool DBImpl::GetMapProperty(ColumnFamilyHandle* column_family,
if (property_info == nullptr) { if (property_info == nullptr) {
return false; return false;
} else if (property_info->handle_map) { } else if (property_info->handle_map) {
InstrumentedMutexLock l(&mutex_); if (property_info->need_out_of_mutex) {
return cfd->internal_stats()->GetMapProperty(*property_info, property, return cfd->internal_stats()->GetMapProperty(*property_info, property,
value); value);
} else {
InstrumentedMutexLock l(&mutex_);
return cfd->internal_stats()->GetMapProperty(*property_info, property,
value);
}
} }
// If we reach this point it means that handle_map is not provided for the // If we reach this point it means that handle_map is not provided for the
// requested property // requested property
@ -3709,6 +3734,8 @@ Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
deleted_files.insert(level_file); deleted_files.insert(level_file);
level_file->being_compacted = true; level_file->being_compacted = true;
} }
vstorage->ComputeCompactionScore(*cfd->ioptions(),
*cfd->GetLatestMutableCFOptions());
} }
} }
if (edit.GetDeletedFiles().empty()) { if (edit.GetDeletedFiles().empty()) {

View File

@ -251,6 +251,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
write_thread_.EnterAsBatchGroupLeader(&w, &write_group); write_thread_.EnterAsBatchGroupLeader(&w, &write_group);
IOStatus io_s; IOStatus io_s;
Status pre_release_cb_status;
if (status.ok()) { if (status.ok()) {
// Rules for when we can update the memtable concurrently // Rules for when we can update the memtable concurrently
// 1. supported by memtable // 1. supported by memtable
@ -361,7 +362,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
writer->sequence, disable_memtable, writer->log_used, index++, writer->sequence, disable_memtable, writer->log_used, index++,
pre_release_callback_cnt); pre_release_callback_cnt);
if (!ws.ok()) { if (!ws.ok()) {
status = ws; status = pre_release_cb_status = ws;
break; break;
} }
} }
@ -414,10 +415,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (!w.CallbackFailed()) { if (!w.CallbackFailed()) {
if (!io_s.ok()) { if (!io_s.ok()) {
assert(pre_release_cb_status.ok());
IOStatusCheck(io_s); IOStatusCheck(io_s);
} else { } else {
WriteStatusCheck(status); WriteStatusCheck(pre_release_cb_status);
} }
} else {
assert(io_s.ok() && pre_release_cb_status.ok());
} }
if (need_log_sync) { if (need_log_sync) {

View File

@ -394,7 +394,7 @@ const std::unordered_map<std::string, DBPropertyInfo>
{DB::Properties::kDBStats, {DB::Properties::kDBStats,
{false, &InternalStats::HandleDBStats, nullptr, nullptr, nullptr}}, {false, &InternalStats::HandleDBStats, nullptr, nullptr, nullptr}},
{DB::Properties::kBlockCacheEntryStats, {DB::Properties::kBlockCacheEntryStats,
{false, &InternalStats::HandleBlockCacheEntryStats, nullptr, {true, &InternalStats::HandleBlockCacheEntryStats, nullptr,
&InternalStats::HandleBlockCacheEntryStatsMap, nullptr}}, &InternalStats::HandleBlockCacheEntryStatsMap, nullptr}},
{DB::Properties::kSSTables, {DB::Properties::kSSTables,
{false, &InternalStats::HandleSsTables, nullptr, nullptr, nullptr}}, {false, &InternalStats::HandleSsTables, nullptr, nullptr, nullptr}},
@ -510,7 +510,7 @@ const std::unordered_map<std::string, DBPropertyInfo>
{false, nullptr, &InternalStats::HandleBlockCachePinnedUsage, nullptr, {false, nullptr, &InternalStats::HandleBlockCachePinnedUsage, nullptr,
nullptr}}, nullptr}},
{DB::Properties::kOptionsStatistics, {DB::Properties::kOptionsStatistics,
{false, nullptr, nullptr, nullptr, {true, nullptr, nullptr, nullptr,
&DBImpl::GetPropertyHandleOptionsStatistics}}, &DBImpl::GetPropertyHandleOptionsStatistics}},
}; };
@ -526,29 +526,41 @@ InternalStats::InternalStats(int num_levels, SystemClock* clock,
number_levels_(num_levels), number_levels_(num_levels),
clock_(clock), clock_(clock),
cfd_(cfd), cfd_(cfd),
started_at_(clock->NowMicros()) {} started_at_(clock->NowMicros()) {
Cache* block_cache = nullptr;
Status InternalStats::CollectCacheEntryStats(bool foreground) { bool ok = GetBlockCacheForStats(&block_cache);
// Lazy initialize/reference the collector. It is pinned in cache (through if (ok) {
// a shared_ptr) so that it does not get immediately ejected from a full assert(block_cache);
// cache, which would force a re-scan on the next GetStats. // Extract or create stats collector. Could fail in rare cases.
if (!cache_entry_stats_collector_) { Status s = CacheEntryStatsCollector<CacheEntryRoleStats>::GetShared(
Cache* block_cache; block_cache, clock_, &cache_entry_stats_collector_);
bool ok = HandleBlockCacheStat(&block_cache); if (s.ok()) {
if (ok) { assert(cache_entry_stats_collector_);
// Extract or create stats collector.
Status s = CacheEntryStatsCollector<CacheEntryRoleStats>::GetShared(
block_cache, clock_, &cache_entry_stats_collector_);
if (!s.ok()) {
// Block cache likely under pressure. Scanning could make it worse,
// so skip.
return s;
}
} else { } else {
return Status::NotFound("block cache not configured"); assert(!cache_entry_stats_collector_);
} }
} else {
assert(!block_cache);
}
}
void InternalStats::TEST_GetCacheEntryRoleStats(CacheEntryRoleStats* stats,
bool foreground) {
CollectCacheEntryStats(foreground);
if (cache_entry_stats_collector_) {
cache_entry_stats_collector_->GetStats(stats);
}
}
void InternalStats::CollectCacheEntryStats(bool foreground) {
// This function is safe to call from any thread because
// cache_entry_stats_collector_ field is const after constructor
// and ->GetStats does its own synchronization, which also suffices for
// cache_entry_stats_.
if (!cache_entry_stats_collector_) {
return; // nothing to do (e.g. no block cache)
} }
assert(cache_entry_stats_collector_);
// For "background" collections, strictly cap the collection time by // For "background" collections, strictly cap the collection time by
// expanding effective cache TTL. For foreground, be more aggressive about // expanding effective cache TTL. For foreground, be more aggressive about
@ -556,9 +568,8 @@ Status InternalStats::CollectCacheEntryStats(bool foreground) {
int min_interval_seconds = foreground ? 10 : 180; int min_interval_seconds = foreground ? 10 : 180;
// 1/500 = max of 0.2% of one CPU thread // 1/500 = max of 0.2% of one CPU thread
int min_interval_factor = foreground ? 10 : 500; int min_interval_factor = foreground ? 10 : 500;
cache_entry_stats_collector_->GetStats( cache_entry_stats_collector_->CollectStats(min_interval_seconds,
&cache_entry_stats_, min_interval_seconds, min_interval_factor); min_interval_factor);
return Status::OK();
} }
std::function<void(const Slice&, void*, size_t, Cache::DeleterFn)> std::function<void(const Slice&, void*, size_t, Cache::DeleterFn)>
@ -649,21 +660,25 @@ void InternalStats::CacheEntryRoleStats::ToMap(
bool InternalStats::HandleBlockCacheEntryStats(std::string* value, bool InternalStats::HandleBlockCacheEntryStats(std::string* value,
Slice /*suffix*/) { Slice /*suffix*/) {
Status s = CollectCacheEntryStats(/*foreground*/ true); if (!cache_entry_stats_collector_) {
if (!s.ok()) {
return false; return false;
} }
*value = cache_entry_stats_.ToString(clock_); CollectCacheEntryStats(/*foreground*/ true);
CacheEntryRoleStats stats;
cache_entry_stats_collector_->GetStats(&stats);
*value = stats.ToString(clock_);
return true; return true;
} }
bool InternalStats::HandleBlockCacheEntryStatsMap( bool InternalStats::HandleBlockCacheEntryStatsMap(
std::map<std::string, std::string>* values, Slice /*suffix*/) { std::map<std::string, std::string>* values, Slice /*suffix*/) {
Status s = CollectCacheEntryStats(/*foreground*/ true); if (!cache_entry_stats_collector_) {
if (!s.ok()) {
return false; return false;
} }
cache_entry_stats_.ToMap(values, clock_); CollectCacheEntryStats(/*foreground*/ true);
CacheEntryRoleStats stats;
cache_entry_stats_collector_->GetStats(&stats);
stats.ToMap(values, clock_);
return true; return true;
} }
@ -1123,7 +1138,7 @@ bool InternalStats::HandleEstimateOldestKeyTime(uint64_t* value, DBImpl* /*db*/,
return *value > 0 && *value < std::numeric_limits<uint64_t>::max(); return *value > 0 && *value < std::numeric_limits<uint64_t>::max();
} }
bool InternalStats::HandleBlockCacheStat(Cache** block_cache) { bool InternalStats::GetBlockCacheForStats(Cache** block_cache) {
assert(block_cache != nullptr); assert(block_cache != nullptr);
auto* table_factory = cfd_->ioptions()->table_factory.get(); auto* table_factory = cfd_->ioptions()->table_factory.get();
assert(table_factory != nullptr); assert(table_factory != nullptr);
@ -1135,7 +1150,7 @@ bool InternalStats::HandleBlockCacheStat(Cache** block_cache) {
bool InternalStats::HandleBlockCacheCapacity(uint64_t* value, DBImpl* /*db*/, bool InternalStats::HandleBlockCacheCapacity(uint64_t* value, DBImpl* /*db*/,
Version* /*version*/) { Version* /*version*/) {
Cache* block_cache; Cache* block_cache;
bool ok = HandleBlockCacheStat(&block_cache); bool ok = GetBlockCacheForStats(&block_cache);
if (!ok) { if (!ok) {
return false; return false;
} }
@ -1146,7 +1161,7 @@ bool InternalStats::HandleBlockCacheCapacity(uint64_t* value, DBImpl* /*db*/,
bool InternalStats::HandleBlockCacheUsage(uint64_t* value, DBImpl* /*db*/, bool InternalStats::HandleBlockCacheUsage(uint64_t* value, DBImpl* /*db*/,
Version* /*version*/) { Version* /*version*/) {
Cache* block_cache; Cache* block_cache;
bool ok = HandleBlockCacheStat(&block_cache); bool ok = GetBlockCacheForStats(&block_cache);
if (!ok) { if (!ok) {
return false; return false;
} }
@ -1157,7 +1172,7 @@ bool InternalStats::HandleBlockCacheUsage(uint64_t* value, DBImpl* /*db*/,
bool InternalStats::HandleBlockCachePinnedUsage(uint64_t* value, DBImpl* /*db*/, bool InternalStats::HandleBlockCachePinnedUsage(uint64_t* value, DBImpl* /*db*/,
Version* /*version*/) { Version* /*version*/) {
Cache* block_cache; Cache* block_cache;
bool ok = HandleBlockCacheStat(&block_cache); bool ok = GetBlockCacheForStats(&block_cache);
if (!ok) { if (!ok) {
return false; return false;
} }
@ -1504,7 +1519,8 @@ void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) {
vstorage->GetTotalBlobFileSize() / kGB); vstorage->GetTotalBlobFileSize() / kGB);
value->append(buf); value->append(buf);
double seconds_up = (clock_->NowMicros() - started_at_ + 1) / kMicrosInSec; uint64_t now_micros = clock_->NowMicros();
double seconds_up = (now_micros - started_at_ + 1) / kMicrosInSec;
double interval_seconds_up = seconds_up - cf_stats_snapshot_.seconds_up; double interval_seconds_up = seconds_up - cf_stats_snapshot_.seconds_up;
snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n", snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n",
seconds_up, interval_seconds_up); seconds_up, interval_seconds_up);
@ -1619,14 +1635,20 @@ void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) {
cf_stats_snapshot_.comp_stats = compaction_stats_sum; cf_stats_snapshot_.comp_stats = compaction_stats_sum;
cf_stats_snapshot_.stall_count = total_stall_count; cf_stats_snapshot_.stall_count = total_stall_count;
// Always treat CFStats context as "background" // Do not gather cache entry stats during CFStats because DB
Status s = CollectCacheEntryStats(/*foreground=*/false); // mutex is held. Only dump last cached collection (rely on DB
if (s.ok()) { // periodic stats dump to update)
value->append(cache_entry_stats_.ToString(clock_)); if (cache_entry_stats_collector_) {
} else { CacheEntryRoleStats stats;
value->append("Block cache: "); // thread safe
value->append(s.ToString()); cache_entry_stats_collector_->GetStats(&stats);
value->append("\n");
constexpr uint64_t kDayInMicros = uint64_t{86400} * 1000000U;
// Skip if stats are extremely old (> 1 day, incl not yet populated)
if (now_micros - stats.last_end_time_micros_ < kDayInMicros) {
value->append(stats.ToString(clock_));
}
} }
} }

View File

@ -392,7 +392,6 @@ class InternalStats {
cf_stats_count_[i] = 0; cf_stats_count_[i] = 0;
cf_stats_value_[i] = 0; cf_stats_value_[i] = 0;
} }
cache_entry_stats_.Clear();
for (auto& comp_stat : comp_stats_) { for (auto& comp_stat : comp_stats_) {
comp_stat.Clear(); comp_stat.Clear();
} }
@ -459,20 +458,20 @@ class InternalStats {
bool GetIntPropertyOutOfMutex(const DBPropertyInfo& property_info, bool GetIntPropertyOutOfMutex(const DBPropertyInfo& property_info,
Version* version, uint64_t* value); Version* version, uint64_t* value);
// Unless there is a recent enough collection of the stats, collect and
// saved new cache entry stats. If `foreground`, require data to be more
// recent to skip re-collection.
//
// This should only be called while NOT holding the DB mutex.
void CollectCacheEntryStats(bool foreground);
const uint64_t* TEST_GetCFStatsValue() const { return cf_stats_value_; } const uint64_t* TEST_GetCFStatsValue() const { return cf_stats_value_; }
const std::vector<CompactionStats>& TEST_GetCompactionStats() const { const std::vector<CompactionStats>& TEST_GetCompactionStats() const {
return comp_stats_; return comp_stats_;
} }
const CacheEntryRoleStats& TEST_GetCacheEntryRoleStats(bool foreground) { void TEST_GetCacheEntryRoleStats(CacheEntryRoleStats* stats, bool foreground);
Status s = CollectCacheEntryStats(foreground);
if (!s.ok()) {
assert(false);
cache_entry_stats_.Clear();
}
return cache_entry_stats_;
}
// Store a mapping from the user-facing DB::Properties string to our // Store a mapping from the user-facing DB::Properties string to our
// DBPropertyInfo struct used internally for retrieving properties. // DBPropertyInfo struct used internally for retrieving properties.
@ -492,16 +491,18 @@ class InternalStats {
void DumpCFStatsNoFileHistogram(std::string* value); void DumpCFStatsNoFileHistogram(std::string* value);
void DumpCFFileHistogram(std::string* value); void DumpCFFileHistogram(std::string* value);
bool HandleBlockCacheStat(Cache** block_cache); bool GetBlockCacheForStats(Cache** block_cache);
Status CollectCacheEntryStats(bool foreground);
// Per-DB stats // Per-DB stats
std::atomic<uint64_t> db_stats_[kIntStatsNumMax]; std::atomic<uint64_t> db_stats_[kIntStatsNumMax];
// Per-ColumnFamily stats // Per-ColumnFamily stats
uint64_t cf_stats_value_[INTERNAL_CF_STATS_ENUM_MAX]; uint64_t cf_stats_value_[INTERNAL_CF_STATS_ENUM_MAX];
uint64_t cf_stats_count_[INTERNAL_CF_STATS_ENUM_MAX]; uint64_t cf_stats_count_[INTERNAL_CF_STATS_ENUM_MAX];
CacheEntryRoleStats cache_entry_stats_; // Initialize/reference the collector in constructor so that we don't need
// additional synchronization in InternalStats, relying on synchronization
// in CacheEntryStatsCollector::GetStats. This collector is pinned in cache
// (through a shared_ptr) so that it does not get immediately ejected from
// a full cache, which would force a re-scan on the next GetStats.
std::shared_ptr<CacheEntryStatsCollector<CacheEntryRoleStats>> std::shared_ptr<CacheEntryStatsCollector<CacheEntryRoleStats>>
cache_entry_stats_collector_; cache_entry_stats_collector_;
// Per-ColumnFamily/level compaction stats // Per-ColumnFamily/level compaction stats

View File

@ -4092,7 +4092,7 @@ Status VersionSet::ProcessManifestWrites(
{ {
FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_); FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
mu->Unlock(); mu->Unlock();
TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestStart");
TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WriteManifest", nullptr); TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WriteManifest", nullptr);
if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) { if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
for (int i = 0; i < static_cast<int>(versions.size()); ++i) { for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
@ -5590,6 +5590,9 @@ void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
filemetadata.oldest_blob_file_number = file->oldest_blob_file_number; filemetadata.oldest_blob_file_number = file->oldest_blob_file_number;
filemetadata.file_checksum = file->file_checksum; filemetadata.file_checksum = file->file_checksum;
filemetadata.file_checksum_func_name = file->file_checksum_func_name; filemetadata.file_checksum_func_name = file->file_checksum_func_name;
filemetadata.temperature = file->temperature;
filemetadata.oldest_ancester_time = file->TryGetOldestAncesterTime();
filemetadata.file_creation_time = file->TryGetFileCreationTime();
metadata->push_back(filemetadata); metadata->push_back(filemetadata);
} }
} }

View File

@ -10,7 +10,6 @@
#include <string> #include <string>
#include "rocksdb/customizable.h"
#include "rocksdb/rocksdb_namespace.h" #include "rocksdb/rocksdb_namespace.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -21,7 +20,7 @@ class Slice;
// used as keys in an sstable or a database. A Comparator implementation // used as keys in an sstable or a database. A Comparator implementation
// must be thread-safe since rocksdb may invoke its methods concurrently // must be thread-safe since rocksdb may invoke its methods concurrently
// from multiple threads. // from multiple threads.
class Comparator : public Customizable { class Comparator {
public: public:
Comparator() : timestamp_size_(0) {} Comparator() : timestamp_size_(0) {}
@ -38,11 +37,7 @@ class Comparator : public Customizable {
virtual ~Comparator() {} virtual ~Comparator() {}
static Status CreateFromString(const ConfigOptions& opts,
const std::string& id,
const Comparator** comp);
static const char* Type() { return "Comparator"; } static const char* Type() { return "Comparator"; }
// Three-way comparison. Returns value: // Three-way comparison. Returns value:
// < 0 iff "a" < "b", // < 0 iff "a" < "b",
// == 0 iff "a" == "b", // == 0 iff "a" == "b",

View File

@ -8,7 +8,6 @@
#pragma once #pragma once
#include <atomic>
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include <unordered_set> #include <unordered_set>
@ -270,7 +269,7 @@ class Configurable {
protected: protected:
// True once the object is prepared. Once the object is prepared, only // True once the object is prepared. Once the object is prepared, only
// mutable options can be configured. // mutable options can be configured.
std::atomic<bool> prepared_; bool prepared_;
// Returns the raw pointer for the associated named option. // Returns the raw pointer for the associated named option.
// The name is typically the name of an option registered via the // The name is typically the name of an option registered via the

View File

@ -578,7 +578,7 @@ class Statistics {
// Resets all ticker and histogram stats // Resets all ticker and histogram stats
virtual Status Reset() { return Status::NotSupported("Not implemented"); } virtual Status Reset() { return Status::NotSupported("Not implemented"); }
// String representation of the statistic object. // String representation of the statistic object. Must be thread-safe.
virtual std::string ToString() const { virtual std::string ToString() const {
// Do nothing by default // Do nothing by default
return std::string("ToString(): not implemented"); return std::string("ToString(): not implemented");

View File

@ -35,6 +35,7 @@ enum class OptionType {
kCompactionPri, kCompactionPri,
kSliceTransform, kSliceTransform,
kCompressionType, kCompressionType,
kComparator,
kCompactionFilter, kCompactionFilter,
kCompactionFilterFactory, kCompactionFilterFactory,
kCompactionStopStyle, kCompactionStopStyle,

View File

@ -10,8 +10,8 @@
#include "rocksdb/rocksdb_namespace.h" #include "rocksdb/rocksdb_namespace.h"
#define ROCKSDB_MAJOR 6 #define ROCKSDB_MAJOR 6
#define ROCKSDB_MINOR 21 #define ROCKSDB_MINOR 22
#define ROCKSDB_PATCH 0 #define ROCKSDB_PATCH 3
// Do not use these. We made the mistake of declaring macros starting with // Do not use these. We made the mistake of declaring macros starting with
// double underscore. Now we have to live with our choice. We'll deprecate these // double underscore. Now we have to live with our choice. We'll deprecate these

View File

@ -58,7 +58,8 @@ public class MemoryUtilTest {
db.getAggregatedLongProperty(UNFLUSHED_MEMTABLE_SIZE)); db.getAggregatedLongProperty(UNFLUSHED_MEMTABLE_SIZE));
assertThat(usage.get(MemoryUsageType.kTableReadersTotal)).isEqualTo( assertThat(usage.get(MemoryUsageType.kTableReadersTotal)).isEqualTo(
db.getAggregatedLongProperty(TABLE_READERS)); db.getAggregatedLongProperty(TABLE_READERS));
assertThat(usage.get(MemoryUsageType.kCacheTotal)).isEqualTo(0); // TODO(peterd): disable block cache entry stats and check for 0
assertThat(usage.get(MemoryUsageType.kCacheTotal)).isLessThan(1024);
db.put(key, value); db.put(key, value);
db.flush(flushOptions); db.flush(flushOptions);

View File

@ -51,8 +51,7 @@ class InstrumentedMutex {
int stats_code_; int stats_code_;
}; };
// A wrapper class for port::Mutex that provides additional layer // RAII wrapper for InstrumentedMutex
// for collecting stats and instrumentation.
class InstrumentedMutexLock { class InstrumentedMutexLock {
public: public:
explicit InstrumentedMutexLock(InstrumentedMutex* mutex) : mutex_(mutex) { explicit InstrumentedMutexLock(InstrumentedMutex* mutex) : mutex_(mutex) {
@ -69,6 +68,22 @@ class InstrumentedMutexLock {
void operator=(const InstrumentedMutexLock&) = delete; void operator=(const InstrumentedMutexLock&) = delete;
}; };
// RAII wrapper for temporary releasing InstrumentedMutex inside
// InstrumentedMutexLock
class InstrumentedMutexUnlock {
public:
explicit InstrumentedMutexUnlock(InstrumentedMutex* mutex) : mutex_(mutex) {
mutex_->Unlock();
}
~InstrumentedMutexUnlock() { mutex_->Lock(); }
private:
InstrumentedMutex* const mutex_;
InstrumentedMutexUnlock(const InstrumentedMutexUnlock&) = delete;
void operator=(const InstrumentedMutexUnlock&) = delete;
};
class InstrumentedCondVar { class InstrumentedCondVar {
public: public:
explicit InstrumentedCondVar(InstrumentedMutex* instrumented_mutex) explicit InstrumentedCondVar(InstrumentedMutex* instrumented_mutex)

View File

@ -535,30 +535,22 @@ static std::unordered_map<std::string, OptionTypeInfo>
OptionVerificationType::kNormal, OptionTypeFlags::kNone, OptionVerificationType::kNormal, OptionTypeFlags::kNone,
{0, OptionType::kCompressionType})}, {0, OptionType::kCompressionType})},
{"comparator", {"comparator",
OptionTypeInfo::AsCustomRawPtr<const Comparator>( {offset_of(&ImmutableCFOptions::user_comparator),
offset_of(&ImmutableCFOptions::user_comparator), OptionType::kComparator, OptionVerificationType::kByName,
OptionVerificationType::kByName, OptionTypeFlags::kCompareLoose, OptionTypeFlags::kCompareLoose,
// Serializes a Comparator // Parses the string and sets the corresponding comparator
[](const ConfigOptions& /*opts*/, const std::string&, [](const ConfigOptions& opts, const std::string& /*name*/,
const void* addr, std::string* value) { const std::string& value, void* addr) {
// it's a const pointer of const Comparator* auto old_comparator = static_cast<const Comparator**>(addr);
const auto* ptr = static_cast<const Comparator* const*>(addr); const Comparator* new_comparator = *old_comparator;
Status status =
// Since the user-specified comparator will be wrapped by opts.registry->NewStaticObject(value, &new_comparator);
// InternalKeyComparator, we should persist the user-specified if (status.ok()) {
// one instead of InternalKeyComparator. *old_comparator = new_comparator;
if (*ptr == nullptr) { return status;
*value = kNullptrString; }
} else { return Status::OK();
const Comparator* root_comp = (*ptr)->GetRootComparator(); }}},
if (root_comp == nullptr) {
root_comp = (*ptr);
}
*value = root_comp->Name();
}
return Status::OK();
},
/* Use the default match function*/ nullptr)},
{"memtable_insert_with_hint_prefix_extractor", {"memtable_insert_with_hint_prefix_extractor",
{offset_of( {offset_of(
&ImmutableCFOptions::memtable_insert_with_hint_prefix_extractor), &ImmutableCFOptions::memtable_insert_with_hint_prefix_extractor),

View File

@ -771,15 +771,6 @@ static int RegisterTestObjects(ObjectLibrary& library,
guard->reset(new mock::MockTableFactory()); guard->reset(new mock::MockTableFactory());
return guard->get(); return guard->get();
}); });
library.Register<const Comparator>(
test::SimpleSuffixReverseComparator::kClassName(),
[](const std::string& /*uri*/,
std::unique_ptr<const Comparator>* /*guard*/,
std::string* /* errmsg */) {
static test::SimpleSuffixReverseComparator ssrc;
return &ssrc;
});
return static_cast<int>(library.GetFactoryCount(&num_types)); return static_cast<int>(library.GetFactoryCount(&num_types));
} }
@ -789,7 +780,6 @@ static int RegisterLocalObjects(ObjectLibrary& library,
// Load any locally defined objects here // Load any locally defined objects here
return static_cast<int>(library.GetFactoryCount(&num_types)); return static_cast<int>(library.GetFactoryCount(&num_types));
} }
#endif // !ROCKSDB_LITE
class LoadCustomizableTest : public testing::Test { class LoadCustomizableTest : public testing::Test {
public: public:
@ -829,31 +819,7 @@ TEST_F(LoadCustomizableTest, LoadTableFactoryTest) {
ASSERT_STREQ(factory->Name(), "MockTable"); ASSERT_STREQ(factory->Name(), "MockTable");
} }
} }
#endif // !ROCKSDB_LITE
TEST_F(LoadCustomizableTest, LoadComparatorTest) {
const Comparator* bytewise = BytewiseComparator();
const Comparator* reverse = ReverseBytewiseComparator();
const Comparator* result = nullptr;
ASSERT_NOK(Comparator::CreateFromString(
config_options_, test::SimpleSuffixReverseComparator::kClassName(),
&result));
ASSERT_OK(
Comparator::CreateFromString(config_options_, bytewise->Name(), &result));
ASSERT_EQ(result, bytewise);
ASSERT_OK(
Comparator::CreateFromString(config_options_, reverse->Name(), &result));
ASSERT_EQ(result, reverse);
if (RegisterTests("Test")) {
ASSERT_OK(Comparator::CreateFromString(
config_options_, test::SimpleSuffixReverseComparator::kClassName(),
&result));
ASSERT_NE(result, nullptr);
ASSERT_STREQ(result->Name(),
test::SimpleSuffixReverseComparator::kClassName());
}
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

View File

@ -562,6 +562,23 @@ bool SerializeSingleOptionHelper(const void* opt_address,
: kNullptrString; : kNullptrString;
break; break;
} }
case OptionType::kComparator: {
// it's a const pointer of const Comparator*
const auto* ptr = static_cast<const Comparator* const*>(opt_address);
// Since the user-specified comparator will be wrapped by
// InternalKeyComparator, we should persist the user-specified one
// instead of InternalKeyComparator.
if (*ptr == nullptr) {
*value = kNullptrString;
} else {
const Comparator* root_comp = (*ptr)->GetRootComparator();
if (root_comp == nullptr) {
root_comp = (*ptr);
}
*value = root_comp->Name();
}
break;
}
case OptionType::kCompactionFilter: { case OptionType::kCompactionFilter: {
// it's a const pointer of const CompactionFilter* // it's a const pointer of const CompactionFilter*
const auto* ptr = const auto* ptr =

View File

@ -15,6 +15,8 @@
#endif #endif
#endif #endif
// ASAN (Address sanitizer)
#if defined(__clang__) #if defined(__clang__)
#if defined(__has_feature) #if defined(__has_feature)
#if __has_feature(address_sanitizer) #if __has_feature(address_sanitizer)
@ -39,3 +41,18 @@
#else #else
#define STATIC_AVOID_DESTRUCTION(Type, name) static Type& name = *new Type #define STATIC_AVOID_DESTRUCTION(Type, name) static Type& name = *new Type
#endif #endif
// TSAN (Thread sanitizer)
// For simplicity, standardize on the GCC define
#if defined(__clang__)
#if defined(__has_feature) && __has_feature(thread_sanitizer)
#define __SANITIZE_THREAD__ 1
#endif // __has_feature(thread_sanitizer)
#endif // __clang__
#ifdef __SANITIZE_THREAD__
#define TSAN_SUPPRESSION __attribute__((no_sanitize("thread")))
#else
#define TSAN_SUPPRESSION
#endif // TSAN_SUPPRESSION

View File

@ -36,6 +36,8 @@ void* SaveStack(int* /*num_frames*/, int /*first_frames_to_skip*/) {
#include <sys/sysctl.h> #include <sys/sysctl.h>
#endif #endif
#include "port/lang.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
namespace port { namespace port {
@ -163,8 +165,7 @@ static void StackTraceHandler(int sig) {
// Efforts to fix or suppress TSAN warnings "signal-unsafe call inside of // Efforts to fix or suppress TSAN warnings "signal-unsafe call inside of
// a signal" have failed, so just warn the user about them. // a signal" have failed, so just warn the user about them.
#if defined(__clang__) && defined(__has_feature) #ifdef __SANITIZE_THREAD__
#if __has_feature(thread_sanitizer)
fprintf(stderr, fprintf(stderr,
"==> NOTE: any above warnings about \"signal-unsafe call\" are\n" "==> NOTE: any above warnings about \"signal-unsafe call\" are\n"
"==> ignorable, as they are expected when generating a stack\n" "==> ignorable, as they are expected when generating a stack\n"
@ -173,7 +174,6 @@ static void StackTraceHandler(int sig) {
"==> in the TSAN warning can be useful for that. (The stack\n" "==> in the TSAN warning can be useful for that. (The stack\n"
"==> trace printed by the signal handler is likely obscured\n" "==> trace printed by the signal handler is likely obscured\n"
"==> by TSAN output.)\n"); "==> by TSAN output.)\n");
#endif
#endif #endif
// re-signal to default handler (so we still get core dump if needed...) // re-signal to default handler (so we still get core dump if needed...)

View File

@ -2564,24 +2564,21 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
s = Status::OK(); s = Status::OK();
} }
if (s.ok() && !results.back().IsEmpty()) { if (s.ok() && !results.back().IsEmpty()) {
if (results.back().IsReady()) { // Since we have a valid handle, check the value. If its nullptr,
// it means the cache is waiting for the final result and we're
// supposed to call WaitAll() to wait for the result.
if (results.back().GetValue() != nullptr) {
// Found it in the cache. Add NULL handle to indicate there is // Found it in the cache. Add NULL handle to indicate there is
// nothing to read from disk. // nothing to read from disk.
if (results.back().GetCacheHandle()) { if (results.back().GetCacheHandle()) {
results.back().UpdateCachedValue(); results.back().UpdateCachedValue();
// Its possible the cache lookup returned a non-null handle,
// but the lookup actually failed to produce a valid value
if (results.back().GetValue() == nullptr) {
block_handles.emplace_back(handle);
total_len += block_size(handle);
}
}
if (results.back().GetValue() != nullptr) {
block_handles.emplace_back(BlockHandle::NullBlockHandle());
} }
block_handles.emplace_back(BlockHandle::NullBlockHandle());
} else { } else {
// We have to wait for the asynchronous cache lookup to finish, // We have to wait for the cache lookup to finish in the
// and then we may have to read the block from disk anyway // background, and then we may have to read the block from disk
// anyway
assert(results.back().GetCacheHandle());
wait_for_cache_results = true; wait_for_cache_results = true;
block_handles.emplace_back(handle); block_handles.emplace_back(handle);
cache_handles.emplace_back(results.back().GetCacheHandle()); cache_handles.emplace_back(results.back().GetCacheHandle());

View File

@ -98,8 +98,10 @@ class PlainInternalKeyComparator : public InternalKeyComparator {
class SimpleSuffixReverseComparator : public Comparator { class SimpleSuffixReverseComparator : public Comparator {
public: public:
SimpleSuffixReverseComparator() {} SimpleSuffixReverseComparator() {}
static const char* kClassName() { return "SimpleSuffixReverseComparator"; }
virtual const char* Name() const override { return kClassName(); } virtual const char* Name() const override {
return "SimpleSuffixReverseComparator";
}
virtual int Compare(const Slice& a, const Slice& b) const override { virtual int Compare(const Slice& a, const Slice& b) const override {
Slice prefix_a = Slice(a.data(), 8); Slice prefix_a = Slice(a.data(), 8);

View File

@ -9,6 +9,7 @@
#include <fstream> #include <fstream>
#include "monitoring/instrumented_mutex.h" #include "monitoring/instrumented_mutex.h"
#include "port/lang.h"
#include "rocksdb/file_system.h" #include "rocksdb/file_system.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "trace_replay/trace_replay.h" #include "trace_replay/trace_replay.h"
@ -157,20 +158,6 @@ class IOTracer {
// mutex and ignore the operation if writer_is null. So its ok if // mutex and ignore the operation if writer_is null. So its ok if
// tracing_enabled shows non updated value. // tracing_enabled shows non updated value.
#if defined(__clang__)
#if defined(__has_feature) && __has_feature(thread_sanitizer)
#define TSAN_SUPPRESSION __attribute__((no_sanitize("thread")))
#endif // __has_feature(thread_sanitizer)
#else // __clang__
#ifdef __SANITIZE_THREAD__
#define TSAN_SUPPRESSION __attribute__((no_sanitize("thread")))
#endif // __SANITIZE_THREAD__
#endif // __clang__
#ifndef TSAN_SUPPRESSION
#define TSAN_SUPPRESSION
#endif // TSAN_SUPPRESSION
// Start writing IO operations to the trace_writer. // Start writing IO operations to the trace_writer.
TSAN_SUPPRESSION Status TSAN_SUPPRESSION Status
StartIOTrace(SystemClock* clock, const TraceOptions& trace_options, StartIOTrace(SystemClock* clock, const TraceOptions& trace_options,

View File

@ -8,17 +8,11 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include <stdint.h> #include <stdint.h>
#include <algorithm> #include <algorithm>
#include <memory> #include <memory>
#include <mutex>
#include "options/configurable_helper.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/utilities/object_registry.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -26,8 +20,8 @@ namespace {
class BytewiseComparatorImpl : public Comparator { class BytewiseComparatorImpl : public Comparator {
public: public:
BytewiseComparatorImpl() { } BytewiseComparatorImpl() { }
static const char* kClassName() { return "leveldb.BytewiseComparator"; }
const char* Name() const override { return kClassName(); } const char* Name() const override { return "leveldb.BytewiseComparator"; }
int Compare(const Slice& a, const Slice& b) const override { int Compare(const Slice& a, const Slice& b) const override {
return a.compare(b); return a.compare(b);
@ -145,10 +139,9 @@ class ReverseBytewiseComparatorImpl : public BytewiseComparatorImpl {
public: public:
ReverseBytewiseComparatorImpl() { } ReverseBytewiseComparatorImpl() { }
static const char* kClassName() { const char* Name() const override {
return "rocksdb.ReverseBytewiseComparator"; return "rocksdb.ReverseBytewiseComparator";
} }
const char* Name() const override { return kClassName(); }
int Compare(const Slice& a, const Slice& b) const override { int Compare(const Slice& a, const Slice& b) const override {
return -a.compare(b); return -a.compare(b);
@ -227,77 +220,4 @@ const Comparator* ReverseBytewiseComparator() {
return &rbytewise; return &rbytewise;
} }
#ifndef ROCKSDB_LITE
static int RegisterBuiltinComparators(ObjectLibrary& library,
const std::string& /*arg*/) {
library.Register<const Comparator>(
BytewiseComparatorImpl::kClassName(),
[](const std::string& /*uri*/,
std::unique_ptr<const Comparator>* /*guard */,
std::string* /* errmsg */) { return BytewiseComparator(); });
library.Register<const Comparator>(
ReverseBytewiseComparatorImpl::kClassName(),
[](const std::string& /*uri*/,
std::unique_ptr<const Comparator>* /*guard */,
std::string* /* errmsg */) { return ReverseBytewiseComparator(); });
return 2;
}
#endif // ROCKSDB_LITE
Status Comparator::CreateFromString(const ConfigOptions& config_options,
const std::string& value,
const Comparator** result) {
#ifndef ROCKSDB_LITE
static std::once_flag once;
std::call_once(once, [&]() {
RegisterBuiltinComparators(*(ObjectLibrary::Default().get()), "");
});
#endif // ROCKSDB_LITE
std::string id;
std::unordered_map<std::string, std::string> opt_map;
Status status =
ConfigurableHelper::GetOptionsMap(value, *result, &id, &opt_map);
if (!status.ok()) { // GetOptionsMap failed
return status;
}
std::string curr_opts;
#ifndef ROCKSDB_LITE
if (*result != nullptr && (*result)->GetId() == id) {
// Try to get the existing options, ignoring any errors
ConfigOptions embedded = config_options;
embedded.delimiter = ";";
(*result)->GetOptionString(embedded, &curr_opts).PermitUncheckedError();
}
#endif
if (id == BytewiseComparatorImpl::kClassName()) {
*result = BytewiseComparator();
} else if (id == ReverseBytewiseComparatorImpl::kClassName()) {
*result = ReverseBytewiseComparator();
} else if (value.empty()) {
// No Id and no options. Clear the object
*result = nullptr;
return Status::OK();
} else if (id.empty()) { // We have no Id but have options. Not good
return Status::NotSupported("Cannot reset object ", id);
} else {
#ifndef ROCKSDB_LITE
status = config_options.registry->NewStaticObject(id, result);
#else
status = Status::NotSupported("Cannot load object in LITE mode ", id);
#endif // ROCKSDB_LITE
if (!status.ok()) {
if (config_options.ignore_unsupported_options &&
status.IsNotSupported()) {
return Status::OK();
} else {
return status;
}
} else if (!curr_opts.empty() || !opt_map.empty()) {
Comparator* comparator = const_cast<Comparator*>(*result);
status = ConfigurableHelper::ConfigureNewObject(
config_options, comparator, id, curr_opts, opt_map);
}
}
return status;
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

View File

@ -4,7 +4,9 @@
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
#include "rocksdb/utilities/sim_cache.h" #include "rocksdb/utilities/sim_cache.h"
#include <cstdlib> #include <cstdlib>
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
@ -87,6 +89,8 @@ TEST_F(SimCacheTest, SimCache) {
options.table_factory.reset(NewBlockBasedTableFactory(table_options)); options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options); Reopen(options);
RecordCacheCounters(options); RecordCacheCounters(options);
// due to cache entry stats collector
uint64_t base_misses = simCache->get_miss_counter();
std::vector<std::unique_ptr<Iterator>> iterators(kNumBlocks); std::vector<std::unique_ptr<Iterator>> iterators(kNumBlocks);
Iterator* iter = nullptr; Iterator* iter = nullptr;
@ -99,8 +103,8 @@ TEST_F(SimCacheTest, SimCache) {
CheckCacheCounters(options, 1, 0, 1, 0); CheckCacheCounters(options, 1, 0, 1, 0);
iterators[i].reset(iter); iterators[i].reset(iter);
} }
ASSERT_EQ(kNumBlocks, ASSERT_EQ(kNumBlocks, simCache->get_hit_counter() +
simCache->get_hit_counter() + simCache->get_miss_counter()); simCache->get_miss_counter() - base_misses);
ASSERT_EQ(0, simCache->get_hit_counter()); ASSERT_EQ(0, simCache->get_hit_counter());
size_t usage = simCache->GetUsage(); size_t usage = simCache->GetUsage();
ASSERT_LT(0, usage); ASSERT_LT(0, usage);
@ -137,8 +141,8 @@ TEST_F(SimCacheTest, SimCache) {
CheckCacheCounters(options, 1, 0, 1, 0); CheckCacheCounters(options, 1, 0, 1, 0);
} }
ASSERT_EQ(0, simCache->GetPinnedUsage()); ASSERT_EQ(0, simCache->GetPinnedUsage());
ASSERT_EQ(3 * kNumBlocks + 1, ASSERT_EQ(3 * kNumBlocks + 1, simCache->get_hit_counter() +
simCache->get_hit_counter() + simCache->get_miss_counter()); simCache->get_miss_counter() - base_misses);
ASSERT_EQ(6, simCache->get_hit_counter()); ASSERT_EQ(6, simCache->get_hit_counter());
} }