Compare commits

...

13 Commits

Author SHA1 Message Date
Andrew Kryczka
58ec8cf304 Backport https://github.com/facebook/rocksdb/pull/7970 2021-02-17 10:42:56 -08:00
Andrew Kryczka
553aafaf8e bump version and update HISTORY.md for 6.17.2 2021-02-05 17:14:21 -08:00
Andrew Kryczka
1315375542 Allow range deletions in *TransactionDB only when safe (#7929)
Summary:
Explicitly reject all range deletions on `TransactionDB` or `OptimisticTransactionDB`, except when the user provides sufficient promises that allow us to proceed safely. The necessary promises are described in the API doc for `TransactionDB::DeleteRange()`. There is currently no way to provide enough promises to make it safe in `OptimisticTransactionDB`.

Fixes https://github.com/facebook/rocksdb/issues/7913.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7929

Test Plan: unit tests covering the cases it's permitted/rejected

Reviewed By: ltamasi

Differential Revision: D26240254

Pulled By: ajkr

fbshipit-source-id: 2834a0ce64cc3e4c3799e35b885a5e79c2f4f6d9
2021-02-05 17:13:26 -08:00
Zhichao Cao
a471d31e04 update HISTORY.md and bump version for 6.17.1 2021-01-28 12:27:20 -08:00
Zhichao Cao
8797aea803 Do not set bg error for compaction in retryable IO Error case (#7899)
Summary:
When retryable IO error occurs during compaction, it is mapped to soft error and set the BG error. However, auto resume is not called to clean the soft error since compaction will reschedule by itself. In this change, When retryable IO error occurs during compaction, BG error is not set. User will be informed the error via EventHelper.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7899

Test Plan: tested with error_handler_fs_test

Reviewed By: anand1976

Differential Revision: D26094097

Pulled By: zhichao-cao

fbshipit-source-id: c53424f11d237405592cd762f43cbbdf8da8234f
2021-01-28 12:20:02 -08:00
Levi Tamasi
9092ebed39 Remove superfluous 'Unreleased' heading 2021-01-21 14:29:52 -08:00
Levi Tamasi
675f351cc8 Update HISTORY.md for PR 7888 (#7890)
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/7890

Reviewed By: ajkr

Differential Revision: D26005509

Pulled By: ltamasi

fbshipit-source-id: e7eb732180d447900788d0e3a17dfd1c3f1e708a
2021-01-21 14:27:06 -08:00
Andrew Kryczka
11b42f92b1 workaround race conditions during PeriodicWorkScheduler registration (#7888)
Summary:
This provides a workaround for two race conditions that will be fixed in
a more sophisticated way later. This PR:

(1) Makes the client serialize calls to `Timer::Start()` and `Timer::Shutdown()` (see https://github.com/facebook/rocksdb/issues/7711). The long-term fix will be to make those functions thread-safe.
(2) Makes `PeriodicWorkScheduler` atomically add/cancel work together with starting/shutting down its `Timer`. The long-term fix will be for `Timer` API to offer more specialized APIs so the client will not need to synchronize.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7888

Test Plan: ran the repro provided in https://github.com/facebook/rocksdb/issues/7881

Reviewed By: jay-zhuang

Differential Revision: D25990891

Pulled By: ajkr

fbshipit-source-id: a97fdaebbda6d7db7ddb1b146738b68c16c5be38
2021-01-21 12:31:28 -08:00
Adam Retter
14d173ec81 Fix compilation against musl lib C (#7875)
Summary:
See https://github.com/percona/PerconaFT/pull/450

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7875

Reviewed By: ajkr

Differential Revision: D25938020

Pulled By: jay-zhuang

fbshipit-source-id: 9014dbc7b23bf92c5e63bfbdda4565bb0d2f2b58
2021-01-21 12:13:42 -08:00
Levi Tamasi
8b30b8d2a0 Make blob related VersionEdit tags unignorable (#7886)
Summary:
BlobFileAddition and BlobFileGarbage should not be in the ignorable tag
range, since if they are present in the MANIFEST, users cannot downgrade
to a RocksDB version that does not understand them without losing access
to the data in the blob files. The patch moves these two tags to the
unignorable range; this should still be safe at this point, since the
integrated BlobDB project is still work in progress and thus there
shouldn't be any ignorable BlobFileAddition/BlobFileGarbage tags out
there.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7886

Test Plan: `make check`

Reviewed By: cheng-chang

Differential Revision: D25980956

Pulled By: ltamasi

fbshipit-source-id: 13cf5bd61d77f049b513ecd5ad0be8c637e40a9d
2021-01-21 10:01:00 -08:00
Cheng Chang
b6471f8a5c Update HISTORY.md (#7887)
Summary:
Mention the forward compatibility fix for WAL related version edits.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7887

Reviewed By: ltamasi

Differential Revision: D25982494

Pulled By: cheng-chang

fbshipit-source-id: 4be292aa4bf7fbc8a27c0bef1e7a98ad3ea8e1fa
2021-01-20 15:07:42 -08:00
Cheng Chang
3dd5bc2a25 Make it able to ignore WAL related VersionEdits in older versions (#7873)
Summary:
Although the tags for `WalAddition`, `WalDeletion` are after `kTagSafeIgnoreMask`, to actually be able to skip these entries in older versions of RocksDB, we require that they are encoded with their encoded size as the prefix. This requirement is not met in the current codebase, so a downgraded DB may fail to open if these entries exist in the MANIFEST.

If a DB wants to downgrade, and its MANIFEST contains `WalAddition` or `WalDeletion`, it can set `track_and_verify_wals_in_manifest` to `false`, then restart twice, then downgrade. On the first restart, a new MANIFEST will be created with a `WalDeletion` indicating that all previously tracked WALs are removed from MANIFEST. On the second restart, since there is  no tracked WALs in MANIFEST now, a new MANIFEST will be created with neither `WalAddition` nor `WalDeletion`. Then the DB can downgrade.

Tags for `BlobFileAddition`, `BlobFileGarbage` also have the same problem, but this PR focuses on solving the problem for WAL edits.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7873

Test Plan: Added a `VersionEditTest::IgnorableTags` unit test to verify all entries with tags larger than `kTagSafeIgnoreMask` can actually be skipped and won't affect parsing of other entries.

Reviewed By: ajkr

Differential Revision: D25935930

Pulled By: cheng-chang

fbshipit-source-id: 7a02fdba4311d6084328c14aed110a26d08c3efb
2021-01-20 08:35:30 -08:00
Cheng Chang
48edcfc17d Update HISTORY.md (#7874)
Summary:
I find that the `track_and_verify_wals_in_manifest` option was only removed from 6.15 branch's HISTORY, but still appears under 6.15 in master branch's HISTORY. It should be moved to 6.16 since that's when the feature should be available.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7874

Reviewed By: jay-zhuang

Differential Revision: D25935971

Pulled By: cheng-chang

fbshipit-source-id: fe8bf1ec111597f9207e109aa3be65f8f919f1fd
2021-01-20 08:35:22 -08:00
40 changed files with 595 additions and 116 deletions

View File

@ -1,9 +1,26 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased
### Public API Change
* Add an option, `CompressionOptions::max_dict_buffer_bytes`, to limit the in-memory buffering for selecting samples for generating/training a dictionary. The limit is currently loosely adhered to.
## 6.17.2 (02/05/2021)
### Bug Fixes
* Since 6.15.0, `TransactionDB` returns error `Status`es from calls to `DeleteRange()` and calls to `Write()` where the `WriteBatch` contains a range deletion. Previously such operations may have succeeded while not providing the expected transactional guarantees. There are certain cases where range deletion can still be used on such DBs; see the API doc on `TransactionDB::DeleteRange()` for details.
* `OptimisticTransactionDB` now returns error `Status`es from calls to `DeleteRange()` and calls to `Write()` where the `WriteBatch` contains a range deletion. Previously such operations may have succeeded while not providing the expected transactional guarantees.
## 6.17.1 (01/28/2021)
### Behavior Changes
* When retryable IO error occurs during compaction, it is mapped to soft error and set the BG error. However, auto resume is not called to clean the soft error since compaction will reschedule by itself. In this change, When retryable IO error occurs during compaction, BG error is not set. User will be informed the error via EventHelper.
## 6.17.0 (01/15/2021) ## 6.17.0 (01/15/2021)
### Behavior Changes ### Behavior Changes
* When verifying full file checksum with `DB::VerifyFileChecksums()`, we now fail with `Status::InvalidArgument` if the name of the checksum generator used for verification does not match the name of the checksum generator used for protecting the file when it was created. * When verifying full file checksum with `DB::VerifyFileChecksums()`, we now fail with `Status::InvalidArgument` if the name of the checksum generator used for verification does not match the name of the checksum generator used for protecting the file when it was created.
* Since RocksDB does not continue write the same file if a file write fails for any reason, the file scope write IO error is treated the same as retryable IO error. More information about error handling of file scope IO error is included in `ErrorHandler::SetBGError`. * Since RocksDB does not continue write the same file if a file write fails for any reason, the file scope write IO error is treated the same as retryable IO error. More information about error handling of file scope IO error is included in `ErrorHandler::SetBGError`.
### Bug Fixes
* Version older than 6.15 cannot decode VersionEdits `WalAddition` and `WalDeletion`, fixed this by changing the encoded format of them to be ignorable by older versions.
* Fix a race condition between DB startups and shutdowns in managing the periodic background worker threads. One effect of this race condition could be the process being terminated.
### Public API Change ### Public API Change
* Add a public API WriteBufferManager::dummy_entries_in_cache_usage() which reports the size of dummy entries stored in cache (passed to WriteBufferManager). Dummy entries are used to account for DataBlocks. * Add a public API WriteBufferManager::dummy_entries_in_cache_usage() which reports the size of dummy entries stored in cache (passed to WriteBufferManager). Dummy entries are used to account for DataBlocks.
@ -11,10 +28,6 @@
### Behavior Changes ### Behavior Changes
* Attempting to write a merge operand without explicitly configuring `merge_operator` now fails immediately, causing the DB to enter read-only mode. Previously, failure was deferred until the `merge_operator` was needed by a user read or a background operation. * Attempting to write a merge operand without explicitly configuring `merge_operator` now fails immediately, causing the DB to enter read-only mode. Previously, failure was deferred until the `merge_operator` was needed by a user read or a background operation.
### API Changes
* `rocksdb_approximate_sizes` and `rocksdb_approximate_sizes_cf` in the C API now requires an error pointer (`char** errptr`) for receiving any error.
* All overloads of DB::GetApproximateSizes now return Status, so that any failure to obtain the sizes is indicated to the caller.
### Bug Fixes ### Bug Fixes
* Truncated WALs ending in incomplete records can no longer produce gaps in the recovered data when `WALRecoveryMode::kPointInTimeRecovery` is used. Gaps are still possible when WALs are truncated exactly on record boundaries; for complete protection, users should enable `track_and_verify_wals_in_manifest`. * Truncated WALs ending in incomplete records can no longer produce gaps in the recovered data when `WALRecoveryMode::kPointInTimeRecovery` is used. Gaps are still possible when WALs are truncated exactly on record boundaries; for complete protection, users should enable `track_and_verify_wals_in_manifest`.
* Fix a bug where compressed blocks read by MultiGet are not inserted into the compressed block cache when use_direct_reads = true. * Fix a bug where compressed blocks read by MultiGet are not inserted into the compressed block cache when use_direct_reads = true.
@ -33,6 +46,9 @@
### Public API Change ### Public API Change
* Deprecated public but rarely-used FilterBitsBuilder::CalculateNumEntry, which is replaced with ApproximateNumEntries taking a size_t parameter and returning size_t. * Deprecated public but rarely-used FilterBitsBuilder::CalculateNumEntry, which is replaced with ApproximateNumEntries taking a size_t parameter and returning size_t.
* To improve portability the functions `Env::GetChildren` and `Env::GetChildrenFileAttributes` will no longer return entries for the special directories `.` or `..`. * To improve portability the functions `Env::GetChildren` and `Env::GetChildrenFileAttributes` will no longer return entries for the special directories `.` or `..`.
* Added a new option `track_and_verify_wals_in_manifest`. If `true`, the log numbers and sizes of the synced WALs are tracked in MANIFEST, then during DB recovery, if a synced WAL is missing from disk, or the WAL's size does not match the recorded size in MANIFEST, an error will be reported and the recovery will be aborted. Note that this option does not work with secondary instance.
* `rocksdb_approximate_sizes` and `rocksdb_approximate_sizes_cf` in the C API now requires an error pointer (`char** errptr`) for receiving any error.
* All overloads of DB::GetApproximateSizes now return Status, so that any failure to obtain the sizes is indicated to the caller.
## 6.15.0 (11/13/2020) ## 6.15.0 (11/13/2020)
### Bug Fixes ### Bug Fixes
@ -54,7 +70,6 @@
### Public API Change ### Public API Change
* Deprecate `BlockBasedTableOptions::pin_l0_filter_and_index_blocks_in_cache` and `BlockBasedTableOptions::pin_top_level_index_and_filter`. These options still take effect until users migrate to the replacement APIs in `BlockBasedTableOptions::metadata_cache_options`. Migration guidance can be found in the API comments on the deprecated options. * Deprecate `BlockBasedTableOptions::pin_l0_filter_and_index_blocks_in_cache` and `BlockBasedTableOptions::pin_top_level_index_and_filter`. These options still take effect until users migrate to the replacement APIs in `BlockBasedTableOptions::metadata_cache_options`. Migration guidance can be found in the API comments on the deprecated options.
* Add new API `DB::VerifyFileChecksums` to verify SST file checksum with corresponding entries in the MANIFEST if present. Current implementation requires scanning and recomputing file checksums. * Add new API `DB::VerifyFileChecksums` to verify SST file checksum with corresponding entries in the MANIFEST if present. Current implementation requires scanning and recomputing file checksums.
* Added a new option `track_and_verify_wals_in_manifest`. If `true`, the log numbers and sizes of the synced WALs are tracked in MANIFEST, then during DB recovery, if a synced WAL is missing from disk, or the WAL's size does not match the recorded size in MANIFEST, an error will be reported and the recovery will be aborted. Note that this option does not work with secondary instance.
### Behavior Changes ### Behavior Changes
* The dictionary compression settings specified in `ColumnFamilyOptions::compression_opts` now additionally affect files generated by flush and compaction to non-bottommost level. Previously those settings at most affected files generated by compaction to bottommost level, depending on whether `ColumnFamilyOptions::bottommost_compression_opts` overrode them. Users who relied on dictionary compression settings in `ColumnFamilyOptions::compression_opts` affecting only the bottommost level can keep the behavior by moving their dictionary settings to `ColumnFamilyOptions::bottommost_compression_opts` and setting its `enabled` flag. * The dictionary compression settings specified in `ColumnFamilyOptions::compression_opts` now additionally affect files generated by flush and compaction to non-bottommost level. Previously those settings at most affected files generated by compaction to bottommost level, depending on whether `ColumnFamilyOptions::bottommost_compression_opts` overrode them. Users who relied on dictionary compression settings in `ColumnFamilyOptions::compression_opts` affecting only the bottommost level can keep the behavior by moving their dictionary settings to `ColumnFamilyOptions::bottommost_compression_opts` and setting its `enabled` flag.

13
db/c.cc
View File

@ -2774,6 +2774,14 @@ void rocksdb_options_set_bottommost_compression_options_zstd_max_train_bytes(
opt->rep.bottommost_compression_opts.enabled = enabled; opt->rep.bottommost_compression_opts.enabled = enabled;
} }
void rocksdb_options_set_bottommost_compression_options_max_dict_buffer_bytes(
rocksdb_options_t* opt, uint64_t max_dict_buffer_bytes,
unsigned char enabled) {
opt->rep.bottommost_compression_opts.max_dict_buffer_bytes =
max_dict_buffer_bytes;
opt->rep.bottommost_compression_opts.enabled = enabled;
}
void rocksdb_options_set_compression_options(rocksdb_options_t* opt, int w_bits, void rocksdb_options_set_compression_options(rocksdb_options_t* opt, int w_bits,
int level, int strategy, int level, int strategy,
int max_dict_bytes) { int max_dict_bytes) {
@ -2788,6 +2796,11 @@ void rocksdb_options_set_compression_options_zstd_max_train_bytes(
opt->rep.compression_opts.zstd_max_train_bytes = zstd_max_train_bytes; opt->rep.compression_opts.zstd_max_train_bytes = zstd_max_train_bytes;
} }
void rocksdb_options_set_compression_options_max_dict_buffer_bytes(
rocksdb_options_t* opt, uint64_t max_dict_buffer_bytes) {
opt->rep.compression_opts.max_dict_buffer_bytes = max_dict_buffer_bytes;
}
void rocksdb_options_set_prefix_extractor( void rocksdb_options_set_prefix_extractor(
rocksdb_options_t* opt, rocksdb_slicetransform_t* prefix_extractor) { rocksdb_options_t* opt, rocksdb_slicetransform_t* prefix_extractor) {
opt->rep.prefix_extractor.reset(prefix_extractor); opt->rep.prefix_extractor.reset(prefix_extractor);

View File

@ -954,6 +954,9 @@ class DBImpl : public DB {
// is only for the special test of CancelledCompactions // is only for the special test of CancelledCompactions
Status TEST_WaitForCompact(bool waitUnscheduled = false); Status TEST_WaitForCompact(bool waitUnscheduled = false);
// Get the background error status
Status TEST_GetBGError();
// Return the maximum overlapping data (in bytes) at next level for any // Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1. // file at a level >= 1.
int64_t TEST_MaxNextLevelOverlappingBytes( int64_t TEST_MaxNextLevelOverlappingBytes(

View File

@ -177,6 +177,11 @@ Status DBImpl::TEST_WaitForCompact(bool wait_unscheduled) {
return error_handler_.GetBGError(); return error_handler_.GetBGError();
} }
Status DBImpl::TEST_GetBGError() {
InstrumentedMutexLock l(&mutex_);
return error_handler_.GetBGError();
}
void DBImpl::TEST_LockMutex() { mutex_.Lock(); } void DBImpl::TEST_LockMutex() { mutex_.Lock(); }
void DBImpl::TEST_UnlockMutex() { mutex_.Unlock(); } void DBImpl::TEST_UnlockMutex() { mutex_.Unlock(); }

View File

@ -1410,67 +1410,88 @@ INSTANTIATE_TEST_CASE_P(
TEST_P(PresetCompressionDictTest, Flush) { TEST_P(PresetCompressionDictTest, Flush) {
// Verifies that dictionary is generated and written during flush only when // Verifies that dictionary is generated and written during flush only when
// `ColumnFamilyOptions::compression` enables dictionary. // `ColumnFamilyOptions::compression` enables dictionary. Also verifies the
// size of the dictionary is within expectations according to the limit on
// buffering set by `CompressionOptions::max_dict_buffer_bytes`.
const size_t kValueLen = 256; const size_t kValueLen = 256;
const size_t kKeysPerFile = 1 << 10; const size_t kKeysPerFile = 1 << 10;
const size_t kDictLen = 4 << 10; const size_t kDictLen = 16 << 10;
const size_t kBlockLen = 4 << 10;
Options options = CurrentOptions(); Options options = CurrentOptions();
if (bottommost_) { if (bottommost_) {
options.bottommost_compression = compression_type_; options.bottommost_compression = compression_type_;
options.bottommost_compression_opts.enabled = true; options.bottommost_compression_opts.enabled = true;
options.bottommost_compression_opts.max_dict_bytes = kDictLen; options.bottommost_compression_opts.max_dict_bytes = kDictLen;
options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
} else { } else {
options.compression = compression_type_; options.compression = compression_type_;
options.compression_opts.max_dict_bytes = kDictLen; options.compression_opts.max_dict_bytes = kDictLen;
options.compression_opts.max_dict_buffer_bytes = kBlockLen;
} }
options.memtable_factory.reset(new SpecialSkipListFactory(kKeysPerFile)); options.memtable_factory.reset(new SpecialSkipListFactory(kKeysPerFile));
options.statistics = CreateDBStatistics(); options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto; BlockBasedTableOptions bbto;
bbto.block_size = kBlockLen;
bbto.cache_index_and_filter_blocks = true; bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto)); options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options); Reopen(options);
uint64_t prev_compression_dict_misses =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS);
Random rnd(301); Random rnd(301);
for (size_t i = 0; i <= kKeysPerFile; ++i) { for (size_t i = 0; i <= kKeysPerFile; ++i) {
ASSERT_OK(Put(Key(static_cast<int>(i)), rnd.RandomString(kValueLen))); ASSERT_OK(Put(Key(static_cast<int>(i)), rnd.RandomString(kValueLen)));
} }
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
// If there's a compression dictionary, it should have been loaded when the // We can use `BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT` to detect whether a
// flush finished, incurring a cache miss. // compression dictionary exists since dictionaries would be preloaded when
uint64_t expected_compression_dict_misses; // the flush finishes.
if (bottommost_) { if (bottommost_) {
expected_compression_dict_misses = prev_compression_dict_misses; // Flush is never considered bottommost. This should change in the future
// since flushed files may have nothing underneath them, like the one in
// this test case.
ASSERT_EQ(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
0);
} else { } else {
expected_compression_dict_misses = prev_compression_dict_misses + 1; ASSERT_GT(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
0);
// Although we limited buffering to `kBlockLen`, there may be up to two
// blocks of data included in the dictionary since we only check limit after
// each block is built.
ASSERT_LE(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
2 * kBlockLen);
} }
ASSERT_EQ(expected_compression_dict_misses,
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
} }
TEST_P(PresetCompressionDictTest, CompactNonBottommost) { TEST_P(PresetCompressionDictTest, CompactNonBottommost) {
// Verifies that dictionary is generated and written during compaction to // Verifies that dictionary is generated and written during compaction to
// non-bottommost level only when `ColumnFamilyOptions::compression` enables // non-bottommost level only when `ColumnFamilyOptions::compression` enables
// dictionary. // dictionary. Also verifies the size of the dictionary is within expectations
// according to the limit on buffering set by
// `CompressionOptions::max_dict_buffer_bytes`.
const size_t kValueLen = 256; const size_t kValueLen = 256;
const size_t kKeysPerFile = 1 << 10; const size_t kKeysPerFile = 1 << 10;
const size_t kDictLen = 4 << 10; const size_t kDictLen = 16 << 10;
const size_t kBlockLen = 4 << 10;
Options options = CurrentOptions(); Options options = CurrentOptions();
if (bottommost_) { if (bottommost_) {
options.bottommost_compression = compression_type_; options.bottommost_compression = compression_type_;
options.bottommost_compression_opts.enabled = true; options.bottommost_compression_opts.enabled = true;
options.bottommost_compression_opts.max_dict_bytes = kDictLen; options.bottommost_compression_opts.max_dict_bytes = kDictLen;
options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
} else { } else {
options.compression = compression_type_; options.compression = compression_type_;
options.compression_opts.max_dict_bytes = kDictLen; options.compression_opts.max_dict_bytes = kDictLen;
options.compression_opts.max_dict_buffer_bytes = kBlockLen;
} }
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
options.statistics = CreateDBStatistics(); options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto; BlockBasedTableOptions bbto;
bbto.block_size = kBlockLen;
bbto.cache_index_and_filter_blocks = true; bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto)); options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options); Reopen(options);
@ -1492,8 +1513,8 @@ TEST_P(PresetCompressionDictTest, CompactNonBottommost) {
ASSERT_EQ("2,0,1", FilesPerLevel(0)); ASSERT_EQ("2,0,1", FilesPerLevel(0));
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
uint64_t prev_compression_dict_misses = uint64_t prev_compression_dict_bytes_inserted =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS); TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT);
// This L0->L1 compaction merges the two L0 files into L1. The produced L1 // This L0->L1 compaction merges the two L0 files into L1. The produced L1
// file is not bottommost due to the existing L2 file covering the same key- // file is not bottommost due to the existing L2 file covering the same key-
// range. // range.
@ -1501,38 +1522,52 @@ TEST_P(PresetCompressionDictTest, CompactNonBottommost) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
ASSERT_EQ("0,1,1", FilesPerLevel(0)); ASSERT_EQ("0,1,1", FilesPerLevel(0));
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
// If there's a compression dictionary, it should have been loaded when the // We can use `BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT` to detect whether a
// compaction finished, incurring a cache miss. // compression dictionary exists since dictionaries would be preloaded when
uint64_t expected_compression_dict_misses; // the compaction finishes.
if (bottommost_) { if (bottommost_) {
expected_compression_dict_misses = prev_compression_dict_misses; ASSERT_EQ(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
prev_compression_dict_bytes_inserted);
} else { } else {
expected_compression_dict_misses = prev_compression_dict_misses + 1; ASSERT_GT(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
prev_compression_dict_bytes_inserted);
// Although we limited buffering to `kBlockLen`, there may be up to two
// blocks of data included in the dictionary since we only check limit after
// each block is built.
ASSERT_LE(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
prev_compression_dict_bytes_inserted + 2 * kBlockLen);
} }
ASSERT_EQ(expected_compression_dict_misses,
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
} }
TEST_P(PresetCompressionDictTest, CompactBottommost) { TEST_P(PresetCompressionDictTest, CompactBottommost) {
// Verifies that dictionary is generated and written during compaction to // Verifies that dictionary is generated and written during compaction to
// non-bottommost level only when either `ColumnFamilyOptions::compression` or // non-bottommost level only when either `ColumnFamilyOptions::compression` or
// `ColumnFamilyOptions::bottommost_compression` enables dictionary. // `ColumnFamilyOptions::bottommost_compression` enables dictionary. Also
// verifies the size of the dictionary is within expectations according to the
// limit on buffering set by `CompressionOptions::max_dict_buffer_bytes`.
const size_t kValueLen = 256; const size_t kValueLen = 256;
const size_t kKeysPerFile = 1 << 10; const size_t kKeysPerFile = 1 << 10;
const size_t kDictLen = 4 << 10; const size_t kDictLen = 16 << 10;
const size_t kBlockLen = 4 << 10;
Options options = CurrentOptions(); Options options = CurrentOptions();
if (bottommost_) { if (bottommost_) {
options.bottommost_compression = compression_type_; options.bottommost_compression = compression_type_;
options.bottommost_compression_opts.enabled = true; options.bottommost_compression_opts.enabled = true;
options.bottommost_compression_opts.max_dict_bytes = kDictLen; options.bottommost_compression_opts.max_dict_bytes = kDictLen;
options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
} else { } else {
options.compression = compression_type_; options.compression = compression_type_;
options.compression_opts.max_dict_bytes = kDictLen; options.compression_opts.max_dict_bytes = kDictLen;
options.compression_opts.max_dict_buffer_bytes = kBlockLen;
} }
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
options.statistics = CreateDBStatistics(); options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto; BlockBasedTableOptions bbto;
bbto.block_size = kBlockLen;
bbto.cache_index_and_filter_blocks = true; bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto)); options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options); Reopen(options);
@ -1548,17 +1583,22 @@ TEST_P(PresetCompressionDictTest, CompactBottommost) {
ASSERT_EQ("2", FilesPerLevel(0)); ASSERT_EQ("2", FilesPerLevel(0));
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
uint64_t prev_compression_dict_misses = uint64_t prev_compression_dict_bytes_inserted =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS); TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT);
CompactRangeOptions cro; CompactRangeOptions cro;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
ASSERT_EQ("0,1", FilesPerLevel(0)); ASSERT_EQ("0,1", FilesPerLevel(0));
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
// If there's a compression dictionary, it should have been loaded when the ASSERT_GT(
// compaction finished, incurring a cache miss. TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
ASSERT_EQ(prev_compression_dict_misses + 1, prev_compression_dict_bytes_inserted);
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS)); // Although we limited buffering to `kBlockLen`, there may be up to two
// blocks of data included in the dictionary since we only check limit after
// each block is built.
ASSERT_LE(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
prev_compression_dict_bytes_inserted + 2 * kBlockLen);
} }
class CompactionCompressionListener : public EventListener { class CompactionCompressionListener : public EventListener {

View File

@ -417,11 +417,11 @@ const Status& ErrorHandler::SetBGError(const IOStatus& bg_io_err,
&new_bg_io_err, db_mutex_, &new_bg_io_err, db_mutex_,
&auto_recovery); &auto_recovery);
if (BackgroundErrorReason::kCompaction == reason) { if (BackgroundErrorReason::kCompaction == reason) {
Status bg_err(new_bg_io_err, Status::Severity::kSoftError); // We map the retryable IO error during compaction to soft error. Since
if (bg_err.severity() > bg_error_.severity()) { // compaction can reschedule by itself. We will not set the BG error in
bg_error_ = bg_err; // this case
} // TODO: a better way to set or clean the retryable IO error which
recover_context_ = context; // happens during compaction SST file write.
return bg_error_; return bg_error_;
} else if (BackgroundErrorReason::kFlushNoWAL == reason || } else if (BackgroundErrorReason::kFlushNoWAL == reason ||
BackgroundErrorReason::kManifestWriteNoWAL == reason) { BackgroundErrorReason::kManifestWriteNoWAL == reason) {

View File

@ -892,15 +892,17 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableError) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::OpenCompactionOutputFile", "CompactionJob::OpenCompactionOutputFile",
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); }); [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:Finish",
[&](void*) { CancelAllBackgroundWork(dbfull()); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put(Key(1), "val")); ASSERT_OK(Put(Key(1), "val"));
s = Flush(); s = Flush();
ASSERT_OK(s); ASSERT_OK(s);
s = dbfull()->TEST_WaitForCompact(); s = dbfull()->TEST_GetBGError();
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError); ASSERT_OK(s);
fault_fs_->SetFilesystemActive(true); fault_fs_->SetFilesystemActive(true);
SyncPoint::GetInstance()->ClearAllCallBacks(); SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->DisableProcessing();
@ -940,14 +942,17 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteFileScopeError) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::OpenCompactionOutputFile", "CompactionJob::OpenCompactionOutputFile",
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); }); [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:Finish",
[&](void*) { CancelAllBackgroundWork(dbfull()); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put(Key(1), "val")); ASSERT_OK(Put(Key(1), "val"));
s = Flush(); s = Flush();
ASSERT_OK(s); ASSERT_OK(s);
s = dbfull()->TEST_WaitForCompact(); s = dbfull()->TEST_GetBGError();
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError); ASSERT_OK(s);
fault_fs_->SetFilesystemActive(true); fault_fs_->SetFilesystemActive(true);
SyncPoint::GetInstance()->ClearAllCallBacks(); SyncPoint::GetInstance()->ClearAllCallBacks();
@ -2190,8 +2195,7 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableErrorAutoRecover) {
ASSERT_OK(s); ASSERT_OK(s);
s = dbfull()->TEST_WaitForCompact(); s = dbfull()->TEST_WaitForCompact();
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError); ASSERT_OK(s);
TEST_SYNC_POINT("CompactionWriteRetryableErrorAutoRecover0"); TEST_SYNC_POINT("CompactionWriteRetryableErrorAutoRecover0");
SyncPoint::GetInstance()->ClearAllCallBacks(); SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->DisableProcessing();

