Compare commits
28 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
2df3e90028 | ||
|
6a23aaf86e | ||
|
bd39be1859 | ||
|
62cf055775 | ||
|
97bf78721b | ||
|
0bda0e3dfa | ||
|
950b72c743 | ||
|
6eccfbbf46 | ||
|
ed5c64e0d9 | ||
|
0dd184c446 | ||
|
6676d31d3f | ||
|
82c5a09241 | ||
|
ac39f413d8 | ||
|
d6cd5d1e3c | ||
|
5a86c2e725 | ||
|
e29a510f45 | ||
|
41a2789e18 | ||
|
2e5f8bd434 | ||
|
e041a40a23 | ||
|
74be8f3ea2 | ||
|
cda8a74eb7 | ||
|
fb98398ca9 | ||
|
2dbb90a064 | ||
|
b4f29f7515 | ||
|
731f022dc8 | ||
|
a02495cf80 | ||
|
5a0cef9259 | ||
|
bb95ed284d |
1
.gitignore
vendored
1
.gitignore
vendored
@ -53,6 +53,7 @@ db_test2
|
||||
trace_analyzer
|
||||
trace_analyzer_test
|
||||
block_cache_trace_analyzer
|
||||
io_tracer_parser
|
||||
.DS_Store
|
||||
.vs
|
||||
.vscode
|
||||
|
@ -1017,6 +1017,7 @@ option(WITH_ALL_TESTS "Build all test, rather than a small subset" ON)
|
||||
if(WITH_TESTS OR WITH_BENCHMARK_TOOLS)
|
||||
add_subdirectory(third-party/gtest-1.8.1/fused-src/gtest)
|
||||
add_library(testharness STATIC
|
||||
test_util/mock_time_env.cc
|
||||
test_util/testharness.cc)
|
||||
target_link_libraries(testharness gtest)
|
||||
endif()
|
||||
|
35
HISTORY.md
35
HISTORY.md
@ -1,5 +1,24 @@
|
||||
# Rocksdb Change Log
|
||||
## 6.13 (09/12/2020)
|
||||
## 6.13.4 (11/15/2020)
|
||||
### Bug Fixes
|
||||
* Fixed a bug of encoding and parsing BlockBasedTableOptions::read_amp_bytes_per_bit as a 64-bit integer.
|
||||
* Fixed the logic of populating native data structure for `read_amp_bytes_per_bit` during OPTIONS file parsing on big-endian architecture. Without this fix, original code introduced in PR7659, when running on big-endian machine, can mistakenly store read_amp_bytes_per_bit (an uint32) in little endian format. Future access to `read_amp_bytes_per_bit` will give wrong values. Little endian architecture is not affected.
|
||||
|
||||
## 6.13.3 (10/14/2020)
|
||||
### Bug Fixes
|
||||
* Fix a bug that could cause a stalled write to crash with mixed of slowdown and no_slowdown writes (`WriteOptions.no_slowdown=true`).
|
||||
|
||||
## 6.13.2 (10/13/2020)
|
||||
### Bug Fixes
|
||||
* Fix false positive flush/compaction `Status::Corruption` failure when `paranoid_file_checks == true` and range tombstones were written to the compaction output files.
|
||||
|
||||
## 6.13.1 (10/12/2020)
|
||||
### Bug Fixes
|
||||
* Since 6.12, memtable lookup should report unrecognized value_type as corruption (#7121).
|
||||
* Fixed a bug in the following combination of features: indexes with user keys (`format_version >= 3`), indexes are partitioned (`index_type == kTwoLevelIndexSearch`), and some index partitions are pinned in memory (`BlockBasedTableOptions::pin_l0_filter_and_index_blocks_in_cache`). The bug could cause keys to be truncated when read from the index leading to wrong read results or other unexpected behavior.
|
||||
* Fixed a bug when indexes are partitioned (`index_type == kTwoLevelIndexSearch`), some index partitions are pinned in memory (`BlockBasedTableOptions::pin_l0_filter_and_index_blocks_in_cache`), and partitions reads could be mixed between block cache and directly from the file (e.g., with `enable_index_compression == 1` and `mmap_read == 1`, partitions that were stored uncompressed due to poor compression ratio would be read directly from the file via mmap, while partitions that were stored compressed would be read from block cache). The bug could cause index partitions to be mistakenly considered empty during reads leading to wrong read results.
|
||||
|
||||
## 6.13 (09/24/2020)
|
||||
### Bug fixes
|
||||
* Fix a performance regression introduced in 6.4 that makes a upper bound check for every Next() even if keys are within a data block that is within the upper bound.
|
||||
* Fix a possible corruption to the LSM state (overlapping files within a level) when a `CompactRange()` for refitting levels (`CompactRangeOptions::change_level == true`) and another manual compaction are executed in parallel.
|
||||
@ -14,23 +33,28 @@
|
||||
* Fix a bug in which bottommost compaction continues to advance the underlying InternalIterator to skip tombstones even after shutdown.
|
||||
|
||||
### New Features
|
||||
* A new option `std::shared_ptr<FileChecksumGenFactory> file_checksum_gen_factory` is added to `BackupableDBOptions`. The default value for this option is `nullptr`. If this option is null, the default backup engine checksum function (crc32c) will be used for creating, verifying, or restoring backups. If it is not null and is set to the DB custom checksum factory, the custom checksum function used in DB will also be used for creating, verifying, or restoring backups, in addition to the default checksum function (crc32c). If it is not null and is set to a custom checksum factory different than the DB custom checksum factory (which may be null), BackupEngine will return `Status::InvalidArgument()`.
|
||||
* A new field `std::string requested_checksum_func_name` is added to `FileChecksumGenContext`, which enables the checksum factory to create generators for a suite of different functions.
|
||||
* Added a new subcommand, `ldb unsafe_remove_sst_file`, which removes a lost or corrupt SST file from a DB's metadata. This command involves data loss and must not be used on a live DB.
|
||||
|
||||
### Performance Improvements
|
||||
* Reduce thread number for multiple DB instances by re-using one global thread for statistics dumping and persisting.
|
||||
* Reduce write-amp in heavy write bursts in `kCompactionStyleLevel` compaction style with `level_compaction_dynamic_level_bytes` set.
|
||||
* BackupEngine incremental backups no longer read DB table files that are already saved to a shared part of the backup directory, unless `share_files_with_checksum` is used with `kLegacyCrc32cAndFileSize` naming (discouraged).
|
||||
* For `share_files_with_checksum`, we are confident there is no regression (vs. pre-6.12) in detecting DB or backup corruption at backup creation time, mostly because the old design did not leverage this extra checksum computation for detecting inconsistencies at backup creation time.
|
||||
* For `share_table_files` without "checksum" (not recommended), there is a regression in detecting fundamentally unsafe use of the option, greatly mitigated by file size checking (under "Behavior Changes"). Almost no reason to use `share_files_with_checksum=false` should remain.
|
||||
* `DB::VerifyChecksum` and `BackupEngine::VerifyBackup` with checksum checking are still able to catch corruptions that `CreateNewBackup` does not.
|
||||
|
||||
### Public API Change
|
||||
* Expose kTypeDeleteWithTimestamp in EntryType and update GetEntryType() accordingly.
|
||||
* Added file_checksum and file_checksum_func_name to TableFileCreationInfo, which can pass the table file checksum information through the OnTableFileCreated callback during flush and compaction.
|
||||
* A warning is added to `DB::DeleteFile()` API describing its known problems and deprecation plan.
|
||||
* Add a new stats level, i.e. StatsLevel::kExceptTickers (PR7329) to exclude tickers even if application passes a non-null Statistics object.
|
||||
* Added a new status code IOStatus::IOFenced() for the Env/FileSystem to indicate that writes from this instance are fenced off. Like any other background error, this error is returned to the user in Put/Merge/Delete/Flush calls and can be checked using Status::IsIOFenced().
|
||||
|
||||
### Behavior Changes
|
||||
* File abstraction `FSRandomAccessFile.Prefetch()` default return status is changed from `OK` to `NotSupported`. If the user inherited file doesn't implement prefetch, RocksDB will create internal prefetch buffer to improve read performance.
|
||||
|
||||
* When retryable IO error happens during Flush (manifest write error is excluded) and WAL is disabled, originally it is mapped to kHardError. Now,it is mapped to soft error. So DB will not stall the writes unless the memtable is full. At the same time, when auto resume is triggered to recover the retryable IO error during Flush, SwitchMemtable is not called to avoid generating to many small immutable memtables. If WAL is enabled, no behavior changes.
|
||||
* When considering whether a table file is already backed up in a shared part of backup directory, BackupEngine would already query the sizes of source (DB) and pre-existing destination (backup) files. BackupEngine now uses these file sizes to detect corruption, as at least one of (a) old backup, (b) backup in progress, or (c) current DB is corrupt if there's a size mismatch.
|
||||
|
||||
### Others
|
||||
* Error in prefetching partitioned index blocks will not be swallowed. It will fail the query and return the IOError users.
|
||||
@ -60,11 +84,12 @@
|
||||
* Make compaction report InternalKey corruption while iterating over the input.
|
||||
* Fix a bug which may cause MultiGet to be slow because it may read more data than requested, but this won't affect correctness. The bug was introduced in 6.10 release.
|
||||
* Fail recovery and report once hitting a physical log record checksum mismatch, while reading MANIFEST. RocksDB should not continue processing the MANIFEST any further.
|
||||
* Fixed a bug in size-amp-triggered and periodic-triggered universal compaction, where the compression settings for the first input level were used rather than the compression settings for the output (bottom) level.
|
||||
|
||||
### New Features
|
||||
* DB identity (`db_id`) and DB session identity (`db_session_id`) are added to table properties and stored in SST files. SST files generated from SstFileWriter and Repairer have DB identity “SST Writer” and “DB Repairer”, respectively. Their DB session IDs are generated in the same way as `DB::GetDbSessionId`. The session ID for SstFileWriter (resp., Repairer) resets every time `SstFileWriter::Open` (resp., `Repairer::Run`) is called.
|
||||
* Added experimental option BlockBasedTableOptions::optimize_filters_for_memory for reducing allocated memory size of Bloom filters (~10% savings with Jemalloc) while preserving the same general accuracy. To have an effect, the option requires format_version=5 and malloc_usable_size. Enabling this option is forward and backward compatible with existing format_version=5.
|
||||
* `BackupTableNameOption BackupableDBOptions::share_files_with_checksum_naming` is added, where `BackupTableNameOption` is an `enum` type with two enumerators `kChecksumAndFileSize` and `kOptionalChecksumAndDbSessionId`. By default, `BackupableDBOptions::share_files_with_checksum_naming` is set to `kOptionalChecksumAndDbSessionId`. In the default case, backup table filenames generated by this version of RocksDB are of the form either `<file_number>_<crc32c>_<db_session_id>.sst` or `<file_number>_<db_session_id>.sst` as opposed to `<file_number>_<crc32c>_<file_size>.sst`. Specifically, table filenames are of the form `<file_number>_<crc32c>_<db_session_id>.sst` if `DBOptions::file_checksum_gen_factory` is set to `GetFileChecksumGenCrc32cFactory()`. Futhermore, the checksum value `<crc32c>` appeared in the filenames is hexadecimal-encoded, instead of being decimal-encoded `uint32_t` value. If `DBOptions::file_checksum_gen_factory` is `nullptr`, the table filenames are of the form `<file_number>_<db_session_id>.sst`. The new default behavior fixes the backup file name collision problem, which might be possible at large scale, but the option `kChecksumAndFileSize` is added to allow use of old naming in case it is needed. Moreover, for table files generated prior to this version of RocksDB, using `kOptionalChecksumAndDbSessionId` will fall back on `kChecksumAndFileSize`. In these cases, the checksum value `<crc32c>` in the filenames `<file_number>_<crc32c>_<file_size>.sst` is decimal-encoded `uint32_t` value as before. This default behavior change is not an upgrade issue, because previous versions of RocksDB can read, restore, and delete backups using new names, and it's OK for a backup directory to use a mixture of table file naming schemes. Note that `share_files_with_checksum_naming` comes into effect only when both `share_files_with_checksum` and `share_table_files` are true.
|
||||
* `BackupableDBOptions::share_files_with_checksum_naming` is added with new default behavior for naming backup files with `share_files_with_checksum`, to address performance and backup integrity issues. See API comments for details.
|
||||
* Added auto resume function to automatically recover the DB from background Retryable IO Error. When retryable IOError happens during flush and WAL write, the error is mapped to Hard Error and DB will be in read mode. When retryable IO Error happens during compaction, the error will be mapped to Soft Error. DB is still in write/read mode. Autoresume function will create a thread for a DB to call DB->ResumeImpl() to try the recover for Retryable IO Error during flush and WAL write. Compaction will be rescheduled by itself if retryable IO Error happens. Auto resume may also cause other Retryable IO Error during the recovery, so the recovery will fail. Retry the auto resume may solve the issue, so we use max_bgerror_resume_count to decide how many resume cycles will be tried in total. If it is <=0, auto resume retryable IO Error is disabled. Default is INT_MAX, which will lead to a infinit auto resume. bgerror_resume_retry_interval decides the time interval between two auto resumes.
|
||||
* Option `max_subcompactions` can be set dynamically using DB::SetDBOptions().
|
||||
* Added experimental ColumnFamilyOptions::sst_partitioner_factory to define determine the partitioning of sst files. This helps compaction to split the files on interesting boundaries (key prefixes) to make propagation of sst files less write amplifying (covering the whole key space).
|
||||
@ -72,7 +97,7 @@
|
||||
### Performance Improvements
|
||||
* Eliminate key copies for internal comparisons while accessing ingested block-based tables.
|
||||
* Reduce key comparisons during random access in all block-based tables.
|
||||
* BackupEngine avoids unnecessary repeated checksum computation for backing up a table file to the `shared_checksum` directory when using `kOptionalChecksumAndDbSessionId`, except on SST files generated before this version of RocksDB, which fall back on using `kChecksumAndFileSize`.
|
||||
* BackupEngine avoids unnecessary repeated checksum computation for backing up a table file to the `shared_checksum` directory when using `share_files_with_checksum_naming = kUseDbSessionId` (new default), except on SST files generated before this version of RocksDB, which fall back on using `kLegacyCrc32cAndFileSize`.
|
||||
|
||||
## 6.11 (6/12/2020)
|
||||
### Bug Fixes
|
||||
|
5
Makefile
5
Makefile
@ -421,6 +421,11 @@ default: all
|
||||
WARNING_FLAGS = -W -Wextra -Wall -Wsign-compare -Wshadow \
|
||||
-Wunused-parameter
|
||||
|
||||
ifdef USE_CLANG
|
||||
# Used by some teams in Facebook
|
||||
WARNING_FLAGS += -Wshift-sign-overflow
|
||||
endif
|
||||
|
||||
ifeq ($(PLATFORM), OS_OPENBSD)
|
||||
WARNING_FLAGS += -Wno-unused-lambda-capture
|
||||
endif
|
||||
|
284
TARGETS
284
TARGETS
@ -387,6 +387,284 @@ cpp_library(
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
deps = [],
|
||||
external_deps = ROCKSDB_EXTERNAL_DEPS,
|
||||
link_whole = False,
|
||||
)
|
||||
|
||||
cpp_library(
|
||||
name = "rocksdb_whole_archive_lib",
|
||||
srcs = [
|
||||
"cache/cache.cc",
|
||||
"cache/clock_cache.cc",
|
||||
"cache/lru_cache.cc",
|
||||
"cache/sharded_cache.cc",
|
||||
"db/arena_wrapped_db_iter.cc",
|
||||
"db/blob/blob_file_addition.cc",
|
||||
"db/blob/blob_file_builder.cc",
|
||||
"db/blob/blob_file_garbage.cc",
|
||||
"db/blob/blob_file_meta.cc",
|
||||
"db/blob/blob_log_format.cc",
|
||||
"db/blob/blob_log_reader.cc",
|
||||
"db/blob/blob_log_writer.cc",
|
||||
"db/builder.cc",
|
||||
"db/c.cc",
|
||||
"db/column_family.cc",
|
||||
"db/compacted_db_impl.cc",
|
||||
"db/compaction/compaction.cc",
|
||||
"db/compaction/compaction_iterator.cc",
|
||||
"db/compaction/compaction_job.cc",
|
||||
"db/compaction/compaction_picker.cc",
|
||||
"db/compaction/compaction_picker_fifo.cc",
|
||||
"db/compaction/compaction_picker_level.cc",
|
||||
"db/compaction/compaction_picker_universal.cc",
|
||||
"db/compaction/sst_partitioner.cc",
|
||||
"db/convenience.cc",
|
||||
"db/db_filesnapshot.cc",
|
||||
"db/db_impl/db_impl.cc",
|
||||
"db/db_impl/db_impl_compaction_flush.cc",
|
||||
"db/db_impl/db_impl_debug.cc",
|
||||
"db/db_impl/db_impl_experimental.cc",
|
||||
"db/db_impl/db_impl_files.cc",
|
||||
"db/db_impl/db_impl_open.cc",
|
||||
"db/db_impl/db_impl_readonly.cc",
|
||||
"db/db_impl/db_impl_secondary.cc",
|
||||
"db/db_impl/db_impl_write.cc",
|
||||
"db/db_info_dumper.cc",
|
||||
"db/db_iter.cc",
|
||||
"db/dbformat.cc",
|
||||
"db/error_handler.cc",
|
||||
"db/event_helpers.cc",
|
||||
"db/experimental.cc",
|
||||
"db/external_sst_file_ingestion_job.cc",
|
||||
"db/file_indexer.cc",
|
||||
"db/flush_job.cc",
|
||||
"db/flush_scheduler.cc",
|
||||
"db/forward_iterator.cc",
|
||||
"db/import_column_family_job.cc",
|
||||
"db/internal_stats.cc",
|
||||
"db/log_reader.cc",
|
||||
"db/log_writer.cc",
|
||||
"db/logs_with_prep_tracker.cc",
|
||||
"db/malloc_stats.cc",
|
||||
"db/memtable.cc",
|
||||
"db/memtable_list.cc",
|
||||
"db/merge_helper.cc",
|
||||
"db/merge_operator.cc",
|
||||
"db/range_del_aggregator.cc",
|
||||
"db/range_tombstone_fragmenter.cc",
|
||||
"db/repair.cc",
|
||||
"db/snapshot_impl.cc",
|
||||
"db/table_cache.cc",
|
||||
"db/table_properties_collector.cc",
|
||||
"db/transaction_log_impl.cc",
|
||||
"db/trim_history_scheduler.cc",
|
||||
"db/version_builder.cc",
|
||||
"db/version_edit.cc",
|
||||
"db/version_edit_handler.cc",
|
||||
"db/version_set.cc",
|
||||
"db/wal_edit.cc",
|
||||
"db/wal_manager.cc",
|
||||
"db/write_batch.cc",
|
||||
"db/write_batch_base.cc",
|
||||
"db/write_controller.cc",
|
||||
"db/write_thread.cc",
|
||||
"env/env.cc",
|
||||
"env/env_chroot.cc",
|
||||
"env/env_encryption.cc",
|
||||
"env/env_hdfs.cc",
|
||||
"env/env_posix.cc",
|
||||
"env/file_system.cc",
|
||||
"env/file_system_tracer.cc",
|
||||
"env/fs_posix.cc",
|
||||
"env/io_posix.cc",
|
||||
"env/mock_env.cc",
|
||||
"file/delete_scheduler.cc",
|
||||
"file/file_prefetch_buffer.cc",
|
||||
"file/file_util.cc",
|
||||
"file/filename.cc",
|
||||
"file/random_access_file_reader.cc",
|
||||
"file/read_write_util.cc",
|
||||
"file/readahead_raf.cc",
|
||||
"file/sequence_file_reader.cc",
|
||||
"file/sst_file_manager_impl.cc",
|
||||
"file/writable_file_writer.cc",
|
||||
"logging/auto_roll_logger.cc",
|
||||
"logging/event_logger.cc",
|
||||
"logging/log_buffer.cc",
|
||||
"memory/arena.cc",
|
||||
"memory/concurrent_arena.cc",
|
||||
"memory/jemalloc_nodump_allocator.cc",
|
||||
"memory/memkind_kmem_allocator.cc",
|
||||
"memtable/alloc_tracker.cc",
|
||||
"memtable/hash_linklist_rep.cc",
|
||||
"memtable/hash_skiplist_rep.cc",
|
||||
"memtable/skiplistrep.cc",
|
||||
"memtable/vectorrep.cc",
|
||||
"memtable/write_buffer_manager.cc",
|
||||
"monitoring/histogram.cc",
|
||||
"monitoring/histogram_windowing.cc",
|
||||
"monitoring/in_memory_stats_history.cc",
|
||||
"monitoring/instrumented_mutex.cc",
|
||||
"monitoring/iostats_context.cc",
|
||||
"monitoring/perf_context.cc",
|
||||
"monitoring/perf_level.cc",
|
||||
"monitoring/persistent_stats_history.cc",
|
||||
"monitoring/statistics.cc",
|
||||
"monitoring/stats_dump_scheduler.cc",
|
||||
"monitoring/thread_status_impl.cc",
|
||||
"monitoring/thread_status_updater.cc",
|
||||
"monitoring/thread_status_updater_debug.cc",
|
||||
"monitoring/thread_status_util.cc",
|
||||
"monitoring/thread_status_util_debug.cc",
|
||||
"options/cf_options.cc",
|
||||
"options/db_options.cc",
|
||||
"options/options.cc",
|
||||
"options/options_helper.cc",
|
||||
"options/options_parser.cc",
|
||||
"port/port_posix.cc",
|
||||
"port/stack_trace.cc",
|
||||
"table/adaptive/adaptive_table_factory.cc",
|
||||
"table/block_based/binary_search_index_reader.cc",
|
||||
"table/block_based/block.cc",
|
||||
"table/block_based/block_based_filter_block.cc",
|
||||
"table/block_based/block_based_table_builder.cc",
|
||||
"table/block_based/block_based_table_factory.cc",
|
||||
"table/block_based/block_based_table_iterator.cc",
|
||||
"table/block_based/block_based_table_reader.cc",
|
||||
"table/block_based/block_builder.cc",
|
||||
"table/block_based/block_prefetcher.cc",
|
||||
"table/block_based/block_prefix_index.cc",
|
||||
"table/block_based/data_block_footer.cc",
|
||||
"table/block_based/data_block_hash_index.cc",
|
||||
"table/block_based/filter_block_reader_common.cc",
|
||||
"table/block_based/filter_policy.cc",
|
||||
"table/block_based/flush_block_policy.cc",
|
||||
"table/block_based/full_filter_block.cc",
|
||||
"table/block_based/hash_index_reader.cc",
|
||||
"table/block_based/index_builder.cc",
|
||||
"table/block_based/index_reader_common.cc",
|
||||
"table/block_based/parsed_full_filter_block.cc",
|
||||
"table/block_based/partitioned_filter_block.cc",
|
||||
"table/block_based/partitioned_index_iterator.cc",
|
||||
"table/block_based/partitioned_index_reader.cc",
|
||||
"table/block_based/reader_common.cc",
|
||||
"table/block_based/uncompression_dict_reader.cc",
|
||||
"table/block_fetcher.cc",
|
||||
"table/cuckoo/cuckoo_table_builder.cc",
|
||||
"table/cuckoo/cuckoo_table_factory.cc",
|
||||
"table/cuckoo/cuckoo_table_reader.cc",
|
||||
"table/format.cc",
|
||||
"table/get_context.cc",
|
||||
"table/iterator.cc",
|
||||
"table/merging_iterator.cc",
|
||||
"table/meta_blocks.cc",
|
||||
"table/persistent_cache_helper.cc",
|
||||
"table/plain/plain_table_bloom.cc",
|
||||
"table/plain/plain_table_builder.cc",
|
||||
"table/plain/plain_table_factory.cc",
|
||||
"table/plain/plain_table_index.cc",
|
||||
"table/plain/plain_table_key_coding.cc",
|
||||
"table/plain/plain_table_reader.cc",
|
||||
"table/sst_file_dumper.cc",
|
||||
"table/sst_file_reader.cc",
|
||||
"table/sst_file_writer.cc",
|
||||
"table/table_properties.cc",
|
||||
"table/two_level_iterator.cc",
|
||||
"test_util/sync_point.cc",
|
||||
"test_util/sync_point_impl.cc",
|
||||
"test_util/transaction_test_util.cc",
|
||||
"tools/dump/db_dump_tool.cc",
|
||||
"tools/ldb_cmd.cc",
|
||||
"tools/ldb_tool.cc",
|
||||
"tools/sst_dump_tool.cc",
|
||||
"trace_replay/block_cache_tracer.cc",
|
||||
"trace_replay/io_tracer.cc",
|
||||
"trace_replay/trace_replay.cc",
|
||||
"util/build_version.cc",
|
||||
"util/coding.cc",
|
||||
"util/compaction_job_stats_impl.cc",
|
||||
"util/comparator.cc",
|
||||
"util/compression_context_cache.cc",
|
||||
"util/concurrent_task_limiter_impl.cc",
|
||||
"util/crc32c.cc",
|
||||
"util/dynamic_bloom.cc",
|
||||
"util/file_checksum_helper.cc",
|
||||
"util/hash.cc",
|
||||
"util/murmurhash.cc",
|
||||
"util/random.cc",
|
||||
"util/rate_limiter.cc",
|
||||
"util/slice.cc",
|
||||
"util/status.cc",
|
||||
"util/string_util.cc",
|
||||
"util/thread_local.cc",
|
||||
"util/threadpool_imp.cc",
|
||||
"util/xxhash.cc",
|
||||
"utilities/backupable/backupable_db.cc",
|
||||
"utilities/blob_db/blob_compaction_filter.cc",
|
||||
"utilities/blob_db/blob_db.cc",
|
||||
"utilities/blob_db/blob_db_impl.cc",
|
||||
"utilities/blob_db/blob_db_impl_filesnapshot.cc",
|
||||
"utilities/blob_db/blob_dump_tool.cc",
|
||||
"utilities/blob_db/blob_file.cc",
|
||||
"utilities/cassandra/cassandra_compaction_filter.cc",
|
||||
"utilities/cassandra/format.cc",
|
||||
"utilities/cassandra/merge_operator.cc",
|
||||
"utilities/checkpoint/checkpoint_impl.cc",
|
||||
"utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc",
|
||||
"utilities/convenience/info_log_finder.cc",
|
||||
"utilities/debug.cc",
|
||||
"utilities/env_mirror.cc",
|
||||
"utilities/env_timed.cc",
|
||||
"utilities/fault_injection_env.cc",
|
||||
"utilities/fault_injection_fs.cc",
|
||||
"utilities/leveldb_options/leveldb_options.cc",
|
||||
"utilities/memory/memory_util.cc",
|
||||
"utilities/merge_operators/bytesxor.cc",
|
||||
"utilities/merge_operators/max.cc",
|
||||
"utilities/merge_operators/put.cc",
|
||||
"utilities/merge_operators/sortlist.cc",
|
||||
"utilities/merge_operators/string_append/stringappend.cc",
|
||||
"utilities/merge_operators/string_append/stringappend2.cc",
|
||||
"utilities/merge_operators/uint64add.cc",
|
||||
"utilities/object_registry.cc",
|
||||
"utilities/option_change_migration/option_change_migration.cc",
|
||||
"utilities/options/options_util.cc",
|
||||
"utilities/persistent_cache/block_cache_tier.cc",
|
||||
"utilities/persistent_cache/block_cache_tier_file.cc",
|
||||
"utilities/persistent_cache/block_cache_tier_metadata.cc",
|
||||
"utilities/persistent_cache/persistent_cache_tier.cc",
|
||||
"utilities/persistent_cache/volatile_tier_impl.cc",
|
||||
"utilities/simulator_cache/cache_simulator.cc",
|
||||
"utilities/simulator_cache/sim_cache.cc",
|
||||
"utilities/table_properties_collectors/compact_on_deletion_collector.cc",
|
||||
"utilities/trace/file_trace_reader_writer.cc",
|
||||
"utilities/transactions/lock/lock_tracker.cc",
|
||||
"utilities/transactions/lock/point_lock_tracker.cc",
|
||||
"utilities/transactions/optimistic_transaction.cc",
|
||||
"utilities/transactions/optimistic_transaction_db_impl.cc",
|
||||
"utilities/transactions/pessimistic_transaction.cc",
|
||||
"utilities/transactions/pessimistic_transaction_db.cc",
|
||||
"utilities/transactions/snapshot_checker.cc",
|
||||
"utilities/transactions/transaction_base.cc",
|
||||
"utilities/transactions/transaction_db_mutex_impl.cc",
|
||||
"utilities/transactions/transaction_lock_mgr.cc",
|
||||
"utilities/transactions/transaction_util.cc",
|
||||
"utilities/transactions/write_prepared_txn.cc",
|
||||
"utilities/transactions/write_prepared_txn_db.cc",
|
||||
"utilities/transactions/write_unprepared_txn.cc",
|
||||
"utilities/transactions/write_unprepared_txn_db.cc",
|
||||
"utilities/ttl/db_ttl_impl.cc",
|
||||
"utilities/write_batch_with_index/write_batch_with_index.cc",
|
||||
"utilities/write_batch_with_index/write_batch_with_index_internal.cc",
|
||||
],
|
||||
auto_headers = AutoHeaders.RECURSIVE_GLOB,
|
||||
arch_preprocessor_flags = ROCKSDB_ARCH_PREPROCESSOR_FLAGS,
|
||||
compiler_flags = ROCKSDB_COMPILER_FLAGS,
|
||||
os_deps = ROCKSDB_OS_DEPS,
|
||||
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
deps = [],
|
||||
external_deps = ROCKSDB_EXTERNAL_DEPS,
|
||||
link_whole = True,
|
||||
)
|
||||
|
||||
cpp_library(
|
||||
@ -394,6 +672,7 @@ cpp_library(
|
||||
srcs = [
|
||||
"db/db_test_util.cc",
|
||||
"table/mock_table.cc",
|
||||
"test_util/mock_time_env.cc",
|
||||
"test_util/testharness.cc",
|
||||
"test_util/testutil.cc",
|
||||
"tools/block_cache_analyzer/block_cache_trace_analyzer.cc",
|
||||
@ -410,6 +689,7 @@ cpp_library(
|
||||
external_deps = ROCKSDB_EXTERNAL_DEPS + [
|
||||
("googletest", None, "gtest"),
|
||||
],
|
||||
link_whole = False,
|
||||
)
|
||||
|
||||
cpp_library(
|
||||
@ -428,6 +708,7 @@ cpp_library(
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
deps = [":rocksdb_lib"],
|
||||
external_deps = ROCKSDB_EXTERNAL_DEPS,
|
||||
link_whole = False,
|
||||
)
|
||||
|
||||
cpp_library(
|
||||
@ -488,6 +769,7 @@ cpp_library(
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
deps = [":rocksdb_test_lib"],
|
||||
external_deps = ROCKSDB_EXTERNAL_DEPS,
|
||||
link_whole = False,
|
||||
)
|
||||
|
||||
# [test_name, test_src, test_type, extra_deps, extra_compiler_flags]
|
||||
@ -1686,7 +1968,7 @@ ROCKS_TESTS = [
|
||||
srcs = [test_cc],
|
||||
arch_preprocessor_flags = ROCKSDB_ARCH_PREPROCESSOR_FLAGS,
|
||||
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
|
||||
compiler_flags = ROCKSDB_COMPILER_FLAGS,
|
||||
compiler_flags = ROCKSDB_COMPILER_FLAGS + extra_compiler_flags,
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
deps = [":rocksdb_test_lib"] + extra_deps,
|
||||
external_deps = ROCKSDB_EXTERNAL_DEPS + [
|
||||
|
@ -136,6 +136,15 @@ def generate_targets(repo_path, deps_map):
|
||||
"rocksdb_lib",
|
||||
src_mk["LIB_SOURCES"] +
|
||||
src_mk["TOOL_LIB_SOURCES"])
|
||||
# rocksdb_whole_archive_lib
|
||||
TARGETS.add_library(
|
||||
"rocksdb_whole_archive_lib",
|
||||
src_mk["LIB_SOURCES"] +
|
||||
src_mk["TOOL_LIB_SOURCES"],
|
||||
deps=None,
|
||||
headers=None,
|
||||
extra_external_deps="",
|
||||
link_whole=True)
|
||||
# rocksdb_test_lib
|
||||
TARGETS.add_library(
|
||||
"rocksdb_test_lib",
|
||||
|
@ -39,7 +39,7 @@ class TARGETSBuilder(object):
|
||||
self.targets_file.close()
|
||||
|
||||
def add_library(self, name, srcs, deps=None, headers=None,
|
||||
extra_external_deps=""):
|
||||
extra_external_deps="", link_whole=False):
|
||||
headers_attr_prefix = ""
|
||||
if headers is None:
|
||||
headers_attr_prefix = "auto_"
|
||||
@ -52,7 +52,8 @@ class TARGETSBuilder(object):
|
||||
headers_attr_prefix=headers_attr_prefix,
|
||||
headers=headers,
|
||||
deps=pretty_list(deps),
|
||||
extra_external_deps=extra_external_deps))
|
||||
extra_external_deps=extra_external_deps,
|
||||
link_whole=link_whole))
|
||||
self.total_lib = self.total_lib + 1
|
||||
|
||||
def add_rocksdb_library(self, name, srcs, headers=None):
|
||||
|
@ -134,6 +134,7 @@ cpp_library(
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
deps = [{deps}],
|
||||
external_deps = ROCKSDB_EXTERNAL_DEPS{extra_external_deps},
|
||||
link_whole = {link_whole},
|
||||
)
|
||||
"""
|
||||
|
||||
@ -187,7 +188,7 @@ ROCKS_TESTS = [
|
||||
srcs = [test_cc],
|
||||
arch_preprocessor_flags = ROCKSDB_ARCH_PREPROCESSOR_FLAGS,
|
||||
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
|
||||
compiler_flags = ROCKSDB_COMPILER_FLAGS,
|
||||
compiler_flags = ROCKSDB_COMPILER_FLAGS + extra_compiler_flags,
|
||||
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
|
||||
deps = [":rocksdb_test_lib"] + extra_deps,
|
||||
external_deps = ROCKSDB_EXTERNAL_DEPS + [
|
||||
|
@ -226,10 +226,12 @@ Status BuildTable(
|
||||
delete builder;
|
||||
|
||||
// Finish and check for file errors
|
||||
TEST_SYNC_POINT("BuildTable:BeforeSyncTable");
|
||||
if (s.ok() && !empty) {
|
||||
StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS);
|
||||
*io_status = file_writer->Sync(ioptions.use_fsync);
|
||||
}
|
||||
TEST_SYNC_POINT("BuildTable:BeforeCloseTableFile");
|
||||
if (s.ok() && io_status->ok() && !empty) {
|
||||
*io_status = file_writer->Close();
|
||||
}
|
||||
|
@ -2427,7 +2427,7 @@ TEST_P(ColumnFamilyTest, FlushAndDropRaceCondition) {
|
||||
// Make sure the task is sleeping. Otherwise, it might start to execute
|
||||
// after sleeping_task.WaitUntilDone() and cause TSAN warning.
|
||||
sleeping_task.WaitUntilSleeping();
|
||||
|
||||
|
||||
// 1MB should create ~10 files for each CF
|
||||
int kKeysNum = 10000;
|
||||
PutRandomData(1, kKeysNum, 100);
|
||||
|
@ -1268,8 +1268,8 @@ Status CompactionJob::FinishCompactionOutputFile(
|
||||
auto kv = tombstone.Serialize();
|
||||
assert(lower_bound == nullptr ||
|
||||
ucmp->Compare(*lower_bound, kv.second) < 0);
|
||||
sub_compact->AddToBuilder(kv.first.Encode(), kv.second,
|
||||
paranoid_file_checks_);
|
||||
// Range tombstone is not supported by output validator yet.
|
||||
sub_compact->builder->Add(kv.first.Encode(), kv.second);
|
||||
InternalKey smallest_candidate = std::move(kv.first);
|
||||
if (lower_bound != nullptr &&
|
||||
ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) {
|
||||
|
@ -105,7 +105,7 @@ class CorruptionTest : public testing::Test {
|
||||
ASSERT_OK(::ROCKSDB_NAMESPACE::RepairDB(dbname_, options_));
|
||||
}
|
||||
|
||||
void Build(int n, int flush_every = 0) {
|
||||
void Build(int n, int start, int flush_every) {
|
||||
std::string key_space, value_space;
|
||||
WriteBatch batch;
|
||||
for (int i = 0; i < n; i++) {
|
||||
@ -114,13 +114,15 @@ class CorruptionTest : public testing::Test {
|
||||
dbi->TEST_FlushMemTable();
|
||||
}
|
||||
//if ((i % 100) == 0) fprintf(stderr, "@ %d of %d\n", i, n);
|
||||
Slice key = Key(i, &key_space);
|
||||
Slice key = Key(i + start, &key_space);
|
||||
batch.Clear();
|
||||
batch.Put(key, Value(i, &value_space));
|
||||
ASSERT_OK(db_->Write(WriteOptions(), &batch));
|
||||
}
|
||||
}
|
||||
|
||||
void Build(int n, int flush_every = 0) { Build(n, 0, flush_every); }
|
||||
|
||||
void Check(int min_expected, int max_expected) {
|
||||
uint64_t next_expected = 0;
|
||||
uint64_t missed = 0;
|
||||
@ -619,6 +621,102 @@ TEST_F(CorruptionTest, ParanoidFileChecksOnCompact) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRangeFirst) {
|
||||
Options options;
|
||||
options.paranoid_file_checks = true;
|
||||
options.create_if_missing = true;
|
||||
for (bool do_flush : {true, false}) {
|
||||
delete db_;
|
||||
db_ = nullptr;
|
||||
ASSERT_OK(DestroyDB(dbname_, options));
|
||||
ASSERT_OK(DB::Open(options, dbname_, &db_));
|
||||
std::string start, end;
|
||||
assert(db_ != nullptr);
|
||||
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
||||
Key(3, &start), Key(7, &end)));
|
||||
auto snap = db_->GetSnapshot();
|
||||
ASSERT_NE(snap, nullptr);
|
||||
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
||||
Key(8, &start), Key(9, &end)));
|
||||
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
||||
Key(2, &start), Key(5, &end)));
|
||||
Build(10);
|
||||
if (do_flush) {
|
||||
ASSERT_OK(db_->Flush(FlushOptions()));
|
||||
} else {
|
||||
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
|
||||
ASSERT_OK(dbi->TEST_FlushMemTable());
|
||||
ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
|
||||
}
|
||||
db_->ReleaseSnapshot(snap);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRange) {
|
||||
Options options;
|
||||
options.paranoid_file_checks = true;
|
||||
options.create_if_missing = true;
|
||||
for (bool do_flush : {true, false}) {
|
||||
delete db_;
|
||||
db_ = nullptr;
|
||||
ASSERT_OK(DestroyDB(dbname_, options));
|
||||
ASSERT_OK(DB::Open(options, dbname_, &db_));
|
||||
assert(db_ != nullptr);
|
||||
Build(10, 0, 0);
|
||||
std::string start, end;
|
||||
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
||||
Key(5, &start), Key(15, &end)));
|
||||
auto snap = db_->GetSnapshot();
|
||||
ASSERT_NE(snap, nullptr);
|
||||
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
||||
Key(8, &start), Key(9, &end)));
|
||||
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
||||
Key(12, &start), Key(17, &end)));
|
||||
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
||||
Key(2, &start), Key(4, &end)));
|
||||
Build(10, 10, 0);
|
||||
if (do_flush) {
|
||||
ASSERT_OK(db_->Flush(FlushOptions()));
|
||||
} else {
|
||||
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
|
||||
ASSERT_OK(dbi->TEST_FlushMemTable());
|
||||
ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
|
||||
}
|
||||
db_->ReleaseSnapshot(snap);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRangeLast) {
|
||||
Options options;
|
||||
options.paranoid_file_checks = true;
|
||||
options.create_if_missing = true;
|
||||
for (bool do_flush : {true, false}) {
|
||||
delete db_;
|
||||
db_ = nullptr;
|
||||
ASSERT_OK(DestroyDB(dbname_, options));
|
||||
ASSERT_OK(DB::Open(options, dbname_, &db_));
|
||||
assert(db_ != nullptr);
|
||||
std::string start, end;
|
||||
Build(10);
|
||||
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
||||
Key(3, &start), Key(7, &end)));
|
||||
auto snap = db_->GetSnapshot();
|
||||
ASSERT_NE(snap, nullptr);
|
||||
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
||||
Key(6, &start), Key(8, &end)));
|
||||
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
||||
Key(2, &start), Key(5, &end)));
|
||||
if (do_flush) {
|
||||
ASSERT_OK(db_->Flush(FlushOptions()));
|
||||
} else {
|
||||
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
|
||||
ASSERT_OK(dbi->TEST_FlushMemTable());
|
||||
ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
|
||||
}
|
||||
db_->ReleaseSnapshot(snap);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -8,6 +8,7 @@
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#include <cstring>
|
||||
#include <regex>
|
||||
|
||||
#include "db/db_test_util.h"
|
||||
#include "port/stack_trace.h"
|
||||
@ -62,6 +63,13 @@ TEST_F(DBBasicTest, UniqueSession) {
|
||||
|
||||
ASSERT_EQ(sid2, sid4);
|
||||
|
||||
// Expected compact format for session ids (see notes in implementation)
|
||||
std::regex expected("[0-9A-Z]{20}");
|
||||
const std::string match("match");
|
||||
EXPECT_EQ(match, std::regex_replace(sid1, expected, match));
|
||||
EXPECT_EQ(match, std::regex_replace(sid2, expected, match));
|
||||
EXPECT_EQ(match, std::regex_replace(sid3, expected, match));
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
Close();
|
||||
ASSERT_OK(ReadOnlyReopen(options));
|
||||
|
@ -302,7 +302,7 @@ Status DBImpl::Resume() {
|
||||
// 4. Schedule compactions if needed for all the CFs. This is needed as the
|
||||
// flush in the prior step might have been a no-op for some CFs, which
|
||||
// means a new super version wouldn't have been installed
|
||||
Status DBImpl::ResumeImpl() {
|
||||
Status DBImpl::ResumeImpl(DBRecoverContext context) {
|
||||
mutex_.AssertHeld();
|
||||
WaitForBackgroundWork();
|
||||
|
||||
@ -364,7 +364,7 @@ Status DBImpl::ResumeImpl() {
|
||||
autovector<ColumnFamilyData*> cfds;
|
||||
SelectColumnFamiliesForAtomicFlush(&cfds);
|
||||
mutex_.Unlock();
|
||||
s = AtomicFlushMemTables(cfds, flush_opts, FlushReason::kErrorRecovery);
|
||||
s = AtomicFlushMemTables(cfds, flush_opts, context.flush_reason);
|
||||
mutex_.Lock();
|
||||
} else {
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
@ -373,7 +373,7 @@ Status DBImpl::ResumeImpl() {
|
||||
}
|
||||
cfd->Ref();
|
||||
mutex_.Unlock();
|
||||
s = FlushMemTable(cfd, flush_opts, FlushReason::kErrorRecovery);
|
||||
s = FlushMemTable(cfd, flush_opts, context.flush_reason);
|
||||
mutex_.Lock();
|
||||
cfd->UnrefAndTryDelete();
|
||||
if (!s.ok()) {
|
||||
@ -3690,13 +3690,29 @@ Status DBImpl::GetDbSessionId(std::string& session_id) const {
|
||||
}
|
||||
|
||||
void DBImpl::SetDbSessionId() {
|
||||
// GenerateUniqueId() generates an identifier
|
||||
// that has a negligible probability of being duplicated
|
||||
db_session_id_ = env_->GenerateUniqueId();
|
||||
// Remove the extra '\n' at the end if there is one
|
||||
if (!db_session_id_.empty() && db_session_id_.back() == '\n') {
|
||||
db_session_id_.pop_back();
|
||||
// GenerateUniqueId() generates an identifier that has a negligible
|
||||
// probability of being duplicated, ~128 bits of entropy
|
||||
std::string uuid = env_->GenerateUniqueId();
|
||||
|
||||
// Hash and reformat that down to a more compact format, 20 characters
|
||||
// in base-36 ([0-9A-Z]), which is ~103 bits of entropy, which is enough
|
||||
// to expect no collisions across a billion servers each opening DBs
|
||||
// a million times (~2^50). Benefits vs. raw unique id:
|
||||
// * Save ~ dozen bytes per SST file
|
||||
// * Shorter shared backup file names (some platforms have low limits)
|
||||
// * Visually distinct from DB id format
|
||||
uint64_t a = NPHash64(uuid.data(), uuid.size(), 1234U);
|
||||
uint64_t b = NPHash64(uuid.data(), uuid.size(), 5678U);
|
||||
db_session_id_.resize(20);
|
||||
static const char* const base36 = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
|
||||
size_t i = 0;
|
||||
for (; i < 10U; ++i, a /= 36U) {
|
||||
db_session_id_[i] = base36[a % 36];
|
||||
}
|
||||
for (; i < 20U; ++i, b /= 36U) {
|
||||
db_session_id_[i] = base36[b % 36];
|
||||
}
|
||||
TEST_SYNC_POINT_CALLBACK("DBImpl::SetDbSessionId", &db_session_id_);
|
||||
}
|
||||
|
||||
// Default implementation -- returns not supported status
|
||||
|
@ -1379,7 +1379,7 @@ class DBImpl : public DB {
|
||||
// Required: DB mutex held
|
||||
Status PersistentStatsProcessFormatVersion();
|
||||
|
||||
Status ResumeImpl();
|
||||
Status ResumeImpl(DBRecoverContext context);
|
||||
|
||||
void MaybeIgnoreError(Status* s) const;
|
||||
|
||||
|
@ -125,7 +125,12 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
|
||||
// "number < current_log_number".
|
||||
MarkLogsSynced(current_log_number - 1, true, io_s);
|
||||
if (!io_s.ok()) {
|
||||
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
|
||||
if (total_log_size_ > 0) {
|
||||
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
|
||||
} else {
|
||||
// If the WAL is empty, we use different error reason
|
||||
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL);
|
||||
}
|
||||
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
|
||||
return io_s;
|
||||
}
|
||||
@ -178,6 +183,10 @@ Status DBImpl::FlushMemTableToOutputFile(
|
||||
// SyncClosedLogs() may unlock and re-lock the db_mutex.
|
||||
io_s = SyncClosedLogs(job_context);
|
||||
s = io_s;
|
||||
if (!io_s.ok() && !io_s.IsShutdownInProgress() &&
|
||||
!io_s.IsColumnFamilyDropped()) {
|
||||
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
|
||||
}
|
||||
} else {
|
||||
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip");
|
||||
}
|
||||
@ -217,10 +226,14 @@ Status DBImpl::FlushMemTableToOutputFile(
|
||||
// CURRENT file. With current code, it's just difficult to tell. So just
|
||||
// be pessimistic and try write to a new MANIFEST.
|
||||
// TODO: distinguish between MANIFEST write and CURRENT renaming
|
||||
auto err_reason = versions_->io_status().ok()
|
||||
? BackgroundErrorReason::kFlush
|
||||
: BackgroundErrorReason::kManifestWrite;
|
||||
error_handler_.SetBGError(io_s, err_reason);
|
||||
if (!versions_->io_status().ok()) {
|
||||
error_handler_.SetBGError(io_s, BackgroundErrorReason::kManifestWrite);
|
||||
} else if (total_log_size_ > 0) {
|
||||
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
|
||||
} else {
|
||||
// If the WAL is empty, we use different error reason
|
||||
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL);
|
||||
}
|
||||
} else {
|
||||
Status new_bg_error = s;
|
||||
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
|
||||
@ -593,10 +606,14 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
||||
// CURRENT file. With current code, it's just difficult to tell. So just
|
||||
// be pessimistic and try write to a new MANIFEST.
|
||||
// TODO: distinguish between MANIFEST write and CURRENT renaming
|
||||
auto err_reason = versions_->io_status().ok()
|
||||
? BackgroundErrorReason::kFlush
|
||||
: BackgroundErrorReason::kManifestWrite;
|
||||
error_handler_.SetBGError(io_s, err_reason);
|
||||
if (!versions_->io_status().ok()) {
|
||||
error_handler_.SetBGError(io_s, BackgroundErrorReason::kManifestWrite);
|
||||
} else if (total_log_size_ > 0) {
|
||||
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
|
||||
} else {
|
||||
// If the WAL is empty, we use different error reason
|
||||
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL);
|
||||
}
|
||||
} else {
|
||||
Status new_bg_error = s;
|
||||
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
|
||||
@ -1598,6 +1615,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
|
||||
return s;
|
||||
}
|
||||
}
|
||||
|
||||
FlushRequest flush_req;
|
||||
{
|
||||
WriteContext context;
|
||||
@ -1614,7 +1632,11 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
|
||||
WaitForPendingWrites();
|
||||
|
||||
if (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) {
|
||||
s = SwitchMemtable(cfd, &context);
|
||||
if (flush_reason != FlushReason::kErrorRecoveryRetryFlush) {
|
||||
s = SwitchMemtable(cfd, &context);
|
||||
} else {
|
||||
assert(cfd->imm()->NumNotFlushed() > 0);
|
||||
}
|
||||
}
|
||||
if (s.ok()) {
|
||||
if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
|
||||
@ -1622,7 +1644,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
|
||||
flush_memtable_id = cfd->imm()->GetLatestMemTableID();
|
||||
flush_req.emplace_back(cfd, flush_memtable_id);
|
||||
}
|
||||
if (immutable_db_options_.persist_stats_to_disk) {
|
||||
if (immutable_db_options_.persist_stats_to_disk &&
|
||||
flush_reason != FlushReason::kErrorRecoveryRetryFlush) {
|
||||
ColumnFamilyData* cfd_stats =
|
||||
versions_->GetColumnFamilySet()->GetColumnFamily(
|
||||
kPersistentStatsColumnFamilyName);
|
||||
@ -1651,7 +1674,6 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (s.ok() && !flush_req.empty()) {
|
||||
for (auto& elem : flush_req) {
|
||||
ColumnFamilyData* loop_cfd = elem.first;
|
||||
@ -1687,8 +1709,10 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
|
||||
cfds.push_back(iter.first);
|
||||
flush_memtable_ids.push_back(&(iter.second));
|
||||
}
|
||||
s = WaitForFlushMemTables(cfds, flush_memtable_ids,
|
||||
(flush_reason == FlushReason::kErrorRecovery));
|
||||
s = WaitForFlushMemTables(
|
||||
cfds, flush_memtable_ids,
|
||||
(flush_reason == FlushReason::kErrorRecovery ||
|
||||
flush_reason == FlushReason::kErrorRecoveryRetryFlush));
|
||||
InstrumentedMutexLock lock_guard(&mutex_);
|
||||
for (auto* tmp_cfd : cfds) {
|
||||
tmp_cfd->UnrefAndTryDelete();
|
||||
@ -1746,7 +1770,8 @@ Status DBImpl::AtomicFlushMemTables(
|
||||
}
|
||||
}
|
||||
for (auto cfd : cfds) {
|
||||
if (cfd->mem()->IsEmpty() && cached_recoverable_state_empty_.load()) {
|
||||
if ((cfd->mem()->IsEmpty() && cached_recoverable_state_empty_.load()) ||
|
||||
flush_reason == FlushReason::kErrorRecoveryRetryFlush) {
|
||||
continue;
|
||||
}
|
||||
cfd->Ref();
|
||||
@ -1789,8 +1814,10 @@ Status DBImpl::AtomicFlushMemTables(
|
||||
for (auto& iter : flush_req) {
|
||||
flush_memtable_ids.push_back(&(iter.second));
|
||||
}
|
||||
s = WaitForFlushMemTables(cfds, flush_memtable_ids,
|
||||
(flush_reason == FlushReason::kErrorRecovery));
|
||||
s = WaitForFlushMemTables(
|
||||
cfds, flush_memtable_ids,
|
||||
(flush_reason == FlushReason::kErrorRecovery ||
|
||||
flush_reason == FlushReason::kErrorRecoveryRetryFlush));
|
||||
InstrumentedMutexLock lock_guard(&mutex_);
|
||||
for (auto* cfd : cfds) {
|
||||
cfd->UnrefAndTryDelete();
|
||||
@ -1900,6 +1927,13 @@ Status DBImpl::WaitForFlushMemTables(
|
||||
if (!error_handler_.GetRecoveryError().ok()) {
|
||||
break;
|
||||
}
|
||||
// If BGWorkStopped, which indicate that there is a BG error and
|
||||
// 1) soft error but requires no BG work, 2) no in auto_recovery_
|
||||
if (!resuming_from_bg_err && error_handler_.IsBGWorkStopped() &&
|
||||
error_handler_.GetBGError().severity() < Status::Severity::kHardError) {
|
||||
return error_handler_.GetBGError();
|
||||
}
|
||||
|
||||
// Number of column families that have been dropped.
|
||||
int num_dropped = 0;
|
||||
// Number of column families that have finished flush.
|
||||
|
@ -470,6 +470,7 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
|
||||
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
|
||||
disable_memtable);
|
||||
write_thread_.JoinBatchGroup(&w);
|
||||
TEST_SYNC_POINT("DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup");
|
||||
if (w.state == WriteThread::STATE_GROUP_LEADER) {
|
||||
WriteThread::WriteGroup wal_write_group;
|
||||
if (w.callback && !w.callback->AllowWriteBatching()) {
|
||||
@ -833,6 +834,7 @@ void DBImpl::WriteStatusCheckOnLocked(const Status& status) {
|
||||
// Is setting bg_error_ enough here? This will at least stop
|
||||
// compaction and fail any further writes.
|
||||
// Caller must hold mutex_.
|
||||
assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok());
|
||||
mutex_.AssertHeld();
|
||||
if (immutable_db_options_.paranoid_checks && !status.ok() &&
|
||||
!status.IsBusy() && !status.IsIncomplete()) {
|
||||
@ -843,6 +845,7 @@ void DBImpl::WriteStatusCheckOnLocked(const Status& status) {
|
||||
void DBImpl::WriteStatusCheck(const Status& status) {
|
||||
// Is setting bg_error_ enough here? This will at least stop
|
||||
// compaction and fail any further writes.
|
||||
assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok());
|
||||
if (immutable_db_options_.paranoid_checks && !status.ok() &&
|
||||
!status.IsBusy() && !status.IsIncomplete()) {
|
||||
mutex_.Lock();
|
||||
@ -854,8 +857,9 @@ void DBImpl::WriteStatusCheck(const Status& status) {
|
||||
void DBImpl::IOStatusCheck(const IOStatus& io_status) {
|
||||
// Is setting bg_error_ enough here? This will at least stop
|
||||
// compaction and fail any further writes.
|
||||
if (immutable_db_options_.paranoid_checks && !io_status.ok() &&
|
||||
!io_status.IsBusy() && !io_status.IsIncomplete()) {
|
||||
if ((immutable_db_options_.paranoid_checks && !io_status.ok() &&
|
||||
!io_status.IsBusy() && !io_status.IsIncomplete()) ||
|
||||
io_status.IsIOFenced()) {
|
||||
mutex_.Lock();
|
||||
error_handler_.SetBGError(io_status, BackgroundErrorReason::kWriteCallback);
|
||||
mutex_.Unlock();
|
||||
|
@ -42,6 +42,125 @@ TEST_P(DBWriteTest, SyncAndDisableWAL) {
|
||||
ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());
|
||||
}
|
||||
|
||||
TEST_P(DBWriteTest, WriteStallRemoveNoSlowdownWrite) {
|
||||
Options options = GetOptions();
|
||||
options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger =
|
||||
4;
|
||||
std::vector<port::Thread> threads;
|
||||
std::atomic<int> thread_num(0);
|
||||
port::Mutex mutex;
|
||||
port::CondVar cv(&mutex);
|
||||
// Guarded by mutex
|
||||
int writers = 0;
|
||||
|
||||
Reopen(options);
|
||||
|
||||
std::function<void()> write_slowdown_func = [&]() {
|
||||
int a = thread_num.fetch_add(1);
|
||||
std::string key = "foo" + std::to_string(a);
|
||||
WriteOptions wo;
|
||||
wo.no_slowdown = false;
|
||||
dbfull()->Put(wo, key, "bar");
|
||||
};
|
||||
std::function<void()> write_no_slowdown_func = [&]() {
|
||||
int a = thread_num.fetch_add(1);
|
||||
std::string key = "foo" + std::to_string(a);
|
||||
WriteOptions wo;
|
||||
wo.no_slowdown = true;
|
||||
dbfull()->Put(wo, key, "bar");
|
||||
};
|
||||
std::function<void(void*)> unblock_main_thread_func = [&](void*) {
|
||||
mutex.Lock();
|
||||
++writers;
|
||||
cv.SignalAll();
|
||||
mutex.Unlock();
|
||||
};
|
||||
|
||||
// Create 3 L0 files and schedule 4th without waiting
|
||||
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
|
||||
Flush();
|
||||
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
|
||||
Flush();
|
||||
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
|
||||
Flush();
|
||||
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBWriteTest::WriteStallRemoveNoSlowdownWrite:1",
|
||||
"DBImpl::BackgroundCallFlush:start"},
|
||||
{"DBWriteTest::WriteStallRemoveNoSlowdownWrite:2",
|
||||
"DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup"},
|
||||
// Make compaction start wait for the write stall to be detected and
|
||||
// implemented by a write group leader
|
||||
{"DBWriteTest::WriteStallRemoveNoSlowdownWrite:3",
|
||||
"BackgroundCallCompaction:0"}});
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
// Schedule creation of 4th L0 file without waiting. This will seal the
|
||||
// memtable and then wait for a sync point before writing the file. We need
|
||||
// to do it this way because SwitchMemtable() needs to enter the
|
||||
// write_thread
|
||||
FlushOptions fopt;
|
||||
fopt.wait = false;
|
||||
dbfull()->Flush(fopt);
|
||||
|
||||
// Create a mix of slowdown/no_slowdown write threads
|
||||
mutex.Lock();
|
||||
// First leader
|
||||
threads.emplace_back(write_slowdown_func);
|
||||
while (writers != 1) {
|
||||
cv.Wait();
|
||||
}
|
||||
|
||||
// Second leader. Will stall writes
|
||||
// Build a writers list with no slowdown in the middle:
|
||||
// +-------------+
|
||||
// | slowdown +<----+ newest
|
||||
// +--+----------+
|
||||
// |
|
||||
// v
|
||||
// +--+----------+
|
||||
// | no slowdown |
|
||||
// +--+----------+
|
||||
// |
|
||||
// v
|
||||
// +--+----------+
|
||||
// | slowdown +
|
||||
// +-------------+
|
||||
threads.emplace_back(write_slowdown_func);
|
||||
while (writers != 2) {
|
||||
cv.Wait();
|
||||
}
|
||||
threads.emplace_back(write_no_slowdown_func);
|
||||
while (writers != 3) {
|
||||
cv.Wait();
|
||||
}
|
||||
threads.emplace_back(write_slowdown_func);
|
||||
while (writers != 4) {
|
||||
cv.Wait();
|
||||
}
|
||||
|
||||
mutex.Unlock();
|
||||
|
||||
TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:1");
|
||||
dbfull()->TEST_WaitForFlushMemTable(nullptr);
|
||||
// This would have triggered a write stall. Unblock the write group leader
|
||||
TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:2");
|
||||
// The leader is going to create missing newer links. When the leader
|
||||
// finishes, the next leader is going to delay writes and fail writers with
|
||||
// no_slowdown
|
||||
|
||||
TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:3");
|
||||
for (auto& t : threads) {
|
||||
t.join();
|
||||
}
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
}
|
||||
|
||||
TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
|
||||
Options options = GetOptions();
|
||||
options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = 4;
|
||||
|
@ -32,6 +32,14 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, Status::SubCode, bool>,
|
||||
Status::Code::kIOError, Status::SubCode::kSpaceLimit,
|
||||
true),
|
||||
Status::Severity::kHardError},
|
||||
{std::make_tuple(BackgroundErrorReason::kCompaction,
|
||||
Status::Code::kIOError, Status::SubCode::kIOFenced,
|
||||
true),
|
||||
Status::Severity::kFatalError},
|
||||
{std::make_tuple(BackgroundErrorReason::kCompaction,
|
||||
Status::Code::kIOError, Status::SubCode::kIOFenced,
|
||||
false),
|
||||
Status::Severity::kFatalError},
|
||||
// Errors during BG flush
|
||||
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
|
||||
Status::SubCode::kNoSpace, true),
|
||||
@ -42,6 +50,12 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, Status::SubCode, bool>,
|
||||
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
|
||||
Status::SubCode::kSpaceLimit, true),
|
||||
Status::Severity::kHardError},
|
||||
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
|
||||
Status::SubCode::kIOFenced, true),
|
||||
Status::Severity::kFatalError},
|
||||
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
|
||||
Status::SubCode::kIOFenced, false),
|
||||
Status::Severity::kFatalError},
|
||||
// Errors during Write
|
||||
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
|
||||
Status::Code::kIOError, Status::SubCode::kNoSpace,
|
||||
@ -51,6 +65,14 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, Status::SubCode, bool>,
|
||||
Status::Code::kIOError, Status::SubCode::kNoSpace,
|
||||
false),
|
||||
Status::Severity::kHardError},
|
||||
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
|
||||
Status::Code::kIOError, Status::SubCode::kIOFenced,
|
||||
true),
|
||||
Status::Severity::kFatalError},
|
||||
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
|
||||
Status::Code::kIOError, Status::SubCode::kIOFenced,
|
||||
false),
|
||||
Status::Severity::kFatalError},
|
||||
// Errors during MANIFEST write
|
||||
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
|
||||
Status::Code::kIOError, Status::SubCode::kNoSpace,
|
||||
@ -60,6 +82,36 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, Status::SubCode, bool>,
|
||||
Status::Code::kIOError, Status::SubCode::kNoSpace,
|
||||
false),
|
||||
Status::Severity::kHardError},
|
||||
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
|
||||
Status::Code::kIOError, Status::SubCode::kIOFenced,
|
||||
true),
|
||||
Status::Severity::kFatalError},
|
||||
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
|
||||
Status::Code::kIOError, Status::SubCode::kIOFenced,
|
||||
false),
|
||||
Status::Severity::kFatalError},
|
||||
// Errors during BG flush with WAL disabled
|
||||
{std::make_tuple(BackgroundErrorReason::kFlushNoWAL,
|
||||
Status::Code::kIOError, Status::SubCode::kNoSpace,
|
||||
true),
|
||||
Status::Severity::kHardError},
|
||||
{std::make_tuple(BackgroundErrorReason::kFlushNoWAL,
|
||||
Status::Code::kIOError, Status::SubCode::kNoSpace,
|
||||
false),
|
||||
Status::Severity::kNoError},
|
||||
{std::make_tuple(BackgroundErrorReason::kFlushNoWAL,
|
||||
Status::Code::kIOError, Status::SubCode::kSpaceLimit,
|
||||
true),
|
||||
Status::Severity::kHardError},
|
||||
{std::make_tuple(BackgroundErrorReason::kFlushNoWAL,
|
||||
Status::Code::kIOError, Status::SubCode::kIOFenced,
|
||||
true),
|
||||
Status::Severity::kFatalError},
|
||||
{std::make_tuple(BackgroundErrorReason::kFlushNoWAL,
|
||||
Status::Code::kIOError, Status::SubCode::kIOFenced,
|
||||
false),
|
||||
Status::Severity::kFatalError},
|
||||
|
||||
};
|
||||
|
||||
std::map<std::tuple<BackgroundErrorReason, Status::Code, bool>,
|
||||
@ -110,6 +162,19 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, bool>,
|
||||
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
|
||||
Status::Code::kIOError, false),
|
||||
Status::Severity::kFatalError},
|
||||
// Errors during BG flush with WAL disabled
|
||||
{std::make_tuple(BackgroundErrorReason::kFlushNoWAL,
|
||||
Status::Code::kCorruption, true),
|
||||
Status::Severity::kUnrecoverableError},
|
||||
{std::make_tuple(BackgroundErrorReason::kFlushNoWAL,
|
||||
Status::Code::kCorruption, false),
|
||||
Status::Severity::kNoError},
|
||||
{std::make_tuple(BackgroundErrorReason::kFlushNoWAL,
|
||||
Status::Code::kIOError, true),
|
||||
Status::Severity::kFatalError},
|
||||
{std::make_tuple(BackgroundErrorReason::kFlushNoWAL,
|
||||
Status::Code::kIOError, false),
|
||||
Status::Severity::kNoError},
|
||||
};
|
||||
|
||||
std::map<std::tuple<BackgroundErrorReason, bool>, Status::Severity>
|
||||
@ -188,6 +253,7 @@ Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reas
|
||||
bool paranoid = db_options_.paranoid_checks;
|
||||
Status::Severity sev = Status::Severity::kFatalError;
|
||||
Status new_bg_err;
|
||||
DBRecoverContext context;
|
||||
bool found = false;
|
||||
|
||||
{
|
||||
@ -246,6 +312,7 @@ Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reas
|
||||
}
|
||||
}
|
||||
|
||||
recover_context_ = context;
|
||||
if (auto_recovery) {
|
||||
recovery_in_prog_ = true;
|
||||
|
||||
@ -273,8 +340,10 @@ Status ErrorHandler::SetBGError(const IOStatus& bg_io_err,
|
||||
// Always returns ok
|
||||
db_->DisableFileDeletionsWithLock();
|
||||
}
|
||||
|
||||
Status new_bg_io_err = bg_io_err;
|
||||
Status s;
|
||||
DBRecoverContext context;
|
||||
if (bg_io_err.GetDataLoss()) {
|
||||
// FIrst, data loss is treated as unrecoverable error. So it can directly
|
||||
// overwrite any existing bg_error_.
|
||||
@ -286,6 +355,7 @@ Status ErrorHandler::SetBGError(const IOStatus& bg_io_err,
|
||||
}
|
||||
EventHelpers::NotifyOnBackgroundError(db_options_.listeners, reason, &s,
|
||||
db_mutex_, &auto_recovery);
|
||||
recover_context_ = context;
|
||||
return bg_error_;
|
||||
} else if (bg_io_err.GetRetryable()) {
|
||||
// Second, check if the error is a retryable IO error or not. if it is
|
||||
@ -302,7 +372,27 @@ Status ErrorHandler::SetBGError(const IOStatus& bg_io_err,
|
||||
if (bg_err.severity() > bg_error_.severity()) {
|
||||
bg_error_ = bg_err;
|
||||
}
|
||||
recover_context_ = context;
|
||||
return bg_error_;
|
||||
} else if (BackgroundErrorReason::kFlushNoWAL == reason) {
|
||||
// When the BG Retryable IO error reason is flush without WAL,
|
||||
// We map it to a soft error. At the same time, all the background work
|
||||
// should be stopped except the BG work from recovery. Therefore, we
|
||||
// set the soft_error_no_bg_work_ to true. At the same time, since DB
|
||||
// continues to receive writes when BG error is soft error, to avoid
|
||||
// to many small memtable being generated during auto resume, the flush
|
||||
// reason is set to kErrorRecoveryRetryFlush.
|
||||
Status bg_err(new_bg_io_err, Status::Severity::kSoftError);
|
||||
if (recovery_in_prog_ && recovery_error_.ok()) {
|
||||
recovery_error_ = bg_err;
|
||||
}
|
||||
if (bg_err.severity() > bg_error_.severity()) {
|
||||
bg_error_ = bg_err;
|
||||
}
|
||||
soft_error_no_bg_work_ = true;
|
||||
context.flush_reason = FlushReason::kErrorRecoveryRetryFlush;
|
||||
recover_context_ = context;
|
||||
return StartRecoverFromRetryableBGIOError(bg_io_err);
|
||||
} else {
|
||||
Status bg_err(new_bg_io_err, Status::Severity::kHardError);
|
||||
if (recovery_in_prog_ && recovery_error_.ok()) {
|
||||
@ -311,6 +401,7 @@ Status ErrorHandler::SetBGError(const IOStatus& bg_io_err,
|
||||
if (bg_err.severity() > bg_error_.severity()) {
|
||||
bg_error_ = bg_err;
|
||||
}
|
||||
recover_context_ = context;
|
||||
return StartRecoverFromRetryableBGIOError(bg_io_err);
|
||||
}
|
||||
} else {
|
||||
@ -377,6 +468,7 @@ Status ErrorHandler::ClearBGError() {
|
||||
Status old_bg_error = bg_error_;
|
||||
bg_error_ = Status::OK();
|
||||
recovery_in_prog_ = false;
|
||||
soft_error_no_bg_work_ = false;
|
||||
EventHelpers::NotifyOnErrorRecoveryCompleted(db_options_.listeners,
|
||||
old_bg_error, db_mutex_);
|
||||
}
|
||||
@ -389,6 +481,7 @@ Status ErrorHandler::ClearBGError() {
|
||||
Status ErrorHandler::RecoverFromBGError(bool is_manual) {
|
||||
#ifndef ROCKSDB_LITE
|
||||
InstrumentedMutexLock l(db_mutex_);
|
||||
bool no_bg_work_original_flag = soft_error_no_bg_work_;
|
||||
if (is_manual) {
|
||||
// If its a manual recovery and there's a background recovery in progress
|
||||
// return busy status
|
||||
@ -396,9 +489,24 @@ Status ErrorHandler::RecoverFromBGError(bool is_manual) {
|
||||
return Status::Busy();
|
||||
}
|
||||
recovery_in_prog_ = true;
|
||||
|
||||
// In manual resume, we allow the bg work to run. If it is a auto resume,
|
||||
// the bg work should follow this tag.
|
||||
soft_error_no_bg_work_ = false;
|
||||
|
||||
// In manual resume, if the bg error is a soft error and also requires
|
||||
// no bg work, the error must be recovered by call the flush with
|
||||
// flush reason: kErrorRecoveryRetryFlush. In other case, the flush
|
||||
// reason is set to kErrorRecovery.
|
||||
if (no_bg_work_original_flag) {
|
||||
recover_context_.flush_reason = FlushReason::kErrorRecoveryRetryFlush;
|
||||
} else {
|
||||
recover_context_.flush_reason = FlushReason::kErrorRecovery;
|
||||
}
|
||||
}
|
||||
|
||||
if (bg_error_.severity() == Status::Severity::kSoftError) {
|
||||
if (bg_error_.severity() == Status::Severity::kSoftError &&
|
||||
recover_context_.flush_reason == FlushReason::kErrorRecovery) {
|
||||
// Simply clear the background error and return
|
||||
recovery_error_ = Status::OK();
|
||||
return ClearBGError();
|
||||
@ -408,7 +516,13 @@ Status ErrorHandler::RecoverFromBGError(bool is_manual) {
|
||||
// during the recovery process. While recovering, the only operations that
|
||||
// can generate background errors should be the flush operations
|
||||
recovery_error_ = Status::OK();
|
||||
Status s = db_->ResumeImpl();
|
||||
Status s = db_->ResumeImpl(recover_context_);
|
||||
if (s.ok()) {
|
||||
soft_error_no_bg_work_ = false;
|
||||
} else {
|
||||
soft_error_no_bg_work_ = no_bg_work_original_flag;
|
||||
}
|
||||
|
||||
// For manual recover, shutdown, and fatal error cases, set
|
||||
// recovery_in_prog_ to false. For automatic background recovery, leave it
|
||||
// as is regardless of success or failure as it will be retried
|
||||
@ -461,6 +575,7 @@ void ErrorHandler::RecoverFromRetryableBGIOError() {
|
||||
if (end_recovery_) {
|
||||
return;
|
||||
}
|
||||
DBRecoverContext context = recover_context_;
|
||||
int resume_count = db_options_.max_bgerror_resume_count;
|
||||
uint64_t wait_interval = db_options_.bgerror_resume_retry_interval;
|
||||
// Recover from the retryable error. Create a separate thread to do it.
|
||||
@ -472,7 +587,7 @@ void ErrorHandler::RecoverFromRetryableBGIOError() {
|
||||
TEST_SYNC_POINT("RecoverFromRetryableBGIOError:BeforeResume1");
|
||||
recovery_io_error_ = IOStatus::OK();
|
||||
recovery_error_ = Status::OK();
|
||||
Status s = db_->ResumeImpl();
|
||||
Status s = db_->ResumeImpl(context);
|
||||
TEST_SYNC_POINT("RecoverFromRetryableBGIOError:AfterResume0");
|
||||
TEST_SYNC_POINT("RecoverFromRetryableBGIOError:AfterResume1");
|
||||
if (s.IsShutdownInProgress() ||
|
||||
@ -507,6 +622,9 @@ void ErrorHandler::RecoverFromRetryableBGIOError() {
|
||||
EventHelpers::NotifyOnErrorRecoveryCompleted(db_options_.listeners,
|
||||
old_bg_error, db_mutex_);
|
||||
recovery_in_prog_ = false;
|
||||
if (soft_error_no_bg_work_) {
|
||||
soft_error_no_bg_work_ = false;
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
TEST_SYNC_POINT("RecoverFromRetryableBGIOError:RecoverFail1");
|
||||
|
@ -14,6 +14,17 @@ namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
class DBImpl;
|
||||
|
||||
// This structure is used to store the DB recovery context. The context is
|
||||
// the information that related to the recover actions. For example, it contains
|
||||
// FlushReason, which tells the flush job why this flush is called.
|
||||
struct DBRecoverContext {
|
||||
FlushReason flush_reason;
|
||||
|
||||
DBRecoverContext() : flush_reason(FlushReason::kErrorRecovery) {}
|
||||
|
||||
DBRecoverContext(FlushReason reason) : flush_reason(reason) {}
|
||||
};
|
||||
|
||||
class ErrorHandler {
|
||||
public:
|
||||
ErrorHandler(DBImpl* db, const ImmutableDBOptions& db_options,
|
||||
@ -28,7 +39,8 @@ class ErrorHandler {
|
||||
recovery_thread_(nullptr),
|
||||
db_mutex_(db_mutex),
|
||||
auto_recovery_(false),
|
||||
recovery_in_prog_(false) {}
|
||||
recovery_in_prog_(false),
|
||||
soft_error_no_bg_work_(false) {}
|
||||
~ErrorHandler() {
|
||||
bg_error_.PermitUncheckedError();
|
||||
recovery_error_.PermitUncheckedError();
|
||||
@ -59,9 +71,11 @@ class ErrorHandler {
|
||||
bool IsBGWorkStopped() {
|
||||
return !bg_error_.ok() &&
|
||||
(bg_error_.severity() >= Status::Severity::kHardError ||
|
||||
!auto_recovery_);
|
||||
!auto_recovery_ || soft_error_no_bg_work_);
|
||||
}
|
||||
|
||||
bool IsSoftErrorNoBGWork() { return soft_error_no_bg_work_; }
|
||||
|
||||
bool IsRecoveryInProgress() { return recovery_in_prog_; }
|
||||
|
||||
Status RecoverFromBGError(bool is_manual = false);
|
||||
@ -89,6 +103,12 @@ class ErrorHandler {
|
||||
// A flag indicating whether automatic recovery from errors is enabled
|
||||
bool auto_recovery_;
|
||||
bool recovery_in_prog_;
|
||||
// A flag to indicate that for the soft error, we should not allow any
|
||||
// backrgound work execpt the work is from recovery.
|
||||
bool soft_error_no_bg_work_;
|
||||
|
||||
// Used to store the context for recover, such as flush reason.
|
||||
DBRecoverContext recover_context_;
|
||||
|
||||
Status OverrideNoSpaceError(Status bg_error, bool* auto_recovery);
|
||||
void RecoverFromNoSpace();
|
||||
|
@ -183,7 +183,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWriteError) {
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeError) {
|
||||
TEST_F(DBErrorHandlingFSTest, FLushWritRetryableError) {
|
||||
std::shared_ptr<FaultInjectionTestFS> fault_fs(
|
||||
new FaultInjectionTestFS(FileSystem::Default()));
|
||||
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
|
||||
@ -247,6 +247,143 @@ TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeError) {
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableError1) {
|
||||
std::shared_ptr<FaultInjectionTestFS> fault_fs(
|
||||
new FaultInjectionTestFS(FileSystem::Default()));
|
||||
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener(
|
||||
new ErrorHandlerFSListener());
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_fs_env.get();
|
||||
options.create_if_missing = true;
|
||||
options.listeners.emplace_back(listener);
|
||||
options.max_bgerror_resume_count = 0;
|
||||
Status s;
|
||||
|
||||
listener->EnableAutoRecovery(false);
|
||||
DestroyAndReopen(options);
|
||||
|
||||
IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
|
||||
error_msg.SetRetryable(true);
|
||||
|
||||
WriteOptions wo = WriteOptions();
|
||||
wo.disableWAL = true;
|
||||
Put(Key(1), "val1", wo);
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"BuildTable:BeforeFinishBuildTable",
|
||||
[&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
Put(Key(2), "val2", wo);
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
ASSERT_EQ("val2", Get(Key(2)));
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs->SetFilesystemActive(true);
|
||||
s = dbfull()->Resume();
|
||||
ASSERT_EQ(s, Status::OK());
|
||||
ASSERT_EQ("val1", Get(Key(1)));
|
||||
ASSERT_EQ("val2", Get(Key(2)));
|
||||
Put(Key(3), "val3", wo);
|
||||
ASSERT_EQ("val3", Get(Key(3)));
|
||||
s = Flush();
|
||||
ASSERT_OK(s);
|
||||
ASSERT_EQ("val3", Get(Key(3)));
|
||||
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableError2) {
|
||||
std::shared_ptr<FaultInjectionTestFS> fault_fs(
|
||||
new FaultInjectionTestFS(FileSystem::Default()));
|
||||
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener(
|
||||
new ErrorHandlerFSListener());
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_fs_env.get();
|
||||
options.create_if_missing = true;
|
||||
options.listeners.emplace_back(listener);
|
||||
options.max_bgerror_resume_count = 0;
|
||||
Status s;
|
||||
|
||||
listener->EnableAutoRecovery(false);
|
||||
DestroyAndReopen(options);
|
||||
|
||||
IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
|
||||
error_msg.SetRetryable(true);
|
||||
|
||||
WriteOptions wo = WriteOptions();
|
||||
wo.disableWAL = true;
|
||||
|
||||
Put(Key(1), "val1", wo);
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"BuildTable:BeforeSyncTable",
|
||||
[&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
Put(Key(2), "val2", wo);
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
ASSERT_EQ("val2", Get(Key(2)));
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs->SetFilesystemActive(true);
|
||||
s = dbfull()->Resume();
|
||||
ASSERT_EQ(s, Status::OK());
|
||||
ASSERT_EQ("val1", Get(Key(1)));
|
||||
ASSERT_EQ("val2", Get(Key(2)));
|
||||
Put(Key(3), "val3", wo);
|
||||
ASSERT_EQ("val3", Get(Key(3)));
|
||||
s = Flush();
|
||||
ASSERT_OK(s);
|
||||
ASSERT_EQ("val3", Get(Key(3)));
|
||||
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableError3) {
|
||||
std::shared_ptr<FaultInjectionTestFS> fault_fs(
|
||||
new FaultInjectionTestFS(FileSystem::Default()));
|
||||
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener(
|
||||
new ErrorHandlerFSListener());
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_fs_env.get();
|
||||
options.create_if_missing = true;
|
||||
options.listeners.emplace_back(listener);
|
||||
options.max_bgerror_resume_count = 0;
|
||||
Status s;
|
||||
|
||||
listener->EnableAutoRecovery(false);
|
||||
DestroyAndReopen(options);
|
||||
|
||||
IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
|
||||
error_msg.SetRetryable(true);
|
||||
|
||||
WriteOptions wo = WriteOptions();
|
||||
wo.disableWAL = true;
|
||||
|
||||
Put(Key(1), "val1", wo);
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"BuildTable:BeforeCloseTableFile",
|
||||
[&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
Put(Key(2), "val2", wo);
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
ASSERT_EQ("val2", Get(Key(2)));
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs->SetFilesystemActive(true);
|
||||
s = dbfull()->Resume();
|
||||
ASSERT_EQ(s, Status::OK());
|
||||
ASSERT_EQ("val1", Get(Key(1)));
|
||||
ASSERT_EQ("val2", Get(Key(2)));
|
||||
Put(Key(3), "val3", wo);
|
||||
ASSERT_EQ("val3", Get(Key(3)));
|
||||
s = Flush();
|
||||
ASSERT_OK(s);
|
||||
ASSERT_EQ("val3", Get(Key(3)));
|
||||
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingFSTest, ManifestWriteError) {
|
||||
std::shared_ptr<FaultInjectionTestFS> fault_fs(
|
||||
new FaultInjectionTestFS(FileSystem::Default()));
|
||||
@ -1213,6 +1350,114 @@ TEST_F(DBErrorHandlingFSTest, MultiDBVariousErrors) {
|
||||
delete def_env;
|
||||
}
|
||||
|
||||
// When Put the KV-pair, the write option is set to disable WAL.
|
||||
// If retryable error happens in this condition, map the bg error
|
||||
// to soft error and trigger auto resume. During auto resume, SwitchMemtable
|
||||
// is disabled to avoid small SST tables. Write can still be applied before
|
||||
// the bg error is cleaned unless the memtable is full.
|
||||
TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableeErrorAutoRecover1) {
|
||||
// Activate the FS before the first resume
|
||||
std::shared_ptr<FaultInjectionTestFS> fault_fs(
|
||||
new FaultInjectionTestFS(FileSystem::Default()));
|
||||
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener(
|
||||
new ErrorHandlerFSListener());
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_fs_env.get();
|
||||
options.create_if_missing = true;
|
||||
options.listeners.emplace_back(listener);
|
||||
options.max_bgerror_resume_count = 2;
|
||||
options.bgerror_resume_retry_interval = 100000; // 0.1 second
|
||||
Status s;
|
||||
|
||||
listener->EnableAutoRecovery(false);
|
||||
DestroyAndReopen(options);
|
||||
|
||||
IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
|
||||
error_msg.SetRetryable(true);
|
||||
|
||||
WriteOptions wo = WriteOptions();
|
||||
wo.disableWAL = true;
|
||||
Put(Key(1), "val1", wo);
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"RecoverFromRetryableBGIOError:LoopOut",
|
||||
"FLushWritNoWALRetryableeErrorAutoRecover1:1"}});
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"BuildTable:BeforeFinishBuildTable",
|
||||
[&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
|
||||
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ("val1", Get(Key(1)));
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
TEST_SYNC_POINT("FLushWritNoWALRetryableeErrorAutoRecover1:1");
|
||||
ASSERT_EQ("val1", Get(Key(1)));
|
||||
ASSERT_EQ("val1", Get(Key(1)));
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs->SetFilesystemActive(true);
|
||||
Put(Key(2), "val2", wo);
|
||||
s = Flush();
|
||||
// Since auto resume fails, the bg error is not cleand, flush will
|
||||
// return the bg_error set before.
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
ASSERT_EQ("val2", Get(Key(2)));
|
||||
|
||||
// call auto resume
|
||||
s = dbfull()->Resume();
|
||||
ASSERT_EQ(s, Status::OK());
|
||||
Put(Key(3), "val3", wo);
|
||||
s = Flush();
|
||||
// After resume is successful, the flush should be ok.
|
||||
ASSERT_EQ(s, Status::OK());
|
||||
ASSERT_EQ("val3", Get(Key(3)));
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableeErrorAutoRecover2) {
|
||||
// Activate the FS before the first resume
|
||||
std::shared_ptr<FaultInjectionTestFS> fault_fs(
|
||||
new FaultInjectionTestFS(FileSystem::Default()));
|
||||
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener(
|
||||
new ErrorHandlerFSListener());
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_fs_env.get();
|
||||
options.create_if_missing = true;
|
||||
options.listeners.emplace_back(listener);
|
||||
options.max_bgerror_resume_count = 2;
|
||||
options.bgerror_resume_retry_interval = 100000; // 0.1 second
|
||||
Status s;
|
||||
|
||||
listener->EnableAutoRecovery(false);
|
||||
DestroyAndReopen(options);
|
||||
|
||||
IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
|
||||
error_msg.SetRetryable(true);
|
||||
|
||||
WriteOptions wo = WriteOptions();
|
||||
wo.disableWAL = true;
|
||||
Put(Key(1), "val1", wo);
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"BuildTable:BeforeFinishBuildTable",
|
||||
[&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
|
||||
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ("val1", Get(Key(1)));
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs->SetFilesystemActive(true);
|
||||
ASSERT_EQ(listener->WaitForRecovery(5000000), true);
|
||||
ASSERT_EQ("val1", Get(Key(1)));
|
||||
Put(Key(2), "val2", wo);
|
||||
s = Flush();
|
||||
// Since auto resume is successful, the bg error is cleaned, flush will
|
||||
// be successful.
|
||||
ASSERT_OK(s);
|
||||
ASSERT_EQ("val2", Get(Key(2)));
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingFSTest, DISABLED_FLushWritRetryableeErrorAutoRecover1) {
|
||||
// Fail the first resume and make the second resume successful
|
||||
std::shared_ptr<FaultInjectionTestFS> fault_fs(
|
||||
@ -1962,6 +2207,194 @@ TEST_F(DBErrorHandlingFSTest, WALWriteRetryableErrorAutoRecover2) {
|
||||
Close();
|
||||
}
|
||||
|
||||
class DBErrorHandlingFencingTest : public DBErrorHandlingFSTest,
|
||||
public testing::WithParamInterface<bool> {};
|
||||
|
||||
TEST_P(DBErrorHandlingFencingTest, FLushWriteFenced) {
|
||||
std::shared_ptr<FaultInjectionTestFS> fault_fs(
|
||||
new FaultInjectionTestFS(FileSystem::Default()));
|
||||
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener(
|
||||
new ErrorHandlerFSListener());
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_fs_env.get();
|
||||
options.create_if_missing = true;
|
||||
options.listeners.emplace_back(listener);
|
||||
options.paranoid_checks = GetParam();
|
||||
Status s;
|
||||
|
||||
listener->EnableAutoRecovery(true);
|
||||
DestroyAndReopen(options);
|
||||
|
||||
Put(Key(0), "val");
|
||||
SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
|
||||
fault_fs->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced"));
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
|
||||
ASSERT_TRUE(s.IsIOFenced());
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs->SetFilesystemActive(true);
|
||||
s = dbfull()->Resume();
|
||||
ASSERT_TRUE(s.IsIOFenced());
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_P(DBErrorHandlingFencingTest, ManifestWriteFenced) {
|
||||
std::shared_ptr<FaultInjectionTestFS> fault_fs(
|
||||
new FaultInjectionTestFS(FileSystem::Default()));
|
||||
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener(
|
||||
new ErrorHandlerFSListener());
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_fs_env.get();
|
||||
options.create_if_missing = true;
|
||||
options.listeners.emplace_back(listener);
|
||||
options.paranoid_checks = GetParam();
|
||||
Status s;
|
||||
std::string old_manifest;
|
||||
std::string new_manifest;
|
||||
|
||||
listener->EnableAutoRecovery(true);
|
||||
DestroyAndReopen(options);
|
||||
old_manifest = GetManifestNameFromLiveFiles();
|
||||
|
||||
Put(Key(0), "val");
|
||||
Flush();
|
||||
Put(Key(1), "val");
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"VersionSet::LogAndApply:WriteManifest", [&](void*) {
|
||||
fault_fs->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced"));
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
|
||||
ASSERT_TRUE(s.IsIOFenced());
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs->SetFilesystemActive(true);
|
||||
s = dbfull()->Resume();
|
||||
ASSERT_TRUE(s.IsIOFenced());
|
||||
Close();
|
||||
}
|
||||
|
||||
TEST_P(DBErrorHandlingFencingTest, CompactionWriteFenced) {
|
||||
std::shared_ptr<FaultInjectionTestFS> fault_fs(
|
||||
new FaultInjectionTestFS(FileSystem::Default()));
|
||||
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener(
|
||||
new ErrorHandlerFSListener());
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_fs_env.get();
|
||||
options.create_if_missing = true;
|
||||
options.level0_file_num_compaction_trigger = 2;
|
||||
options.listeners.emplace_back(listener);
|
||||
options.paranoid_checks = GetParam();
|
||||
Status s;
|
||||
DestroyAndReopen(options);
|
||||
|
||||
Put(Key(0), "va;");
|
||||
Put(Key(2), "va;");
|
||||
s = Flush();
|
||||
ASSERT_EQ(s, Status::OK());
|
||||
|
||||
listener->EnableAutoRecovery(true);
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::FlushMemTable:FlushMemTableFinished",
|
||||
"BackgroundCallCompaction:0"}});
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"BackgroundCallCompaction:0", [&](void*) {
|
||||
fault_fs->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced"));
|
||||
});
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
Put(Key(1), "val");
|
||||
s = Flush();
|
||||
ASSERT_EQ(s, Status::OK());
|
||||
|
||||
s = dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
|
||||
ASSERT_TRUE(s.IsIOFenced());
|
||||
|
||||
fault_fs->SetFilesystemActive(true);
|
||||
s = dbfull()->Resume();
|
||||
ASSERT_TRUE(s.IsIOFenced());
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_P(DBErrorHandlingFencingTest, WALWriteFenced) {
|
||||
std::shared_ptr<FaultInjectionTestFS> fault_fs(
|
||||
new FaultInjectionTestFS(FileSystem::Default()));
|
||||
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener(
|
||||
new ErrorHandlerFSListener());
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_fs_env.get();
|
||||
options.create_if_missing = true;
|
||||
options.writable_file_max_buffer_size = 32768;
|
||||
options.listeners.emplace_back(listener);
|
||||
options.paranoid_checks = GetParam();
|
||||
Status s;
|
||||
Random rnd(301);
|
||||
|
||||
listener->EnableAutoRecovery(true);
|
||||
DestroyAndReopen(options);
|
||||
|
||||
{
|
||||
WriteBatch batch;
|
||||
|
||||
for (auto i = 0; i < 100; ++i) {
|
||||
batch.Put(Key(i), rnd.RandomString(1024));
|
||||
}
|
||||
|
||||
WriteOptions wopts;
|
||||
wopts.sync = true;
|
||||
ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
|
||||
};
|
||||
|
||||
{
|
||||
WriteBatch batch;
|
||||
int write_error = 0;
|
||||
|
||||
for (auto i = 100; i < 199; ++i) {
|
||||
batch.Put(Key(i), rnd.RandomString(1024));
|
||||
}
|
||||
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
|
||||
write_error++;
|
||||
if (write_error > 2) {
|
||||
fault_fs->SetFilesystemActive(false,
|
||||
IOStatus::IOFenced("IO fenced"));
|
||||
}
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
WriteOptions wopts;
|
||||
wopts.sync = true;
|
||||
s = dbfull()->Write(wopts, &batch);
|
||||
ASSERT_TRUE(s.IsIOFenced());
|
||||
}
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs->SetFilesystemActive(true);
|
||||
{
|
||||
WriteBatch batch;
|
||||
|
||||
for (auto i = 0; i < 100; ++i) {
|
||||
batch.Put(Key(i), rnd.RandomString(1024));
|
||||
}
|
||||
|
||||
WriteOptions wopts;
|
||||
wopts.sync = true;
|
||||
s = dbfull()->Write(wopts, &batch);
|
||||
ASSERT_TRUE(s.IsIOFenced());
|
||||
}
|
||||
Close();
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(DBErrorHandlingFSTest, DBErrorHandlingFencingTest,
|
||||
::testing::Bool());
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -419,7 +419,7 @@ TEST_F(WriteBatchTest, PrepareCommit) {
|
||||
TEST_F(WriteBatchTest, DISABLED_ManyUpdates) {
|
||||
// Insert key and value of 3GB and push total batch size to 12GB.
|
||||
static const size_t kKeyValueSize = 4u;
|
||||
static const uint32_t kNumUpdates = uint32_t(3 << 30);
|
||||
static const uint32_t kNumUpdates = uint32_t{3} << 30;
|
||||
std::string raw(kKeyValueSize, 'A');
|
||||
WriteBatch batch(kNumUpdates * (4 + kKeyValueSize * 2) + 1024u);
|
||||
char c = 'A';
|
||||
|
@ -344,7 +344,13 @@ void WriteThread::BeginWriteStall() {
|
||||
prev->link_older = w->link_older;
|
||||
w->status = Status::Incomplete("Write stall");
|
||||
SetState(w, STATE_COMPLETED);
|
||||
if (prev->link_older) {
|
||||
// Only update `link_newer` if it's already set.
|
||||
// `CreateMissingNewerLinks()` will update the nullptr `link_newer` later,
|
||||
// which assumes the the first non-nullptr `link_newer` is the last
|
||||
// nullptr link in the writer list.
|
||||
// If `link_newer` is set here, `CreateMissingNewerLinks()` may stop
|
||||
// updating the whole list when it sees the first non nullptr link.
|
||||
if (prev->link_older && prev->link_older->link_newer) {
|
||||
prev->link_older->link_newer = prev;
|
||||
}
|
||||
w = prev->link_older;
|
||||
|
@ -1223,9 +1223,6 @@ Status StressTest::TestBackupRestore(
|
||||
// For debugging, get info_log from live options
|
||||
backup_opts.info_log = db_->GetDBOptions().info_log.get();
|
||||
assert(backup_opts.info_log);
|
||||
if (thread->rand.OneIn(2)) {
|
||||
backup_opts.file_checksum_gen_factory = options_.file_checksum_gen_factory;
|
||||
}
|
||||
if (thread->rand.OneIn(10)) {
|
||||
backup_opts.share_table_files = false;
|
||||
} else {
|
||||
@ -1236,11 +1233,22 @@ Status StressTest::TestBackupRestore(
|
||||
backup_opts.share_files_with_checksum = true;
|
||||
if (thread->rand.OneIn(2)) {
|
||||
// old
|
||||
backup_opts.share_files_with_checksum_naming = kChecksumAndFileSize;
|
||||
backup_opts.share_files_with_checksum_naming =
|
||||
BackupableDBOptions::kLegacyCrc32cAndFileSize;
|
||||
} else {
|
||||
// new
|
||||
backup_opts.share_files_with_checksum_naming =
|
||||
kOptionalChecksumAndDbSessionId;
|
||||
BackupableDBOptions::kUseDbSessionId;
|
||||
}
|
||||
if (thread->rand.OneIn(2)) {
|
||||
backup_opts.share_files_with_checksum_naming =
|
||||
backup_opts.share_files_with_checksum_naming |
|
||||
BackupableDBOptions::kFlagIncludeFileSize;
|
||||
}
|
||||
if (thread->rand.OneIn(2)) {
|
||||
backup_opts.share_files_with_checksum_naming =
|
||||
backup_opts.share_files_with_checksum_naming |
|
||||
BackupableDBOptions::kFlagMatchInterimNaming;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
11
env/composite_env_wrapper.h
vendored
11
env/composite_env_wrapper.h
vendored
@ -727,11 +727,22 @@ class LegacyWritableFileWrapper : public FSWritableFile {
|
||||
IODebugContext* /*dbg*/) override {
|
||||
return status_to_io_status(target_->Append(data));
|
||||
}
|
||||
IOStatus Append(const Slice& data, const IOOptions& /*options*/,
|
||||
const DataVerificationInfo& /*verification_info*/,
|
||||
IODebugContext* /*dbg*/) override {
|
||||
return status_to_io_status(target_->Append(data));
|
||||
}
|
||||
IOStatus PositionedAppend(const Slice& data, uint64_t offset,
|
||||
const IOOptions& /*options*/,
|
||||
IODebugContext* /*dbg*/) override {
|
||||
return status_to_io_status(target_->PositionedAppend(data, offset));
|
||||
}
|
||||
IOStatus PositionedAppend(const Slice& data, uint64_t offset,
|
||||
const IOOptions& /*options*/,
|
||||
const DataVerificationInfo& /*verification_info*/,
|
||||
IODebugContext* /*dbg*/) override {
|
||||
return status_to_io_status(target_->PositionedAppend(data, offset));
|
||||
}
|
||||
IOStatus Truncate(uint64_t size, const IOOptions& /*options*/,
|
||||
IODebugContext* /*dbg*/) override {
|
||||
return status_to_io_status(target_->Truncate(size));
|
||||
|
11
env/file_system_tracer.h
vendored
11
env/file_system_tracer.h
vendored
@ -244,10 +244,21 @@ class FSWritableFileTracingWrapper : public FSWritableFileWrapper {
|
||||
|
||||
IOStatus Append(const Slice& data, const IOOptions& options,
|
||||
IODebugContext* dbg) override;
|
||||
IOStatus Append(const Slice& data, const IOOptions& options,
|
||||
const DataVerificationInfo& /*verification_info*/,
|
||||
IODebugContext* dbg) override {
|
||||
return Append(data, options, dbg);
|
||||
}
|
||||
|
||||
IOStatus PositionedAppend(const Slice& data, uint64_t offset,
|
||||
const IOOptions& options,
|
||||
IODebugContext* dbg) override;
|
||||
IOStatus PositionedAppend(const Slice& data, uint64_t offset,
|
||||
const IOOptions& options,
|
||||
const DataVerificationInfo& /*verification_info*/,
|
||||
IODebugContext* dbg) override {
|
||||
return PositionedAppend(data, offset, options, dbg);
|
||||
}
|
||||
|
||||
IOStatus Truncate(uint64_t size, const IOOptions& options,
|
||||
IODebugContext* dbg) override;
|
||||
|
16
env/io_posix.h
vendored
16
env/io_posix.h
vendored
@ -242,9 +242,20 @@ class PosixWritableFile : public FSWritableFile {
|
||||
virtual IOStatus Close(const IOOptions& opts, IODebugContext* dbg) override;
|
||||
virtual IOStatus Append(const Slice& data, const IOOptions& opts,
|
||||
IODebugContext* dbg) override;
|
||||
virtual IOStatus Append(const Slice& data, const IOOptions& opts,
|
||||
const DataVerificationInfo& /* verification_info */,
|
||||
IODebugContext* dbg) override {
|
||||
return Append(data, opts, dbg);
|
||||
}
|
||||
virtual IOStatus PositionedAppend(const Slice& data, uint64_t offset,
|
||||
const IOOptions& opts,
|
||||
IODebugContext* dbg) override;
|
||||
virtual IOStatus PositionedAppend(
|
||||
const Slice& data, uint64_t offset, const IOOptions& opts,
|
||||
const DataVerificationInfo& /* verification_info */,
|
||||
IODebugContext* dbg) override {
|
||||
return PositionedAppend(data, offset, opts, dbg);
|
||||
}
|
||||
virtual IOStatus Flush(const IOOptions& opts, IODebugContext* dbg) override;
|
||||
virtual IOStatus Sync(const IOOptions& opts, IODebugContext* dbg) override;
|
||||
virtual IOStatus Fsync(const IOOptions& opts, IODebugContext* dbg) override;
|
||||
@ -331,6 +342,11 @@ class PosixMmapFile : public FSWritableFile {
|
||||
virtual IOStatus Close(const IOOptions& opts, IODebugContext* dbg) override;
|
||||
virtual IOStatus Append(const Slice& data, const IOOptions& opts,
|
||||
IODebugContext* dbg) override;
|
||||
virtual IOStatus Append(const Slice& data, const IOOptions& opts,
|
||||
const DataVerificationInfo& /* verification_info */,
|
||||
IODebugContext* dbg) override {
|
||||
return Append(data, opts, dbg);
|
||||
}
|
||||
virtual IOStatus Flush(const IOOptions& opts, IODebugContext* dbg) override;
|
||||
virtual IOStatus Sync(const IOOptions& opts, IODebugContext* dbg) override;
|
||||
virtual IOStatus Fsync(const IOOptions& opts, IODebugContext* dbg) override;
|
||||
|
@ -702,6 +702,13 @@ class FSRandomAccessFile {
|
||||
// RandomAccessFileWrapper too.
|
||||
};
|
||||
|
||||
// A data structure brings the data verification information, which is
|
||||
// used togther with data being written to a file.
|
||||
struct DataVerificationInfo {
|
||||
// checksum of the data being written.
|
||||
Slice checksum;
|
||||
};
|
||||
|
||||
// A file abstraction for sequential writing. The implementation
|
||||
// must provide buffering since callers may append small fragments
|
||||
// at a time to the file.
|
||||
@ -729,6 +736,16 @@ class FSWritableFile {
|
||||
virtual IOStatus Append(const Slice& data, const IOOptions& options,
|
||||
IODebugContext* dbg) = 0;
|
||||
|
||||
// EXPERIMENTAL / CURRENTLY UNUSED
|
||||
// Append data with verification information
|
||||
// Note that this API change is experimental and it might be changed in
|
||||
// the future. Currently, RocksDB does not use this API.
|
||||
virtual IOStatus Append(const Slice& data, const IOOptions& options,
|
||||
const DataVerificationInfo& /* verification_info */,
|
||||
IODebugContext* dbg) {
|
||||
return Append(data, options, dbg);
|
||||
}
|
||||
|
||||
// PositionedAppend data to the specified offset. The new EOF after append
|
||||
// must be larger than the previous EOF. This is to be used when writes are
|
||||
// not backed by OS buffers and hence has to always start from the start of
|
||||
@ -756,6 +773,18 @@ class FSWritableFile {
|
||||
return IOStatus::NotSupported();
|
||||
}
|
||||
|
||||
// EXPERIMENTAL / CURRENTLY UNUSED
|
||||
// PositionedAppend data with verification information.
|
||||
// Note that this API change is experimental and it might be changed in
|
||||
// the future. Currently, RocksDB does not use this API.
|
||||
virtual IOStatus PositionedAppend(
|
||||
const Slice& /* data */, uint64_t /* offset */,
|
||||
const IOOptions& /*options*/,
|
||||
const DataVerificationInfo& /* verification_info */,
|
||||
IODebugContext* /*dbg*/) {
|
||||
return IOStatus::NotSupported();
|
||||
}
|
||||
|
||||
// Truncate is necessary to trim the file to the correct size
|
||||
// before closing. It is not always possible to keep track of the file
|
||||
// size due to whole pages writes. The behavior is undefined if called
|
||||
@ -1286,11 +1315,23 @@ class FSWritableFileWrapper : public FSWritableFile {
|
||||
IODebugContext* dbg) override {
|
||||
return target_->Append(data, options, dbg);
|
||||
}
|
||||
IOStatus Append(const Slice& data, const IOOptions& options,
|
||||
const DataVerificationInfo& verification_info,
|
||||
IODebugContext* dbg) override {
|
||||
return target_->Append(data, options, verification_info, dbg);
|
||||
}
|
||||
IOStatus PositionedAppend(const Slice& data, uint64_t offset,
|
||||
const IOOptions& options,
|
||||
IODebugContext* dbg) override {
|
||||
return target_->PositionedAppend(data, offset, options, dbg);
|
||||
}
|
||||
IOStatus PositionedAppend(const Slice& data, uint64_t offset,
|
||||
const IOOptions& options,
|
||||
const DataVerificationInfo& verification_info,
|
||||
IODebugContext* dbg) override {
|
||||
return target_->PositionedAppend(data, offset, options, verification_info,
|
||||
dbg);
|
||||
}
|
||||
IOStatus Truncate(uint64_t size, const IOOptions& options,
|
||||
IODebugContext* dbg) override {
|
||||
return target_->Truncate(size, options, dbg);
|
||||
|
@ -126,6 +126,11 @@ class IOStatus : public Status {
|
||||
return IOStatus(kIOError, kPathNotFound, msg, msg2);
|
||||
}
|
||||
|
||||
static IOStatus IOFenced() { return IOStatus(kIOError, kIOFenced); }
|
||||
static IOStatus IOFenced(const Slice& msg, const Slice& msg2 = Slice()) {
|
||||
return IOStatus(kIOError, kIOFenced, msg, msg2);
|
||||
}
|
||||
|
||||
// Return a string representation of this status suitable for printing.
|
||||
// Returns the string "OK" for success.
|
||||
// std::string ToString() const;
|
||||
|
@ -115,6 +115,9 @@ enum class FlushReason : int {
|
||||
kAutoCompaction = 0x09,
|
||||
kManualFlush = 0x0a,
|
||||
kErrorRecovery = 0xb,
|
||||
// When set the flush reason to kErrorRecoveryRetryFlush, SwitchMemtable
|
||||
// will not be called to avoid many small immutable memtables.
|
||||
kErrorRecoveryRetryFlush = 0xc,
|
||||
};
|
||||
|
||||
enum class BackgroundErrorReason {
|
||||
@ -123,6 +126,7 @@ enum class BackgroundErrorReason {
|
||||
kWriteCallback,
|
||||
kMemTable,
|
||||
kManifestWrite,
|
||||
kFlushNoWAL,
|
||||
};
|
||||
|
||||
enum class WriteStallCondition {
|
||||
|
@ -113,6 +113,7 @@ class Status {
|
||||
kManualCompactionPaused = 11,
|
||||
kOverwritten = 12,
|
||||
kTxnNotPrepared = 13,
|
||||
kIOFenced = 14,
|
||||
kMaxSubCode
|
||||
};
|
||||
|
||||
@ -482,6 +483,14 @@ class Status {
|
||||
return (code() == kInvalidArgument) && (subcode() == kTxnNotPrepared);
|
||||
}
|
||||
|
||||
// Returns true iff the status indicates a IOFenced error.
|
||||
bool IsIOFenced() const {
|
||||
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
|
||||
checked_ = true;
|
||||
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
|
||||
return (code() == kIOError) && (subcode() == kIOFenced);
|
||||
}
|
||||
|
||||
// Return a string representation of this status suitable for printing.
|
||||
// Returns the string "OK" for success.
|
||||
std::string ToString() const;
|
||||
|
@ -24,26 +24,10 @@
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
// The default DB file checksum function name.
|
||||
constexpr char kDbFileChecksumFuncName[] = "FileChecksumCrc32c";
|
||||
// The default BackupEngine file checksum function name.
|
||||
constexpr char kDefaultBackupFileChecksumFuncName[] = "crc32c";
|
||||
|
||||
// BackupTableNameOption describes possible naming schemes for backup
|
||||
// table file names when the table files are stored in the shared_checksum
|
||||
// directory (i.e., both share_table_files and share_files_with_checksum
|
||||
// are true).
|
||||
enum BackupTableNameOption : unsigned char {
|
||||
// Backup SST filenames are <file_number>_<crc32c>_<file_size>.sst
|
||||
// where <crc32c> is uint32_t.
|
||||
kChecksumAndFileSize = 0,
|
||||
// Backup SST filenames are <file_number>_<crc32c>_<db_session_id>.sst
|
||||
// where <crc32c> is hexidecimally encoded.
|
||||
// When DBOptions::file_checksum_gen_factory is not set to
|
||||
// GetFileChecksumGenCrc32cFactory(), the filenames will be
|
||||
// <file_number>_<db_session_id>.sst
|
||||
// When there are no db session ids available in the table file, this
|
||||
// option will use kChecksumAndFileSize as a fallback.
|
||||
kOptionalChecksumAndDbSessionId = 1
|
||||
};
|
||||
constexpr char kBackupFileChecksumFuncName[] = "crc32c";
|
||||
|
||||
struct BackupableDBOptions {
|
||||
// Where to keep the backup files. Has to be different than dbname_
|
||||
@ -108,19 +92,16 @@ struct BackupableDBOptions {
|
||||
// Default: nullptr
|
||||
std::shared_ptr<RateLimiter> restore_rate_limiter{nullptr};
|
||||
|
||||
// Only used if share_table_files is set to true. If true, will consider that
|
||||
// backups can come from different databases, hence an sst is not uniquely
|
||||
// identifed by its name, but by the triple
|
||||
// (file name, crc32c, db session id or file length)
|
||||
// Only used if share_table_files is set to true. If true, will consider
|
||||
// that backups can come from different databases, even differently mutated
|
||||
// databases with the same DB ID. See share_files_with_checksum_naming and
|
||||
// ShareFilesNaming for details on how table files names are made
|
||||
// unique between databases.
|
||||
//
|
||||
// Note: If this option is set to true, we recommend setting
|
||||
// share_files_with_checksum_naming to kOptionalChecksumAndDbSessionId, which
|
||||
// is also our default option. Otherwise, there is a non-negligible chance of
|
||||
// filename collision when sharing tables in shared_checksum among several
|
||||
// DBs.
|
||||
// *turn it on only if you know what you're doing*
|
||||
// Using 'true' is fundamentally safer, and performance improvements vs.
|
||||
// original design should leave almost no reason to use the 'false' setting.
|
||||
//
|
||||
// Default: false
|
||||
// Default (only for historical reasons): false
|
||||
bool share_files_with_checksum;
|
||||
|
||||
// Up to this many background threads will copy files for CreateNewBackup()
|
||||
@ -144,51 +125,79 @@ struct BackupableDBOptions {
|
||||
// Default: INT_MAX
|
||||
int max_valid_backups_to_open;
|
||||
|
||||
// Naming option for share_files_with_checksum table files. This option
|
||||
// can be set to kChecksumAndFileSize or kOptionalChecksumAndDbSessionId.
|
||||
// kChecksumAndFileSize is susceptible to collision as file size is not a
|
||||
// good source of entroy.
|
||||
// kOptionalChecksumAndDbSessionId is immune to collision.
|
||||
// ShareFilesNaming describes possible naming schemes for backup
|
||||
// table file names when the table files are stored in the shared_checksum
|
||||
// directory (i.e., both share_table_files and share_files_with_checksum
|
||||
// are true).
|
||||
enum ShareFilesNaming : uint32_t {
|
||||
// Backup SST filenames are <file_number>_<crc32c>_<file_size>.sst
|
||||
// where <crc32c> is an unsigned decimal integer. This is the
|
||||
// original/legacy naming scheme for share_files_with_checksum,
|
||||
// with two problems:
|
||||
// * At massive scale, collisions on this triple with different file
|
||||
// contents is plausible.
|
||||
// * Determining the name to use requires computing the checksum,
|
||||
// so generally requires reading the whole file even if the file
|
||||
// is already backed up.
|
||||
// ** ONLY RECOMMENDED FOR PRESERVING OLD BEHAVIOR **
|
||||
kLegacyCrc32cAndFileSize = 1U,
|
||||
|
||||
// Backup SST filenames are <file_number>_s<db_session_id>.sst. This
|
||||
// pair of values should be very strongly unique for a given SST file
|
||||
// and easily determined before computing a checksum. The 's' indicates
|
||||
// the value is a DB session id, not a checksum.
|
||||
//
|
||||
// Exceptions:
|
||||
// * For old SST files without a DB session id, kLegacyCrc32cAndFileSize
|
||||
// will be used instead, matching the names assigned by RocksDB versions
|
||||
// not supporting the newer naming scheme.
|
||||
// * See also flags below.
|
||||
kUseDbSessionId = 2U,
|
||||
|
||||
kMaskNoNamingFlags = 0xffffU,
|
||||
|
||||
// If not already part of the naming scheme, insert
|
||||
// _<file_size>
|
||||
// before .sst in the name. In case of user code actually parsing the
|
||||
// last _<whatever> before the .sst as the file size, this preserves that
|
||||
// feature of kLegacyCrc32cAndFileSize. In other words, this option makes
|
||||
// official that unofficial feature of the backup metadata.
|
||||
//
|
||||
// We do not consider SST file sizes to have sufficient entropy to
|
||||
// contribute significantly to naming uniqueness.
|
||||
kFlagIncludeFileSize = 1U << 31,
|
||||
|
||||
// When encountering an SST file from a Facebook-internal early
|
||||
// release of 6.12, use the default naming scheme in effect for
|
||||
// when the SST file was generated (assuming full file checksum
|
||||
// was not set to GetFileChecksumGenCrc32cFactory()). That naming is
|
||||
// <file_number>_<db_session_id>.sst
|
||||
// and ignores kFlagIncludeFileSize setting.
|
||||
// NOTE: This flag is intended to be temporary and should be removed
|
||||
// in a later release.
|
||||
kFlagMatchInterimNaming = 1U << 30,
|
||||
|
||||
kMaskNamingFlags = ~kMaskNoNamingFlags,
|
||||
};
|
||||
|
||||
// Naming option for share_files_with_checksum table files. See
|
||||
// ShareFilesNaming for details.
|
||||
//
|
||||
// Modifying this option cannot introduce a downgrade compatibility issue
|
||||
// because RocksDB can read, restore, and delete backups using different file
|
||||
// names, and it's OK for a backup directory to use a mixture of table file
|
||||
// naming schemes.
|
||||
//
|
||||
// Default: kOptionalChecksumAndDbSessionId
|
||||
// However, modifying this option and saving more backups to the same
|
||||
// directory can lead to the same file getting saved again to that
|
||||
// directory, under the new shared name in addition to the old shared
|
||||
// name.
|
||||
//
|
||||
// Default: kUseDbSessionId | kFlagIncludeFileSize | kFlagMatchInterimNaming
|
||||
//
|
||||
// Note: This option comes into effect only if both share_files_with_checksum
|
||||
// and share_table_files are true. In the cases of old table files where no
|
||||
// db_session_id is stored, we use the file_size to replace the empty
|
||||
// db_session_id as a fallback.
|
||||
BackupTableNameOption share_files_with_checksum_naming;
|
||||
|
||||
// Option for custom checksum functions.
|
||||
// When this option is nullptr, BackupEngine will use its default crc32c as
|
||||
// the checksum function.
|
||||
//
|
||||
// When it is not nullptr, BackupEngine will try to find in the factory the
|
||||
// checksum function that DB used to calculate the file checksums. If such a
|
||||
// function is found, BackupEngine will use it to create, verify, or restore
|
||||
// backups, in addition to the default crc32c checksum function. If such a
|
||||
// function is not found, BackupEngine will return Status::InvalidArgument().
|
||||
// Therefore, this option comes into effect only if DB has a custom checksum
|
||||
// factory and this option is set to the same factory.
|
||||
//
|
||||
//
|
||||
// Note: If share_files_with_checksum and share_table_files are true,
|
||||
// the <checksum> appeared in the table filenames will be the custom checksum
|
||||
// value if db session ids are available (namely, table file naming options
|
||||
// is kOptionalChecksumAndDbSessionId and the db session ids obtained from
|
||||
// the table files are nonempty).
|
||||
//
|
||||
// Note: We do not require the same setting to this option for backup
|
||||
// restoration or verification as was set during backup creation but we
|
||||
// strongly recommend setting it to the same as the DB file checksum function
|
||||
// for all BackupEngine interactions when practical.
|
||||
//
|
||||
// Default: nullptr
|
||||
std::shared_ptr<FileChecksumGenFactory> file_checksum_gen_factory;
|
||||
// and share_table_files are true.
|
||||
ShareFilesNaming share_files_with_checksum_naming;
|
||||
|
||||
void Dump(Logger* logger) const;
|
||||
|
||||
@ -200,10 +209,9 @@ struct BackupableDBOptions {
|
||||
uint64_t _restore_rate_limit = 0, int _max_background_operations = 1,
|
||||
uint64_t _callback_trigger_interval_size = 4 * 1024 * 1024,
|
||||
int _max_valid_backups_to_open = INT_MAX,
|
||||
BackupTableNameOption _share_files_with_checksum_naming =
|
||||
kOptionalChecksumAndDbSessionId,
|
||||
std::shared_ptr<FileChecksumGenFactory> _file_checksum_gen_factory =
|
||||
nullptr)
|
||||
ShareFilesNaming _share_files_with_checksum_naming =
|
||||
static_cast<ShareFilesNaming>(kUseDbSessionId | kFlagIncludeFileSize |
|
||||
kFlagMatchInterimNaming))
|
||||
: backup_dir(_backup_dir),
|
||||
backup_env(_backup_env),
|
||||
share_table_files(_share_table_files),
|
||||
@ -217,19 +225,38 @@ struct BackupableDBOptions {
|
||||
max_background_operations(_max_background_operations),
|
||||
callback_trigger_interval_size(_callback_trigger_interval_size),
|
||||
max_valid_backups_to_open(_max_valid_backups_to_open),
|
||||
share_files_with_checksum_naming(_share_files_with_checksum_naming),
|
||||
file_checksum_gen_factory(_file_checksum_gen_factory) {
|
||||
share_files_with_checksum_naming(_share_files_with_checksum_naming) {
|
||||
assert(share_table_files || !share_files_with_checksum);
|
||||
assert((share_files_with_checksum_naming & kMaskNoNamingFlags) != 0);
|
||||
}
|
||||
};
|
||||
|
||||
inline BackupableDBOptions::ShareFilesNaming operator&(
|
||||
BackupableDBOptions::ShareFilesNaming lhs,
|
||||
BackupableDBOptions::ShareFilesNaming rhs) {
|
||||
uint32_t l = static_cast<uint32_t>(lhs);
|
||||
uint32_t r = static_cast<uint32_t>(rhs);
|
||||
assert(r == BackupableDBOptions::kMaskNoNamingFlags ||
|
||||
(r & BackupableDBOptions::kMaskNoNamingFlags) == 0);
|
||||
return static_cast<BackupableDBOptions::ShareFilesNaming>(l & r);
|
||||
}
|
||||
|
||||
inline BackupableDBOptions::ShareFilesNaming operator|(
|
||||
BackupableDBOptions::ShareFilesNaming lhs,
|
||||
BackupableDBOptions::ShareFilesNaming rhs) {
|
||||
uint32_t l = static_cast<uint32_t>(lhs);
|
||||
uint32_t r = static_cast<uint32_t>(rhs);
|
||||
assert((r & BackupableDBOptions::kMaskNoNamingFlags) == 0);
|
||||
return static_cast<BackupableDBOptions::ShareFilesNaming>(l | r);
|
||||
}
|
||||
|
||||
struct CreateBackupOptions {
|
||||
// Flush will always trigger if 2PC is enabled.
|
||||
// If write-ahead logs are disabled, set flush_before_backup=true to
|
||||
// avoid losing unflushed key/value pairs from the memtable.
|
||||
bool flush_before_backup = false;
|
||||
|
||||
// Callback for reporting progress.
|
||||
// Callback for reporting progress, based on callback_trigger_interval_size.
|
||||
std::function<void()> progress_callback = []() {};
|
||||
|
||||
// If false, background_thread_cpu_priority is ignored.
|
||||
@ -355,18 +382,16 @@ class BackupEngineReadOnly {
|
||||
}
|
||||
|
||||
// If verify_with_checksum is true, this function
|
||||
// inspects the default crc32c checksums and file sizes of backup files to
|
||||
// see if they match our expectation. This function further inspects the
|
||||
// custom checksums if BackupableDBOptions::file_checksum_gen_factory is
|
||||
// the same as DBOptions::file_checksum_gen_factory.
|
||||
// inspects the current checksums and file sizes of backup files to see if
|
||||
// they match our expectation.
|
||||
//
|
||||
// If verify_with_checksum is false, this function
|
||||
// checks that each file exists and that the size of the file matches our
|
||||
// expectation. It does not check file checksum.
|
||||
//
|
||||
// If this BackupEngine created the backup, it compares the files' current
|
||||
// sizes (and current checksums) against the number of bytes written to
|
||||
// them (and the checksums calculated) during creation.
|
||||
// sizes (and current checksum) against the number of bytes written to
|
||||
// them (and the checksum calculated) during creation.
|
||||
// Otherwise, it compares the files' current sizes (and checksums) against
|
||||
// their sizes (and checksums) when the BackupEngine was opened.
|
||||
//
|
||||
@ -486,9 +511,7 @@ class BackupEngine {
|
||||
|
||||
// If verify_with_checksum is true, this function
|
||||
// inspects the current checksums and file sizes of backup files to see if
|
||||
// they match our expectation. It further inspects the custom checksums
|
||||
// if BackupableDBOptions::file_checksum_gen_factory is the same as
|
||||
// DBOptions::file_checksum_gen_factory.
|
||||
// they match our expectation.
|
||||
//
|
||||
// If verify_with_checksum is false, this function
|
||||
// checks that each file exists and that the size of the file matches our
|
||||
|
@ -6,7 +6,7 @@
|
||||
|
||||
#define ROCKSDB_MAJOR 6
|
||||
#define ROCKSDB_MINOR 13
|
||||
#define ROCKSDB_PATCH 0
|
||||
#define ROCKSDB_PATCH 4
|
||||
|
||||
// Do not use these. We made the mistake of declaring macros starting with
|
||||
// double underscore. Now we have to live with our choice. We'll deprecate these
|
||||
|
@ -821,7 +821,12 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
|
||||
"block_cache=1M;block_cache_compressed=1k;block_size=1024;"
|
||||
"block_size_deviation=8;block_restart_interval=4;"
|
||||
"format_version=5;whole_key_filtering=1;"
|
||||
"filter_policy=bloomfilter:4.567:false;",
|
||||
"filter_policy=bloomfilter:4.567:false;"
|
||||
// A bug caused read_amp_bytes_per_bit to be a large integer in OPTIONS
|
||||
// file generated by 6.10 to 6.14. Though bug is fixed in these releases,
|
||||
// we need to handle the case of loading OPTIONS file generated before the
|
||||
// fix.
|
||||
"read_amp_bytes_per_bit=17179869185;",
|
||||
&new_opt));
|
||||
ASSERT_TRUE(new_opt.cache_index_and_filter_blocks);
|
||||
ASSERT_EQ(new_opt.index_type, BlockBasedTableOptions::kHashSearch);
|
||||
@ -842,6 +847,9 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
|
||||
dynamic_cast<const BloomFilterPolicy&>(*new_opt.filter_policy);
|
||||
EXPECT_EQ(bfp.GetMillibitsPerKey(), 4567);
|
||||
EXPECT_EQ(bfp.GetWholeBitsPerKey(), 5);
|
||||
// Verify that only the lower 32bits are stored in
|
||||
// new_opt.read_amp_bytes_per_bit.
|
||||
EXPECT_EQ(1U, new_opt.read_amp_bytes_per_bit);
|
||||
|
||||
// unknown option
|
||||
ASSERT_NOK(GetBlockBasedTableOptionsFromString(
|
||||
|
1
src.mk
1
src.mk
@ -303,6 +303,7 @@ STRESS_LIB_SOURCES = \
|
||||
|
||||
TEST_LIB_SOURCES = \
|
||||
db/db_test_util.cc \
|
||||
test_util/mock_time_env.cc \
|
||||
test_util/testharness.cc \
|
||||
test_util/testutil.cc \
|
||||
utilities/cassandra/test_utils.cc \
|
||||
|
@ -331,8 +331,24 @@ static std::unordered_map<std::string, OptionTypeInfo>
|
||||
OptionTypeFlags::kNone, 0}},
|
||||
{"read_amp_bytes_per_bit",
|
||||
{offsetof(struct BlockBasedTableOptions, read_amp_bytes_per_bit),
|
||||
OptionType::kSizeT, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone, 0}},
|
||||
OptionType::kUInt32T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone, 0,
|
||||
[](const ConfigOptions& /*opts*/, const std::string& /*name*/,
|
||||
const std::string& value, char* addr) {
|
||||
// A workaround to fix a bug in 6.10, 6.11, 6.12, 6.13
|
||||
// and 6.14. The bug will write out 8 bytes to OPTIONS file from the
|
||||
// starting address of BlockBasedTableOptions.read_amp_bytes_per_bit
|
||||
// which is actually a uint32. Consequently, the value of
|
||||
// read_amp_bytes_per_bit written in the OPTIONS file is wrong.
|
||||
// From 6.15, RocksDB will try to parse the read_amp_bytes_per_bit
|
||||
// from OPTIONS file as a uint32. To be able to load OPTIONS file
|
||||
// generated by affected releases before the fix, we need to
|
||||
// manually parse read_amp_bytes_per_bit with this special hack.
|
||||
uint64_t read_amp_bytes_per_bit = ParseUint64(value);
|
||||
*(reinterpret_cast<uint32_t*>(addr)) =
|
||||
static_cast<uint32_t>(read_amp_bytes_per_bit);
|
||||
return Status::OK();
|
||||
}}},
|
||||
{"enable_index_compression",
|
||||
{offsetof(struct BlockBasedTableOptions, enable_index_compression),
|
||||
OptionType::kBoolean, OptionVerificationType::kNormal,
|
||||
|
@ -173,7 +173,7 @@ Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro,
|
||||
return s;
|
||||
}
|
||||
if (block.GetValue() != nullptr) {
|
||||
if (block.IsCached()) {
|
||||
if (block.IsCached() || block.GetOwnValue()) {
|
||||
if (pin) {
|
||||
partition_map_[handle.offset()] = std::move(block);
|
||||
}
|
||||
|
@ -149,6 +149,11 @@ class IteratorWrapperBase {
|
||||
return result_.value_prepared;
|
||||
}
|
||||
|
||||
Slice user_key() const {
|
||||
assert(Valid());
|
||||
return iter_->user_key();
|
||||
}
|
||||
|
||||
private:
|
||||
void Update() {
|
||||
valid_ = iter_->Valid();
|
||||
|
@ -96,8 +96,12 @@ void PropertyBlockBuilder::AddTableProperty(const TableProperties& props) {
|
||||
if (props.file_creation_time > 0) {
|
||||
Add(TablePropertiesNames::kFileCreationTime, props.file_creation_time);
|
||||
}
|
||||
Add(TablePropertiesNames::kDbId, props.db_id);
|
||||
Add(TablePropertiesNames::kDbSessionId, props.db_session_id);
|
||||
if (!props.db_id.empty()) {
|
||||
Add(TablePropertiesNames::kDbId, props.db_id);
|
||||
}
|
||||
if (!props.db_session_id.empty()) {
|
||||
Add(TablePropertiesNames::kDbSessionId, props.db_session_id);
|
||||
}
|
||||
|
||||
if (!props.filter_policy_name.empty()) {
|
||||
Add(TablePropertiesNames::kFilterPolicy, props.filter_policy_name);
|
||||
|
@ -7,6 +7,7 @@
|
||||
|
||||
#include "rocksdb/sst_file_reader.h"
|
||||
|
||||
#include "db/arena_wrapped_db_iter.h"
|
||||
#include "db/db_iter.h"
|
||||
#include "db/dbformat.h"
|
||||
#include "env/composite_env_wrapper.h"
|
||||
@ -62,18 +63,23 @@ Status SstFileReader::Open(const std::string& file_path) {
|
||||
return s;
|
||||
}
|
||||
|
||||
Iterator* SstFileReader::NewIterator(const ReadOptions& options) {
|
||||
Iterator* SstFileReader::NewIterator(const ReadOptions& roptions) {
|
||||
auto r = rep_.get();
|
||||
auto sequence = options.snapshot != nullptr
|
||||
? options.snapshot->GetSequenceNumber()
|
||||
auto sequence = roptions.snapshot != nullptr
|
||||
? roptions.snapshot->GetSequenceNumber()
|
||||
: kMaxSequenceNumber;
|
||||
ArenaWrappedDBIter* res = new ArenaWrappedDBIter();
|
||||
res->Init(r->options.env, roptions, r->ioptions, r->moptions, sequence,
|
||||
r->moptions.max_sequential_skip_in_iterations,
|
||||
0 /* version_number */, nullptr /* read_callback */,
|
||||
nullptr /* db_impl */, nullptr /* cfd */, false /* allow_blob */,
|
||||
false /* allow_refresh */);
|
||||
auto internal_iter = r->table_reader->NewIterator(
|
||||
options, r->moptions.prefix_extractor.get(), /*arena=*/nullptr,
|
||||
/*skip_filters=*/false, TableReaderCaller::kSSTFileReader);
|
||||
return NewDBIterator(r->options.env, options, r->ioptions, r->moptions,
|
||||
r->ioptions.user_comparator, internal_iter, sequence,
|
||||
r->moptions.max_sequential_skip_in_iterations,
|
||||
nullptr /* read_callback */);
|
||||
res->GetReadOptions(), r->moptions.prefix_extractor.get(),
|
||||
res->GetArena(), false /* skip_filters */,
|
||||
TableReaderCaller::kSSTFileReader);
|
||||
res->SetIterUnderDBIter(internal_iter);
|
||||
return res;
|
||||
}
|
||||
|
||||
std::shared_ptr<const TableProperties> SstFileReader::GetTableProperties()
|
||||
|
@ -128,6 +128,31 @@ TEST_F(SstFileReaderTest, Uint64Comparator) {
|
||||
CreateFileAndCheck(keys);
|
||||
}
|
||||
|
||||
TEST_F(SstFileReaderTest, ReadOptionsOutOfScope) {
|
||||
// Repro a bug where the SstFileReader depended on its configured ReadOptions
|
||||
// outliving it.
|
||||
options_.comparator = test::Uint64Comparator();
|
||||
std::vector<std::string> keys;
|
||||
for (uint64_t i = 0; i < kNumKeys; i++) {
|
||||
keys.emplace_back(EncodeAsUint64(i));
|
||||
}
|
||||
CreateFile(sst_name_, keys);
|
||||
|
||||
SstFileReader reader(options_);
|
||||
ASSERT_OK(reader.Open(sst_name_));
|
||||
std::unique_ptr<Iterator> iter;
|
||||
{
|
||||
// Make sure ReadOptions go out of scope ASAP so we know the iterator
|
||||
// operations do not depend on it.
|
||||
ReadOptions ropts;
|
||||
iter.reset(reader.NewIterator(ropts));
|
||||
}
|
||||
iter->SeekToFirst();
|
||||
while (iter->Valid()) {
|
||||
iter->Next();
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(SstFileReaderTest, ReadFileWithGlobalSeqno) {
|
||||
std::vector<std::string> keys;
|
||||
for (uint64_t i = 0; i < kNumKeys; i++) {
|
||||
|
@ -43,6 +43,10 @@ class TwoLevelIndexIterator : public InternalIteratorBase<IndexValue> {
|
||||
assert(Valid());
|
||||
return second_level_iter_.key();
|
||||
}
|
||||
Slice user_key() const override {
|
||||
assert(Valid());
|
||||
return second_level_iter_.user_key();
|
||||
}
|
||||
IndexValue value() const override {
|
||||
assert(Valid());
|
||||
return second_level_iter_.value();
|
||||
|
38
test_util/mock_time_env.cc
Normal file
38
test_util/mock_time_env.cc
Normal file
@ -0,0 +1,38 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#include "test_util/mock_time_env.h"
|
||||
|
||||
#include "test_util/sync_point.h"
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
// TODO: this is a workaround for the different behavior on different platform
|
||||
// for timedwait timeout. Ideally timedwait API should be moved to env.
|
||||
// details: PR #7101.
|
||||
void MockTimeEnv::InstallTimedWaitFixCallback() {
|
||||
#ifndef NDEBUG
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
#ifdef OS_MACOSX
|
||||
// This is an alternate way (vs. SpecialEnv) of dealing with the fact
|
||||
// that on some platforms, pthread_cond_timedwait does not appear to
|
||||
// release the lock for other threads to operate if the deadline time
|
||||
// is already passed. (TimedWait calls are currently a bad abstraction
|
||||
// because the deadline parameter is usually computed from Env time,
|
||||
// but is interpreted in real clock time.)
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
|
||||
uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
|
||||
if (time_us < this->RealNowMicros()) {
|
||||
*reinterpret_cast<uint64_t*>(arg) = this->RealNowMicros() + 1000;
|
||||
}
|
||||
});
|
||||
#endif // OS_MACOSX
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
#endif // !NDEBUG
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
@ -5,6 +5,8 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
|
||||
#include "rocksdb/env.h"
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
@ -62,29 +64,11 @@ class MockTimeEnv : public EnvWrapper {
|
||||
// TODO: this is a workaround for the different behavior on different platform
|
||||
// for timedwait timeout. Ideally timedwait API should be moved to env.
|
||||
// details: PR #7101.
|
||||
void InstallTimedWaitFixCallback() {
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
#if defined(OS_MACOSX) && !defined(NDEBUG)
|
||||
// This is an alternate way (vs. SpecialEnv) of dealing with the fact
|
||||
// that on some platforms, pthread_cond_timedwait does not appear to
|
||||
// release the lock for other threads to operate if the deadline time
|
||||
// is already passed. (TimedWait calls are currently a bad abstraction
|
||||
// because the deadline parameter is usually computed from Env time,
|
||||
// but is interpreted in real clock time.)
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
|
||||
uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
|
||||
if (time_us < this->RealNowMicros()) {
|
||||
*reinterpret_cast<uint64_t*>(arg) = this->RealNowMicros() + 1000;
|
||||
}
|
||||
});
|
||||
#endif // OS_MACOSX && !NDEBUG
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
}
|
||||
void InstallTimedWaitFixCallback();
|
||||
|
||||
private:
|
||||
std::atomic<uint64_t> current_time_us_{0};
|
||||
static constexpr uint64_t kMicrosInSecond = 1000U * 1000U;
|
||||
};
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
@ -52,7 +52,9 @@ static const char* msgs[static_cast<int>(Status::kMaxSubCode)] = {
|
||||
"Insufficient capacity for merge operands",
|
||||
// kManualCompactionPaused
|
||||
"Manual compaction paused",
|
||||
" (overwritten)", // kOverwritten, subcode of OK
|
||||
" (overwritten)", // kOverwritten, subcode of OK
|
||||
"Txn not prepared", // kTxnNotPrepared
|
||||
"IO fenced off", // kIOFenced
|
||||
};
|
||||
|
||||
Status::Status(Code _code, SubCode _subcode, const Slice& msg,
|
||||
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -64,6 +64,11 @@ class TestFSWritableFile : public FSWritableFile {
|
||||
virtual ~TestFSWritableFile();
|
||||
virtual IOStatus Append(const Slice& data, const IOOptions&,
|
||||
IODebugContext*) override;
|
||||
virtual IOStatus Append(const Slice& data, const IOOptions& options,
|
||||
const DataVerificationInfo& /*verification_info*/,
|
||||
IODebugContext* dbg) override {
|
||||
return Append(data, options, dbg);
|
||||
}
|
||||
virtual IOStatus Truncate(uint64_t size, const IOOptions& options,
|
||||
IODebugContext* dbg) override {
|
||||
return target_->Truncate(size, options, dbg);
|
||||
@ -78,6 +83,12 @@ class TestFSWritableFile : public FSWritableFile {
|
||||
IODebugContext* dbg) override {
|
||||
return target_->PositionedAppend(data, offset, options, dbg);
|
||||
}
|
||||
IOStatus PositionedAppend(const Slice& data, uint64_t offset,
|
||||
const IOOptions& options,
|
||||
const DataVerificationInfo& /*verification_info*/,
|
||||
IODebugContext* dbg) override {
|
||||
return PositionedAppend(data, offset, options, dbg);
|
||||
}
|
||||
virtual size_t GetRequiredBufferAlignment() const override {
|
||||
return target_->GetRequiredBufferAlignment();
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user