Compare commits
17 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
be84166d2f | ||
|
23e0a29a0e | ||
|
60c1321db1 | ||
|
09c7e96eac | ||
|
9ef7f70c11 | ||
|
553aafaf8e | ||
|
1315375542 | ||
|
a471d31e04 | ||
|
8797aea803 | ||
|
9092ebed39 | ||
|
675f351cc8 | ||
|
11b42f92b1 | ||
|
14d173ec81 | ||
|
8b30b8d2a0 | ||
|
b6471f8a5c | ||
|
3dd5bc2a25 | ||
|
48edcfc17d |
25
HISTORY.md
25
HISTORY.md
@ -1,9 +1,26 @@
|
|||||||
# Rocksdb Change Log
|
# Rocksdb Change Log
|
||||||
|
## 6.17.3 (02/18/2021)
|
||||||
|
### Bug Fixes
|
||||||
|
* Fix `WRITE_PREPARED`, `WRITE_UNPREPARED` TransactionDB `MultiGet()` may return uncommitted data with snapshot.
|
||||||
|
|
||||||
|
## 6.17.2 (02/05/2021)
|
||||||
|
### Bug Fixes
|
||||||
|
* Since 6.15.0, `TransactionDB` returns error `Status`es from calls to `DeleteRange()` and calls to `Write()` where the `WriteBatch` contains a range deletion. Previously such operations may have succeeded while not providing the expected transactional guarantees. There are certain cases where range deletion can still be used on such DBs; see the API doc on `TransactionDB::DeleteRange()` for details.
|
||||||
|
* `OptimisticTransactionDB` now returns error `Status`es from calls to `DeleteRange()` and calls to `Write()` where the `WriteBatch` contains a range deletion. Previously such operations may have succeeded while not providing the expected transactional guarantees.
|
||||||
|
|
||||||
|
## 6.17.1 (01/28/2021)
|
||||||
|
### Behavior Changes
|
||||||
|
* When retryable IO error occurs during compaction, it is mapped to soft error and set the BG error. However, auto resume is not called to clean the soft error since compaction will reschedule by itself. In this change, When retryable IO error occurs during compaction, BG error is not set. User will be informed the error via EventHelper.
|
||||||
|
|
||||||
## 6.17.0 (01/15/2021)
|
## 6.17.0 (01/15/2021)
|
||||||
### Behavior Changes
|
### Behavior Changes
|
||||||
* When verifying full file checksum with `DB::VerifyFileChecksums()`, we now fail with `Status::InvalidArgument` if the name of the checksum generator used for verification does not match the name of the checksum generator used for protecting the file when it was created.
|
* When verifying full file checksum with `DB::VerifyFileChecksums()`, we now fail with `Status::InvalidArgument` if the name of the checksum generator used for verification does not match the name of the checksum generator used for protecting the file when it was created.
|
||||||
* Since RocksDB does not continue write the same file if a file write fails for any reason, the file scope write IO error is treated the same as retryable IO error. More information about error handling of file scope IO error is included in `ErrorHandler::SetBGError`.
|
* Since RocksDB does not continue write the same file if a file write fails for any reason, the file scope write IO error is treated the same as retryable IO error. More information about error handling of file scope IO error is included in `ErrorHandler::SetBGError`.
|
||||||
|
|
||||||
|
### Bug Fixes
|
||||||
|
* Version older than 6.15 cannot decode VersionEdits `WalAddition` and `WalDeletion`, fixed this by changing the encoded format of them to be ignorable by older versions.
|
||||||
|
* Fix a race condition between DB startups and shutdowns in managing the periodic background worker threads. One effect of this race condition could be the process being terminated.
|
||||||
|
|
||||||
### Public API Change
|
### Public API Change
|
||||||
* Add a public API WriteBufferManager::dummy_entries_in_cache_usage() which reports the size of dummy entries stored in cache (passed to WriteBufferManager). Dummy entries are used to account for DataBlocks.
|
* Add a public API WriteBufferManager::dummy_entries_in_cache_usage() which reports the size of dummy entries stored in cache (passed to WriteBufferManager). Dummy entries are used to account for DataBlocks.
|
||||||
|
|
||||||
@ -11,10 +28,6 @@
|
|||||||
### Behavior Changes
|
### Behavior Changes
|
||||||
* Attempting to write a merge operand without explicitly configuring `merge_operator` now fails immediately, causing the DB to enter read-only mode. Previously, failure was deferred until the `merge_operator` was needed by a user read or a background operation.
|
* Attempting to write a merge operand without explicitly configuring `merge_operator` now fails immediately, causing the DB to enter read-only mode. Previously, failure was deferred until the `merge_operator` was needed by a user read or a background operation.
|
||||||
|
|
||||||
### API Changes
|
|
||||||
* `rocksdb_approximate_sizes` and `rocksdb_approximate_sizes_cf` in the C API now requires an error pointer (`char** errptr`) for receiving any error.
|
|
||||||
* All overloads of DB::GetApproximateSizes now return Status, so that any failure to obtain the sizes is indicated to the caller.
|
|
||||||
|
|
||||||
### Bug Fixes
|
### Bug Fixes
|
||||||
* Truncated WALs ending in incomplete records can no longer produce gaps in the recovered data when `WALRecoveryMode::kPointInTimeRecovery` is used. Gaps are still possible when WALs are truncated exactly on record boundaries; for complete protection, users should enable `track_and_verify_wals_in_manifest`.
|
* Truncated WALs ending in incomplete records can no longer produce gaps in the recovered data when `WALRecoveryMode::kPointInTimeRecovery` is used. Gaps are still possible when WALs are truncated exactly on record boundaries; for complete protection, users should enable `track_and_verify_wals_in_manifest`.
|
||||||
* Fix a bug where compressed blocks read by MultiGet are not inserted into the compressed block cache when use_direct_reads = true.
|
* Fix a bug where compressed blocks read by MultiGet are not inserted into the compressed block cache when use_direct_reads = true.
|
||||||
@ -33,6 +46,9 @@
|
|||||||
### Public API Change
|
### Public API Change
|
||||||
* Deprecated public but rarely-used FilterBitsBuilder::CalculateNumEntry, which is replaced with ApproximateNumEntries taking a size_t parameter and returning size_t.
|
* Deprecated public but rarely-used FilterBitsBuilder::CalculateNumEntry, which is replaced with ApproximateNumEntries taking a size_t parameter and returning size_t.
|
||||||
* To improve portability the functions `Env::GetChildren` and `Env::GetChildrenFileAttributes` will no longer return entries for the special directories `.` or `..`.
|
* To improve portability the functions `Env::GetChildren` and `Env::GetChildrenFileAttributes` will no longer return entries for the special directories `.` or `..`.
|
||||||
|
* Added a new option `track_and_verify_wals_in_manifest`. If `true`, the log numbers and sizes of the synced WALs are tracked in MANIFEST, then during DB recovery, if a synced WAL is missing from disk, or the WAL's size does not match the recorded size in MANIFEST, an error will be reported and the recovery will be aborted. Note that this option does not work with secondary instance.
|
||||||
|
* `rocksdb_approximate_sizes` and `rocksdb_approximate_sizes_cf` in the C API now requires an error pointer (`char** errptr`) for receiving any error.
|
||||||
|
* All overloads of DB::GetApproximateSizes now return Status, so that any failure to obtain the sizes is indicated to the caller.
|
||||||
|
|
||||||
## 6.15.0 (11/13/2020)
|
## 6.15.0 (11/13/2020)
|
||||||
### Bug Fixes
|
### Bug Fixes
|
||||||
@ -54,7 +70,6 @@
|
|||||||
### Public API Change
|
### Public API Change
|
||||||
* Deprecate `BlockBasedTableOptions::pin_l0_filter_and_index_blocks_in_cache` and `BlockBasedTableOptions::pin_top_level_index_and_filter`. These options still take effect until users migrate to the replacement APIs in `BlockBasedTableOptions::metadata_cache_options`. Migration guidance can be found in the API comments on the deprecated options.
|
* Deprecate `BlockBasedTableOptions::pin_l0_filter_and_index_blocks_in_cache` and `BlockBasedTableOptions::pin_top_level_index_and_filter`. These options still take effect until users migrate to the replacement APIs in `BlockBasedTableOptions::metadata_cache_options`. Migration guidance can be found in the API comments on the deprecated options.
|
||||||
* Add new API `DB::VerifyFileChecksums` to verify SST file checksum with corresponding entries in the MANIFEST if present. Current implementation requires scanning and recomputing file checksums.
|
* Add new API `DB::VerifyFileChecksums` to verify SST file checksum with corresponding entries in the MANIFEST if present. Current implementation requires scanning and recomputing file checksums.
|
||||||
* Added a new option `track_and_verify_wals_in_manifest`. If `true`, the log numbers and sizes of the synced WALs are tracked in MANIFEST, then during DB recovery, if a synced WAL is missing from disk, or the WAL's size does not match the recorded size in MANIFEST, an error will be reported and the recovery will be aborted. Note that this option does not work with secondary instance.
|
|
||||||
|
|
||||||
### Behavior Changes
|
### Behavior Changes
|
||||||
* The dictionary compression settings specified in `ColumnFamilyOptions::compression_opts` now additionally affect files generated by flush and compaction to non-bottommost level. Previously those settings at most affected files generated by compaction to bottommost level, depending on whether `ColumnFamilyOptions::bottommost_compression_opts` overrode them. Users who relied on dictionary compression settings in `ColumnFamilyOptions::compression_opts` affecting only the bottommost level can keep the behavior by moving their dictionary settings to `ColumnFamilyOptions::bottommost_compression_opts` and setting its `enabled` flag.
|
* The dictionary compression settings specified in `ColumnFamilyOptions::compression_opts` now additionally affect files generated by flush and compaction to non-bottommost level. Previously those settings at most affected files generated by compaction to bottommost level, depending on whether `ColumnFamilyOptions::bottommost_compression_opts` overrode them. Users who relied on dictionary compression settings in `ColumnFamilyOptions::compression_opts` affecting only the bottommost level can keep the behavior by moving their dictionary settings to `ColumnFamilyOptions::bottommost_compression_opts` and setting its `enabled` flag.
|
||||||
|
10
Makefile
10
Makefile
@ -515,6 +515,12 @@ ifeq ($(USE_FOLLY_DISTRIBUTED_MUTEX),1)
|
|||||||
LIB_OBJECTS += $(patsubst %.cpp, $(OBJ_DIR)/%.o, $(FOLLY_SOURCES))
|
LIB_OBJECTS += $(patsubst %.cpp, $(OBJ_DIR)/%.o, $(FOLLY_SOURCES))
|
||||||
endif
|
endif
|
||||||
|
|
||||||
|
# range_tree is not compatible with non GNU libc on ppc64
|
||||||
|
# see https://jira.percona.com/browse/PS-7559
|
||||||
|
ifneq ($(PPC_LIBC_IS_GNU),0)
|
||||||
|
LIB_OBJECTS += $(patsubst %.cc, $(OBJ_DIR)/%.o, $(RANGE_TREE_SOURCES))
|
||||||
|
endif
|
||||||
|
|
||||||
GTEST = $(OBJ_DIR)/$(GTEST_DIR)/gtest/gtest-all.o
|
GTEST = $(OBJ_DIR)/$(GTEST_DIR)/gtest/gtest-all.o
|
||||||
TESTUTIL = $(OBJ_DIR)/test_util/testutil.o
|
TESTUTIL = $(OBJ_DIR)/test_util/testutil.o
|
||||||
TESTHARNESS = $(OBJ_DIR)/test_util/testharness.o $(TESTUTIL) $(GTEST)
|
TESTHARNESS = $(OBJ_DIR)/test_util/testharness.o $(TESTUTIL) $(GTEST)
|
||||||
@ -2152,8 +2158,8 @@ SNAPPY_DOWNLOAD_BASE ?= https://github.com/google/snappy/archive
|
|||||||
LZ4_VER ?= 1.9.3
|
LZ4_VER ?= 1.9.3
|
||||||
LZ4_SHA256 ?= 030644df4611007ff7dc962d981f390361e6c97a34e5cbc393ddfbe019ffe2c1
|
LZ4_SHA256 ?= 030644df4611007ff7dc962d981f390361e6c97a34e5cbc393ddfbe019ffe2c1
|
||||||
LZ4_DOWNLOAD_BASE ?= https://github.com/lz4/lz4/archive
|
LZ4_DOWNLOAD_BASE ?= https://github.com/lz4/lz4/archive
|
||||||
ZSTD_VER ?= 1.4.7
|
ZSTD_VER ?= 1.4.9
|
||||||
ZSTD_SHA256 ?= 085500c8d0b9c83afbc1dc0d8b4889336ad019eba930c5d6a9c6c86c20c769c8
|
ZSTD_SHA256 ?= acf714d98e3db7b876e5b540cbf6dee298f60eb3c0723104f6d3f065cd60d6a8
|
||||||
ZSTD_DOWNLOAD_BASE ?= https://github.com/facebook/zstd/archive
|
ZSTD_DOWNLOAD_BASE ?= https://github.com/facebook/zstd/archive
|
||||||
CURL_SSL_OPTS ?= --tlsv1
|
CURL_SSL_OPTS ?= --tlsv1
|
||||||
|
|
||||||
|
@ -135,11 +135,15 @@ def generate_targets(repo_path, deps_map):
|
|||||||
TARGETS.add_library(
|
TARGETS.add_library(
|
||||||
"rocksdb_lib",
|
"rocksdb_lib",
|
||||||
src_mk["LIB_SOURCES"] +
|
src_mk["LIB_SOURCES"] +
|
||||||
|
# always add range_tree, it's only excluded on ppc64, which we don't use internally
|
||||||
|
src_mk["RANGE_TREE_SOURCES"] +
|
||||||
src_mk["TOOL_LIB_SOURCES"])
|
src_mk["TOOL_LIB_SOURCES"])
|
||||||
# rocksdb_whole_archive_lib
|
# rocksdb_whole_archive_lib
|
||||||
TARGETS.add_library(
|
TARGETS.add_library(
|
||||||
"rocksdb_whole_archive_lib",
|
"rocksdb_whole_archive_lib",
|
||||||
src_mk["LIB_SOURCES"] +
|
src_mk["LIB_SOURCES"] +
|
||||||
|
# always add range_tree, it's only excluded on ppc64, which we don't use internally
|
||||||
|
src_mk["RANGE_TREE_SOURCES"] +
|
||||||
src_mk["TOOL_LIB_SOURCES"],
|
src_mk["TOOL_LIB_SOURCES"],
|
||||||
deps=None,
|
deps=None,
|
||||||
headers=None,
|
headers=None,
|
||||||
|
@ -668,6 +668,23 @@ else
|
|||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
if test -n "`echo $TARGET_ARCHITECTURE | grep ^ppc64`"; then
|
||||||
|
# check for GNU libc on ppc64
|
||||||
|
$CXX -x c++ - -o /dev/null 2>/dev/null <<EOF
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <gnu/libc-version.h>
|
||||||
|
|
||||||
|
int main(int argc, char *argv[]) {
|
||||||
|
printf("GNU libc version: %s\n", gnu_get_libc_version());
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
EOF
|
||||||
|
if [ "$?" != 0 ]; then
|
||||||
|
PPC_LIBC_IS_GNU=0
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
|
||||||
if test "$TRY_SSE_ETC"; then
|
if test "$TRY_SSE_ETC"; then
|
||||||
# The USE_SSE flag now means "attempt to compile with widely-available
|
# The USE_SSE flag now means "attempt to compile with widely-available
|
||||||
# Intel architecture extensions utilized by specific optimizations in the
|
# Intel architecture extensions utilized by specific optimizations in the
|
||||||
@ -861,3 +878,6 @@ echo "LUA_PATH=$LUA_PATH" >> "$OUTPUT"
|
|||||||
if test -n "$USE_FOLLY_DISTRIBUTED_MUTEX"; then
|
if test -n "$USE_FOLLY_DISTRIBUTED_MUTEX"; then
|
||||||
echo "USE_FOLLY_DISTRIBUTED_MUTEX=$USE_FOLLY_DISTRIBUTED_MUTEX" >> "$OUTPUT"
|
echo "USE_FOLLY_DISTRIBUTED_MUTEX=$USE_FOLLY_DISTRIBUTED_MUTEX" >> "$OUTPUT"
|
||||||
fi
|
fi
|
||||||
|
if test -n "$PPC_LIBC_IS_GNU"; then
|
||||||
|
echo "PPC_LIBC_IS_GNU=$PPC_LIBC_IS_GNU" >> "$OUTPUT"
|
||||||
|
fi
|
||||||
|
@ -1718,7 +1718,9 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
|
|||||||
}
|
}
|
||||||
// If timestamp is used, we use read callback to ensure <key,t,s> is returned
|
// If timestamp is used, we use read callback to ensure <key,t,s> is returned
|
||||||
// only if t <= read_opts.timestamp and s <= snapshot.
|
// only if t <= read_opts.timestamp and s <= snapshot.
|
||||||
if (ts_sz > 0 && !get_impl_options.callback) {
|
if (ts_sz > 0) {
|
||||||
|
assert(!get_impl_options
|
||||||
|
.callback); // timestamp with callback is not supported
|
||||||
read_cb.Refresh(snapshot);
|
read_cb.Refresh(snapshot);
|
||||||
get_impl_options.callback = &read_cb;
|
get_impl_options.callback = &read_cb;
|
||||||
}
|
}
|
||||||
@ -2394,8 +2396,9 @@ void DBImpl::MultiGetWithCallback(
|
|||||||
}
|
}
|
||||||
|
|
||||||
GetWithTimestampReadCallback timestamp_read_callback(0);
|
GetWithTimestampReadCallback timestamp_read_callback(0);
|
||||||
ReadCallback* read_callback = nullptr;
|
ReadCallback* read_callback = callback;
|
||||||
if (read_options.timestamp && read_options.timestamp->size() > 0) {
|
if (read_options.timestamp && read_options.timestamp->size() > 0) {
|
||||||
|
assert(!read_callback); // timestamp with callback is not supported
|
||||||
timestamp_read_callback.Refresh(consistent_seqnum);
|
timestamp_read_callback.Refresh(consistent_seqnum);
|
||||||
read_callback = ×tamp_read_callback;
|
read_callback = ×tamp_read_callback;
|
||||||
}
|
}
|
||||||
|
@ -954,6 +954,9 @@ class DBImpl : public DB {
|
|||||||
// is only for the special test of CancelledCompactions
|
// is only for the special test of CancelledCompactions
|
||||||
Status TEST_WaitForCompact(bool waitUnscheduled = false);
|
Status TEST_WaitForCompact(bool waitUnscheduled = false);
|
||||||
|
|
||||||
|
// Get the background error status
|
||||||
|
Status TEST_GetBGError();
|
||||||
|
|
||||||
// Return the maximum overlapping data (in bytes) at next level for any
|
// Return the maximum overlapping data (in bytes) at next level for any
|
||||||
// file at a level >= 1.
|
// file at a level >= 1.
|
||||||
int64_t TEST_MaxNextLevelOverlappingBytes(
|
int64_t TEST_MaxNextLevelOverlappingBytes(
|
||||||
|
@ -177,6 +177,11 @@ Status DBImpl::TEST_WaitForCompact(bool wait_unscheduled) {
|
|||||||
return error_handler_.GetBGError();
|
return error_handler_.GetBGError();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Status DBImpl::TEST_GetBGError() {
|
||||||
|
InstrumentedMutexLock l(&mutex_);
|
||||||
|
return error_handler_.GetBGError();
|
||||||
|
}
|
||||||
|
|
||||||
void DBImpl::TEST_LockMutex() { mutex_.Lock(); }
|
void DBImpl::TEST_LockMutex() { mutex_.Lock(); }
|
||||||
|
|
||||||
void DBImpl::TEST_UnlockMutex() { mutex_.Unlock(); }
|
void DBImpl::TEST_UnlockMutex() { mutex_.Unlock(); }
|
||||||
|
@ -417,11 +417,11 @@ const Status& ErrorHandler::SetBGError(const IOStatus& bg_io_err,
|
|||||||
&new_bg_io_err, db_mutex_,
|
&new_bg_io_err, db_mutex_,
|
||||||
&auto_recovery);
|
&auto_recovery);
|
||||||
if (BackgroundErrorReason::kCompaction == reason) {
|
if (BackgroundErrorReason::kCompaction == reason) {
|
||||||
Status bg_err(new_bg_io_err, Status::Severity::kSoftError);
|
// We map the retryable IO error during compaction to soft error. Since
|
||||||
if (bg_err.severity() > bg_error_.severity()) {
|
// compaction can reschedule by itself. We will not set the BG error in
|
||||||
bg_error_ = bg_err;
|
// this case
|
||||||
}
|
// TODO: a better way to set or clean the retryable IO error which
|
||||||
recover_context_ = context;
|
// happens during compaction SST file write.
|
||||||
return bg_error_;
|
return bg_error_;
|
||||||
} else if (BackgroundErrorReason::kFlushNoWAL == reason ||
|
} else if (BackgroundErrorReason::kFlushNoWAL == reason ||
|
||||||
BackgroundErrorReason::kManifestWriteNoWAL == reason) {
|
BackgroundErrorReason::kManifestWriteNoWAL == reason) {
|
||||||
|
@ -892,15 +892,17 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableError) {
|
|||||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||||
"CompactionJob::OpenCompactionOutputFile",
|
"CompactionJob::OpenCompactionOutputFile",
|
||||||
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
|
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
|
||||||
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||||
|
"DBImpl::BackgroundCompaction:Finish",
|
||||||
|
[&](void*) { CancelAllBackgroundWork(dbfull()); });
|
||||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
ASSERT_OK(Put(Key(1), "val"));
|
ASSERT_OK(Put(Key(1), "val"));
|
||||||
s = Flush();
|
s = Flush();
|
||||||
ASSERT_OK(s);
|
ASSERT_OK(s);
|
||||||
|
|
||||||
s = dbfull()->TEST_WaitForCompact();
|
s = dbfull()->TEST_GetBGError();
|
||||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
ASSERT_OK(s);
|
||||||
|
|
||||||
fault_fs_->SetFilesystemActive(true);
|
fault_fs_->SetFilesystemActive(true);
|
||||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||||
SyncPoint::GetInstance()->DisableProcessing();
|
SyncPoint::GetInstance()->DisableProcessing();
|
||||||
@ -940,14 +942,17 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteFileScopeError) {
|
|||||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||||
"CompactionJob::OpenCompactionOutputFile",
|
"CompactionJob::OpenCompactionOutputFile",
|
||||||
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
|
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
|
||||||
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||||
|
"DBImpl::BackgroundCompaction:Finish",
|
||||||
|
[&](void*) { CancelAllBackgroundWork(dbfull()); });
|
||||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
ASSERT_OK(Put(Key(1), "val"));
|
ASSERT_OK(Put(Key(1), "val"));
|
||||||
s = Flush();
|
s = Flush();
|
||||||
ASSERT_OK(s);
|
ASSERT_OK(s);
|
||||||
|
|
||||||
s = dbfull()->TEST_WaitForCompact();
|
s = dbfull()->TEST_GetBGError();
|
||||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
ASSERT_OK(s);
|
||||||
|
|
||||||
fault_fs_->SetFilesystemActive(true);
|
fault_fs_->SetFilesystemActive(true);
|
||||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||||
@ -2190,8 +2195,7 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableErrorAutoRecover) {
|
|||||||
ASSERT_OK(s);
|
ASSERT_OK(s);
|
||||||
|
|
||||||
s = dbfull()->TEST_WaitForCompact();
|
s = dbfull()->TEST_WaitForCompact();
|
||||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
ASSERT_OK(s);
|
||||||
|
|
||||||
TEST_SYNC_POINT("CompactionWriteRetryableErrorAutoRecover0");
|
TEST_SYNC_POINT("CompactionWriteRetryableErrorAutoRecover0");
|
||||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||||
SyncPoint::GetInstance()->DisableProcessing();
|
SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
@ -10,13 +10,14 @@
|
|||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
namespace ROCKSDB_NAMESPACE {
|
namespace ROCKSDB_NAMESPACE {
|
||||||
|
|
||||||
PeriodicWorkScheduler::PeriodicWorkScheduler(Env* env) {
|
PeriodicWorkScheduler::PeriodicWorkScheduler(Env* env) : timer_mu_(env) {
|
||||||
timer = std::unique_ptr<Timer>(new Timer(env));
|
timer = std::unique_ptr<Timer>(new Timer(env));
|
||||||
}
|
}
|
||||||
|
|
||||||
void PeriodicWorkScheduler::Register(DBImpl* dbi,
|
void PeriodicWorkScheduler::Register(DBImpl* dbi,
|
||||||
unsigned int stats_dump_period_sec,
|
unsigned int stats_dump_period_sec,
|
||||||
unsigned int stats_persist_period_sec) {
|
unsigned int stats_persist_period_sec) {
|
||||||
|
MutexLock l(&timer_mu_);
|
||||||
static std::atomic<uint64_t> initial_delay(0);
|
static std::atomic<uint64_t> initial_delay(0);
|
||||||
timer->Start();
|
timer->Start();
|
||||||
if (stats_dump_period_sec > 0) {
|
if (stats_dump_period_sec > 0) {
|
||||||
@ -41,6 +42,7 @@ void PeriodicWorkScheduler::Register(DBImpl* dbi,
|
|||||||
}
|
}
|
||||||
|
|
||||||
void PeriodicWorkScheduler::Unregister(DBImpl* dbi) {
|
void PeriodicWorkScheduler::Unregister(DBImpl* dbi) {
|
||||||
|
MutexLock l(&timer_mu_);
|
||||||
timer->Cancel(GetTaskName(dbi, "dump_st"));
|
timer->Cancel(GetTaskName(dbi, "dump_st"));
|
||||||
timer->Cancel(GetTaskName(dbi, "pst_st"));
|
timer->Cancel(GetTaskName(dbi, "pst_st"));
|
||||||
timer->Cancel(GetTaskName(dbi, "flush_info_log"));
|
timer->Cancel(GetTaskName(dbi, "flush_info_log"));
|
||||||
@ -78,7 +80,10 @@ PeriodicWorkTestScheduler* PeriodicWorkTestScheduler::Default(Env* env) {
|
|||||||
MutexLock l(&mutex);
|
MutexLock l(&mutex);
|
||||||
if (scheduler.timer.get() != nullptr &&
|
if (scheduler.timer.get() != nullptr &&
|
||||||
scheduler.timer->TEST_GetPendingTaskNum() == 0) {
|
scheduler.timer->TEST_GetPendingTaskNum() == 0) {
|
||||||
scheduler.timer->Shutdown();
|
{
|
||||||
|
MutexLock timer_mu_guard(&scheduler.timer_mu_);
|
||||||
|
scheduler.timer->Shutdown();
|
||||||
|
}
|
||||||
scheduler.timer.reset(new Timer(env));
|
scheduler.timer.reset(new Timer(env));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -42,6 +42,12 @@ class PeriodicWorkScheduler {
|
|||||||
|
|
||||||
protected:
|
protected:
|
||||||
std::unique_ptr<Timer> timer;
|
std::unique_ptr<Timer> timer;
|
||||||
|
// `timer_mu_` serves two purposes currently:
|
||||||
|
// (1) to ensure calls to `Start()` and `Shutdown()` are serialized, as
|
||||||
|
// they are currently not implemented in a thread-safe way; and
|
||||||
|
// (2) to ensure the `Timer::Add()`s and `Timer::Start()` run atomically, and
|
||||||
|
// the `Timer::Cancel()`s and `Timer::Shutdown()` run atomically.
|
||||||
|
port::Mutex timer_mu_;
|
||||||
|
|
||||||
explicit PeriodicWorkScheduler(Env* env);
|
explicit PeriodicWorkScheduler(Env* env);
|
||||||
|
|
||||||
|
@ -226,13 +226,17 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (const auto& wal_addition : wal_additions_) {
|
for (const auto& wal_addition : wal_additions_) {
|
||||||
PutVarint32(dst, kWalAddition);
|
PutVarint32(dst, kWalAddition2);
|
||||||
wal_addition.EncodeTo(dst);
|
std::string encoded;
|
||||||
|
wal_addition.EncodeTo(&encoded);
|
||||||
|
PutLengthPrefixedSlice(dst, encoded);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!wal_deletion_.IsEmpty()) {
|
if (!wal_deletion_.IsEmpty()) {
|
||||||
PutVarint32(dst, kWalDeletion);
|
PutVarint32(dst, kWalDeletion2);
|
||||||
wal_deletion_.EncodeTo(dst);
|
std::string encoded;
|
||||||
|
wal_deletion_.EncodeTo(&encoded);
|
||||||
|
PutLengthPrefixedSlice(dst, encoded);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 0 is default and does not need to be explicitly written
|
// 0 is default and does not need to be explicitly written
|
||||||
@ -375,6 +379,11 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
|
|||||||
|
|
||||||
Status VersionEdit::DecodeFrom(const Slice& src) {
|
Status VersionEdit::DecodeFrom(const Slice& src) {
|
||||||
Clear();
|
Clear();
|
||||||
|
#ifndef NDEBUG
|
||||||
|
bool ignore_ignorable_tags = false;
|
||||||
|
TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:IgnoreIgnorableTags",
|
||||||
|
&ignore_ignorable_tags);
|
||||||
|
#endif
|
||||||
Slice input = src;
|
Slice input = src;
|
||||||
const char* msg = nullptr;
|
const char* msg = nullptr;
|
||||||
uint32_t tag = 0;
|
uint32_t tag = 0;
|
||||||
@ -385,6 +394,11 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
|
|||||||
Slice str;
|
Slice str;
|
||||||
InternalKey key;
|
InternalKey key;
|
||||||
while (msg == nullptr && GetVarint32(&input, &tag)) {
|
while (msg == nullptr && GetVarint32(&input, &tag)) {
|
||||||
|
#ifndef NDEBUG
|
||||||
|
if (ignore_ignorable_tags && tag > kTagSafeIgnoreMask) {
|
||||||
|
tag = kTagSafeIgnoreMask;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
switch (tag) {
|
switch (tag) {
|
||||||
case kDbId:
|
case kDbId:
|
||||||
if (GetLengthPrefixedSlice(&input, &str)) {
|
if (GetLengthPrefixedSlice(&input, &str)) {
|
||||||
@ -542,7 +556,8 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case kBlobFileAddition: {
|
case kBlobFileAddition:
|
||||||
|
case kBlobFileAddition_DEPRECATED: {
|
||||||
BlobFileAddition blob_file_addition;
|
BlobFileAddition blob_file_addition;
|
||||||
const Status s = blob_file_addition.DecodeFrom(&input);
|
const Status s = blob_file_addition.DecodeFrom(&input);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
@ -553,7 +568,8 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case kBlobFileGarbage: {
|
case kBlobFileGarbage:
|
||||||
|
case kBlobFileGarbage_DEPRECATED: {
|
||||||
BlobFileGarbage blob_file_garbage;
|
BlobFileGarbage blob_file_garbage;
|
||||||
const Status s = blob_file_garbage.DecodeFrom(&input);
|
const Status s = blob_file_garbage.DecodeFrom(&input);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
@ -575,6 +591,23 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case kWalAddition2: {
|
||||||
|
Slice encoded;
|
||||||
|
if (!GetLengthPrefixedSlice(&input, &encoded)) {
|
||||||
|
msg = "WalAddition not prefixed by length";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
WalAddition wal_addition;
|
||||||
|
const Status s = wal_addition.DecodeFrom(&encoded);
|
||||||
|
if (!s.ok()) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
wal_additions_.emplace_back(std::move(wal_addition));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
case kWalDeletion: {
|
case kWalDeletion: {
|
||||||
WalDeletion wal_deletion;
|
WalDeletion wal_deletion;
|
||||||
const Status s = wal_deletion.DecodeFrom(&input);
|
const Status s = wal_deletion.DecodeFrom(&input);
|
||||||
@ -586,6 +619,23 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case kWalDeletion2: {
|
||||||
|
Slice encoded;
|
||||||
|
if (!GetLengthPrefixedSlice(&input, &encoded)) {
|
||||||
|
msg = "WalDeletion not prefixed by length";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
WalDeletion wal_deletion;
|
||||||
|
const Status s = wal_deletion.DecodeFrom(&encoded);
|
||||||
|
if (!s.ok()) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
wal_deletion_ = std::move(wal_deletion);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
case kColumnFamily:
|
case kColumnFamily:
|
||||||
if (!GetVarint32(&input, &column_family_)) {
|
if (!GetVarint32(&input, &column_family_)) {
|
||||||
if (!msg) {
|
if (!msg) {
|
||||||
|
@ -52,16 +52,21 @@ enum Tag : uint32_t {
|
|||||||
|
|
||||||
kInAtomicGroup = 300,
|
kInAtomicGroup = 300,
|
||||||
|
|
||||||
|
kBlobFileAddition = 400,
|
||||||
|
kBlobFileGarbage,
|
||||||
|
|
||||||
// Mask for an unidentified tag from the future which can be safely ignored.
|
// Mask for an unidentified tag from the future which can be safely ignored.
|
||||||
kTagSafeIgnoreMask = 1 << 13,
|
kTagSafeIgnoreMask = 1 << 13,
|
||||||
|
|
||||||
// Forward compatible (aka ignorable) records
|
// Forward compatible (aka ignorable) records
|
||||||
kDbId,
|
kDbId,
|
||||||
kBlobFileAddition,
|
kBlobFileAddition_DEPRECATED,
|
||||||
kBlobFileGarbage,
|
kBlobFileGarbage_DEPRECATED,
|
||||||
kWalAddition,
|
kWalAddition,
|
||||||
kWalDeletion,
|
kWalDeletion,
|
||||||
kFullHistoryTsLow,
|
kFullHistoryTsLow,
|
||||||
|
kWalAddition2,
|
||||||
|
kWalDeletion2,
|
||||||
};
|
};
|
||||||
|
|
||||||
enum NewFileCustomTag : uint32_t {
|
enum NewFileCustomTag : uint32_t {
|
||||||
|
@ -324,14 +324,22 @@ TEST_F(VersionEditTest, AddWalEncodeDecode) {
|
|||||||
TestEncodeDecode(edit);
|
TestEncodeDecode(edit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static std::string PrefixEncodedWalAdditionWithLength(
|
||||||
|
const std::string& encoded) {
|
||||||
|
std::string ret;
|
||||||
|
PutVarint32(&ret, Tag::kWalAddition2);
|
||||||
|
PutLengthPrefixedSlice(&ret, encoded);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(VersionEditTest, AddWalDecodeBadLogNumber) {
|
TEST_F(VersionEditTest, AddWalDecodeBadLogNumber) {
|
||||||
std::string encoded;
|
std::string encoded;
|
||||||
PutVarint32(&encoded, Tag::kWalAddition);
|
|
||||||
|
|
||||||
{
|
{
|
||||||
// No log number.
|
// No log number.
|
||||||
|
std::string encoded_edit = PrefixEncodedWalAdditionWithLength(encoded);
|
||||||
VersionEdit edit;
|
VersionEdit edit;
|
||||||
Status s = edit.DecodeFrom(encoded);
|
Status s = edit.DecodeFrom(encoded_edit);
|
||||||
ASSERT_TRUE(s.IsCorruption());
|
ASSERT_TRUE(s.IsCorruption());
|
||||||
ASSERT_TRUE(s.ToString().find("Error decoding WAL log number") !=
|
ASSERT_TRUE(s.ToString().find("Error decoding WAL log number") !=
|
||||||
std::string::npos)
|
std::string::npos)
|
||||||
@ -345,8 +353,10 @@ TEST_F(VersionEditTest, AddWalDecodeBadLogNumber) {
|
|||||||
unsigned char* ptr = reinterpret_cast<unsigned char*>(&c);
|
unsigned char* ptr = reinterpret_cast<unsigned char*>(&c);
|
||||||
*ptr = 128;
|
*ptr = 128;
|
||||||
encoded.append(1, c);
|
encoded.append(1, c);
|
||||||
|
|
||||||
|
std::string encoded_edit = PrefixEncodedWalAdditionWithLength(encoded);
|
||||||
VersionEdit edit;
|
VersionEdit edit;
|
||||||
Status s = edit.DecodeFrom(encoded);
|
Status s = edit.DecodeFrom(encoded_edit);
|
||||||
ASSERT_TRUE(s.IsCorruption());
|
ASSERT_TRUE(s.IsCorruption());
|
||||||
ASSERT_TRUE(s.ToString().find("Error decoding WAL log number") !=
|
ASSERT_TRUE(s.ToString().find("Error decoding WAL log number") !=
|
||||||
std::string::npos)
|
std::string::npos)
|
||||||
@ -358,14 +368,14 @@ TEST_F(VersionEditTest, AddWalDecodeBadTag) {
|
|||||||
constexpr WalNumber kLogNumber = 100;
|
constexpr WalNumber kLogNumber = 100;
|
||||||
constexpr uint64_t kSizeInBytes = 100;
|
constexpr uint64_t kSizeInBytes = 100;
|
||||||
|
|
||||||
std::string encoded_without_tag;
|
std::string encoded;
|
||||||
PutVarint32(&encoded_without_tag, Tag::kWalAddition);
|
PutVarint64(&encoded, kLogNumber);
|
||||||
PutVarint64(&encoded_without_tag, kLogNumber);
|
|
||||||
|
|
||||||
{
|
{
|
||||||
// No tag.
|
// No tag.
|
||||||
|
std::string encoded_edit = PrefixEncodedWalAdditionWithLength(encoded);
|
||||||
VersionEdit edit;
|
VersionEdit edit;
|
||||||
Status s = edit.DecodeFrom(encoded_without_tag);
|
Status s = edit.DecodeFrom(encoded_edit);
|
||||||
ASSERT_TRUE(s.IsCorruption());
|
ASSERT_TRUE(s.IsCorruption());
|
||||||
ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos)
|
ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos)
|
||||||
<< s.ToString();
|
<< s.ToString();
|
||||||
@ -373,12 +383,15 @@ TEST_F(VersionEditTest, AddWalDecodeBadTag) {
|
|||||||
|
|
||||||
{
|
{
|
||||||
// Only has size tag, no terminate tag.
|
// Only has size tag, no terminate tag.
|
||||||
std::string encoded_with_size = encoded_without_tag;
|
std::string encoded_with_size = encoded;
|
||||||
PutVarint32(&encoded_with_size,
|
PutVarint32(&encoded_with_size,
|
||||||
static_cast<uint32_t>(WalAdditionTag::kSyncedSize));
|
static_cast<uint32_t>(WalAdditionTag::kSyncedSize));
|
||||||
PutVarint64(&encoded_with_size, kSizeInBytes);
|
PutVarint64(&encoded_with_size, kSizeInBytes);
|
||||||
|
|
||||||
|
std::string encoded_edit =
|
||||||
|
PrefixEncodedWalAdditionWithLength(encoded_with_size);
|
||||||
VersionEdit edit;
|
VersionEdit edit;
|
||||||
Status s = edit.DecodeFrom(encoded_with_size);
|
Status s = edit.DecodeFrom(encoded_edit);
|
||||||
ASSERT_TRUE(s.IsCorruption());
|
ASSERT_TRUE(s.IsCorruption());
|
||||||
ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos)
|
ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos)
|
||||||
<< s.ToString();
|
<< s.ToString();
|
||||||
@ -386,11 +399,14 @@ TEST_F(VersionEditTest, AddWalDecodeBadTag) {
|
|||||||
|
|
||||||
{
|
{
|
||||||
// Only has terminate tag.
|
// Only has terminate tag.
|
||||||
std::string encoded_with_terminate = encoded_without_tag;
|
std::string encoded_with_terminate = encoded;
|
||||||
PutVarint32(&encoded_with_terminate,
|
PutVarint32(&encoded_with_terminate,
|
||||||
static_cast<uint32_t>(WalAdditionTag::kTerminate));
|
static_cast<uint32_t>(WalAdditionTag::kTerminate));
|
||||||
|
|
||||||
|
std::string encoded_edit =
|
||||||
|
PrefixEncodedWalAdditionWithLength(encoded_with_terminate);
|
||||||
VersionEdit edit;
|
VersionEdit edit;
|
||||||
ASSERT_OK(edit.DecodeFrom(encoded_with_terminate));
|
ASSERT_OK(edit.DecodeFrom(encoded_edit));
|
||||||
auto& wal_addition = edit.GetWalAdditions()[0];
|
auto& wal_addition = edit.GetWalAdditions()[0];
|
||||||
ASSERT_EQ(wal_addition.GetLogNumber(), kLogNumber);
|
ASSERT_EQ(wal_addition.GetLogNumber(), kLogNumber);
|
||||||
ASSERT_FALSE(wal_addition.GetMetadata().HasSyncedSize());
|
ASSERT_FALSE(wal_addition.GetMetadata().HasSyncedSize());
|
||||||
@ -401,15 +417,15 @@ TEST_F(VersionEditTest, AddWalDecodeNoSize) {
|
|||||||
constexpr WalNumber kLogNumber = 100;
|
constexpr WalNumber kLogNumber = 100;
|
||||||
|
|
||||||
std::string encoded;
|
std::string encoded;
|
||||||
PutVarint32(&encoded, Tag::kWalAddition);
|
|
||||||
PutVarint64(&encoded, kLogNumber);
|
PutVarint64(&encoded, kLogNumber);
|
||||||
PutVarint32(&encoded, static_cast<uint32_t>(WalAdditionTag::kSyncedSize));
|
PutVarint32(&encoded, static_cast<uint32_t>(WalAdditionTag::kSyncedSize));
|
||||||
// No real size after the size tag.
|
// No real size after the size tag.
|
||||||
|
|
||||||
{
|
{
|
||||||
// Without terminate tag.
|
// Without terminate tag.
|
||||||
|
std::string encoded_edit = PrefixEncodedWalAdditionWithLength(encoded);
|
||||||
VersionEdit edit;
|
VersionEdit edit;
|
||||||
Status s = edit.DecodeFrom(encoded);
|
Status s = edit.DecodeFrom(encoded_edit);
|
||||||
ASSERT_TRUE(s.IsCorruption());
|
ASSERT_TRUE(s.IsCorruption());
|
||||||
ASSERT_TRUE(s.ToString().find("Error decoding WAL file size") !=
|
ASSERT_TRUE(s.ToString().find("Error decoding WAL file size") !=
|
||||||
std::string::npos)
|
std::string::npos)
|
||||||
@ -419,8 +435,10 @@ TEST_F(VersionEditTest, AddWalDecodeNoSize) {
|
|||||||
{
|
{
|
||||||
// With terminate tag.
|
// With terminate tag.
|
||||||
PutVarint32(&encoded, static_cast<uint32_t>(WalAdditionTag::kTerminate));
|
PutVarint32(&encoded, static_cast<uint32_t>(WalAdditionTag::kTerminate));
|
||||||
|
|
||||||
|
std::string encoded_edit = PrefixEncodedWalAdditionWithLength(encoded);
|
||||||
VersionEdit edit;
|
VersionEdit edit;
|
||||||
Status s = edit.DecodeFrom(encoded);
|
Status s = edit.DecodeFrom(encoded_edit);
|
||||||
ASSERT_TRUE(s.IsCorruption());
|
ASSERT_TRUE(s.IsCorruption());
|
||||||
// The terminate tag is misunderstood as the size.
|
// The terminate tag is misunderstood as the size.
|
||||||
ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos)
|
ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos)
|
||||||
@ -515,6 +533,62 @@ TEST_F(VersionEditTest, FullHistoryTsLow) {
|
|||||||
TestEncodeDecode(edit);
|
TestEncodeDecode(edit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tests that if RocksDB is downgraded, the new types of VersionEdits
|
||||||
|
// that have a tag larger than kTagSafeIgnoreMask can be safely ignored.
|
||||||
|
TEST_F(VersionEditTest, IgnorableTags) {
|
||||||
|
SyncPoint::GetInstance()->SetCallBack(
|
||||||
|
"VersionEdit::EncodeTo:IgnoreIgnorableTags", [&](void* arg) {
|
||||||
|
bool* ignore = static_cast<bool*>(arg);
|
||||||
|
*ignore = true;
|
||||||
|
});
|
||||||
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
|
constexpr uint64_t kPrevLogNumber = 100;
|
||||||
|
constexpr uint64_t kLogNumber = 200;
|
||||||
|
constexpr uint64_t kNextFileNumber = 300;
|
||||||
|
constexpr uint64_t kColumnFamilyId = 400;
|
||||||
|
|
||||||
|
VersionEdit edit;
|
||||||
|
// Add some ignorable entries.
|
||||||
|
for (int i = 0; i < 2; i++) {
|
||||||
|
edit.AddWal(i + 1, WalMetadata(i + 2));
|
||||||
|
}
|
||||||
|
edit.SetDBId("db_id");
|
||||||
|
// Add unignorable entries.
|
||||||
|
edit.SetPrevLogNumber(kPrevLogNumber);
|
||||||
|
edit.SetLogNumber(kLogNumber);
|
||||||
|
// Add more ignorable entries.
|
||||||
|
edit.DeleteWalsBefore(100);
|
||||||
|
// Add unignorable entry.
|
||||||
|
edit.SetNextFile(kNextFileNumber);
|
||||||
|
// Add more ignorable entries.
|
||||||
|
edit.SetFullHistoryTsLow("ts");
|
||||||
|
// Add unignorable entry.
|
||||||
|
edit.SetColumnFamily(kColumnFamilyId);
|
||||||
|
|
||||||
|
std::string encoded;
|
||||||
|
ASSERT_TRUE(edit.EncodeTo(&encoded));
|
||||||
|
|
||||||
|
VersionEdit decoded;
|
||||||
|
ASSERT_OK(decoded.DecodeFrom(encoded));
|
||||||
|
|
||||||
|
// Check that all ignorable entries are ignored.
|
||||||
|
ASSERT_FALSE(decoded.HasDbId());
|
||||||
|
ASSERT_FALSE(decoded.HasFullHistoryTsLow());
|
||||||
|
ASSERT_FALSE(decoded.IsWalAddition());
|
||||||
|
ASSERT_FALSE(decoded.IsWalDeletion());
|
||||||
|
ASSERT_TRUE(decoded.GetWalAdditions().empty());
|
||||||
|
ASSERT_TRUE(decoded.GetWalDeletion().IsEmpty());
|
||||||
|
|
||||||
|
// Check that unignorable entries are still present.
|
||||||
|
ASSERT_EQ(edit.GetPrevLogNumber(), kPrevLogNumber);
|
||||||
|
ASSERT_EQ(edit.GetLogNumber(), kLogNumber);
|
||||||
|
ASSERT_EQ(edit.GetNextFile(), kNextFileNumber);
|
||||||
|
ASSERT_EQ(edit.GetColumnFamily(), kColumnFamilyId);
|
||||||
|
|
||||||
|
SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
@ -51,6 +51,8 @@ struct OptimisticTransactionDBOptions {
|
|||||||
uint32_t occ_lock_buckets = (1 << 20);
|
uint32_t occ_lock_buckets = (1 << 20);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Range deletions (including those in `WriteBatch`es passed to `Write()`) are
|
||||||
|
// incompatible with `OptimisticTransactionDB` and will return a non-OK `Status`
|
||||||
class OptimisticTransactionDB : public StackableDB {
|
class OptimisticTransactionDB : public StackableDB {
|
||||||
public:
|
public:
|
||||||
// Open an OptimisticTransactionDB similar to DB::Open().
|
// Open an OptimisticTransactionDB similar to DB::Open().
|
||||||
|
@ -344,6 +344,17 @@ class TransactionDB : public StackableDB {
|
|||||||
// falls back to the un-optimized version of ::Write
|
// falls back to the un-optimized version of ::Write
|
||||||
return Write(opts, updates);
|
return Write(opts, updates);
|
||||||
}
|
}
|
||||||
|
// Transactional `DeleteRange()` is not yet supported.
|
||||||
|
// However, users who know their deleted range does not conflict with
|
||||||
|
// anything can still use it via the `Write()` API. In all cases, the
|
||||||
|
// `Write()` overload specifying `TransactionDBWriteOptimizations` must be
|
||||||
|
// used and `skip_concurrency_control` must be set. When using either
|
||||||
|
// WRITE_PREPARED or WRITE_UNPREPARED , `skip_duplicate_key_check` must
|
||||||
|
// additionally be set.
|
||||||
|
virtual Status DeleteRange(const WriteOptions&, ColumnFamilyHandle*,
|
||||||
|
const Slice&, const Slice&) override {
|
||||||
|
return Status::NotSupported();
|
||||||
|
}
|
||||||
// Open a TransactionDB similar to DB::Open().
|
// Open a TransactionDB similar to DB::Open().
|
||||||
// Internally call PrepareWrap() and WrapDB()
|
// Internally call PrepareWrap() and WrapDB()
|
||||||
// If the return status is not ok, then dbptr is set to nullptr.
|
// If the return status is not ok, then dbptr is set to nullptr.
|
||||||
|
@ -6,7 +6,7 @@
|
|||||||
|
|
||||||
#define ROCKSDB_MAJOR 6
|
#define ROCKSDB_MAJOR 6
|
||||||
#define ROCKSDB_MINOR 17
|
#define ROCKSDB_MINOR 17
|
||||||
#define ROCKSDB_PATCH 0
|
#define ROCKSDB_PATCH 3
|
||||||
|
|
||||||
// Do not use these. We made the mistake of declaring macros starting with
|
// Do not use these. We made the mistake of declaring macros starting with
|
||||||
// double underscore. Now we have to live with our choice. We'll deprecate these
|
// double underscore. Now we have to live with our choice. We'll deprecate these
|
||||||
|
@ -684,7 +684,8 @@ IOStatus WinFileSystem::GetChildren(const std::string& dir,
|
|||||||
// which appear only on some platforms
|
// which appear only on some platforms
|
||||||
const bool ignore =
|
const bool ignore =
|
||||||
((data.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) != 0) &&
|
((data.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) != 0) &&
|
||||||
(strcmp(data.cFileName, ".") == 0 || strcmp(data.cFileName, "..") == 0);
|
(RX_FNCMP(data.cFileName, ".") == 0 ||
|
||||||
|
RX_FNCMP(data.cFileName, "..") == 0);
|
||||||
if (!ignore) {
|
if (!ignore) {
|
||||||
auto x = RX_FILESTRING(data.cFileName, RX_FNLEN(data.cFileName));
|
auto x = RX_FILESTRING(data.cFileName, RX_FNLEN(data.cFileName));
|
||||||
result->push_back(FN_TO_RX(x));
|
result->push_back(FN_TO_RX(x));
|
||||||
|
@ -355,6 +355,7 @@ extern void SetCpuPriority(ThreadId id, CpuPriority priority);
|
|||||||
#define RX_FILESTRING std::wstring
|
#define RX_FILESTRING std::wstring
|
||||||
#define RX_FN(a) ROCKSDB_NAMESPACE::port::utf8_to_utf16(a)
|
#define RX_FN(a) ROCKSDB_NAMESPACE::port::utf8_to_utf16(a)
|
||||||
#define FN_TO_RX(a) ROCKSDB_NAMESPACE::port::utf16_to_utf8(a)
|
#define FN_TO_RX(a) ROCKSDB_NAMESPACE::port::utf16_to_utf8(a)
|
||||||
|
#define RX_FNCMP(a, b) ::wcscmp(a, RX_FN(b).c_str())
|
||||||
#define RX_FNLEN(a) ::wcslen(a)
|
#define RX_FNLEN(a) ::wcslen(a)
|
||||||
|
|
||||||
#define RX_DeleteFile DeleteFileW
|
#define RX_DeleteFile DeleteFileW
|
||||||
@ -379,6 +380,7 @@ extern void SetCpuPriority(ThreadId id, CpuPriority priority);
|
|||||||
#define RX_FILESTRING std::string
|
#define RX_FILESTRING std::string
|
||||||
#define RX_FN(a) a
|
#define RX_FN(a) a
|
||||||
#define FN_TO_RX(a) a
|
#define FN_TO_RX(a) a
|
||||||
|
#define RX_FNCMP(a, b) strcmp(a, b)
|
||||||
#define RX_FNLEN(a) strlen(a)
|
#define RX_FNLEN(a) strlen(a)
|
||||||
|
|
||||||
#define RX_DeleteFile DeleteFileA
|
#define RX_DeleteFile DeleteFileA
|
||||||
|
30
src.mk
30
src.mk
@ -255,20 +255,6 @@ LIB_SOURCES = \
|
|||||||
utilities/transactions/lock/lock_manager.cc \
|
utilities/transactions/lock/lock_manager.cc \
|
||||||
utilities/transactions/lock/point/point_lock_tracker.cc \
|
utilities/transactions/lock/point/point_lock_tracker.cc \
|
||||||
utilities/transactions/lock/point/point_lock_manager.cc \
|
utilities/transactions/lock/point/point_lock_manager.cc \
|
||||||
utilities/transactions/lock/range/range_tree/lib/locktree/concurrent_tree.cc \
|
|
||||||
utilities/transactions/lock/range/range_tree/lib/locktree/keyrange.cc \
|
|
||||||
utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc \
|
|
||||||
utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc \
|
|
||||||
utilities/transactions/lock/range/range_tree/lib/locktree/manager.cc \
|
|
||||||
utilities/transactions/lock/range/range_tree/lib/locktree/range_buffer.cc \
|
|
||||||
utilities/transactions/lock/range/range_tree/lib/locktree/treenode.cc \
|
|
||||||
utilities/transactions/lock/range/range_tree/lib/locktree/txnid_set.cc \
|
|
||||||
utilities/transactions/lock/range/range_tree/lib/locktree/wfg.cc \
|
|
||||||
utilities/transactions/lock/range/range_tree/lib/standalone_port.cc \
|
|
||||||
utilities/transactions/lock/range/range_tree/lib/util/dbt.cc \
|
|
||||||
utilities/transactions/lock/range/range_tree/lib/util/memarena.cc \
|
|
||||||
utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc \
|
|
||||||
utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc \
|
|
||||||
utilities/transactions/optimistic_transaction.cc \
|
utilities/transactions/optimistic_transaction.cc \
|
||||||
utilities/transactions/optimistic_transaction_db_impl.cc \
|
utilities/transactions/optimistic_transaction_db_impl.cc \
|
||||||
utilities/transactions/pessimistic_transaction.cc \
|
utilities/transactions/pessimistic_transaction.cc \
|
||||||
@ -300,6 +286,22 @@ LIB_SOURCES_ASM =
|
|||||||
LIB_SOURCES_C =
|
LIB_SOURCES_C =
|
||||||
endif
|
endif
|
||||||
|
|
||||||
|
RANGE_TREE_SOURCES =\
|
||||||
|
utilities/transactions/lock/range/range_tree/lib/locktree/concurrent_tree.cc \
|
||||||
|
utilities/transactions/lock/range/range_tree/lib/locktree/keyrange.cc \
|
||||||
|
utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc \
|
||||||
|
utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc \
|
||||||
|
utilities/transactions/lock/range/range_tree/lib/locktree/manager.cc \
|
||||||
|
utilities/transactions/lock/range/range_tree/lib/locktree/range_buffer.cc \
|
||||||
|
utilities/transactions/lock/range/range_tree/lib/locktree/treenode.cc \
|
||||||
|
utilities/transactions/lock/range/range_tree/lib/locktree/txnid_set.cc \
|
||||||
|
utilities/transactions/lock/range/range_tree/lib/locktree/wfg.cc \
|
||||||
|
utilities/transactions/lock/range/range_tree/lib/standalone_port.cc \
|
||||||
|
utilities/transactions/lock/range/range_tree/lib/util/dbt.cc \
|
||||||
|
utilities/transactions/lock/range/range_tree/lib/util/memarena.cc \
|
||||||
|
utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc \
|
||||||
|
utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc
|
||||||
|
|
||||||
TOOL_LIB_SOURCES = \
|
TOOL_LIB_SOURCES = \
|
||||||
tools/io_tracer_parser_tool.cc \
|
tools/io_tracer_parser_tool.cc \
|
||||||
tools/ldb_cmd.cc \
|
tools/ldb_cmd.cc \
|
||||||
|
@ -22,6 +22,9 @@ namespace ROCKSDB_NAMESPACE {
|
|||||||
|
|
||||||
// A Timer class to handle repeated work.
|
// A Timer class to handle repeated work.
|
||||||
//
|
//
|
||||||
|
// `Start()` and `Shutdown()` are currently not thread-safe. The client must
|
||||||
|
// serialize calls to these two member functions.
|
||||||
|
//
|
||||||
// A single timer instance can handle multiple functions via a single thread.
|
// A single timer instance can handle multiple functions via a single thread.
|
||||||
// It is better to leave long running work to a dedicated thread pool.
|
// It is better to leave long running work to a dedicated thread pool.
|
||||||
//
|
//
|
||||||
|
@ -153,7 +153,12 @@ typedef struct toku_mutex_aligned {
|
|||||||
{ .pmutex = PTHREAD_MUTEX_INITIALIZER, .psi_mutex = nullptr }
|
{ .pmutex = PTHREAD_MUTEX_INITIALIZER, .psi_mutex = nullptr }
|
||||||
#endif // defined(TOKU_PTHREAD_DEBUG)
|
#endif // defined(TOKU_PTHREAD_DEBUG)
|
||||||
#else // __FreeBSD__, __linux__, at least
|
#else // __FreeBSD__, __linux__, at least
|
||||||
|
#if defined(__GLIBC__)
|
||||||
#define TOKU_MUTEX_ADAPTIVE PTHREAD_MUTEX_ADAPTIVE_NP
|
#define TOKU_MUTEX_ADAPTIVE PTHREAD_MUTEX_ADAPTIVE_NP
|
||||||
|
#else
|
||||||
|
// not all libc (e.g. musl) implement NP (Non-POSIX) attributes
|
||||||
|
#define TOKU_MUTEX_ADAPTIVE PTHREAD_MUTEX_DEFAULT
|
||||||
|
#endif
|
||||||
#if defined(TOKU_PTHREAD_DEBUG)
|
#if defined(TOKU_PTHREAD_DEBUG)
|
||||||
#define TOKU_ADAPTIVE_MUTEX_INITIALIZER \
|
#define TOKU_ADAPTIVE_MUTEX_INITIALIZER \
|
||||||
{ \
|
{ \
|
||||||
|
@ -46,6 +46,22 @@ class OptimisticTransactionDBImpl : public OptimisticTransactionDB {
|
|||||||
const OptimisticTransactionOptions& txn_options,
|
const OptimisticTransactionOptions& txn_options,
|
||||||
Transaction* old_txn) override;
|
Transaction* old_txn) override;
|
||||||
|
|
||||||
|
// Transactional `DeleteRange()` is not yet supported.
|
||||||
|
virtual Status DeleteRange(const WriteOptions&, ColumnFamilyHandle*,
|
||||||
|
const Slice&, const Slice&) override {
|
||||||
|
return Status::NotSupported();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Range deletions also must not be snuck into `WriteBatch`es as they are
|
||||||
|
// incompatible with `OptimisticTransactionDB`.
|
||||||
|
virtual Status Write(const WriteOptions& write_opts,
|
||||||
|
WriteBatch* batch) override {
|
||||||
|
if (batch->HasDeleteRange()) {
|
||||||
|
return Status::NotSupported();
|
||||||
|
}
|
||||||
|
return OptimisticTransactionDB::Write(write_opts, batch);
|
||||||
|
}
|
||||||
|
|
||||||
size_t GetLockBucketsSize() const { return bucketed_locks_.size(); }
|
size_t GetLockBucketsSize() const { return bucketed_locks_.size(); }
|
||||||
|
|
||||||
OccValidationPolicy GetValidatePolicy() const { return validate_policy_; }
|
OccValidationPolicy GetValidatePolicy() const { return validate_policy_; }
|
||||||
|
@ -1033,6 +1033,17 @@ TEST_P(OptimisticTransactionTest, IteratorTest) {
|
|||||||
delete txn;
|
delete txn;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_P(OptimisticTransactionTest, DeleteRangeSupportTest) {
|
||||||
|
// `OptimisticTransactionDB` does not allow range deletion in any API.
|
||||||
|
ASSERT_TRUE(
|
||||||
|
txn_db
|
||||||
|
->DeleteRange(WriteOptions(), txn_db->DefaultColumnFamily(), "a", "b")
|
||||||
|
.IsNotSupported());
|
||||||
|
WriteBatch wb;
|
||||||
|
ASSERT_OK(wb.DeleteRange("a", "b"));
|
||||||
|
ASSERT_NOK(txn_db->Write(WriteOptions(), &wb));
|
||||||
|
}
|
||||||
|
|
||||||
TEST_P(OptimisticTransactionTest, SavepointTest) {
|
TEST_P(OptimisticTransactionTest, SavepointTest) {
|
||||||
WriteOptions write_options;
|
WriteOptions write_options;
|
||||||
ReadOptions read_options, snapshot_read_options;
|
ReadOptions read_options, snapshot_read_options;
|
||||||
|
@ -2925,6 +2925,47 @@ TEST_P(TransactionTest, MultiGetLargeBatchedTest) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_P(TransactionTest, MultiGetSnapshot) {
|
||||||
|
WriteOptions write_options;
|
||||||
|
TransactionOptions transaction_options;
|
||||||
|
Transaction* txn1 = db->BeginTransaction(write_options, transaction_options);
|
||||||
|
|
||||||
|
Slice key = "foo";
|
||||||
|
|
||||||
|
Status s = txn1->Put(key, "bar");
|
||||||
|
ASSERT_OK(s);
|
||||||
|
|
||||||
|
s = txn1->SetName("test");
|
||||||
|
ASSERT_OK(s);
|
||||||
|
|
||||||
|
s = txn1->Prepare();
|
||||||
|
ASSERT_OK(s);
|
||||||
|
|
||||||
|
// Get snapshot between prepare and commit
|
||||||
|
// Un-committed data should be invisible to other transactions
|
||||||
|
const Snapshot* s1 = db->GetSnapshot();
|
||||||
|
|
||||||
|
s = txn1->Commit();
|
||||||
|
ASSERT_OK(s);
|
||||||
|
delete txn1;
|
||||||
|
|
||||||
|
Transaction* txn2 = db->BeginTransaction(write_options, transaction_options);
|
||||||
|
ReadOptions read_options;
|
||||||
|
read_options.snapshot = s1;
|
||||||
|
|
||||||
|
std::vector<Slice> keys;
|
||||||
|
std::vector<PinnableSlice> values(1);
|
||||||
|
std::vector<Status> statuses(1);
|
||||||
|
keys.push_back(key);
|
||||||
|
auto cfd = db->DefaultColumnFamily();
|
||||||
|
txn2->MultiGet(read_options, cfd, 1, keys.data(), values.data(),
|
||||||
|
statuses.data());
|
||||||
|
ASSERT_TRUE(statuses[0].IsNotFound());
|
||||||
|
delete txn2;
|
||||||
|
|
||||||
|
db->ReleaseSnapshot(s1);
|
||||||
|
}
|
||||||
|
|
||||||
TEST_P(TransactionTest, ColumnFamiliesTest2) {
|
TEST_P(TransactionTest, ColumnFamiliesTest2) {
|
||||||
WriteOptions write_options;
|
WriteOptions write_options;
|
||||||
ReadOptions read_options, snapshot_read_options;
|
ReadOptions read_options, snapshot_read_options;
|
||||||
@ -4835,6 +4876,56 @@ TEST_P(TransactionTest, MergeTest) {
|
|||||||
ASSERT_EQ("a,3", value);
|
ASSERT_EQ("a,3", value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_P(TransactionTest, DeleteRangeSupportTest) {
|
||||||
|
// The `DeleteRange()` API is banned everywhere.
|
||||||
|
ASSERT_TRUE(
|
||||||
|
db->DeleteRange(WriteOptions(), db->DefaultColumnFamily(), "a", "b")
|
||||||
|
.IsNotSupported());
|
||||||
|
|
||||||
|
// But range deletions can be added via the `Write()` API by specifying the
|
||||||
|
// proper flags to promise there are no conflicts according to the DB type
|
||||||
|
// (see `TransactionDB::DeleteRange()` API doc for details).
|
||||||
|
for (bool skip_concurrency_control : {false, true}) {
|
||||||
|
for (bool skip_duplicate_key_check : {false, true}) {
|
||||||
|
ASSERT_OK(db->Put(WriteOptions(), "a", "val"));
|
||||||
|
WriteBatch wb;
|
||||||
|
ASSERT_OK(wb.DeleteRange("a", "b"));
|
||||||
|
TransactionDBWriteOptimizations flags;
|
||||||
|
flags.skip_concurrency_control = skip_concurrency_control;
|
||||||
|
flags.skip_duplicate_key_check = skip_duplicate_key_check;
|
||||||
|
Status s = db->Write(WriteOptions(), flags, &wb);
|
||||||
|
std::string value;
|
||||||
|
switch (txn_db_options.write_policy) {
|
||||||
|
case WRITE_COMMITTED:
|
||||||
|
if (skip_concurrency_control) {
|
||||||
|
ASSERT_OK(s);
|
||||||
|
ASSERT_TRUE(db->Get(ReadOptions(), "a", &value).IsNotFound());
|
||||||
|
} else {
|
||||||
|
ASSERT_NOK(s);
|
||||||
|
ASSERT_OK(db->Get(ReadOptions(), "a", &value));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case WRITE_PREPARED:
|
||||||
|
// Intentional fall-through
|
||||||
|
case WRITE_UNPREPARED:
|
||||||
|
if (skip_concurrency_control && skip_duplicate_key_check) {
|
||||||
|
ASSERT_OK(s);
|
||||||
|
ASSERT_TRUE(db->Get(ReadOptions(), "a", &value).IsNotFound());
|
||||||
|
} else {
|
||||||
|
ASSERT_NOK(s);
|
||||||
|
ASSERT_OK(db->Get(ReadOptions(), "a", &value));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// Without any promises from the user, range deletion via other `Write()`
|
||||||
|
// APIs are still banned.
|
||||||
|
ASSERT_OK(db->Put(WriteOptions(), "a", "val"));
|
||||||
|
ASSERT_NOK(db->Write(WriteOptions(), &wb));
|
||||||
|
ASSERT_OK(db->Get(ReadOptions(), "a", &value));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
TEST_P(TransactionTest, DeferSnapshotTest) {
|
TEST_P(TransactionTest, DeferSnapshotTest) {
|
||||||
WriteOptions write_options;
|
WriteOptions write_options;
|
||||||
ReadOptions read_options;
|
ReadOptions read_options;
|
||||||
|
@ -157,7 +157,9 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
|
|||||||
// TODO(myabandeh): add an option to allow user skipping this cost
|
// TODO(myabandeh): add an option to allow user skipping this cost
|
||||||
SubBatchCounter counter(*GetCFComparatorMap());
|
SubBatchCounter counter(*GetCFComparatorMap());
|
||||||
auto s = batch->Iterate(&counter);
|
auto s = batch->Iterate(&counter);
|
||||||
assert(s.ok());
|
if (!s.ok()) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
batch_cnt = counter.BatchCount();
|
batch_cnt = counter.BatchCount();
|
||||||
WPRecordTick(TXN_DUPLICATE_KEY_OVERHEAD);
|
WPRecordTick(TXN_DUPLICATE_KEY_OVERHEAD);
|
||||||
ROCKS_LOG_DETAILS(info_log_, "Duplicate key overhead: %" PRIu64 " batches",
|
ROCKS_LOG_DETAILS(info_log_, "Duplicate key overhead: %" PRIu64 " batches",
|
||||||
|
Loading…
Reference in New Issue
Block a user