Compare commits
13 Commits
main
...
6.17.fb.la
Author | SHA1 | Date | |
---|---|---|---|
|
58ec8cf304 | ||
|
553aafaf8e | ||
|
1315375542 | ||
|
a471d31e04 | ||
|
8797aea803 | ||
|
9092ebed39 | ||
|
675f351cc8 | ||
|
11b42f92b1 | ||
|
14d173ec81 | ||
|
8b30b8d2a0 | ||
|
b6471f8a5c | ||
|
3dd5bc2a25 | ||
|
48edcfc17d |
25
HISTORY.md
25
HISTORY.md
@ -1,9 +1,26 @@
|
||||
# Rocksdb Change Log
|
||||
## Unreleased
|
||||
### Public API Change
|
||||
* Add an option, `CompressionOptions::max_dict_buffer_bytes`, to limit the in-memory buffering for selecting samples for generating/training a dictionary. The limit is currently loosely adhered to.
|
||||
|
||||
## 6.17.2 (02/05/2021)
|
||||
### Bug Fixes
|
||||
* Since 6.15.0, `TransactionDB` returns error `Status`es from calls to `DeleteRange()` and calls to `Write()` where the `WriteBatch` contains a range deletion. Previously such operations may have succeeded while not providing the expected transactional guarantees. There are certain cases where range deletion can still be used on such DBs; see the API doc on `TransactionDB::DeleteRange()` for details.
|
||||
* `OptimisticTransactionDB` now returns error `Status`es from calls to `DeleteRange()` and calls to `Write()` where the `WriteBatch` contains a range deletion. Previously such operations may have succeeded while not providing the expected transactional guarantees.
|
||||
|
||||
## 6.17.1 (01/28/2021)
|
||||
### Behavior Changes
|
||||
* When retryable IO error occurs during compaction, it is mapped to soft error and set the BG error. However, auto resume is not called to clean the soft error since compaction will reschedule by itself. In this change, When retryable IO error occurs during compaction, BG error is not set. User will be informed the error via EventHelper.
|
||||
|
||||
## 6.17.0 (01/15/2021)
|
||||
### Behavior Changes
|
||||
* When verifying full file checksum with `DB::VerifyFileChecksums()`, we now fail with `Status::InvalidArgument` if the name of the checksum generator used for verification does not match the name of the checksum generator used for protecting the file when it was created.
|
||||
* Since RocksDB does not continue write the same file if a file write fails for any reason, the file scope write IO error is treated the same as retryable IO error. More information about error handling of file scope IO error is included in `ErrorHandler::SetBGError`.
|
||||
|
||||
### Bug Fixes
|
||||
* Version older than 6.15 cannot decode VersionEdits `WalAddition` and `WalDeletion`, fixed this by changing the encoded format of them to be ignorable by older versions.
|
||||
* Fix a race condition between DB startups and shutdowns in managing the periodic background worker threads. One effect of this race condition could be the process being terminated.
|
||||
|
||||
### Public API Change
|
||||
* Add a public API WriteBufferManager::dummy_entries_in_cache_usage() which reports the size of dummy entries stored in cache (passed to WriteBufferManager). Dummy entries are used to account for DataBlocks.
|
||||
|
||||
@ -11,10 +28,6 @@
|
||||
### Behavior Changes
|
||||
* Attempting to write a merge operand without explicitly configuring `merge_operator` now fails immediately, causing the DB to enter read-only mode. Previously, failure was deferred until the `merge_operator` was needed by a user read or a background operation.
|
||||
|
||||
### API Changes
|
||||
* `rocksdb_approximate_sizes` and `rocksdb_approximate_sizes_cf` in the C API now requires an error pointer (`char** errptr`) for receiving any error.
|
||||
* All overloads of DB::GetApproximateSizes now return Status, so that any failure to obtain the sizes is indicated to the caller.
|
||||
|
||||
### Bug Fixes
|
||||
* Truncated WALs ending in incomplete records can no longer produce gaps in the recovered data when `WALRecoveryMode::kPointInTimeRecovery` is used. Gaps are still possible when WALs are truncated exactly on record boundaries; for complete protection, users should enable `track_and_verify_wals_in_manifest`.
|
||||
* Fix a bug where compressed blocks read by MultiGet are not inserted into the compressed block cache when use_direct_reads = true.
|
||||
@ -33,6 +46,9 @@
|
||||
### Public API Change
|
||||
* Deprecated public but rarely-used FilterBitsBuilder::CalculateNumEntry, which is replaced with ApproximateNumEntries taking a size_t parameter and returning size_t.
|
||||
* To improve portability the functions `Env::GetChildren` and `Env::GetChildrenFileAttributes` will no longer return entries for the special directories `.` or `..`.
|
||||
* Added a new option `track_and_verify_wals_in_manifest`. If `true`, the log numbers and sizes of the synced WALs are tracked in MANIFEST, then during DB recovery, if a synced WAL is missing from disk, or the WAL's size does not match the recorded size in MANIFEST, an error will be reported and the recovery will be aborted. Note that this option does not work with secondary instance.
|
||||
* `rocksdb_approximate_sizes` and `rocksdb_approximate_sizes_cf` in the C API now requires an error pointer (`char** errptr`) for receiving any error.
|
||||
* All overloads of DB::GetApproximateSizes now return Status, so that any failure to obtain the sizes is indicated to the caller.
|
||||
|
||||
## 6.15.0 (11/13/2020)
|
||||
### Bug Fixes
|
||||
@ -54,7 +70,6 @@
|
||||
### Public API Change
|
||||
* Deprecate `BlockBasedTableOptions::pin_l0_filter_and_index_blocks_in_cache` and `BlockBasedTableOptions::pin_top_level_index_and_filter`. These options still take effect until users migrate to the replacement APIs in `BlockBasedTableOptions::metadata_cache_options`. Migration guidance can be found in the API comments on the deprecated options.
|
||||
* Add new API `DB::VerifyFileChecksums` to verify SST file checksum with corresponding entries in the MANIFEST if present. Current implementation requires scanning and recomputing file checksums.
|
||||
* Added a new option `track_and_verify_wals_in_manifest`. If `true`, the log numbers and sizes of the synced WALs are tracked in MANIFEST, then during DB recovery, if a synced WAL is missing from disk, or the WAL's size does not match the recorded size in MANIFEST, an error will be reported and the recovery will be aborted. Note that this option does not work with secondary instance.
|
||||
|
||||
### Behavior Changes
|
||||
* The dictionary compression settings specified in `ColumnFamilyOptions::compression_opts` now additionally affect files generated by flush and compaction to non-bottommost level. Previously those settings at most affected files generated by compaction to bottommost level, depending on whether `ColumnFamilyOptions::bottommost_compression_opts` overrode them. Users who relied on dictionary compression settings in `ColumnFamilyOptions::compression_opts` affecting only the bottommost level can keep the behavior by moving their dictionary settings to `ColumnFamilyOptions::bottommost_compression_opts` and setting its `enabled` flag.
|
||||
|
13
db/c.cc
13
db/c.cc
@ -2774,6 +2774,14 @@ void rocksdb_options_set_bottommost_compression_options_zstd_max_train_bytes(
|
||||
opt->rep.bottommost_compression_opts.enabled = enabled;
|
||||
}
|
||||
|
||||
void rocksdb_options_set_bottommost_compression_options_max_dict_buffer_bytes(
|
||||
rocksdb_options_t* opt, uint64_t max_dict_buffer_bytes,
|
||||
unsigned char enabled) {
|
||||
opt->rep.bottommost_compression_opts.max_dict_buffer_bytes =
|
||||
max_dict_buffer_bytes;
|
||||
opt->rep.bottommost_compression_opts.enabled = enabled;
|
||||
}
|
||||
|
||||
void rocksdb_options_set_compression_options(rocksdb_options_t* opt, int w_bits,
|
||||
int level, int strategy,
|
||||
int max_dict_bytes) {
|
||||
@ -2788,6 +2796,11 @@ void rocksdb_options_set_compression_options_zstd_max_train_bytes(
|
||||
opt->rep.compression_opts.zstd_max_train_bytes = zstd_max_train_bytes;
|
||||
}
|
||||
|
||||
void rocksdb_options_set_compression_options_max_dict_buffer_bytes(
|
||||
rocksdb_options_t* opt, uint64_t max_dict_buffer_bytes) {
|
||||
opt->rep.compression_opts.max_dict_buffer_bytes = max_dict_buffer_bytes;
|
||||
}
|
||||
|
||||
void rocksdb_options_set_prefix_extractor(
|
||||
rocksdb_options_t* opt, rocksdb_slicetransform_t* prefix_extractor) {
|
||||
opt->rep.prefix_extractor.reset(prefix_extractor);
|
||||
|
@ -954,6 +954,9 @@ class DBImpl : public DB {
|
||||
// is only for the special test of CancelledCompactions
|
||||
Status TEST_WaitForCompact(bool waitUnscheduled = false);
|
||||
|
||||
// Get the background error status
|
||||
Status TEST_GetBGError();
|
||||
|
||||
// Return the maximum overlapping data (in bytes) at next level for any
|
||||
// file at a level >= 1.
|
||||
int64_t TEST_MaxNextLevelOverlappingBytes(
|
||||
|
@ -177,6 +177,11 @@ Status DBImpl::TEST_WaitForCompact(bool wait_unscheduled) {
|
||||
return error_handler_.GetBGError();
|
||||
}
|
||||
|
||||
Status DBImpl::TEST_GetBGError() {
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
return error_handler_.GetBGError();
|
||||
}
|
||||
|
||||
void DBImpl::TEST_LockMutex() { mutex_.Lock(); }
|
||||
|
||||
void DBImpl::TEST_UnlockMutex() { mutex_.Unlock(); }
|
||||
|
100
db/db_test2.cc
100
db/db_test2.cc
@ -1410,67 +1410,88 @@ INSTANTIATE_TEST_CASE_P(
|
||||
|
||||
TEST_P(PresetCompressionDictTest, Flush) {
|
||||
// Verifies that dictionary is generated and written during flush only when
|
||||
// `ColumnFamilyOptions::compression` enables dictionary.
|
||||
// `ColumnFamilyOptions::compression` enables dictionary. Also verifies the
|
||||
// size of the dictionary is within expectations according to the limit on
|
||||
// buffering set by `CompressionOptions::max_dict_buffer_bytes`.
|
||||
const size_t kValueLen = 256;
|
||||
const size_t kKeysPerFile = 1 << 10;
|
||||
const size_t kDictLen = 4 << 10;
|
||||
const size_t kDictLen = 16 << 10;
|
||||
const size_t kBlockLen = 4 << 10;
|
||||
|
||||
Options options = CurrentOptions();
|
||||
if (bottommost_) {
|
||||
options.bottommost_compression = compression_type_;
|
||||
options.bottommost_compression_opts.enabled = true;
|
||||
options.bottommost_compression_opts.max_dict_bytes = kDictLen;
|
||||
options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
|
||||
} else {
|
||||
options.compression = compression_type_;
|
||||
options.compression_opts.max_dict_bytes = kDictLen;
|
||||
options.compression_opts.max_dict_buffer_bytes = kBlockLen;
|
||||
}
|
||||
options.memtable_factory.reset(new SpecialSkipListFactory(kKeysPerFile));
|
||||
options.statistics = CreateDBStatistics();
|
||||
BlockBasedTableOptions bbto;
|
||||
bbto.block_size = kBlockLen;
|
||||
bbto.cache_index_and_filter_blocks = true;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
||||
Reopen(options);
|
||||
|
||||
uint64_t prev_compression_dict_misses =
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS);
|
||||
Random rnd(301);
|
||||
for (size_t i = 0; i <= kKeysPerFile; ++i) {
|
||||
ASSERT_OK(Put(Key(static_cast<int>(i)), rnd.RandomString(kValueLen)));
|
||||
}
|
||||
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
|
||||
|
||||
// If there's a compression dictionary, it should have been loaded when the
|
||||
// flush finished, incurring a cache miss.
|
||||
uint64_t expected_compression_dict_misses;
|
||||
// We can use `BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT` to detect whether a
|
||||
// compression dictionary exists since dictionaries would be preloaded when
|
||||
// the flush finishes.
|
||||
if (bottommost_) {
|
||||
expected_compression_dict_misses = prev_compression_dict_misses;
|
||||
// Flush is never considered bottommost. This should change in the future
|
||||
// since flushed files may have nothing underneath them, like the one in
|
||||
// this test case.
|
||||
ASSERT_EQ(
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
|
||||
0);
|
||||
} else {
|
||||
expected_compression_dict_misses = prev_compression_dict_misses + 1;
|
||||
ASSERT_GT(
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
|
||||
0);
|
||||
// Although we limited buffering to `kBlockLen`, there may be up to two
|
||||
// blocks of data included in the dictionary since we only check limit after
|
||||
// each block is built.
|
||||
ASSERT_LE(
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
|
||||
2 * kBlockLen);
|
||||
}
|
||||
ASSERT_EQ(expected_compression_dict_misses,
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
|
||||
}
|
||||
|
||||
TEST_P(PresetCompressionDictTest, CompactNonBottommost) {
|
||||
// Verifies that dictionary is generated and written during compaction to
|
||||
// non-bottommost level only when `ColumnFamilyOptions::compression` enables
|
||||
// dictionary.
|
||||
// dictionary. Also verifies the size of the dictionary is within expectations
|
||||
// according to the limit on buffering set by
|
||||
// `CompressionOptions::max_dict_buffer_bytes`.
|
||||
const size_t kValueLen = 256;
|
||||
const size_t kKeysPerFile = 1 << 10;
|
||||
const size_t kDictLen = 4 << 10;
|
||||
const size_t kDictLen = 16 << 10;
|
||||
const size_t kBlockLen = 4 << 10;
|
||||
|
||||
Options options = CurrentOptions();
|
||||
if (bottommost_) {
|
||||
options.bottommost_compression = compression_type_;
|
||||
options.bottommost_compression_opts.enabled = true;
|
||||
options.bottommost_compression_opts.max_dict_bytes = kDictLen;
|
||||
options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
|
||||
} else {
|
||||
options.compression = compression_type_;
|
||||
options.compression_opts.max_dict_bytes = kDictLen;
|
||||
options.compression_opts.max_dict_buffer_bytes = kBlockLen;
|
||||
}
|
||||
options.disable_auto_compactions = true;
|
||||
options.statistics = CreateDBStatistics();
|
||||
BlockBasedTableOptions bbto;
|
||||
bbto.block_size = kBlockLen;
|
||||
bbto.cache_index_and_filter_blocks = true;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
||||
Reopen(options);
|
||||
@ -1492,8 +1513,8 @@ TEST_P(PresetCompressionDictTest, CompactNonBottommost) {
|
||||
ASSERT_EQ("2,0,1", FilesPerLevel(0));
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
uint64_t prev_compression_dict_misses =
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS);
|
||||
uint64_t prev_compression_dict_bytes_inserted =
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT);
|
||||
// This L0->L1 compaction merges the two L0 files into L1. The produced L1
|
||||
// file is not bottommost due to the existing L2 file covering the same key-
|
||||
// range.
|
||||
@ -1501,38 +1522,52 @@ TEST_P(PresetCompressionDictTest, CompactNonBottommost) {
|
||||
#ifndef ROCKSDB_LITE
|
||||
ASSERT_EQ("0,1,1", FilesPerLevel(0));
|
||||
#endif // ROCKSDB_LITE
|
||||
// If there's a compression dictionary, it should have been loaded when the
|
||||
// compaction finished, incurring a cache miss.
|
||||
uint64_t expected_compression_dict_misses;
|
||||
// We can use `BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT` to detect whether a
|
||||
// compression dictionary exists since dictionaries would be preloaded when
|
||||
// the compaction finishes.
|
||||
if (bottommost_) {
|
||||
expected_compression_dict_misses = prev_compression_dict_misses;
|
||||
ASSERT_EQ(
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
|
||||
prev_compression_dict_bytes_inserted);
|
||||
} else {
|
||||
expected_compression_dict_misses = prev_compression_dict_misses + 1;
|
||||
ASSERT_GT(
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
|
||||
prev_compression_dict_bytes_inserted);
|
||||
// Although we limited buffering to `kBlockLen`, there may be up to two
|
||||
// blocks of data included in the dictionary since we only check limit after
|
||||
// each block is built.
|
||||
ASSERT_LE(
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
|
||||
prev_compression_dict_bytes_inserted + 2 * kBlockLen);
|
||||
}
|
||||
ASSERT_EQ(expected_compression_dict_misses,
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
|
||||
}
|
||||
|
||||
TEST_P(PresetCompressionDictTest, CompactBottommost) {
|
||||
// Verifies that dictionary is generated and written during compaction to
|
||||
// non-bottommost level only when either `ColumnFamilyOptions::compression` or
|
||||
// `ColumnFamilyOptions::bottommost_compression` enables dictionary.
|
||||
// `ColumnFamilyOptions::bottommost_compression` enables dictionary. Also
|
||||
// verifies the size of the dictionary is within expectations according to the
|
||||
// limit on buffering set by `CompressionOptions::max_dict_buffer_bytes`.
|
||||
const size_t kValueLen = 256;
|
||||
const size_t kKeysPerFile = 1 << 10;
|
||||
const size_t kDictLen = 4 << 10;
|
||||
const size_t kDictLen = 16 << 10;
|
||||
const size_t kBlockLen = 4 << 10;
|
||||
|
||||
Options options = CurrentOptions();
|
||||
if (bottommost_) {
|
||||
options.bottommost_compression = compression_type_;
|
||||
options.bottommost_compression_opts.enabled = true;
|
||||
options.bottommost_compression_opts.max_dict_bytes = kDictLen;
|
||||
options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
|
||||
} else {
|
||||
options.compression = compression_type_;
|
||||
options.compression_opts.max_dict_bytes = kDictLen;
|
||||
options.compression_opts.max_dict_buffer_bytes = kBlockLen;
|
||||
}
|
||||
options.disable_auto_compactions = true;
|
||||
options.statistics = CreateDBStatistics();
|
||||
BlockBasedTableOptions bbto;
|
||||
bbto.block_size = kBlockLen;
|
||||
bbto.cache_index_and_filter_blocks = true;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
||||
Reopen(options);
|
||||
@ -1548,17 +1583,22 @@ TEST_P(PresetCompressionDictTest, CompactBottommost) {
|
||||
ASSERT_EQ("2", FilesPerLevel(0));
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
uint64_t prev_compression_dict_misses =
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS);
|
||||
uint64_t prev_compression_dict_bytes_inserted =
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT);
|
||||
CompactRangeOptions cro;
|
||||
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
|
||||
#ifndef ROCKSDB_LITE
|
||||
ASSERT_EQ("0,1", FilesPerLevel(0));
|
||||
#endif // ROCKSDB_LITE
|
||||
// If there's a compression dictionary, it should have been loaded when the
|
||||
// compaction finished, incurring a cache miss.
|
||||
ASSERT_EQ(prev_compression_dict_misses + 1,
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
|
||||
ASSERT_GT(
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
|
||||
prev_compression_dict_bytes_inserted);
|
||||
// Although we limited buffering to `kBlockLen`, there may be up to two
|
||||
// blocks of data included in the dictionary since we only check limit after
|
||||
// each block is built.
|
||||
ASSERT_LE(
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
|
||||
prev_compression_dict_bytes_inserted + 2 * kBlockLen);
|
||||
}
|
||||
|
||||
class CompactionCompressionListener : public EventListener {
|
||||
|
@ -417,11 +417,11 @@ const Status& ErrorHandler::SetBGError(const IOStatus& bg_io_err,
|
||||
&new_bg_io_err, db_mutex_,
|
||||
&auto_recovery);
|
||||
if (BackgroundErrorReason::kCompaction == reason) {
|
||||
Status bg_err(new_bg_io_err, Status::Severity::kSoftError);
|
||||
if (bg_err.severity() > bg_error_.severity()) {
|
||||
bg_error_ = bg_err;
|
||||
}
|
||||
recover_context_ = context;
|
||||
// We map the retryable IO error during compaction to soft error. Since
|
||||
// compaction can reschedule by itself. We will not set the BG error in
|
||||
// this case
|
||||
// TODO: a better way to set or clean the retryable IO error which
|
||||
// happens during compaction SST file write.
|
||||
return bg_error_;
|
||||
} else if (BackgroundErrorReason::kFlushNoWAL == reason ||
|
||||
BackgroundErrorReason::kManifestWriteNoWAL == reason) {
|
||||
|
@ -892,15 +892,17 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableError) {
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"CompactionJob::OpenCompactionOutputFile",
|
||||
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::BackgroundCompaction:Finish",
|
||||
[&](void*) { CancelAllBackgroundWork(dbfull()); });
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ASSERT_OK(Put(Key(1), "val"));
|
||||
s = Flush();
|
||||
ASSERT_OK(s);
|
||||
|
||||
s = dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
|
||||
s = dbfull()->TEST_GetBGError();
|
||||
ASSERT_OK(s);
|
||||
fault_fs_->SetFilesystemActive(true);
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
@ -940,14 +942,17 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteFileScopeError) {
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"CompactionJob::OpenCompactionOutputFile",
|
||||
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::BackgroundCompaction:Finish",
|
||||
[&](void*) { CancelAllBackgroundWork(dbfull()); });
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ASSERT_OK(Put(Key(1), "val"));
|
||||
s = Flush();
|
||||
ASSERT_OK(s);
|
||||
|
||||
s = dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
s = dbfull()->TEST_GetBGError();
|
||||
ASSERT_OK(s);
|
||||
|
||||
fault_fs_->SetFilesystemActive(true);
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
@ -2190,8 +2195,7 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableErrorAutoRecover) {
|
||||
ASSERT_OK(s);
|
||||
|
||||
s = dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
|
||||
ASSERT_OK(s);
|
||||
TEST_SYNC_POINT("CompactionWriteRetryableErrorAutoRecover0");
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
@ -10,13 +10,14 @@
|
||||
#ifndef ROCKSDB_LITE
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
PeriodicWorkScheduler::PeriodicWorkScheduler(Env* env) {
|
||||
PeriodicWorkScheduler::PeriodicWorkScheduler(Env* env) : timer_mu_(env) {
|
||||
timer = std::unique_ptr<Timer>(new Timer(env));
|
||||
}
|
||||
|
||||
void PeriodicWorkScheduler::Register(DBImpl* dbi,
|
||||
unsigned int stats_dump_period_sec,
|
||||
unsigned int stats_persist_period_sec) {
|
||||
MutexLock l(&timer_mu_);
|
||||
static std::atomic<uint64_t> initial_delay(0);
|
||||
timer->Start();
|
||||
if (stats_dump_period_sec > 0) {
|
||||
@ -41,6 +42,7 @@ void PeriodicWorkScheduler::Register(DBImpl* dbi,
|
||||
}
|
||||
|
||||
void PeriodicWorkScheduler::Unregister(DBImpl* dbi) {
|
||||
MutexLock l(&timer_mu_);
|
||||
timer->Cancel(GetTaskName(dbi, "dump_st"));
|
||||
timer->Cancel(GetTaskName(dbi, "pst_st"));
|
||||
timer->Cancel(GetTaskName(dbi, "flush_info_log"));
|
||||
@ -78,7 +80,10 @@ PeriodicWorkTestScheduler* PeriodicWorkTestScheduler::Default(Env* env) {
|
||||
MutexLock l(&mutex);
|
||||
if (scheduler.timer.get() != nullptr &&
|
||||
scheduler.timer->TEST_GetPendingTaskNum() == 0) {
|
||||
scheduler.timer->Shutdown();
|
||||
{
|
||||
MutexLock timer_mu_guard(&scheduler.timer_mu_);
|
||||
scheduler.timer->Shutdown();
|
||||
}
|
||||
scheduler.timer.reset(new Timer(env));
|
||||
}
|
||||
}
|
||||
|
@ -42,6 +42,12 @@ class PeriodicWorkScheduler {
|
||||
|
||||
protected:
|
||||
std::unique_ptr<Timer> timer;
|
||||
// `timer_mu_` serves two purposes currently:
|
||||
// (1) to ensure calls to `Start()` and `Shutdown()` are serialized, as
|
||||
// they are currently not implemented in a thread-safe way; and
|
||||
// (2) to ensure the `Timer::Add()`s and `Timer::Start()` run atomically, and
|
||||
// the `Timer::Cancel()`s and `Timer::Shutdown()` run atomically.
|
||||
port::Mutex timer_mu_;
|
||||
|
||||
explicit PeriodicWorkScheduler(Env* env);
|
||||
|
||||
|
@ -226,13 +226,17 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
|
||||
}
|
||||
|
||||
for (const auto& wal_addition : wal_additions_) {
|
||||
PutVarint32(dst, kWalAddition);
|
||||
wal_addition.EncodeTo(dst);
|
||||
PutVarint32(dst, kWalAddition2);
|
||||
std::string encoded;
|
||||
wal_addition.EncodeTo(&encoded);
|
||||
PutLengthPrefixedSlice(dst, encoded);
|
||||
}
|
||||
|
||||
if (!wal_deletion_.IsEmpty()) {
|
||||
PutVarint32(dst, kWalDeletion);
|
||||
wal_deletion_.EncodeTo(dst);
|
||||
PutVarint32(dst, kWalDeletion2);
|
||||
std::string encoded;
|
||||
wal_deletion_.EncodeTo(&encoded);
|
||||
PutLengthPrefixedSlice(dst, encoded);
|
||||
}
|
||||
|
||||
// 0 is default and does not need to be explicitly written
|
||||
@ -375,6 +379,11 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
|
||||
|
||||
Status VersionEdit::DecodeFrom(const Slice& src) {
|
||||
Clear();
|
||||
#ifndef NDEBUG
|
||||
bool ignore_ignorable_tags = false;
|
||||
TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:IgnoreIgnorableTags",
|
||||
&ignore_ignorable_tags);
|
||||
#endif
|
||||
Slice input = src;
|
||||
const char* msg = nullptr;
|
||||
uint32_t tag = 0;
|
||||
@ -385,6 +394,11 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
|
||||
Slice str;
|
||||
InternalKey key;
|
||||
while (msg == nullptr && GetVarint32(&input, &tag)) {
|
||||
#ifndef NDEBUG
|
||||
if (ignore_ignorable_tags && tag > kTagSafeIgnoreMask) {
|
||||
tag = kTagSafeIgnoreMask;
|
||||
}
|
||||
#endif
|
||||
switch (tag) {
|
||||
case kDbId:
|
||||
if (GetLengthPrefixedSlice(&input, &str)) {
|
||||
@ -542,7 +556,8 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
|
||||
break;
|
||||
}
|
||||
|
||||
case kBlobFileAddition: {
|
||||
case kBlobFileAddition:
|
||||
case kBlobFileAddition_DEPRECATED: {
|
||||
BlobFileAddition blob_file_addition;
|
||||
const Status s = blob_file_addition.DecodeFrom(&input);
|
||||
if (!s.ok()) {
|
||||
@ -553,7 +568,8 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
|
||||
break;
|
||||
}
|
||||
|
||||
case kBlobFileGarbage: {
|
||||
case kBlobFileGarbage:
|
||||
case kBlobFileGarbage_DEPRECATED: {
|
||||
BlobFileGarbage blob_file_garbage;
|
||||
const Status s = blob_file_garbage.DecodeFrom(&input);
|
||||
if (!s.ok()) {
|
||||
@ -575,6 +591,23 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
|
||||
break;
|
||||
}
|
||||
|
||||
case kWalAddition2: {
|
||||
Slice encoded;
|
||||
if (!GetLengthPrefixedSlice(&input, &encoded)) {
|
||||
msg = "WalAddition not prefixed by length";
|
||||
break;
|
||||
}
|
||||
|
||||
WalAddition wal_addition;
|
||||
const Status s = wal_addition.DecodeFrom(&encoded);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
wal_additions_.emplace_back(std::move(wal_addition));
|
||||
break;
|
||||
}
|
||||
|
||||
case kWalDeletion: {
|
||||
WalDeletion wal_deletion;
|
||||
const Status s = wal_deletion.DecodeFrom(&input);
|
||||
@ -586,6 +619,23 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
|
||||
break;
|
||||
}
|
||||
|
||||
case kWalDeletion2: {
|
||||
Slice encoded;
|
||||
if (!GetLengthPrefixedSlice(&input, &encoded)) {
|
||||
msg = "WalDeletion not prefixed by length";
|
||||
break;
|
||||
}
|
||||
|
||||
WalDeletion wal_deletion;
|
||||
const Status s = wal_deletion.DecodeFrom(&encoded);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
wal_deletion_ = std::move(wal_deletion);
|
||||
break;
|
||||
}
|
||||
|
||||
case kColumnFamily:
|
||||
if (!GetVarint32(&input, &column_family_)) {
|
||||
if (!msg) {
|
||||
|
@ -52,16 +52,21 @@ enum Tag : uint32_t {
|
||||
|
||||
kInAtomicGroup = 300,
|
||||
|
||||
kBlobFileAddition = 400,
|
||||
kBlobFileGarbage,
|
||||
|
||||
// Mask for an unidentified tag from the future which can be safely ignored.
|
||||
kTagSafeIgnoreMask = 1 << 13,
|
||||
|
||||
// Forward compatible (aka ignorable) records
|
||||
kDbId,
|
||||
kBlobFileAddition,
|
||||
kBlobFileGarbage,
|
||||
kBlobFileAddition_DEPRECATED,
|
||||
kBlobFileGarbage_DEPRECATED,
|
||||
kWalAddition,
|
||||
kWalDeletion,
|
||||
kFullHistoryTsLow,
|
||||
kWalAddition2,
|
||||
kWalDeletion2,
|
||||
};
|
||||
|
||||
enum NewFileCustomTag : uint32_t {
|
||||
|
@ -324,14 +324,22 @@ TEST_F(VersionEditTest, AddWalEncodeDecode) {
|
||||
TestEncodeDecode(edit);
|
||||
}
|
||||
|
||||
static std::string PrefixEncodedWalAdditionWithLength(
|
||||
const std::string& encoded) {
|
||||
std::string ret;
|
||||
PutVarint32(&ret, Tag::kWalAddition2);
|
||||
PutLengthPrefixedSlice(&ret, encoded);
|
||||
return ret;
|
||||
}
|
||||
|
||||
TEST_F(VersionEditTest, AddWalDecodeBadLogNumber) {
|
||||
std::string encoded;
|
||||
PutVarint32(&encoded, Tag::kWalAddition);
|
||||
|
||||
{
|
||||
// No log number.
|
||||
std::string encoded_edit = PrefixEncodedWalAdditionWithLength(encoded);
|
||||
VersionEdit edit;
|
||||
Status s = edit.DecodeFrom(encoded);
|
||||
Status s = edit.DecodeFrom(encoded_edit);
|
||||
ASSERT_TRUE(s.IsCorruption());
|
||||
ASSERT_TRUE(s.ToString().find("Error decoding WAL log number") !=
|
||||
std::string::npos)
|
||||
@ -345,8 +353,10 @@ TEST_F(VersionEditTest, AddWalDecodeBadLogNumber) {
|
||||
unsigned char* ptr = reinterpret_cast<unsigned char*>(&c);
|
||||
*ptr = 128;
|
||||
encoded.append(1, c);
|
||||
|
||||
std::string encoded_edit = PrefixEncodedWalAdditionWithLength(encoded);
|
||||
VersionEdit edit;
|
||||
Status s = edit.DecodeFrom(encoded);
|
||||
Status s = edit.DecodeFrom(encoded_edit);
|
||||
ASSERT_TRUE(s.IsCorruption());
|
||||
ASSERT_TRUE(s.ToString().find("Error decoding WAL log number") !=
|
||||
std::string::npos)
|
||||
@ -358,14 +368,14 @@ TEST_F(VersionEditTest, AddWalDecodeBadTag) {
|
||||
constexpr WalNumber kLogNumber = 100;
|
||||
constexpr uint64_t kSizeInBytes = 100;
|
||||
|
||||
std::string encoded_without_tag;
|
||||
PutVarint32(&encoded_without_tag, Tag::kWalAddition);
|
||||
PutVarint64(&encoded_without_tag, kLogNumber);
|
||||
std::string encoded;
|
||||
PutVarint64(&encoded, kLogNumber);
|
||||
|
||||
{
|
||||
// No tag.
|
||||
std::string encoded_edit = PrefixEncodedWalAdditionWithLength(encoded);
|
||||
VersionEdit edit;
|
||||
Status s = edit.DecodeFrom(encoded_without_tag);
|
||||
Status s = edit.DecodeFrom(encoded_edit);
|
||||
ASSERT_TRUE(s.IsCorruption());
|
||||
ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos)
|
||||
<< s.ToString();
|
||||
@ -373,12 +383,15 @@ TEST_F(VersionEditTest, AddWalDecodeBadTag) {
|
||||
|
||||
{
|
||||
// Only has size tag, no terminate tag.
|
||||
std::string encoded_with_size = encoded_without_tag;
|
||||
std::string encoded_with_size = encoded;
|
||||
PutVarint32(&encoded_with_size,
|
||||
static_cast<uint32_t>(WalAdditionTag::kSyncedSize));
|
||||
PutVarint64(&encoded_with_size, kSizeInBytes);
|
||||
|
||||
std::string encoded_edit =
|
||||
PrefixEncodedWalAdditionWithLength(encoded_with_size);
|
||||
VersionEdit edit;
|
||||
Status s = edit.DecodeFrom(encoded_with_size);
|
||||
Status s = edit.DecodeFrom(encoded_edit);
|
||||
ASSERT_TRUE(s.IsCorruption());
|
||||
ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos)
|
||||
<< s.ToString();
|
||||
@ -386,11 +399,14 @@ TEST_F(VersionEditTest, AddWalDecodeBadTag) {
|
||||
|
||||
{
|
||||
// Only has terminate tag.
|
||||
std::string encoded_with_terminate = encoded_without_tag;
|
||||
std::string encoded_with_terminate = encoded;
|
||||
PutVarint32(&encoded_with_terminate,
|
||||
static_cast<uint32_t>(WalAdditionTag::kTerminate));
|
||||
|
||||
std::string encoded_edit =
|
||||
PrefixEncodedWalAdditionWithLength(encoded_with_terminate);
|
||||
VersionEdit edit;
|
||||
ASSERT_OK(edit.DecodeFrom(encoded_with_terminate));
|
||||
ASSERT_OK(edit.DecodeFrom(encoded_edit));
|
||||
auto& wal_addition = edit.GetWalAdditions()[0];
|
||||
ASSERT_EQ(wal_addition.GetLogNumber(), kLogNumber);
|
||||
ASSERT_FALSE(wal_addition.GetMetadata().HasSyncedSize());
|
||||
@ -401,15 +417,15 @@ TEST_F(VersionEditTest, AddWalDecodeNoSize) {
|
||||
constexpr WalNumber kLogNumber = 100;
|
||||
|
||||
std::string encoded;
|
||||
PutVarint32(&encoded, Tag::kWalAddition);
|
||||
PutVarint64(&encoded, kLogNumber);
|
||||
PutVarint32(&encoded, static_cast<uint32_t>(WalAdditionTag::kSyncedSize));
|
||||
// No real size after the size tag.
|
||||
|
||||
{
|
||||
// Without terminate tag.
|
||||
std::string encoded_edit = PrefixEncodedWalAdditionWithLength(encoded);
|
||||
VersionEdit edit;
|
||||
Status s = edit.DecodeFrom(encoded);
|
||||
Status s = edit.DecodeFrom(encoded_edit);
|
||||
ASSERT_TRUE(s.IsCorruption());
|
||||
ASSERT_TRUE(s.ToString().find("Error decoding WAL file size") !=
|
||||
std::string::npos)
|
||||
@ -419,8 +435,10 @@ TEST_F(VersionEditTest, AddWalDecodeNoSize) {
|
||||
{
|
||||
// With terminate tag.
|
||||
PutVarint32(&encoded, static_cast<uint32_t>(WalAdditionTag::kTerminate));
|
||||
|
||||
std::string encoded_edit = PrefixEncodedWalAdditionWithLength(encoded);
|
||||
VersionEdit edit;
|
||||
Status s = edit.DecodeFrom(encoded);
|
||||
Status s = edit.DecodeFrom(encoded_edit);
|
||||
ASSERT_TRUE(s.IsCorruption());
|
||||
// The terminate tag is misunderstood as the size.
|
||||
ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos)
|
||||
@ -515,6 +533,62 @@ TEST_F(VersionEditTest, FullHistoryTsLow) {
|
||||
TestEncodeDecode(edit);
|
||||
}
|
||||
|
||||
// Tests that if RocksDB is downgraded, the new types of VersionEdits
|
||||
// that have a tag larger than kTagSafeIgnoreMask can be safely ignored.
|
||||
TEST_F(VersionEditTest, IgnorableTags) {
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"VersionEdit::EncodeTo:IgnoreIgnorableTags", [&](void* arg) {
|
||||
bool* ignore = static_cast<bool*>(arg);
|
||||
*ignore = true;
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
constexpr uint64_t kPrevLogNumber = 100;
|
||||
constexpr uint64_t kLogNumber = 200;
|
||||
constexpr uint64_t kNextFileNumber = 300;
|
||||
constexpr uint64_t kColumnFamilyId = 400;
|
||||
|
||||
VersionEdit edit;
|
||||
// Add some ignorable entries.
|
||||
for (int i = 0; i < 2; i++) {
|
||||
edit.AddWal(i + 1, WalMetadata(i + 2));
|
||||
}
|
||||
edit.SetDBId("db_id");
|
||||
// Add unignorable entries.
|
||||
edit.SetPrevLogNumber(kPrevLogNumber);
|
||||
edit.SetLogNumber(kLogNumber);
|
||||
// Add more ignorable entries.
|
||||
edit.DeleteWalsBefore(100);
|
||||
// Add unignorable entry.
|
||||
edit.SetNextFile(kNextFileNumber);
|
||||
// Add more ignorable entries.
|
||||
edit.SetFullHistoryTsLow("ts");
|
||||
// Add unignorable entry.
|
||||
edit.SetColumnFamily(kColumnFamilyId);
|
||||
|
||||
std::string encoded;
|
||||
ASSERT_TRUE(edit.EncodeTo(&encoded));
|
||||
|
||||
VersionEdit decoded;
|
||||
ASSERT_OK(decoded.DecodeFrom(encoded));
|
||||
|
||||
// Check that all ignorable entries are ignored.
|
||||
ASSERT_FALSE(decoded.HasDbId());
|
||||
ASSERT_FALSE(decoded.HasFullHistoryTsLow());
|
||||
ASSERT_FALSE(decoded.IsWalAddition());
|
||||
ASSERT_FALSE(decoded.IsWalDeletion());
|
||||
ASSERT_TRUE(decoded.GetWalAdditions().empty());
|
||||
ASSERT_TRUE(decoded.GetWalDeletion().IsEmpty());
|
||||
|
||||
// Check that unignorable entries are still present.
|
||||
ASSERT_EQ(edit.GetPrevLogNumber(), kPrevLogNumber);
|
||||
ASSERT_EQ(edit.GetLogNumber(), kLogNumber);
|
||||
ASSERT_EQ(edit.GetNextFile(), kNextFileNumber);
|
||||
ASSERT_EQ(edit.GetColumnFamily(), kColumnFamilyId);
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -206,6 +206,7 @@ DECLARE_string(bottommost_compression_type);
|
||||
DECLARE_int32(compression_max_dict_bytes);
|
||||
DECLARE_int32(compression_zstd_max_train_bytes);
|
||||
DECLARE_int32(compression_parallel_threads);
|
||||
DECLARE_uint64(compression_max_dict_buffer_bytes);
|
||||
DECLARE_string(checksum_type);
|
||||
DECLARE_string(hdfs);
|
||||
DECLARE_string(env_uri);
|
||||
|
@ -626,6 +626,10 @@ DEFINE_int32(compression_zstd_max_train_bytes, 0,
|
||||
DEFINE_int32(compression_parallel_threads, 1,
|
||||
"Number of threads for parallel compression.");
|
||||
|
||||
DEFINE_uint64(compression_max_dict_buffer_bytes, 0,
|
||||
"Buffering limit for SST file data to sample for dictionary "
|
||||
"compression.");
|
||||
|
||||
DEFINE_string(bottommost_compression_type, "disable",
|
||||
"Algorithm to use to compress bottommost level of the database. "
|
||||
"\"disable\" means disabling the feature");
|
||||
|
@ -2052,6 +2052,8 @@ void StressTest::Open() {
|
||||
FLAGS_compression_zstd_max_train_bytes;
|
||||
options_.compression_opts.parallel_threads =
|
||||
FLAGS_compression_parallel_threads;
|
||||
options_.compression_opts.max_dict_buffer_bytes =
|
||||
FLAGS_compression_max_dict_buffer_bytes;
|
||||
options_.create_if_missing = true;
|
||||
options_.max_manifest_file_size = FLAGS_max_manifest_file_size;
|
||||
options_.inplace_update_support = FLAGS_in_place_update;
|
||||
|
@ -143,6 +143,28 @@ struct CompressionOptions {
|
||||
// Default: false.
|
||||
bool enabled;
|
||||
|
||||
// Limit on data buffering, which is used to gather samples to build a
|
||||
// dictionary. Zero means no limit. When dictionary is disabled
|
||||
// (`max_dict_bytes == 0`), enabling this limit (`max_dict_buffer_bytes != 0`)
|
||||
// has no effect.
|
||||
//
|
||||
// In compaction, the buffering is limited to the target file size (see
|
||||
// `target_file_size_base` and `target_file_size_multiplier`) even if this
|
||||
// setting permits more buffering. Since we cannot determine where the file
|
||||
// should be cut until data blocks are compressed with dictionary, buffering
|
||||
// more than the target file size could lead to selecting samples that belong
|
||||
// to a later output SST.
|
||||
//
|
||||
// Limiting too strictly may harm dictionary effectiveness since it forces
|
||||
// RocksDB to pick samples from the initial portion of the output SST, which
|
||||
// may not be representative of the whole file. Configuring this limit below
|
||||
// `zstd_max_train_bytes` (when enabled) can restrict how many samples we can
|
||||
// pass to the dictionary trainer. Configuring it below `max_dict_bytes` can
|
||||
// restrict the size of the final dictionary.
|
||||
//
|
||||
// Default: 0 (unlimited)
|
||||
uint64_t max_dict_buffer_bytes;
|
||||
|
||||
CompressionOptions()
|
||||
: window_bits(-14),
|
||||
level(kDefaultCompressionLevel),
|
||||
@ -150,17 +172,19 @@ struct CompressionOptions {
|
||||
max_dict_bytes(0),
|
||||
zstd_max_train_bytes(0),
|
||||
parallel_threads(1),
|
||||
enabled(false) {}
|
||||
enabled(false),
|
||||
max_dict_buffer_bytes(0) {}
|
||||
CompressionOptions(int wbits, int _lev, int _strategy, int _max_dict_bytes,
|
||||
int _zstd_max_train_bytes, int _parallel_threads,
|
||||
bool _enabled)
|
||||
bool _enabled, uint64_t _max_dict_buffer_bytes)
|
||||
: window_bits(wbits),
|
||||
level(_lev),
|
||||
strategy(_strategy),
|
||||
max_dict_bytes(_max_dict_bytes),
|
||||
zstd_max_train_bytes(_zstd_max_train_bytes),
|
||||
parallel_threads(_parallel_threads),
|
||||
enabled(_enabled) {}
|
||||
enabled(_enabled),
|
||||
max_dict_buffer_bytes(_max_dict_buffer_bytes) {}
|
||||
};
|
||||
|
||||
enum UpdateStatus { // Return status For inplace update callback
|
||||
|
@ -998,11 +998,17 @@ extern ROCKSDB_LIBRARY_API void
|
||||
rocksdb_options_set_compression_options_zstd_max_train_bytes(rocksdb_options_t*,
|
||||
int);
|
||||
extern ROCKSDB_LIBRARY_API void
|
||||
rocksdb_options_set_compression_options_max_dict_buffer_bytes(
|
||||
rocksdb_options_t*, uint64_t);
|
||||
extern ROCKSDB_LIBRARY_API void
|
||||
rocksdb_options_set_bottommost_compression_options(rocksdb_options_t*, int, int,
|
||||
int, int, unsigned char);
|
||||
extern ROCKSDB_LIBRARY_API void
|
||||
rocksdb_options_set_bottommost_compression_options_zstd_max_train_bytes(
|
||||
rocksdb_options_t*, int, unsigned char);
|
||||
extern ROCKSDB_LIBRARY_API void
|
||||
rocksdb_options_set_bottommost_compression_options_max_dict_buffer_bytes(
|
||||
rocksdb_options_t*, uint64_t, unsigned char);
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_prefix_extractor(
|
||||
rocksdb_options_t*, rocksdb_slicetransform_t*);
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_num_levels(
|
||||
|
@ -51,6 +51,8 @@ struct OptimisticTransactionDBOptions {
|
||||
uint32_t occ_lock_buckets = (1 << 20);
|
||||
};
|
||||
|
||||
// Range deletions (including those in `WriteBatch`es passed to `Write()`) are
|
||||
// incompatible with `OptimisticTransactionDB` and will return a non-OK `Status`
|
||||
class OptimisticTransactionDB : public StackableDB {
|
||||
public:
|
||||
// Open an OptimisticTransactionDB similar to DB::Open().
|
||||
|
@ -344,6 +344,17 @@ class TransactionDB : public StackableDB {
|
||||
// falls back to the un-optimized version of ::Write
|
||||
return Write(opts, updates);
|
||||
}
|
||||
// Transactional `DeleteRange()` is not yet supported.
|
||||
// However, users who know their deleted range does not conflict with
|
||||
// anything can still use it via the `Write()` API. In all cases, the
|
||||
// `Write()` overload specifying `TransactionDBWriteOptimizations` must be
|
||||
// used and `skip_concurrency_control` must be set. When using either
|
||||
// WRITE_PREPARED or WRITE_UNPREPARED , `skip_duplicate_key_check` must
|
||||
// additionally be set.
|
||||
virtual Status DeleteRange(const WriteOptions&, ColumnFamilyHandle*,
|
||||
const Slice&, const Slice&) override {
|
||||
return Status::NotSupported();
|
||||
}
|
||||
// Open a TransactionDB similar to DB::Open().
|
||||
// Internally call PrepareWrap() and WrapDB()
|
||||
// If the return status is not ok, then dbptr is set to nullptr.
|
||||
|
@ -6,7 +6,7 @@
|
||||
|
||||
#define ROCKSDB_MAJOR 6
|
||||
#define ROCKSDB_MINOR 17
|
||||
#define ROCKSDB_PATCH 0
|
||||
#define ROCKSDB_PATCH 2
|
||||
|
||||
// Do not use these. We made the mistake of declaring macros starting with
|
||||
// double underscore. Now we have to live with our choice. We'll deprecate these
|
||||
|
@ -132,6 +132,27 @@ jint Java_org_rocksdb_CompressionOptions_zstdMaxTrainBytes(
|
||||
return static_cast<jint>(opt->zstd_max_train_bytes);
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_rocksdb_CompressionOptions
|
||||
* Method: setMaxDictBufferBytes
|
||||
* Signature: (JJ)V
|
||||
*/
|
||||
void Java_org_rocksdb_CompressionOptions_setMaxDictBufferBytes(
|
||||
JNIEnv*, jobject, jlong jhandle, jlong jmax_dict_buffer_bytes) {
|
||||
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
|
||||
opt->max_dict_buffer_bytes = static_cast<uint64_t>(jmax_dict_buffer_bytes);
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_rocksdb_CompressionOptions
|
||||
* Method: maxDictBufferBytes
|
||||
* Signature: (J)J
|
||||
*/
|
||||
jlong Java_org_rocksdb_CompressionOptions_maxDictBufferBytes(JNIEnv*, jobject,
|
||||
jlong jhandle) {
|
||||
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
|
||||
return static_cast<jlong>(opt->max_dict_buffer_bytes);
|
||||
}
|
||||
/*
|
||||
* Class: org_rocksdb_CompressionOptions
|
||||
* Method: setEnabled
|
||||
|
@ -105,7 +105,7 @@ static Status ParseCompressionOptions(const std::string& value,
|
||||
}
|
||||
// Since parallel_threads comes before enabled but was added optionally
|
||||
// later, we need to check if this is the final token (meaning it is the
|
||||
// enabled bit), or if there is another token (meaning this one is
|
||||
// enabled bit), or if there are more tokens (meaning this one is
|
||||
// parallel_threads)
|
||||
end = value.find(':', start);
|
||||
if (end != std::string::npos) {
|
||||
@ -113,7 +113,6 @@ static Status ParseCompressionOptions(const std::string& value,
|
||||
ParseInt(value.substr(start, value.size() - start));
|
||||
} else {
|
||||
// parallel_threads is not serialized with this format, but enabled is
|
||||
compression_opts.parallel_threads = CompressionOptions().parallel_threads;
|
||||
compression_opts.enabled =
|
||||
ParseBoolean("", value.substr(start, value.size() - start));
|
||||
}
|
||||
@ -128,6 +127,18 @@ static Status ParseCompressionOptions(const std::string& value,
|
||||
}
|
||||
compression_opts.enabled =
|
||||
ParseBoolean("", value.substr(start, value.size() - start));
|
||||
end = value.find(':', start);
|
||||
}
|
||||
|
||||
// max_dict_buffer_bytes is optional for backwards compatibility
|
||||
if (end != std::string::npos) {
|
||||
start = end + 1;
|
||||
if (start >= value.size()) {
|
||||
return Status::InvalidArgument(
|
||||
"unable to parse the specified CF option " + name);
|
||||
}
|
||||
compression_opts.max_dict_buffer_bytes =
|
||||
ParseUint64(value.substr(start, value.size() - start));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
@ -161,6 +172,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
|
||||
{"enabled",
|
||||
{offsetof(struct CompressionOptions, enabled), OptionType::kBoolean,
|
||||
OptionVerificationType::kNormal, OptionTypeFlags::kMutable}},
|
||||
{"max_dict_buffer_bytes",
|
||||
{offsetof(struct CompressionOptions, max_dict_buffer_bytes),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kMutable}},
|
||||
};
|
||||
|
||||
static std::unordered_map<std::string, OptionTypeInfo>
|
||||
|
@ -201,6 +201,11 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
|
||||
ROCKS_LOG_HEADER(
|
||||
log, " Options.bottommost_compression_opts.enabled: %s",
|
||||
bottommost_compression_opts.enabled ? "true" : "false");
|
||||
ROCKS_LOG_HEADER(
|
||||
log,
|
||||
" Options.bottommost_compression_opts.max_dict_buffer_bytes: "
|
||||
"%" PRIu64,
|
||||
bottommost_compression_opts.max_dict_buffer_bytes);
|
||||
ROCKS_LOG_HEADER(log, " Options.compression_opts.window_bits: %d",
|
||||
compression_opts.window_bits);
|
||||
ROCKS_LOG_HEADER(log, " Options.compression_opts.level: %d",
|
||||
@ -222,6 +227,10 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
|
||||
ROCKS_LOG_HEADER(log,
|
||||
" Options.compression_opts.enabled: %s",
|
||||
compression_opts.enabled ? "true" : "false");
|
||||
ROCKS_LOG_HEADER(log,
|
||||
" Options.compression_opts.max_dict_buffer_bytes: "
|
||||
"%" PRIu64,
|
||||
compression_opts.max_dict_buffer_bytes);
|
||||
ROCKS_LOG_HEADER(log, " Options.level0_file_num_compaction_trigger: %d",
|
||||
level0_file_num_compaction_trigger);
|
||||
ROCKS_LOG_HEADER(log, " Options.level0_slowdown_writes_trigger: %d",
|
||||
|
@ -409,10 +409,11 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
|
||||
FillWithSpecialChar(options_ptr, sizeof(ColumnFamilyOptions),
|
||||
kColumnFamilyOptionsExcluded);
|
||||
|
||||
// It based on the behavior of compiler that padding bytes are not changed
|
||||
// when copying the struct. It's prone to failure when compiler behavior
|
||||
// changes. We verify there is unset bytes to detect the case.
|
||||
*options = ColumnFamilyOptions();
|
||||
// Invoke a user-defined constructor in the hope that it does not overwrite
|
||||
// padding bytes. Note that previously we relied on the implicitly-defined
|
||||
// copy-assignment operator (i.e., `*options = ColumnFamilyOptions();`) here,
|
||||
// which did in fact modify padding bytes.
|
||||
options = new (options_ptr) ColumnFamilyOptions();
|
||||
|
||||
// Deprecatd option which is not initialized. Need to set it to avoid
|
||||
// Valgrind error
|
||||
@ -470,8 +471,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
|
||||
"max_bytes_for_level_multiplier=60;"
|
||||
"memtable_factory=SkipListFactory;"
|
||||
"compression=kNoCompression;"
|
||||
"compression_opts=5:6:7:8:9:true;"
|
||||
"bottommost_compression_opts=4:5:6:7:8:true;"
|
||||
"compression_opts=5:6:7:8:9:10:true:11;"
|
||||
"bottommost_compression_opts=4:5:6:7:8:9:true:10;"
|
||||
"bottommost_compression=kDisableCompressionOption;"
|
||||
"level0_stop_writes_trigger=33;"
|
||||
"num_levels=99;"
|
||||
|
@ -725,7 +725,7 @@ TEST_F(OptionsTest, CompressionOptionsFromString) {
|
||||
ASSERT_OK(GetColumnFamilyOptionsFromString(
|
||||
ignore, ColumnFamilyOptions(), "compression_opts=5:6:7:8:9:x:false",
|
||||
&base_cf_opt));
|
||||
ASSERT_NOK(GetColumnFamilyOptionsFromString(
|
||||
ASSERT_OK(GetColumnFamilyOptionsFromString(
|
||||
config_options, ColumnFamilyOptions(),
|
||||
"compression_opts=1:2:3:4:5:6:true:8", &base_cf_opt));
|
||||
ASSERT_OK(GetColumnFamilyOptionsFromString(
|
||||
|
@ -11,24 +11,25 @@
|
||||
|
||||
#include <assert.h>
|
||||
#include <stdio.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <list>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <numeric>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
|
||||
#include "db/dbformat.h"
|
||||
#include "index_builder.h"
|
||||
|
||||
#include "memory/memory_allocator.h"
|
||||
#include "rocksdb/cache.h"
|
||||
#include "rocksdb/comparator.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/flush_block_policy.h"
|
||||
#include "rocksdb/merge_operator.h"
|
||||
#include "rocksdb/table.h"
|
||||
|
||||
#include "table/block_based/block.h"
|
||||
#include "table/block_based/block_based_filter_block.h"
|
||||
#include "table/block_based/block_based_table_factory.h"
|
||||
@ -40,8 +41,6 @@
|
||||
#include "table/block_based/partitioned_filter_block.h"
|
||||
#include "table/format.h"
|
||||
#include "table/table_builder.h"
|
||||
|
||||
#include "memory/memory_allocator.h"
|
||||
#include "util/coding.h"
|
||||
#include "util/compression.h"
|
||||
#include "util/crc32c.h"
|
||||
@ -306,6 +305,10 @@ struct BlockBasedTableBuilder::Rep {
|
||||
kClosed,
|
||||
};
|
||||
State state;
|
||||
// `kBuffered` state is allowed only as long as the buffering of uncompressed
|
||||
// data blocks (see `data_block_and_keys_buffers`) does not exceed
|
||||
// `buffer_limit`.
|
||||
uint64_t buffer_limit;
|
||||
|
||||
const bool use_delta_encoding_for_index_values;
|
||||
std::unique_ptr<FilterBlockBuilder> filter_builder;
|
||||
@ -321,7 +324,6 @@ struct BlockBasedTableBuilder::Rep {
|
||||
const std::string& column_family_name;
|
||||
uint64_t creation_time = 0;
|
||||
uint64_t oldest_key_time = 0;
|
||||
const uint64_t target_file_size;
|
||||
uint64_t file_creation_time = 0;
|
||||
|
||||
// DB IDs
|
||||
@ -407,7 +409,7 @@ struct BlockBasedTableBuilder::Rep {
|
||||
const CompressionOptions& _compression_opts, const bool skip_filters,
|
||||
const int _level_at_creation, const std::string& _column_family_name,
|
||||
const uint64_t _creation_time, const uint64_t _oldest_key_time,
|
||||
const uint64_t _target_file_size, const uint64_t _file_creation_time,
|
||||
const uint64_t target_file_size, const uint64_t _file_creation_time,
|
||||
const std::string& _db_id, const std::string& _db_session_id)
|
||||
: ioptions(_ioptions),
|
||||
moptions(_moptions),
|
||||
@ -448,13 +450,20 @@ struct BlockBasedTableBuilder::Rep {
|
||||
column_family_name(_column_family_name),
|
||||
creation_time(_creation_time),
|
||||
oldest_key_time(_oldest_key_time),
|
||||
target_file_size(_target_file_size),
|
||||
file_creation_time(_file_creation_time),
|
||||
db_id(_db_id),
|
||||
db_session_id(_db_session_id),
|
||||
db_host_id(ioptions.db_host_id),
|
||||
status_ok(true),
|
||||
io_status_ok(true) {
|
||||
if (target_file_size == 0) {
|
||||
buffer_limit = compression_opts.max_dict_buffer_bytes;
|
||||
} else if (compression_opts.max_dict_buffer_bytes == 0) {
|
||||
buffer_limit = target_file_size;
|
||||
} else {
|
||||
buffer_limit =
|
||||
std::min(target_file_size, compression_opts.max_dict_buffer_bytes);
|
||||
}
|
||||
for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
|
||||
compression_ctxs[i].reset(new CompressionContext(compression_type));
|
||||
}
|
||||
@ -896,8 +905,8 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
|
||||
r->first_key_in_next_block = &key;
|
||||
Flush();
|
||||
|
||||
if (r->state == Rep::State::kBuffered && r->target_file_size != 0 &&
|
||||
r->data_begin_offset > r->target_file_size) {
|
||||
if (r->state == Rep::State::kBuffered && r->buffer_limit != 0 &&
|
||||
r->data_begin_offset > r->buffer_limit) {
|
||||
EnterUnbuffered();
|
||||
}
|
||||
|
||||
@ -997,23 +1006,28 @@ void BlockBasedTableBuilder::Flush() {
|
||||
void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
|
||||
BlockHandle* handle,
|
||||
bool is_data_block) {
|
||||
WriteBlock(block->Finish(), handle, is_data_block);
|
||||
block->Reset();
|
||||
block->Finish();
|
||||
std::string raw_block_contents;
|
||||
block->SwapAndReset(raw_block_contents);
|
||||
if (rep_->state == Rep::State::kBuffered) {
|
||||
assert(is_data_block);
|
||||
assert(!rep_->data_block_and_keys_buffers.empty());
|
||||
rep_->data_block_and_keys_buffers.back().first =
|
||||
std::move(raw_block_contents);
|
||||
rep_->data_begin_offset +=
|
||||
rep_->data_block_and_keys_buffers.back().first.size();
|
||||
return;
|
||||
}
|
||||
WriteBlock(raw_block_contents, handle, is_data_block);
|
||||
}
|
||||
|
||||
void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
|
||||
BlockHandle* handle,
|
||||
bool is_data_block) {
|
||||
Rep* r = rep_;
|
||||
assert(r->state == Rep::State::kUnbuffered);
|
||||
Slice block_contents;
|
||||
CompressionType type;
|
||||
if (r->state == Rep::State::kBuffered) {
|
||||
assert(is_data_block);
|
||||
assert(!r->data_block_and_keys_buffers.empty());
|
||||
r->data_block_and_keys_buffers.back().first = raw_block_contents.ToString();
|
||||
r->data_begin_offset += r->data_block_and_keys_buffers.back().first.size();
|
||||
return;
|
||||
}
|
||||
Status compress_status;
|
||||
CompressAndVerifyBlock(raw_block_contents, is_data_block,
|
||||
*(r->compression_ctxs[0]), r->verify_ctxs[0].get(),
|
||||
@ -1629,14 +1643,37 @@ void BlockBasedTableBuilder::EnterUnbuffered() {
|
||||
const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0
|
||||
? r->compression_opts.zstd_max_train_bytes
|
||||
: r->compression_opts.max_dict_bytes;
|
||||
|
||||
// If buffer size is reasonable, we pre-generate a permutation to enforce
|
||||
// uniqueness. This prevents wasting samples on duplicates, which is
|
||||
// particularly likely when not many blocks were buffered.
|
||||
std::vector<uint16_t> data_block_order;
|
||||
size_t data_block_order_idx = 0;
|
||||
if (r->data_block_and_keys_buffers.size() <= ((1 << 16) - 1)) {
|
||||
data_block_order.resize(r->data_block_and_keys_buffers.size());
|
||||
std::iota(data_block_order.begin(), data_block_order.end(),
|
||||
static_cast<uint16_t>(0));
|
||||
// We could be smarter and interleave the shuffling and sample appending
|
||||
// logic. Then we could terminate as soon as `kSampleBytes` is reached,
|
||||
// saving some shuffling computation.
|
||||
RandomShuffle(data_block_order.begin(), data_block_order.end());
|
||||
}
|
||||
|
||||
Random64 generator{r->creation_time};
|
||||
std::string compression_dict_samples;
|
||||
std::vector<size_t> compression_dict_sample_lens;
|
||||
if (!r->data_block_and_keys_buffers.empty()) {
|
||||
while (compression_dict_samples.size() < kSampleBytes) {
|
||||
size_t rand_idx =
|
||||
static_cast<size_t>(
|
||||
generator.Uniform(r->data_block_and_keys_buffers.size()));
|
||||
while ((data_block_order.empty() ||
|
||||
data_block_order_idx < data_block_order.size()) &&
|
||||
compression_dict_samples.size() < kSampleBytes) {
|
||||
size_t rand_idx;
|
||||
if (data_block_order.empty()) {
|
||||
rand_idx = static_cast<size_t>(
|
||||
generator.Uniform(r->data_block_and_keys_buffers.size()));
|
||||
} else {
|
||||
rand_idx = data_block_order[data_block_order_idx];
|
||||
++data_block_order_idx;
|
||||
}
|
||||
size_t copy_len =
|
||||
std::min(kSampleBytes - compression_dict_samples.size(),
|
||||
r->data_block_and_keys_buffers[rand_idx].first.size());
|
||||
|
@ -117,8 +117,9 @@ class BlockBasedTableBuilder : public TableBuilder {
|
||||
// REQUIRES: `rep_->state == kBuffered`
|
||||
void EnterUnbuffered();
|
||||
|
||||
// Call block's Finish() method
|
||||
// and then write the compressed block contents to file.
|
||||
// Call block's Finish() method and then
|
||||
// - in buffered mode, buffer the uncompressed block contents.
|
||||
// - in unbuffered mode, write the compressed block contents to file.
|
||||
void WriteBlock(BlockBuilder* block, BlockHandle* handle, bool is_data_block);
|
||||
|
||||
// Compress and write block content to the file.
|
||||
|
@ -235,7 +235,8 @@ Status SstFileDumper::ShowAllCompressionSizes(
|
||||
const std::vector<std::pair<CompressionType, const char*>>&
|
||||
compression_types,
|
||||
int32_t compress_level_from, int32_t compress_level_to,
|
||||
uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes) {
|
||||
uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes,
|
||||
uint64_t max_dict_buffer_bytes) {
|
||||
fprintf(stdout, "Block Size: %" ROCKSDB_PRIszt "\n", block_size);
|
||||
for (auto& i : compression_types) {
|
||||
if (CompressionTypeSupported(i.first)) {
|
||||
@ -243,6 +244,7 @@ Status SstFileDumper::ShowAllCompressionSizes(
|
||||
CompressionOptions compress_opt;
|
||||
compress_opt.max_dict_bytes = max_dict_bytes;
|
||||
compress_opt.zstd_max_train_bytes = zstd_max_train_bytes;
|
||||
compress_opt.max_dict_buffer_bytes = max_dict_buffer_bytes;
|
||||
for (int32_t j = compress_level_from; j <= compress_level_to; j++) {
|
||||
fprintf(stdout, "Compression level: %d", j);
|
||||
compress_opt.level = j;
|
||||
|
@ -40,7 +40,8 @@ class SstFileDumper {
|
||||
const std::vector<std::pair<CompressionType, const char*>>&
|
||||
compression_types,
|
||||
int32_t compress_level_from, int32_t compress_level_to,
|
||||
uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes);
|
||||
uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes,
|
||||
uint64_t max_dict_buffer_bytes);
|
||||
|
||||
Status ShowCompressionSize(size_t block_size, CompressionType compress_type,
|
||||
const CompressionOptions& compress_opt);
|
||||
|
@ -948,6 +948,10 @@ DEFINE_int32(min_level_to_compress, -1, "If non-negative, compression starts"
|
||||
DEFINE_int32(compression_parallel_threads, 1,
|
||||
"Number of threads for parallel compression.");
|
||||
|
||||
DEFINE_uint64(compression_max_dict_buffer_bytes,
|
||||
ROCKSDB_NAMESPACE::CompressionOptions().max_dict_buffer_bytes,
|
||||
"Maximum bytes to buffer to collect samples for dictionary.");
|
||||
|
||||
static bool ValidateTableCacheNumshardbits(const char* flagname,
|
||||
int32_t value) {
|
||||
if (0 >= value || value > 20) {
|
||||
@ -4053,6 +4057,8 @@ class Benchmark {
|
||||
FLAGS_compression_zstd_max_train_bytes;
|
||||
options.compression_opts.parallel_threads =
|
||||
FLAGS_compression_parallel_threads;
|
||||
options.compression_opts.max_dict_buffer_bytes =
|
||||
FLAGS_compression_max_dict_buffer_bytes;
|
||||
// If this is a block based table, set some related options
|
||||
auto table_options =
|
||||
options.table_factory->GetOptions<BlockBasedTableOptions>();
|
||||
|
@ -50,6 +50,7 @@ default_params = {
|
||||
# Disabled compression_parallel_threads as the feature is not stable
|
||||
# lambda: random.choice([1] * 9 + [4])
|
||||
"compression_parallel_threads": 1,
|
||||
"compression_max_dict_buffer_bytes": lambda: 4096 * random.randint(0, 32),
|
||||
"clear_column_family_one_in": 0,
|
||||
"compact_files_one_in": 1000000,
|
||||
"compact_range_one_in": 1000000,
|
||||
@ -267,8 +268,10 @@ best_efforts_recovery_params = {
|
||||
def finalize_and_sanitize(src_params):
|
||||
dest_params = dict([(k, v() if callable(v) else v)
|
||||
for (k, v) in src_params.items()])
|
||||
if dest_params.get("compression_type") != "zstd" or \
|
||||
dest_params.get("compression_max_dict_bytes") == 0:
|
||||
if dest_params.get("compression_max_dict_bytes") == 0:
|
||||
dest_params["compression_zstd_max_train_bytes"] = 0
|
||||
dest_params["compression_max_dict_buffer_bytes"] = 0
|
||||
if dest_params.get("compression_type") != "zstd":
|
||||
dest_params["compression_zstd_max_train_bytes"] = 0
|
||||
if dest_params.get("allow_concurrent_memtable_write", 1) == 1:
|
||||
dest_params["memtablerep"] = "skip_list"
|
||||
|
@ -103,6 +103,9 @@ void print_help(bool to_stderr) {
|
||||
|
||||
--compression_zstd_max_train_bytes=<uint32_t>
|
||||
Maximum size of training data passed to zstd's dictionary trainer
|
||||
|
||||
--compression_max_dict_buffer_bytes=<int64_t>
|
||||
Limit on buffer size from which we collect samples for dictionary generation.
|
||||
)");
|
||||
}
|
||||
|
||||
@ -166,6 +169,8 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) {
|
||||
ROCKSDB_NAMESPACE::CompressionOptions().max_dict_bytes;
|
||||
uint32_t compression_zstd_max_train_bytes =
|
||||
ROCKSDB_NAMESPACE::CompressionOptions().zstd_max_train_bytes;
|
||||
uint64_t compression_max_dict_buffer_bytes =
|
||||
ROCKSDB_NAMESPACE::CompressionOptions().max_dict_buffer_bytes;
|
||||
|
||||
int64_t tmp_val;
|
||||
|
||||
@ -276,6 +281,17 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) {
|
||||
return 1;
|
||||
}
|
||||
compression_zstd_max_train_bytes = static_cast<uint32_t>(tmp_val);
|
||||
} else if (ParseIntArg(argv[i], "--compression_max_dict_buffer_bytes=",
|
||||
"compression_max_dict_buffer_bytes must be numeric",
|
||||
&tmp_val)) {
|
||||
if (tmp_val < 0) {
|
||||
fprintf(stderr,
|
||||
"compression_max_dict_buffer_bytes must be positive: '%s'\n",
|
||||
argv[i]);
|
||||
print_help(/*to_stderr*/ true);
|
||||
return 1;
|
||||
}
|
||||
compression_max_dict_buffer_bytes = static_cast<uint64_t>(tmp_val);
|
||||
} else if (strcmp(argv[i], "--help") == 0) {
|
||||
print_help(/*to_stderr*/ false);
|
||||
return 0;
|
||||
@ -404,7 +420,7 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) {
|
||||
set_block_size ? block_size : 16384,
|
||||
compression_types.empty() ? kCompressions : compression_types,
|
||||
compress_level_from, compress_level_to, compression_max_dict_bytes,
|
||||
compression_zstd_max_train_bytes);
|
||||
compression_zstd_max_train_bytes, compression_max_dict_buffer_bytes);
|
||||
if (!st.ok()) {
|
||||
fprintf(stderr, "Failed to recompress: %s\n", st.ToString().c_str());
|
||||
exit(1);
|
||||
|
@ -627,6 +627,9 @@ inline std::string CompressionOptionsToString(
|
||||
result.append("enabled=")
|
||||
.append(ToString(compression_options.enabled))
|
||||
.append("; ");
|
||||
result.append("max_dict_buffer_bytes=")
|
||||
.append(ToString(compression_options.max_dict_buffer_bytes))
|
||||
.append("; ");
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -279,9 +279,16 @@ bool StartsWith(const std::string& string, const std::string& pattern) {
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
bool ParseBoolean(const std::string& type, const std::string& value) {
|
||||
if (value == "true" || value == "1") {
|
||||
const static std::string kTrue = "true", kFalse = "false";
|
||||
if (value.compare(0 /* pos */, kTrue.size(), kTrue) == 0) {
|
||||
return true;
|
||||
} else if (value == "false" || value == "0") {
|
||||
} else if (value.compare(0 /* pos */, kFalse.size(), kFalse) == 0) {
|
||||
return false;
|
||||
}
|
||||
int num = ParseInt(value);
|
||||
if (num == 1) {
|
||||
return true;
|
||||
} else if (num == 0) {
|
||||
return false;
|
||||
}
|
||||
throw std::invalid_argument(type);
|
||||
|
@ -22,6 +22,9 @@ namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
// A Timer class to handle repeated work.
|
||||
//
|
||||
// `Start()` and `Shutdown()` are currently not thread-safe. The client must
|
||||
// serialize calls to these two member functions.
|
||||
//
|
||||
// A single timer instance can handle multiple functions via a single thread.
|
||||
// It is better to leave long running work to a dedicated thread pool.
|
||||
//
|
||||
|
@ -153,7 +153,12 @@ typedef struct toku_mutex_aligned {
|
||||
{ .pmutex = PTHREAD_MUTEX_INITIALIZER, .psi_mutex = nullptr }
|
||||
#endif // defined(TOKU_PTHREAD_DEBUG)
|
||||
#else // __FreeBSD__, __linux__, at least
|
||||
#if defined(__GLIBC__)
|
||||
#define TOKU_MUTEX_ADAPTIVE PTHREAD_MUTEX_ADAPTIVE_NP
|
||||
#else
|
||||
// not all libc (e.g. musl) implement NP (Non-POSIX) attributes
|
||||
#define TOKU_MUTEX_ADAPTIVE PTHREAD_MUTEX_DEFAULT
|
||||
#endif
|
||||
#if defined(TOKU_PTHREAD_DEBUG)
|
||||
#define TOKU_ADAPTIVE_MUTEX_INITIALIZER \
|
||||
{ \
|
||||
|
@ -46,6 +46,22 @@ class OptimisticTransactionDBImpl : public OptimisticTransactionDB {
|
||||
const OptimisticTransactionOptions& txn_options,
|
||||
Transaction* old_txn) override;
|
||||
|
||||
// Transactional `DeleteRange()` is not yet supported.
|
||||
virtual Status DeleteRange(const WriteOptions&, ColumnFamilyHandle*,
|
||||
const Slice&, const Slice&) override {
|
||||
return Status::NotSupported();
|
||||
}
|
||||
|
||||
// Range deletions also must not be snuck into `WriteBatch`es as they are
|
||||
// incompatible with `OptimisticTransactionDB`.
|
||||
virtual Status Write(const WriteOptions& write_opts,
|
||||
WriteBatch* batch) override {
|
||||
if (batch->HasDeleteRange()) {
|
||||
return Status::NotSupported();
|
||||
}
|
||||
return OptimisticTransactionDB::Write(write_opts, batch);
|
||||
}
|
||||
|
||||
size_t GetLockBucketsSize() const { return bucketed_locks_.size(); }
|
||||
|
||||
OccValidationPolicy GetValidatePolicy() const { return validate_policy_; }
|
||||
|
@ -1033,6 +1033,17 @@ TEST_P(OptimisticTransactionTest, IteratorTest) {
|
||||
delete txn;
|
||||
}
|
||||
|
||||
TEST_P(OptimisticTransactionTest, DeleteRangeSupportTest) {
|
||||
// `OptimisticTransactionDB` does not allow range deletion in any API.
|
||||
ASSERT_TRUE(
|
||||
txn_db
|
||||
->DeleteRange(WriteOptions(), txn_db->DefaultColumnFamily(), "a", "b")
|
||||
.IsNotSupported());
|
||||
WriteBatch wb;
|
||||
ASSERT_OK(wb.DeleteRange("a", "b"));
|
||||
ASSERT_NOK(txn_db->Write(WriteOptions(), &wb));
|
||||
}
|
||||
|
||||
TEST_P(OptimisticTransactionTest, SavepointTest) {
|
||||
WriteOptions write_options;
|
||||
ReadOptions read_options, snapshot_read_options;
|
||||
|
@ -4835,6 +4835,56 @@ TEST_P(TransactionTest, MergeTest) {
|
||||
ASSERT_EQ("a,3", value);
|
||||
}
|
||||
|
||||
TEST_P(TransactionTest, DeleteRangeSupportTest) {
|
||||
// The `DeleteRange()` API is banned everywhere.
|
||||
ASSERT_TRUE(
|
||||
db->DeleteRange(WriteOptions(), db->DefaultColumnFamily(), "a", "b")
|
||||
.IsNotSupported());
|
||||
|
||||
// But range deletions can be added via the `Write()` API by specifying the
|
||||
// proper flags to promise there are no conflicts according to the DB type
|
||||
// (see `TransactionDB::DeleteRange()` API doc for details).
|
||||
for (bool skip_concurrency_control : {false, true}) {
|
||||
for (bool skip_duplicate_key_check : {false, true}) {
|
||||
ASSERT_OK(db->Put(WriteOptions(), "a", "val"));
|
||||
WriteBatch wb;
|
||||
ASSERT_OK(wb.DeleteRange("a", "b"));
|
||||
TransactionDBWriteOptimizations flags;
|
||||
flags.skip_concurrency_control = skip_concurrency_control;
|
||||
flags.skip_duplicate_key_check = skip_duplicate_key_check;
|
||||
Status s = db->Write(WriteOptions(), flags, &wb);
|
||||
std::string value;
|
||||
switch (txn_db_options.write_policy) {
|
||||
case WRITE_COMMITTED:
|
||||
if (skip_concurrency_control) {
|
||||
ASSERT_OK(s);
|
||||
ASSERT_TRUE(db->Get(ReadOptions(), "a", &value).IsNotFound());
|
||||
} else {
|
||||
ASSERT_NOK(s);
|
||||
ASSERT_OK(db->Get(ReadOptions(), "a", &value));
|
||||
}
|
||||
break;
|
||||
case WRITE_PREPARED:
|
||||
// Intentional fall-through
|
||||
case WRITE_UNPREPARED:
|
||||
if (skip_concurrency_control && skip_duplicate_key_check) {
|
||||
ASSERT_OK(s);
|
||||
ASSERT_TRUE(db->Get(ReadOptions(), "a", &value).IsNotFound());
|
||||
} else {
|
||||
ASSERT_NOK(s);
|
||||
ASSERT_OK(db->Get(ReadOptions(), "a", &value));
|
||||
}
|
||||
break;
|
||||
}
|
||||
// Without any promises from the user, range deletion via other `Write()`
|
||||
// APIs are still banned.
|
||||
ASSERT_OK(db->Put(WriteOptions(), "a", "val"));
|
||||
ASSERT_NOK(db->Write(WriteOptions(), &wb));
|
||||
ASSERT_OK(db->Get(ReadOptions(), "a", &value));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_P(TransactionTest, DeferSnapshotTest) {
|
||||
WriteOptions write_options;
|
||||
ReadOptions read_options;
|
||||
|
@ -157,7 +157,9 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
|
||||
// TODO(myabandeh): add an option to allow user skipping this cost
|
||||
SubBatchCounter counter(*GetCFComparatorMap());
|
||||
auto s = batch->Iterate(&counter);
|
||||
assert(s.ok());
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
batch_cnt = counter.BatchCount();
|
||||
WPRecordTick(TXN_DUPLICATE_KEY_OVERHEAD);
|
||||
ROCKS_LOG_DETAILS(info_log_, "Duplicate key overhead: %" PRIu64 " batches",
|
||||
|
Loading…
Reference in New Issue
Block a user