View File

@ -10,13 +10,14 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
PeriodicWorkScheduler::PeriodicWorkScheduler(Env* env) { PeriodicWorkScheduler::PeriodicWorkScheduler(Env* env) : timer_mu_(env) {
timer = std::unique_ptr<Timer>(new Timer(env)); timer = std::unique_ptr<Timer>(new Timer(env));
} }
void PeriodicWorkScheduler::Register(DBImpl* dbi, void PeriodicWorkScheduler::Register(DBImpl* dbi,
unsigned int stats_dump_period_sec, unsigned int stats_dump_period_sec,
unsigned int stats_persist_period_sec) { unsigned int stats_persist_period_sec) {
MutexLock l(&timer_mu_);
static std::atomic<uint64_t> initial_delay(0); static std::atomic<uint64_t> initial_delay(0);
timer->Start(); timer->Start();
if (stats_dump_period_sec > 0) { if (stats_dump_period_sec > 0) {
@ -41,6 +42,7 @@ void PeriodicWorkScheduler::Register(DBImpl* dbi,
} }
void PeriodicWorkScheduler::Unregister(DBImpl* dbi) { void PeriodicWorkScheduler::Unregister(DBImpl* dbi) {
MutexLock l(&timer_mu_);
timer->Cancel(GetTaskName(dbi, "dump_st")); timer->Cancel(GetTaskName(dbi, "dump_st"));
timer->Cancel(GetTaskName(dbi, "pst_st")); timer->Cancel(GetTaskName(dbi, "pst_st"));
timer->Cancel(GetTaskName(dbi, "flush_info_log")); timer->Cancel(GetTaskName(dbi, "flush_info_log"));
@ -78,7 +80,10 @@ PeriodicWorkTestScheduler* PeriodicWorkTestScheduler::Default(Env* env) {
MutexLock l(&mutex); MutexLock l(&mutex);
if (scheduler.timer.get() != nullptr && if (scheduler.timer.get() != nullptr &&
scheduler.timer->TEST_GetPendingTaskNum() == 0) { scheduler.timer->TEST_GetPendingTaskNum() == 0) {
{
MutexLock timer_mu_guard(&scheduler.timer_mu_);
scheduler.timer->Shutdown(); scheduler.timer->Shutdown();
}
scheduler.timer.reset(new Timer(env)); scheduler.timer.reset(new Timer(env));
} }
} }

View File

@ -42,6 +42,12 @@ class PeriodicWorkScheduler {
protected: protected:
std::unique_ptr<Timer> timer; std::unique_ptr<Timer> timer;
// `timer_mu_` serves two purposes currently:
// (1) to ensure calls to `Start()` and `Shutdown()` are serialized, as
// they are currently not implemented in a thread-safe way; and
// (2) to ensure the `Timer::Add()`s and `Timer::Start()` run atomically, and
// the `Timer::Cancel()`s and `Timer::Shutdown()` run atomically.
port::Mutex timer_mu_;
explicit PeriodicWorkScheduler(Env* env); explicit PeriodicWorkScheduler(Env* env);

View File

@ -226,13 +226,17 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
} }
for (const auto& wal_addition : wal_additions_) { for (const auto& wal_addition : wal_additions_) {
PutVarint32(dst, kWalAddition); PutVarint32(dst, kWalAddition2);
wal_addition.EncodeTo(dst); std::string encoded;
wal_addition.EncodeTo(&encoded);
PutLengthPrefixedSlice(dst, encoded);
} }
if (!wal_deletion_.IsEmpty()) { if (!wal_deletion_.IsEmpty()) {
PutVarint32(dst, kWalDeletion); PutVarint32(dst, kWalDeletion2);
wal_deletion_.EncodeTo(dst); std::string encoded;
wal_deletion_.EncodeTo(&encoded);
PutLengthPrefixedSlice(dst, encoded);
} }
// 0 is default and does not need to be explicitly written // 0 is default and does not need to be explicitly written
@ -375,6 +379,11 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
Status VersionEdit::DecodeFrom(const Slice& src) { Status VersionEdit::DecodeFrom(const Slice& src) {
Clear(); Clear();
#ifndef NDEBUG
bool ignore_ignorable_tags = false;
TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:IgnoreIgnorableTags",
&ignore_ignorable_tags);
#endif
Slice input = src; Slice input = src;
const char* msg = nullptr; const char* msg = nullptr;
uint32_t tag = 0; uint32_t tag = 0;
@ -385,6 +394,11 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
Slice str; Slice str;
InternalKey key; InternalKey key;
while (msg == nullptr && GetVarint32(&input, &tag)) { while (msg == nullptr && GetVarint32(&input, &tag)) {
#ifndef NDEBUG
if (ignore_ignorable_tags && tag > kTagSafeIgnoreMask) {
tag = kTagSafeIgnoreMask;
}
#endif
switch (tag) { switch (tag) {
case kDbId: case kDbId:
if (GetLengthPrefixedSlice(&input, &str)) { if (GetLengthPrefixedSlice(&input, &str)) {
@ -542,7 +556,8 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
break; break;
} }
case kBlobFileAddition: { case kBlobFileAddition:
case kBlobFileAddition_DEPRECATED: {
BlobFileAddition blob_file_addition; BlobFileAddition blob_file_addition;
const Status s = blob_file_addition.DecodeFrom(&input); const Status s = blob_file_addition.DecodeFrom(&input);
if (!s.ok()) { if (!s.ok()) {
@ -553,7 +568,8 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
break; break;
} }
case kBlobFileGarbage: { case kBlobFileGarbage:
case kBlobFileGarbage_DEPRECATED: {
BlobFileGarbage blob_file_garbage; BlobFileGarbage blob_file_garbage;
const Status s = blob_file_garbage.DecodeFrom(&input); const Status s = blob_file_garbage.DecodeFrom(&input);
if (!s.ok()) { if (!s.ok()) {
@ -575,6 +591,23 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
break; break;
} }
case kWalAddition2: {
Slice encoded;
if (!GetLengthPrefixedSlice(&input, &encoded)) {
msg = "WalAddition not prefixed by length";
break;
}
WalAddition wal_addition;
const Status s = wal_addition.DecodeFrom(&encoded);
if (!s.ok()) {
return s;
}
wal_additions_.emplace_back(std::move(wal_addition));
break;
}
case kWalDeletion: { case kWalDeletion: {
WalDeletion wal_deletion; WalDeletion wal_deletion;
const Status s = wal_deletion.DecodeFrom(&input); const Status s = wal_deletion.DecodeFrom(&input);
@ -586,6 +619,23 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
break; break;
} }
case kWalDeletion2: {
Slice encoded;
if (!GetLengthPrefixedSlice(&input, &encoded)) {
msg = "WalDeletion not prefixed by length";
break;
}
WalDeletion wal_deletion;
const Status s = wal_deletion.DecodeFrom(&encoded);
if (!s.ok()) {
return s;
}
wal_deletion_ = std::move(wal_deletion);
break;
}
case kColumnFamily: case kColumnFamily:
if (!GetVarint32(&input, &column_family_)) { if (!GetVarint32(&input, &column_family_)) {
if (!msg) { if (!msg) {

View File

@ -52,16 +52,21 @@ enum Tag : uint32_t {
kInAtomicGroup = 300, kInAtomicGroup = 300,
kBlobFileAddition = 400,
kBlobFileGarbage,
// Mask for an unidentified tag from the future which can be safely ignored. // Mask for an unidentified tag from the future which can be safely ignored.
kTagSafeIgnoreMask = 1 << 13, kTagSafeIgnoreMask = 1 << 13,
// Forward compatible (aka ignorable) records // Forward compatible (aka ignorable) records
kDbId, kDbId,
kBlobFileAddition, kBlobFileAddition_DEPRECATED,
kBlobFileGarbage, kBlobFileGarbage_DEPRECATED,
kWalAddition, kWalAddition,
kWalDeletion, kWalDeletion,
kFullHistoryTsLow, kFullHistoryTsLow,
kWalAddition2,
kWalDeletion2,
}; };
enum NewFileCustomTag : uint32_t { enum NewFileCustomTag : uint32_t {

View File

@ -324,14 +324,22 @@ TEST_F(VersionEditTest, AddWalEncodeDecode) {
TestEncodeDecode(edit); TestEncodeDecode(edit);
} }
static std::string PrefixEncodedWalAdditionWithLength(
const std::string& encoded) {
std::string ret;
PutVarint32(&ret, Tag::kWalAddition2);
PutLengthPrefixedSlice(&ret, encoded);
return ret;
}
TEST_F(VersionEditTest, AddWalDecodeBadLogNumber) { TEST_F(VersionEditTest, AddWalDecodeBadLogNumber) {
std::string encoded; std::string encoded;
PutVarint32(&encoded, Tag::kWalAddition);
{ {
// No log number. // No log number.
std::string encoded_edit = PrefixEncodedWalAdditionWithLength(encoded);
VersionEdit edit; VersionEdit edit;
Status s = edit.DecodeFrom(encoded); Status s = edit.DecodeFrom(encoded_edit);
ASSERT_TRUE(s.IsCorruption()); ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("Error decoding WAL log number") != ASSERT_TRUE(s.ToString().find("Error decoding WAL log number") !=
std::string::npos) std::string::npos)
@ -345,8 +353,10 @@ TEST_F(VersionEditTest, AddWalDecodeBadLogNumber) {
unsigned char* ptr = reinterpret_cast<unsigned char*>(&c); unsigned char* ptr = reinterpret_cast<unsigned char*>(&c);
*ptr = 128; *ptr = 128;
encoded.append(1, c); encoded.append(1, c);
std::string encoded_edit = PrefixEncodedWalAdditionWithLength(encoded);
VersionEdit edit; VersionEdit edit;
Status s = edit.DecodeFrom(encoded); Status s = edit.DecodeFrom(encoded_edit);
ASSERT_TRUE(s.IsCorruption()); ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("Error decoding WAL log number") != ASSERT_TRUE(s.ToString().find("Error decoding WAL log number") !=
std::string::npos) std::string::npos)
@ -358,14 +368,14 @@ TEST_F(VersionEditTest, AddWalDecodeBadTag) {
constexpr WalNumber kLogNumber = 100; constexpr WalNumber kLogNumber = 100;
constexpr uint64_t kSizeInBytes = 100; constexpr uint64_t kSizeInBytes = 100;
std::string encoded_without_tag; std::string encoded;
PutVarint32(&encoded_without_tag, Tag::kWalAddition); PutVarint64(&encoded, kLogNumber);
PutVarint64(&encoded_without_tag, kLogNumber);
{ {
// No tag. // No tag.
std::string encoded_edit = PrefixEncodedWalAdditionWithLength(encoded);
VersionEdit edit; VersionEdit edit;
Status s = edit.DecodeFrom(encoded_without_tag); Status s = edit.DecodeFrom(encoded_edit);
ASSERT_TRUE(s.IsCorruption()); ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos) ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos)
<< s.ToString(); << s.ToString();
@ -373,12 +383,15 @@ TEST_F(VersionEditTest, AddWalDecodeBadTag) {
{ {
// Only has size tag, no terminate tag. // Only has size tag, no terminate tag.
std::string encoded_with_size = encoded_without_tag; std::string encoded_with_size = encoded;
PutVarint32(&encoded_with_size, PutVarint32(&encoded_with_size,
static_cast<uint32_t>(WalAdditionTag::kSyncedSize)); static_cast<uint32_t>(WalAdditionTag::kSyncedSize));
PutVarint64(&encoded_with_size, kSizeInBytes); PutVarint64(&encoded_with_size, kSizeInBytes);
std::string encoded_edit =
PrefixEncodedWalAdditionWithLength(encoded_with_size);
VersionEdit edit; VersionEdit edit;
Status s = edit.DecodeFrom(encoded_with_size); Status s = edit.DecodeFrom(encoded_edit);
ASSERT_TRUE(s.IsCorruption()); ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos) ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos)
<< s.ToString(); << s.ToString();
@ -386,11 +399,14 @@ TEST_F(VersionEditTest, AddWalDecodeBadTag) {
{ {
// Only has terminate tag. // Only has terminate tag.
std::string encoded_with_terminate = encoded_without_tag; std::string encoded_with_terminate = encoded;
PutVarint32(&encoded_with_terminate, PutVarint32(&encoded_with_terminate,
static_cast<uint32_t>(WalAdditionTag::kTerminate)); static_cast<uint32_t>(WalAdditionTag::kTerminate));
std::string encoded_edit =
PrefixEncodedWalAdditionWithLength(encoded_with_terminate);
VersionEdit edit; VersionEdit edit;
ASSERT_OK(edit.DecodeFrom(encoded_with_terminate)); ASSERT_OK(edit.DecodeFrom(encoded_edit));
auto& wal_addition = edit.GetWalAdditions()[0]; auto& wal_addition = edit.GetWalAdditions()[0];
ASSERT_EQ(wal_addition.GetLogNumber(), kLogNumber); ASSERT_EQ(wal_addition.GetLogNumber(), kLogNumber);
ASSERT_FALSE(wal_addition.GetMetadata().HasSyncedSize()); ASSERT_FALSE(wal_addition.GetMetadata().HasSyncedSize());
@ -401,15 +417,15 @@ TEST_F(VersionEditTest, AddWalDecodeNoSize) {
constexpr WalNumber kLogNumber = 100; constexpr WalNumber kLogNumber = 100;
std::string encoded; std::string encoded;
PutVarint32(&encoded, Tag::kWalAddition);
PutVarint64(&encoded, kLogNumber); PutVarint64(&encoded, kLogNumber);
PutVarint32(&encoded, static_cast<uint32_t>(WalAdditionTag::kSyncedSize)); PutVarint32(&encoded, static_cast<uint32_t>(WalAdditionTag::kSyncedSize));
// No real size after the size tag. // No real size after the size tag.
{ {
// Without terminate tag. // Without terminate tag.
std::string encoded_edit = PrefixEncodedWalAdditionWithLength(encoded);
VersionEdit edit; VersionEdit edit;
Status s = edit.DecodeFrom(encoded); Status s = edit.DecodeFrom(encoded_edit);
ASSERT_TRUE(s.IsCorruption()); ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("Error decoding WAL file size") != ASSERT_TRUE(s.ToString().find("Error decoding WAL file size") !=
std::string::npos) std::string::npos)
@ -419,8 +435,10 @@ TEST_F(VersionEditTest, AddWalDecodeNoSize) {
{ {
// With terminate tag. // With terminate tag.
PutVarint32(&encoded, static_cast<uint32_t>(WalAdditionTag::kTerminate)); PutVarint32(&encoded, static_cast<uint32_t>(WalAdditionTag::kTerminate));
std::string encoded_edit = PrefixEncodedWalAdditionWithLength(encoded);
VersionEdit edit; VersionEdit edit;
Status s = edit.DecodeFrom(encoded); Status s = edit.DecodeFrom(encoded_edit);
ASSERT_TRUE(s.IsCorruption()); ASSERT_TRUE(s.IsCorruption());
// The terminate tag is misunderstood as the size. // The terminate tag is misunderstood as the size.
ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos) ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos)
@ -515,6 +533,62 @@ TEST_F(VersionEditTest, FullHistoryTsLow) {
TestEncodeDecode(edit); TestEncodeDecode(edit);
} }
// Tests that if RocksDB is downgraded, the new types of VersionEdits
// that have a tag larger than kTagSafeIgnoreMask can be safely ignored.
TEST_F(VersionEditTest, IgnorableTags) {
SyncPoint::GetInstance()->SetCallBack(
"VersionEdit::EncodeTo:IgnoreIgnorableTags", [&](void* arg) {
bool* ignore = static_cast<bool*>(arg);
*ignore = true;
});
SyncPoint::GetInstance()->EnableProcessing();
constexpr uint64_t kPrevLogNumber = 100;
constexpr uint64_t kLogNumber = 200;
constexpr uint64_t kNextFileNumber = 300;
constexpr uint64_t kColumnFamilyId = 400;
VersionEdit edit;
// Add some ignorable entries.
for (int i = 0; i < 2; i++) {
edit.AddWal(i + 1, WalMetadata(i + 2));
}
edit.SetDBId("db_id");
// Add unignorable entries.
edit.SetPrevLogNumber(kPrevLogNumber);
edit.SetLogNumber(kLogNumber);
// Add more ignorable entries.
edit.DeleteWalsBefore(100);
// Add unignorable entry.
edit.SetNextFile(kNextFileNumber);
// Add more ignorable entries.
edit.SetFullHistoryTsLow("ts");
// Add unignorable entry.
edit.SetColumnFamily(kColumnFamilyId);
std::string encoded;
ASSERT_TRUE(edit.EncodeTo(&encoded));
VersionEdit decoded;
ASSERT_OK(decoded.DecodeFrom(encoded));
// Check that all ignorable entries are ignored.
ASSERT_FALSE(decoded.HasDbId());
ASSERT_FALSE(decoded.HasFullHistoryTsLow());
ASSERT_FALSE(decoded.IsWalAddition());
ASSERT_FALSE(decoded.IsWalDeletion());
ASSERT_TRUE(decoded.GetWalAdditions().empty());
ASSERT_TRUE(decoded.GetWalDeletion().IsEmpty());
// Check that unignorable entries are still present.
ASSERT_EQ(edit.GetPrevLogNumber(), kPrevLogNumber);
ASSERT_EQ(edit.GetLogNumber(), kLogNumber);
ASSERT_EQ(edit.GetNextFile(), kNextFileNumber);
ASSERT_EQ(edit.GetColumnFamily(), kColumnFamilyId);
SyncPoint::GetInstance()->DisableProcessing();
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

View File

@ -206,6 +206,7 @@ DECLARE_string(bottommost_compression_type);
DECLARE_int32(compression_max_dict_bytes); DECLARE_int32(compression_max_dict_bytes);
DECLARE_int32(compression_zstd_max_train_bytes); DECLARE_int32(compression_zstd_max_train_bytes);
DECLARE_int32(compression_parallel_threads); DECLARE_int32(compression_parallel_threads);
DECLARE_uint64(compression_max_dict_buffer_bytes);
DECLARE_string(checksum_type); DECLARE_string(checksum_type);
DECLARE_string(hdfs); DECLARE_string(hdfs);
DECLARE_string(env_uri); DECLARE_string(env_uri);

View File

@ -626,6 +626,10 @@ DEFINE_int32(compression_zstd_max_train_bytes, 0,
DEFINE_int32(compression_parallel_threads, 1, DEFINE_int32(compression_parallel_threads, 1,
"Number of threads for parallel compression."); "Number of threads for parallel compression.");
DEFINE_uint64(compression_max_dict_buffer_bytes, 0,
"Buffering limit for SST file data to sample for dictionary "
"compression.");
DEFINE_string(bottommost_compression_type, "disable", DEFINE_string(bottommost_compression_type, "disable",
"Algorithm to use to compress bottommost level of the database. " "Algorithm to use to compress bottommost level of the database. "
"\"disable\" means disabling the feature"); "\"disable\" means disabling the feature");

View File

@ -2052,6 +2052,8 @@ void StressTest::Open() {
FLAGS_compression_zstd_max_train_bytes; FLAGS_compression_zstd_max_train_bytes;
options_.compression_opts.parallel_threads = options_.compression_opts.parallel_threads =
FLAGS_compression_parallel_threads; FLAGS_compression_parallel_threads;
options_.compression_opts.max_dict_buffer_bytes =
FLAGS_compression_max_dict_buffer_bytes;
options_.create_if_missing = true; options_.create_if_missing = true;
options_.max_manifest_file_size = FLAGS_max_manifest_file_size; options_.max_manifest_file_size = FLAGS_max_manifest_file_size;
options_.inplace_update_support = FLAGS_in_place_update; options_.inplace_update_support = FLAGS_in_place_update;

View File

@ -143,6 +143,28 @@ struct CompressionOptions {
// Default: false. // Default: false.
bool enabled; bool enabled;
// Limit on data buffering, which is used to gather samples to build a
// dictionary. Zero means no limit. When dictionary is disabled
// (`max_dict_bytes == 0`), enabling this limit (`max_dict_buffer_bytes != 0`)
// has no effect.
//
// In compaction, the buffering is limited to the target file size (see
// `target_file_size_base` and `target_file_size_multiplier`) even if this
// setting permits more buffering. Since we cannot determine where the file
// should be cut until data blocks are compressed with dictionary, buffering
// more than the target file size could lead to selecting samples that belong
// to a later output SST.
//
// Limiting too strictly may harm dictionary effectiveness since it forces
// RocksDB to pick samples from the initial portion of the output SST, which
// may not be representative of the whole file. Configuring this limit below
// `zstd_max_train_bytes` (when enabled) can restrict how many samples we can
// pass to the dictionary trainer. Configuring it below `max_dict_bytes` can
// restrict the size of the final dictionary.
//
// Default: 0 (unlimited)
uint64_t max_dict_buffer_bytes;
CompressionOptions() CompressionOptions()
: window_bits(-14), : window_bits(-14),
level(kDefaultCompressionLevel), level(kDefaultCompressionLevel),
@ -150,17 +172,19 @@ struct CompressionOptions {
max_dict_bytes(0), max_dict_bytes(0),
zstd_max_train_bytes(0), zstd_max_train_bytes(0),
parallel_threads(1), parallel_threads(1),
enabled(false) {} enabled(false),
max_dict_buffer_bytes(0) {}
CompressionOptions(int wbits, int _lev, int _strategy, int _max_dict_bytes, CompressionOptions(int wbits, int _lev, int _strategy, int _max_dict_bytes,
int _zstd_max_train_bytes, int _parallel_threads, int _zstd_max_train_bytes, int _parallel_threads,
bool _enabled) bool _enabled, uint64_t _max_dict_buffer_bytes)
: window_bits(wbits), : window_bits(wbits),
level(_lev), level(_lev),
strategy(_strategy), strategy(_strategy),
max_dict_bytes(_max_dict_bytes), max_dict_bytes(_max_dict_bytes),
zstd_max_train_bytes(_zstd_max_train_bytes), zstd_max_train_bytes(_zstd_max_train_bytes),
parallel_threads(_parallel_threads), parallel_threads(_parallel_threads),
enabled(_enabled) {} enabled(_enabled),
max_dict_buffer_bytes(_max_dict_buffer_bytes) {}
}; };
enum UpdateStatus { // Return status For inplace update callback enum UpdateStatus { // Return status For inplace update callback

View File

@ -998,11 +998,17 @@ extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_compression_options_zstd_max_train_bytes(rocksdb_options_t*, rocksdb_options_set_compression_options_zstd_max_train_bytes(rocksdb_options_t*,
int); int);
extern ROCKSDB_LIBRARY_API void extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_compression_options_max_dict_buffer_bytes(
rocksdb_options_t*, uint64_t);
extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_bottommost_compression_options(rocksdb_options_t*, int, int, rocksdb_options_set_bottommost_compression_options(rocksdb_options_t*, int, int,
int, int, unsigned char); int, int, unsigned char);
extern ROCKSDB_LIBRARY_API void extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_bottommost_compression_options_zstd_max_train_bytes( rocksdb_options_set_bottommost_compression_options_zstd_max_train_bytes(
rocksdb_options_t*, int, unsigned char); rocksdb_options_t*, int, unsigned char);
extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_bottommost_compression_options_max_dict_buffer_bytes(
rocksdb_options_t*, uint64_t, unsigned char);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_prefix_extractor( extern ROCKSDB_LIBRARY_API void rocksdb_options_set_prefix_extractor(
rocksdb_options_t*, rocksdb_slicetransform_t*); rocksdb_options_t*, rocksdb_slicetransform_t*);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_num_levels( extern ROCKSDB_LIBRARY_API void rocksdb_options_set_num_levels(

View File

@ -51,6 +51,8 @@ struct OptimisticTransactionDBOptions {
uint32_t occ_lock_buckets = (1 << 20); uint32_t occ_lock_buckets = (1 << 20);
}; };
// Range deletions (including those in `WriteBatch`es passed to `Write()`) are
// incompatible with `OptimisticTransactionDB` and will return a non-OK `Status`
class OptimisticTransactionDB : public StackableDB { class OptimisticTransactionDB : public StackableDB {
public: public:
// Open an OptimisticTransactionDB similar to DB::Open(). // Open an OptimisticTransactionDB similar to DB::Open().

View File

@ -344,6 +344,17 @@ class TransactionDB : public StackableDB {
// falls back to the un-optimized version of ::Write // falls back to the un-optimized version of ::Write
return Write(opts, updates); return Write(opts, updates);
} }
// Transactional `DeleteRange()` is not yet supported.
// However, users who know their deleted range does not conflict with
// anything can still use it via the `Write()` API. In all cases, the
// `Write()` overload specifying `TransactionDBWriteOptimizations` must be
// used and `skip_concurrency_control` must be set. When using either
// WRITE_PREPARED or WRITE_UNPREPARED , `skip_duplicate_key_check` must
// additionally be set.
virtual Status DeleteRange(const WriteOptions&, ColumnFamilyHandle*,
const Slice&, const Slice&) override {
return Status::NotSupported();
}
// Open a TransactionDB similar to DB::Open(). // Open a TransactionDB similar to DB::Open().
// Internally call PrepareWrap() and WrapDB() // Internally call PrepareWrap() and WrapDB()
// If the return status is not ok, then dbptr is set to nullptr. // If the return status is not ok, then dbptr is set to nullptr.

View File

@ -6,7 +6,7 @@
#define ROCKSDB_MAJOR 6 #define ROCKSDB_MAJOR 6
#define ROCKSDB_MINOR 17 #define ROCKSDB_MINOR 17
#define ROCKSDB_PATCH 0 #define ROCKSDB_PATCH 2
// Do not use these. We made the mistake of declaring macros starting with // Do not use these. We made the mistake of declaring macros starting with
// double underscore. Now we have to live with our choice. We'll deprecate these // double underscore. Now we have to live with our choice. We'll deprecate these

View File

@ -132,6 +132,27 @@ jint Java_org_rocksdb_CompressionOptions_zstdMaxTrainBytes(
return static_cast<jint>(opt->zstd_max_train_bytes); return static_cast<jint>(opt->zstd_max_train_bytes);
} }
/*
* Class: org_rocksdb_CompressionOptions
* Method: setMaxDictBufferBytes
* Signature: (JJ)V
*/
void Java_org_rocksdb_CompressionOptions_setMaxDictBufferBytes(
JNIEnv*, jobject, jlong jhandle, jlong jmax_dict_buffer_bytes) {
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
opt->max_dict_buffer_bytes = static_cast<uint64_t>(jmax_dict_buffer_bytes);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: maxDictBufferBytes
* Signature: (J)J
*/
jlong Java_org_rocksdb_CompressionOptions_maxDictBufferBytes(JNIEnv*, jobject,
jlong jhandle) {
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
return static_cast<jlong>(opt->max_dict_buffer_bytes);
}
/* /*
* Class: org_rocksdb_CompressionOptions * Class: org_rocksdb_CompressionOptions
* Method: setEnabled * Method: setEnabled

View File

@ -105,7 +105,7 @@ static Status ParseCompressionOptions(const std::string& value,
} }
// Since parallel_threads comes before enabled but was added optionally // Since parallel_threads comes before enabled but was added optionally
// later, we need to check if this is the final token (meaning it is the // later, we need to check if this is the final token (meaning it is the
// enabled bit), or if there is another token (meaning this one is // enabled bit), or if there are more tokens (meaning this one is
// parallel_threads) // parallel_threads)
end = value.find(':', start); end = value.find(':', start);
if (end != std::string::npos) { if (end != std::string::npos) {
@ -113,7 +113,6 @@ static Status ParseCompressionOptions(const std::string& value,
ParseInt(value.substr(start, value.size() - start)); ParseInt(value.substr(start, value.size() - start));
} else { } else {
// parallel_threads is not serialized with this format, but enabled is // parallel_threads is not serialized with this format, but enabled is
compression_opts.parallel_threads = CompressionOptions().parallel_threads;
compression_opts.enabled = compression_opts.enabled =
ParseBoolean("", value.substr(start, value.size() - start)); ParseBoolean("", value.substr(start, value.size() - start));
} }
@ -128,6 +127,18 @@ static Status ParseCompressionOptions(const std::string& value,
} }
compression_opts.enabled = compression_opts.enabled =
ParseBoolean("", value.substr(start, value.size() - start)); ParseBoolean("", value.substr(start, value.size() - start));
end = value.find(':', start);
}
// max_dict_buffer_bytes is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
compression_opts.max_dict_buffer_bytes =
ParseUint64(value.substr(start, value.size() - start));
} }
return Status::OK(); return Status::OK();
} }
@ -161,6 +172,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
{"enabled", {"enabled",
{offsetof(struct CompressionOptions, enabled), OptionType::kBoolean, {offsetof(struct CompressionOptions, enabled), OptionType::kBoolean,
OptionVerificationType::kNormal, OptionTypeFlags::kMutable}}, OptionVerificationType::kNormal, OptionTypeFlags::kMutable}},
{"max_dict_buffer_bytes",
{offsetof(struct CompressionOptions, max_dict_buffer_bytes),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
}; };
static std::unordered_map<std::string, OptionTypeInfo> static std::unordered_map<std::string, OptionTypeInfo>

View File

@ -201,6 +201,11 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER( ROCKS_LOG_HEADER(
log, " Options.bottommost_compression_opts.enabled: %s", log, " Options.bottommost_compression_opts.enabled: %s",
bottommost_compression_opts.enabled ? "true" : "false"); bottommost_compression_opts.enabled ? "true" : "false");
ROCKS_LOG_HEADER(
log,
" Options.bottommost_compression_opts.max_dict_buffer_bytes: "
"%" PRIu64,
bottommost_compression_opts.max_dict_buffer_bytes);
ROCKS_LOG_HEADER(log, " Options.compression_opts.window_bits: %d", ROCKS_LOG_HEADER(log, " Options.compression_opts.window_bits: %d",
compression_opts.window_bits); compression_opts.window_bits);
ROCKS_LOG_HEADER(log, " Options.compression_opts.level: %d", ROCKS_LOG_HEADER(log, " Options.compression_opts.level: %d",
@ -222,6 +227,10 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(log, ROCKS_LOG_HEADER(log,
" Options.compression_opts.enabled: %s", " Options.compression_opts.enabled: %s",
compression_opts.enabled ? "true" : "false"); compression_opts.enabled ? "true" : "false");
ROCKS_LOG_HEADER(log,
" Options.compression_opts.max_dict_buffer_bytes: "
"%" PRIu64,
compression_opts.max_dict_buffer_bytes);
ROCKS_LOG_HEADER(log, " Options.level0_file_num_compaction_trigger: %d", ROCKS_LOG_HEADER(log, " Options.level0_file_num_compaction_trigger: %d",
level0_file_num_compaction_trigger); level0_file_num_compaction_trigger);
ROCKS_LOG_HEADER(log, " Options.level0_slowdown_writes_trigger: %d", ROCKS_LOG_HEADER(log, " Options.level0_slowdown_writes_trigger: %d",

View File

@ -409,10 +409,11 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
FillWithSpecialChar(options_ptr, sizeof(ColumnFamilyOptions), FillWithSpecialChar(options_ptr, sizeof(ColumnFamilyOptions),
kColumnFamilyOptionsExcluded); kColumnFamilyOptionsExcluded);
// It based on the behavior of compiler that padding bytes are not changed // Invoke a user-defined constructor in the hope that it does not overwrite
// when copying the struct. It's prone to failure when compiler behavior // padding bytes. Note that previously we relied on the implicitly-defined
// changes. We verify there is unset bytes to detect the case. // copy-assignment operator (i.e., `*options = ColumnFamilyOptions();`) here,
*options = ColumnFamilyOptions(); // which did in fact modify padding bytes.
options = new (options_ptr) ColumnFamilyOptions();
// Deprecatd option which is not initialized. Need to set it to avoid // Deprecatd option which is not initialized. Need to set it to avoid
// Valgrind error // Valgrind error
@ -470,8 +471,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"max_bytes_for_level_multiplier=60;" "max_bytes_for_level_multiplier=60;"
"memtable_factory=SkipListFactory;" "memtable_factory=SkipListFactory;"
"compression=kNoCompression;" "compression=kNoCompression;"
"compression_opts=5:6:7:8:9:true;" "compression_opts=5:6:7:8:9:10:true:11;"
"bottommost_compression_opts=4:5:6:7:8:true;" "bottommost_compression_opts=4:5:6:7:8:9:true:10;"
"bottommost_compression=kDisableCompressionOption;" "bottommost_compression=kDisableCompressionOption;"
"level0_stop_writes_trigger=33;" "level0_stop_writes_trigger=33;"
"num_levels=99;" "num_levels=99;"

View File

@ -725,7 +725,7 @@ TEST_F(OptionsTest, CompressionOptionsFromString) {
ASSERT_OK(GetColumnFamilyOptionsFromString( ASSERT_OK(GetColumnFamilyOptionsFromString(
ignore, ColumnFamilyOptions(), "compression_opts=5:6:7:8:9:x:false", ignore, ColumnFamilyOptions(), "compression_opts=5:6:7:8:9:x:false",
&base_cf_opt)); &base_cf_opt));
ASSERT_NOK(GetColumnFamilyOptionsFromString( ASSERT_OK(GetColumnFamilyOptionsFromString(
config_options, ColumnFamilyOptions(), config_options, ColumnFamilyOptions(),
"compression_opts=1:2:3:4:5:6:true:8", &base_cf_opt)); "compression_opts=1:2:3:4:5:6:true:8", &base_cf_opt));
ASSERT_OK(GetColumnFamilyOptionsFromString( ASSERT_OK(GetColumnFamilyOptionsFromString(

View File

@ -11,24 +11,25 @@
#include <assert.h> #include <assert.h>
#include <stdio.h> #include <stdio.h>
#include <atomic> #include <atomic>
#include <list> #include <list>
#include <map> #include <map>
#include <memory> #include <memory>
#include <numeric>
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include <utility> #include <utility>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "index_builder.h" #include "index_builder.h"
#include "memory/memory_allocator.h"
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/flush_block_policy.h" #include "rocksdb/flush_block_policy.h"
#include "rocksdb/merge_operator.h" #include "rocksdb/merge_operator.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "table/block_based/block.h" #include "table/block_based/block.h"
#include "table/block_based/block_based_filter_block.h" #include "table/block_based/block_based_filter_block.h"
#include "table/block_based/block_based_table_factory.h" #include "table/block_based/block_based_table_factory.h"
@ -40,8 +41,6 @@
#include "table/block_based/partitioned_filter_block.h" #include "table/block_based/partitioned_filter_block.h"
#include "table/format.h" #include "table/format.h"
#include "table/table_builder.h" #include "table/table_builder.h"
#include "memory/memory_allocator.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/compression.h" #include "util/compression.h"
#include "util/crc32c.h" #include "util/crc32c.h"
@ -306,6 +305,10 @@ struct BlockBasedTableBuilder::Rep {
kClosed, kClosed,
}; };
State state; State state;
// `kBuffered` state is allowed only as long as the buffering of uncompressed
// data blocks (see `data_block_and_keys_buffers`) does not exceed
// `buffer_limit`.
uint64_t buffer_limit;
const bool use_delta_encoding_for_index_values; const bool use_delta_encoding_for_index_values;
std::unique_ptr<FilterBlockBuilder> filter_builder; std::unique_ptr<FilterBlockBuilder> filter_builder;
@ -321,7 +324,6 @@ struct BlockBasedTableBuilder::Rep {
const std::string& column_family_name; const std::string& column_family_name;
uint64_t creation_time = 0; uint64_t creation_time = 0;
uint64_t oldest_key_time = 0; uint64_t oldest_key_time = 0;
const uint64_t target_file_size;
uint64_t file_creation_time = 0; uint64_t file_creation_time = 0;
// DB IDs // DB IDs
@ -407,7 +409,7 @@ struct BlockBasedTableBuilder::Rep {
const CompressionOptions& _compression_opts, const bool skip_filters, const CompressionOptions& _compression_opts, const bool skip_filters,
const int _level_at_creation, const std::string& _column_family_name, const int _level_at_creation, const std::string& _column_family_name,
const uint64_t _creation_time, const uint64_t _oldest_key_time, const uint64_t _creation_time, const uint64_t _oldest_key_time,
const uint64_t _target_file_size, const uint64_t _file_creation_time, const uint64_t target_file_size, const uint64_t _file_creation_time,
const std::string& _db_id, const std::string& _db_session_id) const std::string& _db_id, const std::string& _db_session_id)
: ioptions(_ioptions), : ioptions(_ioptions),
moptions(_moptions), moptions(_moptions),
@ -448,13 +450,20 @@ struct BlockBasedTableBuilder::Rep {
column_family_name(_column_family_name), column_family_name(_column_family_name),
creation_time(_creation_time), creation_time(_creation_time),
oldest_key_time(_oldest_key_time), oldest_key_time(_oldest_key_time),
target_file_size(_target_file_size),
file_creation_time(_file_creation_time), file_creation_time(_file_creation_time),
db_id(_db_id), db_id(_db_id),
db_session_id(_db_session_id), db_session_id(_db_session_id),
db_host_id(ioptions.db_host_id), db_host_id(ioptions.db_host_id),
status_ok(true), status_ok(true),
io_status_ok(true) { io_status_ok(true) {
if (target_file_size == 0) {
buffer_limit = compression_opts.max_dict_buffer_bytes;
} else if (compression_opts.max_dict_buffer_bytes == 0) {
buffer_limit = target_file_size;
} else {
buffer_limit =
std::min(target_file_size, compression_opts.max_dict_buffer_bytes);
}
for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
compression_ctxs[i].reset(new CompressionContext(compression_type)); compression_ctxs[i].reset(new CompressionContext(compression_type));
} }
@ -896,8 +905,8 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
r->first_key_in_next_block = &key; r->first_key_in_next_block = &key;
Flush(); Flush();
if (r->state == Rep::State::kBuffered && r->target_file_size != 0 && if (r->state == Rep::State::kBuffered && r->buffer_limit != 0 &&
r->data_begin_offset > r->target_file_size) { r->data_begin_offset > r->buffer_limit) {
EnterUnbuffered(); EnterUnbuffered();
} }
@ -997,23 +1006,28 @@ void BlockBasedTableBuilder::Flush() {
void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block, void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
BlockHandle* handle, BlockHandle* handle,
bool is_data_block) { bool is_data_block) {
WriteBlock(block->Finish(), handle, is_data_block); block->Finish();
block->Reset(); std::string raw_block_contents;
block->SwapAndReset(raw_block_contents);
if (rep_->state == Rep::State::kBuffered) {
assert(is_data_block);
assert(!rep_->data_block_and_keys_buffers.empty());
rep_->data_block_and_keys_buffers.back().first =
std::move(raw_block_contents);
rep_->data_begin_offset +=
rep_->data_block_and_keys_buffers.back().first.size();
return;
}
WriteBlock(raw_block_contents, handle, is_data_block);
} }
void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
BlockHandle* handle, BlockHandle* handle,
bool is_data_block) { bool is_data_block) {
Rep* r = rep_; Rep* r = rep_;
assert(r->state == Rep::State::kUnbuffered);
Slice block_contents; Slice block_contents;
CompressionType type; CompressionType type;
if (r->state == Rep::State::kBuffered) {
assert(is_data_block);
assert(!r->data_block_and_keys_buffers.empty());
r->data_block_and_keys_buffers.back().first = raw_block_contents.ToString();
r->data_begin_offset += r->data_block_and_keys_buffers.back().first.size();
return;
}
Status compress_status; Status compress_status;
CompressAndVerifyBlock(raw_block_contents, is_data_block, CompressAndVerifyBlock(raw_block_contents, is_data_block,
*(r->compression_ctxs[0]), r->verify_ctxs[0].get(), *(r->compression_ctxs[0]), r->verify_ctxs[0].get(),
@ -1629,14 +1643,37 @@ void BlockBasedTableBuilder::EnterUnbuffered() {
const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0 const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0
? r->compression_opts.zstd_max_train_bytes ? r->compression_opts.zstd_max_train_bytes
: r->compression_opts.max_dict_bytes; : r->compression_opts.max_dict_bytes;
// If buffer size is reasonable, we pre-generate a permutation to enforce
// uniqueness. This prevents wasting samples on duplicates, which is
// particularly likely when not many blocks were buffered.
std::vector<uint16_t> data_block_order;
size_t data_block_order_idx = 0;
if (r->data_block_and_keys_buffers.size() <= ((1 << 16) - 1)) {
data_block_order.resize(r->data_block_and_keys_buffers.size());
std::iota(data_block_order.begin(), data_block_order.end(),
static_cast<uint16_t>(0));
// We could be smarter and interleave the shuffling and sample appending
// logic. Then we could terminate as soon as `kSampleBytes` is reached,
// saving some shuffling computation.
RandomShuffle(data_block_order.begin(), data_block_order.end());
}
Random64 generator{r->creation_time}; Random64 generator{r->creation_time};
std::string compression_dict_samples; std::string compression_dict_samples;
std::vector<size_t> compression_dict_sample_lens; std::vector<size_t> compression_dict_sample_lens;
if (!r->data_block_and_keys_buffers.empty()) { if (!r->data_block_and_keys_buffers.empty()) {
while (compression_dict_samples.size() < kSampleBytes) { while ((data_block_order.empty() ||
size_t rand_idx = data_block_order_idx < data_block_order.size()) &&
static_cast<size_t>( compression_dict_samples.size() < kSampleBytes) {
size_t rand_idx;
if (data_block_order.empty()) {
rand_idx = static_cast<size_t>(
generator.Uniform(r->data_block_and_keys_buffers.size())); generator.Uniform(r->data_block_and_keys_buffers.size()));
} else {
rand_idx = data_block_order[data_block_order_idx];
++data_block_order_idx;
}
size_t copy_len = size_t copy_len =
std::min(kSampleBytes - compression_dict_samples.size(), std::min(kSampleBytes - compression_dict_samples.size(),
r->data_block_and_keys_buffers[rand_idx].first.size()); r->data_block_and_keys_buffers[rand_idx].first.size());

View File

@ -117,8 +117,9 @@ class BlockBasedTableBuilder : public TableBuilder {
// REQUIRES: `rep_->state == kBuffered` // REQUIRES: `rep_->state == kBuffered`
void EnterUnbuffered(); void EnterUnbuffered();
// Call block's Finish() method // Call block's Finish() method and then
// and then write the compressed block contents to file. // - in buffered mode, buffer the uncompressed block contents.
// - in unbuffered mode, write the compressed block contents to file.
void WriteBlock(BlockBuilder* block, BlockHandle* handle, bool is_data_block); void WriteBlock(BlockBuilder* block, BlockHandle* handle, bool is_data_block);
// Compress and write block content to the file. // Compress and write block content to the file.

View File

@ -235,7 +235,8 @@ Status SstFileDumper::ShowAllCompressionSizes(
const std::vector<std::pair<CompressionType, const char*>>& const std::vector<std::pair<CompressionType, const char*>>&
compression_types, compression_types,
int32_t compress_level_from, int32_t compress_level_to, int32_t compress_level_from, int32_t compress_level_to,
uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes) { uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes,
uint64_t max_dict_buffer_bytes) {
fprintf(stdout, "Block Size: %" ROCKSDB_PRIszt "\n", block_size); fprintf(stdout, "Block Size: %" ROCKSDB_PRIszt "\n", block_size);
for (auto& i : compression_types) { for (auto& i : compression_types) {
if (CompressionTypeSupported(i.first)) { if (CompressionTypeSupported(i.first)) {
@ -243,6 +244,7 @@ Status SstFileDumper::ShowAllCompressionSizes(
CompressionOptions compress_opt; CompressionOptions compress_opt;
compress_opt.max_dict_bytes = max_dict_bytes; compress_opt.max_dict_bytes = max_dict_bytes;
compress_opt.zstd_max_train_bytes = zstd_max_train_bytes; compress_opt.zstd_max_train_bytes = zstd_max_train_bytes;
compress_opt.max_dict_buffer_bytes = max_dict_buffer_bytes;
for (int32_t j = compress_level_from; j <= compress_level_to; j++) { for (int32_t j = compress_level_from; j <= compress_level_to; j++) {
fprintf(stdout, "Compression level: %d", j); fprintf(stdout, "Compression level: %d", j);
compress_opt.level = j; compress_opt.level = j;

View File

@ -40,7 +40,8 @@ class SstFileDumper {
const std::vector<std::pair<CompressionType, const char*>>& const std::vector<std::pair<CompressionType, const char*>>&
compression_types, compression_types,
int32_t compress_level_from, int32_t compress_level_to, int32_t compress_level_from, int32_t compress_level_to,
uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes); uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes,
uint64_t max_dict_buffer_bytes);
Status ShowCompressionSize(size_t block_size, CompressionType compress_type, Status ShowCompressionSize(size_t block_size, CompressionType compress_type,
const CompressionOptions& compress_opt); const CompressionOptions& compress_opt);

View File

@ -948,6 +948,10 @@ DEFINE_int32(min_level_to_compress, -1, "If non-negative, compression starts"
DEFINE_int32(compression_parallel_threads, 1, DEFINE_int32(compression_parallel_threads, 1,
"Number of threads for parallel compression."); "Number of threads for parallel compression.");
DEFINE_uint64(compression_max_dict_buffer_bytes,
ROCKSDB_NAMESPACE::CompressionOptions().max_dict_buffer_bytes,
"Maximum bytes to buffer to collect samples for dictionary.");
static bool ValidateTableCacheNumshardbits(const char* flagname, static bool ValidateTableCacheNumshardbits(const char* flagname,
int32_t value) { int32_t value) {
if (0 >= value || value > 20) { if (0 >= value || value > 20) {
@ -4053,6 +4057,8 @@ class Benchmark {
FLAGS_compression_zstd_max_train_bytes; FLAGS_compression_zstd_max_train_bytes;
options.compression_opts.parallel_threads = options.compression_opts.parallel_threads =
FLAGS_compression_parallel_threads; FLAGS_compression_parallel_threads;
options.compression_opts.max_dict_buffer_bytes =
FLAGS_compression_max_dict_buffer_bytes;
// If this is a block based table, set some related options // If this is a block based table, set some related options
auto table_options = auto table_options =
options.table_factory->GetOptions<BlockBasedTableOptions>(); options.table_factory->GetOptions<BlockBasedTableOptions>();

View File

@ -50,6 +50,7 @@ default_params = {
# Disabled compression_parallel_threads as the feature is not stable # Disabled compression_parallel_threads as the feature is not stable
# lambda: random.choice([1] * 9 + [4]) # lambda: random.choice([1] * 9 + [4])
"compression_parallel_threads": 1, "compression_parallel_threads": 1,
"compression_max_dict_buffer_bytes": lambda: 4096 * random.randint(0, 32),
"clear_column_family_one_in": 0, "clear_column_family_one_in": 0,
"compact_files_one_in": 1000000, "compact_files_one_in": 1000000,
"compact_range_one_in": 1000000, "compact_range_one_in": 1000000,
@ -267,8 +268,10 @@ best_efforts_recovery_params = {
def finalize_and_sanitize(src_params): def finalize_and_sanitize(src_params):
dest_params = dict([(k, v() if callable(v) else v) dest_params = dict([(k, v() if callable(v) else v)
for (k, v) in src_params.items()]) for (k, v) in src_params.items()])
if dest_params.get("compression_type") != "zstd" or \ if dest_params.get("compression_max_dict_bytes") == 0:
dest_params.get("compression_max_dict_bytes") == 0: dest_params["compression_zstd_max_train_bytes"] = 0
dest_params["compression_max_dict_buffer_bytes"] = 0
if dest_params.get("compression_type") != "zstd":
dest_params["compression_zstd_max_train_bytes"] = 0 dest_params["compression_zstd_max_train_bytes"] = 0
if dest_params.get("allow_concurrent_memtable_write", 1) == 1: if dest_params.get("allow_concurrent_memtable_write", 1) == 1:
dest_params["memtablerep"] = "skip_list" dest_params["memtablerep"] = "skip_list"

View File

@ -103,6 +103,9 @@ void print_help(bool to_stderr) {
--compression_zstd_max_train_bytes=<uint32_t> --compression_zstd_max_train_bytes=<uint32_t>
Maximum size of training data passed to zstd's dictionary trainer Maximum size of training data passed to zstd's dictionary trainer
--compression_max_dict_buffer_bytes=<int64_t>
Limit on buffer size from which we collect samples for dictionary generation.
)"); )");
} }
@ -166,6 +169,8 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) {
ROCKSDB_NAMESPACE::CompressionOptions().max_dict_bytes; ROCKSDB_NAMESPACE::CompressionOptions().max_dict_bytes;
uint32_t compression_zstd_max_train_bytes = uint32_t compression_zstd_max_train_bytes =
ROCKSDB_NAMESPACE::CompressionOptions().zstd_max_train_bytes; ROCKSDB_NAMESPACE::CompressionOptions().zstd_max_train_bytes;
uint64_t compression_max_dict_buffer_bytes =
ROCKSDB_NAMESPACE::CompressionOptions().max_dict_buffer_bytes;
int64_t tmp_val; int64_t tmp_val;
@ -276,6 +281,17 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) {
return 1; return 1;
} }
compression_zstd_max_train_bytes = static_cast<uint32_t>(tmp_val); compression_zstd_max_train_bytes = static_cast<uint32_t>(tmp_val);
} else if (ParseIntArg(argv[i], "--compression_max_dict_buffer_bytes=",
"compression_max_dict_buffer_bytes must be numeric",
&tmp_val)) {
if (tmp_val < 0) {
fprintf(stderr,
"compression_max_dict_buffer_bytes must be positive: '%s'\n",
argv[i]);
print_help(/*to_stderr*/ true);
return 1;
}
compression_max_dict_buffer_bytes = static_cast<uint64_t>(tmp_val);
} else if (strcmp(argv[i], "--help") == 0) { } else if (strcmp(argv[i], "--help") == 0) {
print_help(/*to_stderr*/ false); print_help(/*to_stderr*/ false);
return 0; return 0;
@ -404,7 +420,7 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) {
set_block_size ? block_size : 16384, set_block_size ? block_size : 16384,
compression_types.empty() ? kCompressions : compression_types, compression_types.empty() ? kCompressions : compression_types,
compress_level_from, compress_level_to, compression_max_dict_bytes, compress_level_from, compress_level_to, compression_max_dict_bytes,
compression_zstd_max_train_bytes); compression_zstd_max_train_bytes, compression_max_dict_buffer_bytes);
if (!st.ok()) { if (!st.ok()) {
fprintf(stderr, "Failed to recompress: %s\n", st.ToString().c_str()); fprintf(stderr, "Failed to recompress: %s\n", st.ToString().c_str());
exit(1); exit(1);

View File

@ -627,6 +627,9 @@ inline std::string CompressionOptionsToString(
result.append("enabled=") result.append("enabled=")
.append(ToString(compression_options.enabled)) .append(ToString(compression_options.enabled))
.append("; "); .append("; ");
result.append("max_dict_buffer_bytes=")
.append(ToString(compression_options.max_dict_buffer_bytes))
.append("; ");
return result; return result;
} }

View File

@ -279,9 +279,16 @@ bool StartsWith(const std::string& string, const std::string& pattern) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
bool ParseBoolean(const std::string& type, const std::string& value) { bool ParseBoolean(const std::string& type, const std::string& value) {
if (value == "true" || value == "1") { const static std::string kTrue = "true", kFalse = "false";
if (value.compare(0 /* pos */, kTrue.size(), kTrue) == 0) {
return true; return true;
} else if (value == "false" || value == "0") { } else if (value.compare(0 /* pos */, kFalse.size(), kFalse) == 0) {
return false;
}
int num = ParseInt(value);
if (num == 1) {
return true;
} else if (num == 0) {
return false; return false;
} }
throw std::invalid_argument(type); throw std::invalid_argument(type);

View File

@ -22,6 +22,9 @@ namespace ROCKSDB_NAMESPACE {
// A Timer class to handle repeated work. // A Timer class to handle repeated work.
// //
// `Start()` and `Shutdown()` are currently not thread-safe. The client must
// serialize calls to these two member functions.
//
// A single timer instance can handle multiple functions via a single thread. // A single timer instance can handle multiple functions via a single thread.
// It is better to leave long running work to a dedicated thread pool. // It is better to leave long running work to a dedicated thread pool.
// //

View File

@ -153,7 +153,12 @@ typedef struct toku_mutex_aligned {
{ .pmutex = PTHREAD_MUTEX_INITIALIZER, .psi_mutex = nullptr } { .pmutex = PTHREAD_MUTEX_INITIALIZER, .psi_mutex = nullptr }
#endif // defined(TOKU_PTHREAD_DEBUG) #endif // defined(TOKU_PTHREAD_DEBUG)
#else // __FreeBSD__, __linux__, at least #else // __FreeBSD__, __linux__, at least
#if defined(__GLIBC__)
#define TOKU_MUTEX_ADAPTIVE PTHREAD_MUTEX_ADAPTIVE_NP #define TOKU_MUTEX_ADAPTIVE PTHREAD_MUTEX_ADAPTIVE_NP
#else
// not all libc (e.g. musl) implement NP (Non-POSIX) attributes
#define TOKU_MUTEX_ADAPTIVE PTHREAD_MUTEX_DEFAULT
#endif
#if defined(TOKU_PTHREAD_DEBUG) #if defined(TOKU_PTHREAD_DEBUG)
#define TOKU_ADAPTIVE_MUTEX_INITIALIZER \ #define TOKU_ADAPTIVE_MUTEX_INITIALIZER \
{ \ { \

View File

@ -46,6 +46,22 @@ class OptimisticTransactionDBImpl : public OptimisticTransactionDB {
const OptimisticTransactionOptions& txn_options, const OptimisticTransactionOptions& txn_options,
Transaction* old_txn) override; Transaction* old_txn) override;
// Transactional `DeleteRange()` is not yet supported.
virtual Status DeleteRange(const WriteOptions&, ColumnFamilyHandle*,
const Slice&, const Slice&) override {
return Status::NotSupported();
}
// Range deletions also must not be snuck into `WriteBatch`es as they are
// incompatible with `OptimisticTransactionDB`.
virtual Status Write(const WriteOptions& write_opts,
WriteBatch* batch) override {
if (batch->HasDeleteRange()) {
return Status::NotSupported();
}
return OptimisticTransactionDB::Write(write_opts, batch);
}
size_t GetLockBucketsSize() const { return bucketed_locks_.size(); } size_t GetLockBucketsSize() const { return bucketed_locks_.size(); }
OccValidationPolicy GetValidatePolicy() const { return validate_policy_; } OccValidationPolicy GetValidatePolicy() const { return validate_policy_; }

View File

@ -1033,6 +1033,17 @@ TEST_P(OptimisticTransactionTest, IteratorTest) {
delete txn; delete txn;
} }
TEST_P(OptimisticTransactionTest, DeleteRangeSupportTest) {
// `OptimisticTransactionDB` does not allow range deletion in any API.
ASSERT_TRUE(
txn_db
->DeleteRange(WriteOptions(), txn_db->DefaultColumnFamily(), "a", "b")
.IsNotSupported());
WriteBatch wb;
ASSERT_OK(wb.DeleteRange("a", "b"));
ASSERT_NOK(txn_db->Write(WriteOptions(), &wb));
}
TEST_P(OptimisticTransactionTest, SavepointTest) { TEST_P(OptimisticTransactionTest, SavepointTest) {
WriteOptions write_options; WriteOptions write_options;
ReadOptions read_options, snapshot_read_options; ReadOptions read_options, snapshot_read_options;

View File

@ -4835,6 +4835,56 @@ TEST_P(TransactionTest, MergeTest) {
ASSERT_EQ("a,3", value); ASSERT_EQ("a,3", value);
} }
TEST_P(TransactionTest, DeleteRangeSupportTest) {
// The `DeleteRange()` API is banned everywhere.
ASSERT_TRUE(
db->DeleteRange(WriteOptions(), db->DefaultColumnFamily(), "a", "b")
.IsNotSupported());
// But range deletions can be added via the `Write()` API by specifying the
// proper flags to promise there are no conflicts according to the DB type
// (see `TransactionDB::DeleteRange()` API doc for details).
for (bool skip_concurrency_control : {false, true}) {
for (bool skip_duplicate_key_check : {false, true}) {
ASSERT_OK(db->Put(WriteOptions(), "a", "val"));
WriteBatch wb;
ASSERT_OK(wb.DeleteRange("a", "b"));
TransactionDBWriteOptimizations flags;
flags.skip_concurrency_control = skip_concurrency_control;
flags.skip_duplicate_key_check = skip_duplicate_key_check;
Status s = db->Write(WriteOptions(), flags, &wb);
std::string value;
switch (txn_db_options.write_policy) {
case WRITE_COMMITTED:
if (skip_concurrency_control) {
ASSERT_OK(s);
ASSERT_TRUE(db->Get(ReadOptions(), "a", &value).IsNotFound());
} else {
ASSERT_NOK(s);
ASSERT_OK(db->Get(ReadOptions(), "a", &value));
}
break;
case WRITE_PREPARED:
// Intentional fall-through
case WRITE_UNPREPARED:
if (skip_concurrency_control && skip_duplicate_key_check) {
ASSERT_OK(s);
ASSERT_TRUE(db->Get(ReadOptions(), "a", &value).IsNotFound());
} else {
ASSERT_NOK(s);
ASSERT_OK(db->Get(ReadOptions(), "a", &value));
}
break;
}
// Without any promises from the user, range deletion via other `Write()`
// APIs are still banned.
ASSERT_OK(db->Put(WriteOptions(), "a", "val"));
ASSERT_NOK(db->Write(WriteOptions(), &wb));
ASSERT_OK(db->Get(ReadOptions(), "a", &value));
}
}
}
TEST_P(TransactionTest, DeferSnapshotTest) { TEST_P(TransactionTest, DeferSnapshotTest) {
WriteOptions write_options; WriteOptions write_options;
ReadOptions read_options; ReadOptions read_options;

View File

@ -157,7 +157,9 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
// TODO(myabandeh): add an option to allow user skipping this cost // TODO(myabandeh): add an option to allow user skipping this cost
SubBatchCounter counter(*GetCFComparatorMap()); SubBatchCounter counter(*GetCFComparatorMap());
auto s = batch->Iterate(&counter); auto s = batch->Iterate(&counter);
assert(s.ok()); if (!s.ok()) {
return s;
}
batch_cnt = counter.BatchCount(); batch_cnt = counter.BatchCount();
WPRecordTick(TXN_DUPLICATE_KEY_OVERHEAD); WPRecordTick(TXN_DUPLICATE_KEY_OVERHEAD);
ROCKS_LOG_DETAILS(info_log_, "Duplicate key overhead: %" PRIu64 " batches", ROCKS_LOG_DETAILS(info_log_, "Duplicate key overhead: %" PRIu64 " batches",