Compare commits
22 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
91f175888a | ||
|
00724f43bc | ||
|
3d15d07740 | ||
|
3345d12a1d | ||
|
9ed2144ebd | ||
|
7e42a5ba89 | ||
|
8654ac5d9a | ||
|
e07d836391 | ||
|
e6bca1a54e | ||
|
c815f31ad3 | ||
|
6c8620451c | ||
|
fa99e8afbc | ||
|
43890774ed | ||
|
ee6ee5ff08 | ||
|
80bea2ba8e | ||
|
71158842aa | ||
|
78641c0930 | ||
|
9d3d5e72e0 | ||
|
45770b5d23 | ||
|
4411488585 | ||
|
8cc712c0eb | ||
|
62b71b3094 |
@ -2,7 +2,6 @@ version: 2.1
|
||||
|
||||
orbs:
|
||||
win: circleci/windows@2.4.0
|
||||
slack: circleci/slack@3.4.2
|
||||
|
||||
aliases:
|
||||
- ¬ify-on-main-failure
|
||||
@ -57,7 +56,6 @@ commands:
|
||||
|
||||
post-steps:
|
||||
steps:
|
||||
- slack/status: *notify-on-main-failure
|
||||
- store_test_results: # store test result if there's any
|
||||
path: /tmp/test-results
|
||||
- store_artifacts: # store LOG for debugging if there's any
|
||||
|
25
HISTORY.md
25
HISTORY.md
@ -1,8 +1,18 @@
|
||||
# Rocksdb Change Log
|
||||
## 7.1.0 (03/21/2022)
|
||||
### Public API changes
|
||||
* Add DB::OpenAndTrimHistory API. This API will open DB and trim data to the timestamp specified by trim_ts (The data with timestamp larger than specified trim bound will be removed). This API should only be used at a timestamp-enabled column families recovery. If the column family doesn't have timestamp enabled, this API won't trim any data on that column family. This API is not compatible with avoid_flush_during_recovery option.
|
||||
## 7.1.2 (04/19/2022)
|
||||
### Bug Fixes
|
||||
* Fixed bug which caused rocksdb failure in the situation when rocksdb was accessible using UNC path
|
||||
* Fixed a race condition when 2PC is disabled and WAL tracking in the MANIFEST is enabled. The race condition is between two background flush threads trying to install flush results, causing a WAL deletion not tracked in the MANIFEST. A future DB open may fail.
|
||||
* Fixed a heap use-after-free race with DropColumnFamily.
|
||||
* Fixed a bug that `rocksdb.read.block.compaction.micros` cannot track compaction stats (#9722).
|
||||
* Fixed `file_type`, `relative_filename` and `directory` fields returned by `GetLiveFilesMetaData()`, which were added in inheriting from `FileStorageInfo`.
|
||||
* Fixed a bug affecting `track_and_verify_wals_in_manifest`. Without the fix, application may see "open error: Corruption: Missing WAL with log number" while trying to open the db. The corruption is a false alarm but prevents DB open (#9766).
|
||||
|
||||
## 7.1.1 (04/07/2022)
|
||||
### Bug Fixes
|
||||
* Fix segfault in FilePrefetchBuffer with async_io as it doesn't wait for pending jobs to complete on destruction.
|
||||
|
||||
## 7.1.0 (03/23/2022)
|
||||
### New Features
|
||||
* Allow WriteBatchWithIndex to index a WriteBatch that includes keys with user-defined timestamps. The index itself does not have timestamp.
|
||||
* Add support for user-defined timestamps to write-committed transaction without API change. The `TransactionDB` layer APIs do not allow timestamps because we require that all user-defined-timestamps-aware operations go through the `Transaction` APIs.
|
||||
@ -13,10 +23,12 @@
|
||||
* Experimental support for async_io in ReadOptions which is used by FilePrefetchBuffer to prefetch some of the data asynchronously, if reads are sequential and auto readahead is enabled by rocksdb internally.
|
||||
|
||||
### Bug Fixes
|
||||
* Fixed a major performance bug in which Bloom filters generated by pre-7.0 releases are not read by early 7.0.x releases (and vice-versa) due to changes to FilterPolicy::Name() in #9590. This can severely impact read performance and read I/O on upgrade or downgrade with existing DB, but not data correctness.
|
||||
* Fixed a data race on `versions_` between `DBImpl::ResumeImpl()` and threads waiting for recovery to complete (#9496)
|
||||
* Fixed a bug caused by race among flush, incoming writes and taking snapshots. Queries to snapshots created with these race condition can return incorrect result, e.g. resurfacing deleted data.
|
||||
* Fixed a bug that DB flush uses `options.compression` even `options.compression_per_level` is set.
|
||||
* Fixed a bug that DisableManualCompaction may assert when disable an unscheduled manual compaction.
|
||||
* Fix a race condition when cancel manual compaction with `DisableManualCompaction`. Also DB close can cancel the manual compaction thread.
|
||||
* Fixed a potential timer crash when open close DB concurrently.
|
||||
* Fixed a race condition for `alive_log_files_` in non-two-write-queues mode. The race is between the write_thread_ in WriteToWAL() and another thread executing `FindObsoleteFiles()`. The race condition will be caught if `__glibcxx_requires_nonempty` is enabled.
|
||||
* Fixed a bug that `Iterator::Refresh()` reads stale keys after DeleteRange() performed.
|
||||
@ -25,12 +37,11 @@
|
||||
* Fixed a race condition when mmaping a WritableFile on POSIX.
|
||||
|
||||
### Public API changes
|
||||
* Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect.
|
||||
* Added pure virtual FilterPolicy::CompatibilityName(), which is needed for fixing major performance bug involving FilterPolicy naming in SST metadata without affecting Customizable aspect of FilterPolicy. This change only affects those with their own custom or wrapper FilterPolicy classes.
|
||||
* `options.compression_per_level` is dynamically changeable with `SetOptions()`.
|
||||
* Added `WriteOptions::rate_limiter_priority`. When set to something other than `Env::IO_TOTAL`, the internal rate limiter (`DBOptions::rate_limiter`) will be charged at the specified priority for writes associated with the API to which the `WriteOptions` was provided. Currently the support covers automatic WAL flushes, which happen during live updates (`Put()`, `Write()`, `Delete()`, etc.) when `WriteOptions::disableWAL == false` and `DBOptions::manual_wal_flush == false`.
|
||||
|
||||
### Bug Fixes
|
||||
* Fix a race condition when cancel manual compaction with `DisableManualCompaction`. Also DB close can cancel the manual compaction thread.
|
||||
* Add DB::OpenAndTrimHistory API. This API will open DB and trim data to the timestamp specified by trim_ts (The data with timestamp larger than specified trim bound will be removed). This API should only be used at a timestamp-enabled column families recovery. If the column family doesn't have timestamp enabled, this API won't trim any data on that column family. This API is not compatible with avoid_flush_during_recovery option.
|
||||
* Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect.
|
||||
|
||||
## 7.0.0 (02/20/2022)
|
||||
### Bug Fixes
|
||||
|
4
Makefile
4
Makefile
@ -2044,8 +2044,8 @@ ROCKSDB_JAVADOCS_JAR = rocksdbjni-$(ROCKSDB_JAVA_VERSION)-javadoc.jar
|
||||
ROCKSDB_SOURCES_JAR = rocksdbjni-$(ROCKSDB_JAVA_VERSION)-sources.jar
|
||||
SHA256_CMD = sha256sum
|
||||
|
||||
ZLIB_VER ?= 1.2.11
|
||||
ZLIB_SHA256 ?= c3e5e9fdd5004dcb542feda5ee4f0ff0744628baf8ed2dd5d66f8ca1197cb1a1
|
||||
ZLIB_VER ?= 1.2.12
|
||||
ZLIB_SHA256 ?= 91844808532e5ce316b3c010929493c0244f3d37593afd6de04f71821d5136d9
|
||||
ZLIB_DOWNLOAD_BASE ?= http://zlib.net
|
||||
BZIP2_VER ?= 1.0.8
|
||||
BZIP2_SHA256 ?= ab5a03176ee106d3f0fa90e381da478ddae405918153cca248e682cd0c4a2269
|
||||
|
6
db/c.cc
6
db/c.cc
@ -3752,6 +3752,9 @@ rocksdb_filterpolicy_t* rocksdb_filterpolicy_create_bloom_format(
|
||||
const FilterPolicy* rep_;
|
||||
~Wrapper() override { delete rep_; }
|
||||
const char* Name() const override { return rep_->Name(); }
|
||||
const char* CompatibilityName() const override {
|
||||
return rep_->CompatibilityName();
|
||||
}
|
||||
// No need to override GetFilterBitsBuilder if this one is overridden
|
||||
ROCKSDB_NAMESPACE::FilterBitsBuilder* GetBuilderWithContext(
|
||||
const ROCKSDB_NAMESPACE::FilterBuildingContext& context)
|
||||
@ -3789,6 +3792,9 @@ rocksdb_filterpolicy_t* rocksdb_filterpolicy_create_ribbon_format(
|
||||
const FilterPolicy* rep_;
|
||||
~Wrapper() override { delete rep_; }
|
||||
const char* Name() const override { return rep_->Name(); }
|
||||
const char* CompatibilityName() const override {
|
||||
return rep_->CompatibilityName();
|
||||
}
|
||||
ROCKSDB_NAMESPACE::FilterBitsBuilder* GetBuilderWithContext(
|
||||
const ROCKSDB_NAMESPACE::FilterBuildingContext& context)
|
||||
const override {
|
||||
|
@ -1562,20 +1562,6 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
|
||||
return new_cfd;
|
||||
}
|
||||
|
||||
// REQUIRES: DB mutex held
|
||||
void ColumnFamilySet::FreeDeadColumnFamilies() {
|
||||
autovector<ColumnFamilyData*> to_delete;
|
||||
for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
|
||||
if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
|
||||
to_delete.push_back(cfd);
|
||||
}
|
||||
}
|
||||
for (auto cfd : to_delete) {
|
||||
// this is very rare, so it's not a problem that we do it under a mutex
|
||||
delete cfd;
|
||||
}
|
||||
}
|
||||
|
||||
// under a DB mutex AND from a write thread
|
||||
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
|
||||
auto cfd_iter = column_family_data_.find(cfd->GetID());
|
||||
|
@ -520,9 +520,10 @@ class ColumnFamilyData {
|
||||
ThreadLocalPtr* TEST_GetLocalSV() { return local_sv_.get(); }
|
||||
WriteBufferManager* write_buffer_mgr() { return write_buffer_manager_; }
|
||||
|
||||
static const uint32_t kDummyColumnFamilyDataId;
|
||||
|
||||
private:
|
||||
friend class ColumnFamilySet;
|
||||
static const uint32_t kDummyColumnFamilyDataId;
|
||||
ColumnFamilyData(uint32_t id, const std::string& name,
|
||||
Version* dummy_versions, Cache* table_cache,
|
||||
WriteBufferManager* write_buffer_manager,
|
||||
@ -628,10 +629,8 @@ class ColumnFamilyData {
|
||||
// held and it needs to be executed from the write thread. SetDropped() also
|
||||
// guarantees that it will be called only from single-threaded LogAndApply(),
|
||||
// but this condition is not that important.
|
||||
// * Iteration -- hold DB mutex, but you can release it in the body of
|
||||
// iteration. If you release DB mutex in body, reference the column
|
||||
// family before the mutex and unreference after you unlock, since the column
|
||||
// family might get dropped when the DB mutex is released
|
||||
// * Iteration -- hold DB mutex. If you want to release the DB mutex in the
|
||||
// body of the iteration, wrap in a RefedColumnFamilySet.
|
||||
// * GetDefault() -- thread safe
|
||||
// * GetColumnFamily() -- either inside of DB mutex or from a write thread
|
||||
// * GetNextColumnFamilyID(), GetMaxColumnFamily(), UpdateMaxColumnFamily(),
|
||||
@ -643,17 +642,12 @@ class ColumnFamilySet {
|
||||
public:
|
||||
explicit iterator(ColumnFamilyData* cfd)
|
||||
: current_(cfd) {}
|
||||
// NOTE: minimum operators for for-loop iteration
|
||||
iterator& operator++() {
|
||||
// dropped column families might still be included in this iteration
|
||||
// (we're only removing them when client drops the last reference to the
|
||||
// column family).
|
||||
// dummy is never dead, so this will never be infinite
|
||||
do {
|
||||
current_ = current_->next_;
|
||||
} while (current_->refs_.load(std::memory_order_relaxed) == 0);
|
||||
current_ = current_->next_;
|
||||
return *this;
|
||||
}
|
||||
bool operator!=(const iterator& other) {
|
||||
bool operator!=(const iterator& other) const {
|
||||
return this->current_ != other.current_;
|
||||
}
|
||||
ColumnFamilyData* operator*() { return current_; }
|
||||
@ -692,10 +686,6 @@ class ColumnFamilySet {
|
||||
iterator begin() { return iterator(dummy_cfd_->next_); }
|
||||
iterator end() { return iterator(dummy_cfd_); }
|
||||
|
||||
// REQUIRES: DB mutex held
|
||||
// Don't call while iterating over ColumnFamilySet
|
||||
void FreeDeadColumnFamilies();
|
||||
|
||||
Cache* get_table_cache() { return table_cache_; }
|
||||
|
||||
WriteBufferManager* write_buffer_manager() { return write_buffer_manager_; }
|
||||
@ -738,6 +728,55 @@ class ColumnFamilySet {
|
||||
std::string db_session_id_;
|
||||
};
|
||||
|
||||
// A wrapper for ColumnFamilySet that supports releasing DB mutex during each
|
||||
// iteration over the iterator, because the cfd is Refed and Unrefed during
|
||||
// each iteration to prevent concurrent CF drop from destroying it (until
|
||||
// Unref).
|
||||
class RefedColumnFamilySet {
|
||||
public:
|
||||
explicit RefedColumnFamilySet(ColumnFamilySet* cfs) : wrapped_(cfs) {}
|
||||
|
||||
class iterator {
|
||||
public:
|
||||
explicit iterator(ColumnFamilySet::iterator wrapped) : wrapped_(wrapped) {
|
||||
MaybeRef(*wrapped_);
|
||||
}
|
||||
~iterator() { MaybeUnref(*wrapped_); }
|
||||
inline void MaybeRef(ColumnFamilyData* cfd) {
|
||||
if (cfd->GetID() != ColumnFamilyData::kDummyColumnFamilyDataId) {
|
||||
cfd->Ref();
|
||||
}
|
||||
}
|
||||
inline void MaybeUnref(ColumnFamilyData* cfd) {
|
||||
if (cfd->GetID() != ColumnFamilyData::kDummyColumnFamilyDataId) {
|
||||
cfd->UnrefAndTryDelete();
|
||||
}
|
||||
}
|
||||
// NOTE: minimum operators for for-loop iteration
|
||||
inline iterator& operator++() {
|
||||
ColumnFamilyData* old = *wrapped_;
|
||||
++wrapped_;
|
||||
// Can only unref & potentially free cfd after accessing its next_
|
||||
MaybeUnref(old);
|
||||
MaybeRef(*wrapped_);
|
||||
return *this;
|
||||
}
|
||||
inline bool operator!=(const iterator& other) const {
|
||||
return this->wrapped_ != other.wrapped_;
|
||||
}
|
||||
inline ColumnFamilyData* operator*() { return *wrapped_; }
|
||||
|
||||
private:
|
||||
ColumnFamilySet::iterator wrapped_;
|
||||
};
|
||||
|
||||
iterator begin() { return iterator(wrapped_->begin()); }
|
||||
iterator end() { return iterator(wrapped_->end()); }
|
||||
|
||||
private:
|
||||
ColumnFamilySet* wrapped_;
|
||||
};
|
||||
|
||||
// We use ColumnFamilyMemTablesImpl to provide WriteBatch a way to access
|
||||
// memtables of different column families (specified by ID in the write batch)
|
||||
class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {
|
||||
|
@ -1638,9 +1638,15 @@ class LevelAndStyleCustomFilterPolicy : public FilterPolicy {
|
||||
policy_l0_other_(NewBloomFilterPolicy(bpk_l0_other)),
|
||||
policy_otherwise_(NewBloomFilterPolicy(bpk_otherwise)) {}
|
||||
|
||||
const char* Name() const override {
|
||||
return "LevelAndStyleCustomFilterPolicy";
|
||||
}
|
||||
|
||||
// OK to use built-in policy name because we are deferring to a
|
||||
// built-in builder. We aren't changing the serialized format.
|
||||
const char* Name() const override { return policy_fifo_->Name(); }
|
||||
const char* CompatibilityName() const override {
|
||||
return policy_fifo_->CompatibilityName();
|
||||
}
|
||||
|
||||
FilterBitsBuilder* GetBuilderWithContext(
|
||||
const FilterBuildingContext& context) const override {
|
||||
|
@ -45,17 +45,15 @@ Status DBImpl::FlushForGetLiveFiles() {
|
||||
}
|
||||
mutex_.Lock();
|
||||
} else {
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
for (auto cfd : versions_->GetRefedColumnFamilySet()) {
|
||||
if (cfd->IsDropped()) {
|
||||
continue;
|
||||
}
|
||||
cfd->Ref();
|
||||
mutex_.Unlock();
|
||||
status = FlushMemTable(cfd, FlushOptions(), FlushReason::kGetLiveFiles);
|
||||
TEST_SYNC_POINT("DBImpl::GetLiveFiles:1");
|
||||
TEST_SYNC_POINT("DBImpl::GetLiveFiles:2");
|
||||
mutex_.Lock();
|
||||
cfd->UnrefAndTryDelete();
|
||||
if (!status.ok() && !status.IsColumnFamilyDropped()) {
|
||||
break;
|
||||
} else if (status.IsColumnFamilyDropped()) {
|
||||
@ -63,7 +61,6 @@ Status DBImpl::FlushForGetLiveFiles() {
|
||||
}
|
||||
}
|
||||
}
|
||||
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
|
||||
return status;
|
||||
}
|
||||
|
||||
|
@ -2271,7 +2271,7 @@ TEST_P(DBAtomicFlushTest, ManualFlushUnder2PC) {
|
||||
|
||||
// The recovered min log number with prepared data should be non-zero.
|
||||
// In 2pc mode, MinLogNumberToKeep returns the
|
||||
// VersionSet::min_log_number_to_keep_2pc recovered from MANIFEST, if it's 0,
|
||||
// VersionSet::min_log_number_to_keep recovered from MANIFEST, if it's 0,
|
||||
// it means atomic flush didn't write the min_log_number_to_keep to MANIFEST.
|
||||
cfs.push_back(kDefaultColumnFamilyName);
|
||||
ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
|
||||
|
@ -375,15 +375,12 @@ Status DBImpl::ResumeImpl(DBRecoverContext context) {
|
||||
s = AtomicFlushMemTables(cfds, flush_opts, context.flush_reason);
|
||||
mutex_.Lock();
|
||||
} else {
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
for (auto cfd : versions_->GetRefedColumnFamilySet()) {
|
||||
if (cfd->IsDropped()) {
|
||||
continue;
|
||||
}
|
||||
cfd->Ref();
|
||||
mutex_.Unlock();
|
||||
InstrumentedMutexUnlock u(&mutex_);
|
||||
s = FlushMemTable(cfd, flush_opts, context.flush_reason);
|
||||
mutex_.Lock();
|
||||
cfd->UnrefAndTryDelete();
|
||||
if (!s.ok()) {
|
||||
break;
|
||||
}
|
||||
@ -495,18 +492,14 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
|
||||
s.PermitUncheckedError(); //**TODO: What to do on error?
|
||||
mutex_.Lock();
|
||||
} else {
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
for (auto cfd : versions_->GetRefedColumnFamilySet()) {
|
||||
if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
|
||||
cfd->Ref();
|
||||
mutex_.Unlock();
|
||||
InstrumentedMutexUnlock u(&mutex_);
|
||||
Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
|
||||
s.PermitUncheckedError(); //**TODO: What to do on error?
|
||||
mutex_.Lock();
|
||||
cfd->UnrefAndTryDelete();
|
||||
}
|
||||
}
|
||||
}
|
||||
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
|
||||
}
|
||||
|
||||
shutting_down_.store(true, std::memory_order_release);
|
||||
@ -969,18 +962,13 @@ void DBImpl::DumpStats() {
|
||||
TEST_SYNC_POINT("DBImpl::DumpStats:StartRunning");
|
||||
{
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
for (auto cfd : versions_->GetRefedColumnFamilySet()) {
|
||||
if (cfd->initialized()) {
|
||||
// Release DB mutex for gathering cache entry stats. Pass over all
|
||||
// column families for this first so that other stats are dumped
|
||||
// near-atomically.
|
||||
// Get a ref before unlocking
|
||||
cfd->Ref();
|
||||
{
|
||||
InstrumentedMutexUnlock u(&mutex_);
|
||||
cfd->internal_stats()->CollectCacheEntryStats(/*foreground=*/false);
|
||||
}
|
||||
cfd->UnrefAndTryDelete();
|
||||
InstrumentedMutexUnlock u(&mutex_);
|
||||
cfd->internal_stats()->CollectCacheEntryStats(/*foreground=*/false);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1900,6 +1888,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
|
||||
return s;
|
||||
}
|
||||
}
|
||||
TEST_SYNC_POINT("DBImpl::GetImpl:PostMemTableGet:0");
|
||||
TEST_SYNC_POINT("DBImpl::GetImpl:PostMemTableGet:1");
|
||||
PinnedIteratorsManager pinned_iters_mgr;
|
||||
if (!done) {
|
||||
PERF_TIMER_GUARD(get_from_output_files_time);
|
||||
@ -1917,8 +1907,6 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
|
||||
{
|
||||
PERF_TIMER_GUARD(get_post_process_time);
|
||||
|
||||
ReturnAndCleanupSuperVersion(cfd, sv);
|
||||
|
||||
RecordTick(stats_, NUMBER_KEYS_READ);
|
||||
size_t size = 0;
|
||||
if (s.ok()) {
|
||||
@ -1945,6 +1933,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
|
||||
PERF_COUNTER_ADD(get_read_bytes, size);
|
||||
}
|
||||
RecordInHistogram(stats_, BYTES_PER_READ, size);
|
||||
|
||||
ReturnAndCleanupSuperVersion(cfd, sv);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
@ -3490,15 +3480,13 @@ bool DBImpl::GetAggregatedIntProperty(const Slice& property,
|
||||
// Needs mutex to protect the list of column families.
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
uint64_t value;
|
||||
for (auto* cfd : *versions_->GetColumnFamilySet()) {
|
||||
for (auto* cfd : versions_->GetRefedColumnFamilySet()) {
|
||||
if (!cfd->initialized()) {
|
||||
continue;
|
||||
}
|
||||
cfd->Ref();
|
||||
ret = GetIntPropertyInternal(cfd, *property_info, true, &value);
|
||||
// GetIntPropertyInternal may release db mutex and re-acquire it.
|
||||
mutex_.AssertHeld();
|
||||
cfd->UnrefAndTryDelete();
|
||||
if (ret) {
|
||||
sum += value;
|
||||
} else {
|
||||
@ -5089,6 +5077,7 @@ Status DBImpl::VerifyChecksumInternal(const ReadOptions& read_options,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: simplify using GetRefedColumnFamilySet?
|
||||
std::vector<ColumnFamilyData*> cfd_list;
|
||||
{
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
|
@ -2973,8 +2973,6 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
|
||||
bg_bottom_compaction_scheduled_--;
|
||||
}
|
||||
|
||||
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
|
||||
|
||||
// See if there's more work to be done
|
||||
MaybeScheduleFlushOrCompaction();
|
||||
|
||||
|
@ -23,11 +23,7 @@
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
uint64_t DBImpl::MinLogNumberToKeep() {
|
||||
if (allow_2pc()) {
|
||||
return versions_->min_log_number_to_keep_2pc();
|
||||
} else {
|
||||
return versions_->MinLogNumberWithUnflushedData();
|
||||
}
|
||||
return versions_->min_log_number_to_keep();
|
||||
}
|
||||
|
||||
uint64_t DBImpl::MinObsoleteSstNumberToKeep() {
|
||||
@ -224,7 +220,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
||||
}
|
||||
|
||||
// Add log files in wal_dir
|
||||
|
||||
if (!immutable_db_options_.IsWalDirSameAsDBPath(dbname_)) {
|
||||
std::vector<std::string> log_files;
|
||||
Status s = env_->GetChildren(immutable_db_options_.wal_dir, &log_files);
|
||||
@ -234,6 +229,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
||||
log_file, immutable_db_options_.wal_dir);
|
||||
}
|
||||
}
|
||||
|
||||
// Add info log files in db_log_dir
|
||||
if (!immutable_db_options_.db_log_dir.empty() &&
|
||||
immutable_db_options_.db_log_dir != dbname_) {
|
||||
|
@ -866,6 +866,11 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
|
||||
bool flushed = false;
|
||||
uint64_t corrupted_wal_number = kMaxSequenceNumber;
|
||||
uint64_t min_wal_number = MinLogNumberToKeep();
|
||||
if (!allow_2pc()) {
|
||||
// In non-2pc mode, we skip WALs that do not back unflushed data.
|
||||
min_wal_number =
|
||||
std::max(min_wal_number, versions_->MinLogNumberWithUnflushedData());
|
||||
}
|
||||
for (auto wal_number : wal_numbers) {
|
||||
if (wal_number < min_wal_number) {
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
||||
@ -1270,9 +1275,16 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
|
||||
}
|
||||
|
||||
std::unique_ptr<VersionEdit> wal_deletion;
|
||||
if (immutable_db_options_.track_and_verify_wals_in_manifest) {
|
||||
wal_deletion.reset(new VersionEdit);
|
||||
wal_deletion->DeleteWalsBefore(max_wal_number + 1);
|
||||
if (flushed) {
|
||||
wal_deletion = std::make_unique<VersionEdit>();
|
||||
if (immutable_db_options_.track_and_verify_wals_in_manifest) {
|
||||
wal_deletion->DeleteWalsBefore(max_wal_number + 1);
|
||||
}
|
||||
if (!allow_2pc()) {
|
||||
// In non-2pc mode, flushing the memtables of the column families
|
||||
// means we can advance min_log_number_to_keep.
|
||||
wal_deletion->SetMinLogNumberToKeep(max_wal_number + 1);
|
||||
}
|
||||
edit_lists.back().push_back(wal_deletion.get());
|
||||
}
|
||||
|
||||
@ -1351,7 +1363,14 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
|
||||
// FindObsoleteFiles()
|
||||
total_log_size_ = 0;
|
||||
log_empty_ = false;
|
||||
uint64_t min_wal_with_unflushed_data =
|
||||
versions_->MinLogNumberWithUnflushedData();
|
||||
for (auto wal_number : wal_numbers) {
|
||||
if (!allow_2pc() && wal_number < min_wal_with_unflushed_data) {
|
||||
// In non-2pc mode, the WAL files not backing unflushed data are not
|
||||
// alive, thus should not be added to the alive_log_files_.
|
||||
continue;
|
||||
}
|
||||
// We preallocate space for wals, but then after a crash and restart, those
|
||||
// preallocated space are not needed anymore. It is likely only the last
|
||||
// log has such preallocated space, so we only truncate for the last log.
|
||||
@ -1952,10 +1971,10 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
|
||||
"DB::Open() failed --- Unable to persist Options file",
|
||||
persist_options_status.ToString());
|
||||
}
|
||||
} else {
|
||||
}
|
||||
if (!s.ok()) {
|
||||
ROCKS_LOG_WARN(impl->immutable_db_options_.info_log,
|
||||
"Persisting Option File error: %s",
|
||||
persist_options_status.ToString().c_str());
|
||||
"DB::Open() failed: %s", s.ToString().c_str());
|
||||
}
|
||||
if (s.ok()) {
|
||||
s = impl->StartPeriodicWorkScheduler();
|
||||
|
@ -47,7 +47,7 @@ class DBMergeOperandTest : public DBTestBase {
|
||||
: DBTestBase("db_merge_operand_test", /*env_do_fsync=*/true) {}
|
||||
};
|
||||
|
||||
TEST_F(DBMergeOperandTest, MergeOperandReadAfterFreeBug) {
|
||||
TEST_F(DBMergeOperandTest, CacheEvictedMergeOperandReadAfterFreeBug) {
|
||||
// There was a bug of reading merge operands after they are mistakely freed
|
||||
// in DB::GetMergeOperands, which is surfaced by cache full.
|
||||
// See PR#9507 for more.
|
||||
@ -86,6 +86,42 @@ TEST_F(DBMergeOperandTest, MergeOperandReadAfterFreeBug) {
|
||||
ASSERT_EQ(values[3].ToString(), "v4");
|
||||
}
|
||||
|
||||
TEST_F(DBMergeOperandTest, FlushedMergeOperandReadAfterFreeBug) {
|
||||
// Repro for a bug where a memtable containing a merge operand could be
|
||||
// deleted before the merge operand was saved to the result.
|
||||
auto options = CurrentOptions();
|
||||
options.merge_operator = MergeOperators::CreateStringAppendOperator();
|
||||
Reopen(options);
|
||||
|
||||
ASSERT_OK(Merge("key", "value"));
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::GetImpl:PostMemTableGet:0",
|
||||
"DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PreFlush"},
|
||||
{"DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PostFlush",
|
||||
"DBImpl::GetImpl:PostMemTableGet:1"}});
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
auto flush_thread = port::Thread([&]() {
|
||||
TEST_SYNC_POINT(
|
||||
"DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PreFlush");
|
||||
ASSERT_OK(Flush());
|
||||
TEST_SYNC_POINT(
|
||||
"DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PostFlush");
|
||||
});
|
||||
|
||||
PinnableSlice value;
|
||||
GetMergeOperandsOptions merge_operands_info;
|
||||
merge_operands_info.expected_max_number_of_operands = 1;
|
||||
int number_of_operands;
|
||||
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
|
||||
"key", &value, &merge_operands_info,
|
||||
&number_of_operands));
|
||||
ASSERT_EQ(1, number_of_operands);
|
||||
|
||||
flush_thread.join();
|
||||
}
|
||||
|
||||
TEST_F(DBMergeOperandTest, GetMergeOperandsBasic) {
|
||||
Options options;
|
||||
options.create_if_missing = true;
|
||||
|
@ -1078,6 +1078,11 @@ void CheckColumnFamilyMeta(
|
||||
ASSERT_LE(file_meta_from_cf.file_creation_time, end_time);
|
||||
ASSERT_GE(file_meta_from_cf.oldest_ancester_time, start_time);
|
||||
ASSERT_LE(file_meta_from_cf.oldest_ancester_time, end_time);
|
||||
// More from FileStorageInfo
|
||||
ASSERT_EQ(file_meta_from_cf.file_type, kTableFile);
|
||||
ASSERT_EQ(file_meta_from_cf.name,
|
||||
"/" + file_meta_from_cf.relative_filename);
|
||||
ASSERT_EQ(file_meta_from_cf.directory, file_meta_from_cf.db_path);
|
||||
}
|
||||
|
||||
ASSERT_EQ(level_meta_from_cf.size, level_size);
|
||||
@ -1122,6 +1127,11 @@ void CheckLiveFilesMeta(
|
||||
ASSERT_EQ(meta.oldest_blob_file_number,
|
||||
expected_meta.oldest_blob_file_number);
|
||||
|
||||
// More from FileStorageInfo
|
||||
ASSERT_EQ(meta.file_type, kTableFile);
|
||||
ASSERT_EQ(meta.name, "/" + meta.relative_filename);
|
||||
ASSERT_EQ(meta.directory, meta.db_path);
|
||||
|
||||
++i;
|
||||
}
|
||||
}
|
||||
|
@ -1491,6 +1491,93 @@ TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) {
|
||||
ASSERT_NOK(TryReopenWithColumnFamilies({"default", "one", "two"}, options));
|
||||
}
|
||||
|
||||
TEST_F(DBWALTest, RaceInstallFlushResultsWithWalObsoletion) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.track_and_verify_wals_in_manifest = true;
|
||||
// The following make sure there are two bg flush threads.
|
||||
options.max_background_jobs = 8;
|
||||
|
||||
const std::string cf1_name("cf1");
|
||||
CreateAndReopenWithCF({cf1_name}, options);
|
||||
assert(handles_.size() == 2);
|
||||
|
||||
{
|
||||
dbfull()->TEST_LockMutex();
|
||||
ASSERT_LE(2, dbfull()->GetBGJobLimits().max_flushes);
|
||||
dbfull()->TEST_UnlockMutex();
|
||||
}
|
||||
|
||||
ASSERT_OK(dbfull()->PauseBackgroundWork());
|
||||
|
||||
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", "value"));
|
||||
ASSERT_OK(db_->Put(WriteOptions(), "foo", "value"));
|
||||
|
||||
ASSERT_OK(dbfull()->TEST_FlushMemTable(false, true, handles_[1]));
|
||||
|
||||
ASSERT_OK(db_->Put(WriteOptions(), "foo", "value"));
|
||||
ASSERT_OK(dbfull()->TEST_FlushMemTable(false, true, handles_[0]));
|
||||
|
||||
bool called = false;
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
// This callback will be called when the first bg flush thread reaches the
|
||||
// point before entering the MANIFEST write queue after flushing the SST
|
||||
// file.
|
||||
// The purpose of the sync points here is to ensure both bg flush threads
|
||||
// finish computing `min_wal_number_to_keep` before any of them updates the
|
||||
// `log_number` for the column family that's being flushed.
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"MemTableList::TryInstallMemtableFlushResults:AfterComputeMinWalToKeep",
|
||||
[&](void* /*arg*/) {
|
||||
dbfull()->mutex()->AssertHeld();
|
||||
if (!called) {
|
||||
// We are the first bg flush thread in the MANIFEST write queue.
|
||||
// We set up the dependency between sync points for two threads that
|
||||
// will be executing the same code.
|
||||
// For the interleaving of events, see
|
||||
// https://github.com/facebook/rocksdb/pull/9715.
|
||||
// bg flush thread1 will release the db mutex while in the MANIFEST
|
||||
// write queue. In the meantime, bg flush thread2 locks db mutex and
|
||||
// computes the min_wal_number_to_keep (before thread1 writes to
|
||||
// MANIFEST thus before cf1->log_number is updated). Bg thread2 joins
|
||||
// the MANIFEST write queue afterwards and bg flush thread1 proceeds
|
||||
// with writing to MANIFEST.
|
||||
called = true;
|
||||
SyncPoint::GetInstance()->LoadDependency({
|
||||
{"VersionSet::LogAndApply:WriteManifestStart",
|
||||
"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2"},
|
||||
{"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2",
|
||||
"VersionSet::LogAndApply:WriteManifest"},
|
||||
});
|
||||
} else {
|
||||
// The other bg flush thread has already been in the MANIFEST write
|
||||
// queue, and we are after.
|
||||
TEST_SYNC_POINT(
|
||||
"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2");
|
||||
}
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ASSERT_OK(dbfull()->ContinueBackgroundWork());
|
||||
|
||||
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
|
||||
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
|
||||
|
||||
ASSERT_TRUE(called);
|
||||
|
||||
Close();
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
|
||||
DB* db1 = nullptr;
|
||||
Status s = DB::OpenForReadOnly(options, dbname_, &db1);
|
||||
ASSERT_OK(s);
|
||||
assert(db1);
|
||||
delete db1;
|
||||
}
|
||||
|
||||
// Test scope:
|
||||
// - We expect to open data store under all circumstances
|
||||
// - We expect only data upto the point where the first error was encountered
|
||||
|
@ -95,8 +95,9 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished(
|
||||
jwriter << "cf_name" << cf_name << "job" << job_id << "event"
|
||||
<< "table_file_creation"
|
||||
<< "file_number" << fd.GetNumber() << "file_size"
|
||||
<< fd.GetFileSize() << "file_checksum" << file_checksum
|
||||
<< "file_checksum_func_name" << file_checksum_func_name;
|
||||
<< fd.GetFileSize() << "file_checksum"
|
||||
<< Slice(file_checksum).ToString(true) << "file_checksum_func_name"
|
||||
<< file_checksum_func_name;
|
||||
|
||||
// table_properties
|
||||
{
|
||||
|
@ -171,6 +171,69 @@ TEST_F(FileNameTest, Construction) {
|
||||
ASSERT_EQ(kMetaDatabase, type);
|
||||
}
|
||||
|
||||
TEST_F(FileNameTest, NormalizePath) {
|
||||
// No leading slash
|
||||
const std::string sep = std::string(1, kFilePathSeparator);
|
||||
|
||||
std::string expected = "FOLDER" + sep + "filename.ext";
|
||||
std::string given = "FOLDER" + sep + "filename.ext";
|
||||
|
||||
ASSERT_EQ(expected, NormalizePath(given));
|
||||
|
||||
// Two chars /a
|
||||
|
||||
expected = sep + "a";
|
||||
given = expected;
|
||||
ASSERT_EQ(expected, NormalizePath(given));
|
||||
|
||||
// Two chars a/
|
||||
expected = "a" + sep;
|
||||
given = expected;
|
||||
ASSERT_EQ(expected, NormalizePath(given));
|
||||
|
||||
// Server only
|
||||
expected = sep + sep + "a";
|
||||
given = expected;
|
||||
ASSERT_EQ(expected, NormalizePath(given));
|
||||
|
||||
// Two slashes after character
|
||||
expected = "a" + sep;
|
||||
given = "a" + sep + sep;
|
||||
|
||||
ASSERT_EQ(expected, NormalizePath(given));
|
||||
|
||||
// slash only /
|
||||
expected = sep;
|
||||
given = expected;
|
||||
ASSERT_EQ(expected, NormalizePath(given));
|
||||
|
||||
// UNC only //
|
||||
expected = sep;
|
||||
given = sep + sep;
|
||||
|
||||
ASSERT_EQ(expected, NormalizePath(given));
|
||||
|
||||
// 3 slashesy //
|
||||
expected = sep + sep;
|
||||
given = sep + sep + sep;
|
||||
ASSERT_EQ(expected, NormalizePath(given));
|
||||
|
||||
// 3 slashes //
|
||||
expected = sep + sep + "a" + sep;
|
||||
given = sep + sep + sep + "a" + sep;
|
||||
ASSERT_EQ(expected, NormalizePath(given));
|
||||
|
||||
// 2 separators in the middle
|
||||
expected = "a" + sep + "b";
|
||||
given = "a" + sep + sep + "b";
|
||||
ASSERT_EQ(expected, NormalizePath(given));
|
||||
|
||||
// UNC with duplicate slashes
|
||||
expected = sep + sep + "SERVER" + sep + "a" + sep + "b" + sep + "c";
|
||||
given = sep + sep + "SERVER" + sep + "a" + sep + sep + "b" + sep + "c";
|
||||
ASSERT_EQ(expected, NormalizePath(given));
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -489,8 +489,8 @@ Status MemTableList::TryInstallMemtableFlushResults(
|
||||
// TODO(myabandeh): Not sure how batch_count could be 0 here.
|
||||
if (batch_count > 0) {
|
||||
uint64_t min_wal_number_to_keep = 0;
|
||||
assert(edit_list.size() > 0);
|
||||
if (vset->db_options()->allow_2pc) {
|
||||
assert(edit_list.size() > 0);
|
||||
// Note that if mempurge is successful, the edit_list will
|
||||
// not be applicable (contains info of new min_log number to keep,
|
||||
// and level 0 file path of SST file created during normal flush,
|
||||
@ -499,24 +499,26 @@ Status MemTableList::TryInstallMemtableFlushResults(
|
||||
min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC(
|
||||
vset, *cfd, edit_list, memtables_to_flush, prep_tracker);
|
||||
|
||||
// We piggyback the information of earliest log file to keep in the
|
||||
// We piggyback the information of earliest log file to keep in the
|
||||
// manifest entry for the last file flushed.
|
||||
edit_list.back()->SetMinLogNumberToKeep(min_wal_number_to_keep);
|
||||
} else {
|
||||
min_wal_number_to_keep =
|
||||
PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list);
|
||||
}
|
||||
|
||||
std::unique_ptr<VersionEdit> wal_deletion;
|
||||
VersionEdit wal_deletion;
|
||||
wal_deletion.SetMinLogNumberToKeep(min_wal_number_to_keep);
|
||||
if (vset->db_options()->track_and_verify_wals_in_manifest) {
|
||||
if (!vset->db_options()->allow_2pc) {
|
||||
min_wal_number_to_keep =
|
||||
PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list);
|
||||
}
|
||||
if (min_wal_number_to_keep >
|
||||
vset->GetWalSet().GetMinWalNumberToKeep()) {
|
||||
wal_deletion.reset(new VersionEdit);
|
||||
wal_deletion->DeleteWalsBefore(min_wal_number_to_keep);
|
||||
edit_list.push_back(wal_deletion.get());
|
||||
wal_deletion.DeleteWalsBefore(min_wal_number_to_keep);
|
||||
}
|
||||
TEST_SYNC_POINT_CALLBACK(
|
||||
"MemTableList::TryInstallMemtableFlushResults:"
|
||||
"AfterComputeMinWalToKeep",
|
||||
nullptr);
|
||||
}
|
||||
edit_list.push_back(&wal_deletion);
|
||||
|
||||
const auto manifest_write_cb = [this, cfd, batch_count, log_buffer,
|
||||
to_delete, mu](const Status& status) {
|
||||
@ -798,22 +800,19 @@ Status InstallMemtableAtomicFlushResults(
|
||||
if (vset->db_options()->allow_2pc) {
|
||||
min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC(
|
||||
vset, cfds, edit_lists, mems_list, prep_tracker);
|
||||
edit_lists.back().back()->SetMinLogNumberToKeep(min_wal_number_to_keep);
|
||||
} else {
|
||||
min_wal_number_to_keep =
|
||||
PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists);
|
||||
}
|
||||
|
||||
std::unique_ptr<VersionEdit> wal_deletion;
|
||||
if (vset->db_options()->track_and_verify_wals_in_manifest) {
|
||||
if (!vset->db_options()->allow_2pc) {
|
||||
min_wal_number_to_keep =
|
||||
PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists);
|
||||
}
|
||||
if (min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) {
|
||||
wal_deletion.reset(new VersionEdit);
|
||||
wal_deletion->DeleteWalsBefore(min_wal_number_to_keep);
|
||||
edit_lists.back().push_back(wal_deletion.get());
|
||||
++num_entries;
|
||||
}
|
||||
VersionEdit wal_deletion;
|
||||
wal_deletion.SetMinLogNumberToKeep(min_wal_number_to_keep);
|
||||
if (vset->db_options()->track_and_verify_wals_in_manifest &&
|
||||
min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) {
|
||||
wal_deletion.DeleteWalsBefore(min_wal_number_to_keep);
|
||||
}
|
||||
edit_lists.back().push_back(&wal_deletion);
|
||||
++num_entries;
|
||||
|
||||
// Mark the version edits as an atomic group if the number of version edits
|
||||
// exceeds 1.
|
||||
|
@ -120,6 +120,9 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
|
||||
if (has_max_column_family_) {
|
||||
PutVarint32Varint32(dst, kMaxColumnFamily, max_column_family_);
|
||||
}
|
||||
if (has_min_log_number_to_keep_) {
|
||||
PutVarint32Varint64(dst, kMinLogNumberToKeep, min_log_number_to_keep_);
|
||||
}
|
||||
if (has_last_sequence_) {
|
||||
PutVarint32Varint64(dst, kLastSequence, last_sequence_);
|
||||
}
|
||||
|
@ -394,7 +394,7 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader,
|
||||
if (s->ok()) {
|
||||
version_set_->GetColumnFamilySet()->UpdateMaxColumnFamily(
|
||||
version_edit_params_.max_column_family_);
|
||||
version_set_->MarkMinLogNumberToKeep2PC(
|
||||
version_set_->MarkMinLogNumberToKeep(
|
||||
version_edit_params_.min_log_number_to_keep_);
|
||||
version_set_->MarkFileNumberUsed(version_edit_params_.prev_log_number_);
|
||||
version_set_->MarkFileNumberUsed(version_edit_params_.log_number_);
|
||||
@ -970,12 +970,11 @@ void DumpManifestHandler::CheckIterationResult(const log::Reader& reader,
|
||||
fprintf(stdout,
|
||||
"next_file_number %" PRIu64 " last_sequence %" PRIu64
|
||||
" prev_log_number %" PRIu64 " max_column_family %" PRIu32
|
||||
" min_log_number_to_keep "
|
||||
"%" PRIu64 "\n",
|
||||
" min_log_number_to_keep %" PRIu64 "\n",
|
||||
version_set_->current_next_file_number(),
|
||||
version_set_->LastSequence(), version_set_->prev_log_number(),
|
||||
version_set_->column_family_set_->GetMaxColumnFamily(),
|
||||
version_set_->min_log_number_to_keep_2pc());
|
||||
version_set_->min_log_number_to_keep());
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
@ -1475,7 +1475,7 @@ void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
|
||||
const uint64_t file_number = file->fd.GetNumber();
|
||||
files.emplace_back(
|
||||
MakeTableFileName("", file_number), file_number, file_path,
|
||||
static_cast<size_t>(file->fd.GetFileSize()), file->fd.smallest_seqno,
|
||||
file->fd.GetFileSize(), file->fd.smallest_seqno,
|
||||
file->fd.largest_seqno, file->smallest.user_key().ToString(),
|
||||
file->largest.user_key().ToString(),
|
||||
file->stats.num_reads_sampled.load(std::memory_order_relaxed),
|
||||
@ -4141,7 +4141,7 @@ void VersionSet::Reset() {
|
||||
}
|
||||
db_id_.clear();
|
||||
next_file_number_.store(2);
|
||||
min_log_number_to_keep_2pc_.store(0);
|
||||
min_log_number_to_keep_.store(0);
|
||||
manifest_file_number_ = 0;
|
||||
options_file_number_ = 0;
|
||||
pending_manifest_file_number_ = 0;
|
||||
@ -4610,8 +4610,7 @@ Status VersionSet::ProcessManifestWrites(
|
||||
}
|
||||
|
||||
if (last_min_log_number_to_keep != 0) {
|
||||
// Should only be set in 2PC mode.
|
||||
MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep);
|
||||
MarkMinLogNumberToKeep(last_min_log_number_to_keep);
|
||||
}
|
||||
|
||||
for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
|
||||
@ -4965,7 +4964,7 @@ Status VersionSet::Recover(
|
||||
",min_log_number_to_keep is %" PRIu64 "\n",
|
||||
manifest_path.c_str(), manifest_file_number_, next_file_number_.load(),
|
||||
last_sequence_.load(), log_number, prev_log_number_,
|
||||
column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
|
||||
column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep());
|
||||
|
||||
for (auto cfd : *column_family_set_) {
|
||||
if (cfd->IsDropped()) {
|
||||
@ -5378,9 +5377,9 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) {
|
||||
}
|
||||
// Called only either from ::LogAndApply which is protected by mutex or during
|
||||
// recovery which is single-threaded.
|
||||
void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
|
||||
if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) {
|
||||
min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed);
|
||||
void VersionSet::MarkMinLogNumberToKeep(uint64_t number) {
|
||||
if (min_log_number_to_keep_.load(std::memory_order_relaxed) < number) {
|
||||
min_log_number_to_keep_.store(number, std::memory_order_relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
@ -5506,7 +5505,7 @@ Status VersionSet::WriteCurrentStateToManifest(
|
||||
// min_log_number_to_keep is for the whole db, not for specific column family.
|
||||
// So it does not need to be set for every column family, just need to be set once.
|
||||
// Since default CF can never be dropped, we set the min_log to the default CF here.
|
||||
uint64_t min_log = min_log_number_to_keep_2pc();
|
||||
uint64_t min_log = min_log_number_to_keep();
|
||||
if (min_log != 0) {
|
||||
edit.SetMinLogNumberToKeep(min_log);
|
||||
}
|
||||
@ -5894,11 +5893,13 @@ void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
|
||||
assert(!cfd->ioptions()->cf_paths.empty());
|
||||
filemetadata.db_path = cfd->ioptions()->cf_paths.back().path;
|
||||
}
|
||||
filemetadata.directory = filemetadata.db_path;
|
||||
const uint64_t file_number = file->fd.GetNumber();
|
||||
filemetadata.name = MakeTableFileName("", file_number);
|
||||
filemetadata.relative_filename = filemetadata.name.substr(1);
|
||||
filemetadata.file_number = file_number;
|
||||
filemetadata.level = level;
|
||||
filemetadata.size = static_cast<size_t>(file->fd.GetFileSize());
|
||||
filemetadata.size = file->fd.GetFileSize();
|
||||
filemetadata.smallestkey = file->smallest.user_key().ToString();
|
||||
filemetadata.largestkey = file->largest.user_key().ToString();
|
||||
filemetadata.smallest_seqno = file->fd.smallest_seqno;
|
||||
|
@ -1128,8 +1128,8 @@ class VersionSet {
|
||||
|
||||
uint64_t current_next_file_number() const { return next_file_number_.load(); }
|
||||
|
||||
uint64_t min_log_number_to_keep_2pc() const {
|
||||
return min_log_number_to_keep_2pc_.load();
|
||||
uint64_t min_log_number_to_keep() const {
|
||||
return min_log_number_to_keep_.load();
|
||||
}
|
||||
|
||||
// Allocate and return a new file number
|
||||
@ -1187,7 +1187,7 @@ class VersionSet {
|
||||
// Mark the specified log number as deleted
|
||||
// REQUIRED: this is only called during single-threaded recovery or repair, or
|
||||
// from ::LogAndApply where the global mutex is held.
|
||||
void MarkMinLogNumberToKeep2PC(uint64_t number);
|
||||
void MarkMinLogNumberToKeep(uint64_t number);
|
||||
|
||||
// Return the log file number for the log file that is currently
|
||||
// being compacted, or zero if there is no such log file.
|
||||
@ -1196,10 +1196,12 @@ class VersionSet {
|
||||
// Returns the minimum log number which still has data not flushed to any SST
|
||||
// file.
|
||||
// In non-2PC mode, all the log numbers smaller than this number can be safely
|
||||
// deleted.
|
||||
// deleted, although we still use `min_log_number_to_keep_` to determine when
|
||||
// to delete a WAL file.
|
||||
uint64_t MinLogNumberWithUnflushedData() const {
|
||||
return PreComputeMinLogNumberWithUnflushedData(nullptr);
|
||||
}
|
||||
|
||||
// Returns the minimum log number which still has data not flushed to any SST
|
||||
// file.
|
||||
// Empty column families' log number is considered to be
|
||||
@ -1297,6 +1299,10 @@ class VersionSet {
|
||||
uint64_t min_pending_output);
|
||||
|
||||
ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); }
|
||||
RefedColumnFamilySet GetRefedColumnFamilySet() {
|
||||
return RefedColumnFamilySet(GetColumnFamilySet());
|
||||
}
|
||||
|
||||
const FileOptions& file_options() { return file_options_; }
|
||||
void ChangeFileOptions(const MutableDBOptions& new_options) {
|
||||
file_options_.writable_file_max_buffer_size =
|
||||
@ -1399,9 +1405,8 @@ class VersionSet {
|
||||
const ImmutableDBOptions* const db_options_;
|
||||
std::atomic<uint64_t> next_file_number_;
|
||||
// Any WAL number smaller than this should be ignored during recovery,
|
||||
// and is qualified for being deleted in 2PC mode. In non-2PC mode, this
|
||||
// number is ignored.
|
||||
std::atomic<uint64_t> min_log_number_to_keep_2pc_ = {0};
|
||||
// and is qualified for being deleted.
|
||||
std::atomic<uint64_t> min_log_number_to_keep_ = {0};
|
||||
uint64_t manifest_file_number_;
|
||||
uint64_t options_file_number_;
|
||||
uint64_t options_file_size_;
|
||||
|
@ -3424,6 +3424,7 @@ TEST_F(VersionSetTestMissingFiles, NoFileMissing) {
|
||||
}
|
||||
|
||||
TEST_F(VersionSetTestMissingFiles, MinLogNumberToKeep2PC) {
|
||||
db_options_.allow_2pc = true;
|
||||
NewDB();
|
||||
|
||||
SstInfo sst(100, kDefaultColumnFamilyName, "a");
|
||||
@ -3435,12 +3436,12 @@ TEST_F(VersionSetTestMissingFiles, MinLogNumberToKeep2PC) {
|
||||
edit.AddFile(0, file_metas[0]);
|
||||
edit.SetMinLogNumberToKeep(kMinWalNumberToKeep2PC);
|
||||
ASSERT_OK(LogAndApplyToDefaultCF(edit));
|
||||
ASSERT_EQ(versions_->min_log_number_to_keep_2pc(), kMinWalNumberToKeep2PC);
|
||||
ASSERT_EQ(versions_->min_log_number_to_keep(), kMinWalNumberToKeep2PC);
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
CreateNewManifest();
|
||||
ReopenDB();
|
||||
ASSERT_EQ(versions_->min_log_number_to_keep_2pc(), kMinWalNumberToKeep2PC);
|
||||
ASSERT_EQ(versions_->min_log_number_to_keep(), kMinWalNumberToKeep2PC);
|
||||
}
|
||||
}
|
||||
|
||||
|
7
env/env_test.cc
vendored
7
env/env_test.cc
vendored
@ -85,6 +85,8 @@ struct Deleter {
|
||||
void (*fn_)(void*);
|
||||
};
|
||||
|
||||
extern "C" bool RocksDbIOUringEnable() { return true; }
|
||||
|
||||
std::unique_ptr<char, Deleter> NewAligned(const size_t size, const char ch) {
|
||||
char* ptr = nullptr;
|
||||
#ifdef OS_WIN
|
||||
@ -1256,7 +1258,7 @@ TEST_P(EnvPosixTestWithParam, MultiRead) {
|
||||
// Random Read
|
||||
Random rnd(301 + attempt);
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"UpdateResults:io_uring_result", [&](void* arg) {
|
||||
"UpdateResults::io_uring_result", [&](void* arg) {
|
||||
if (attempt > 0) {
|
||||
// No failure in the first attempt.
|
||||
size_t& bytes_read = *static_cast<size_t*>(arg);
|
||||
@ -1326,7 +1328,7 @@ TEST_F(EnvPosixTest, MultiReadNonAlignedLargeNum) {
|
||||
const int num_reads = rnd.Uniform(512) + 1;
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"UpdateResults:io_uring_result", [&](void* arg) {
|
||||
"UpdateResults::io_uring_result", [&](void* arg) {
|
||||
if (attempt > 5) {
|
||||
// Improve partial result rates in second half of the run to
|
||||
// cover the case of repeated partial results.
|
||||
@ -3308,7 +3310,6 @@ TEST_F(TestAsyncRead, ReadAsync) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
9
env/fs_posix.cc
vendored
9
env/fs_posix.cc
vendored
@ -1045,9 +1045,12 @@ class PosixFileSystem : public FileSystem {
|
||||
|
||||
// EXPERIMENTAL
|
||||
//
|
||||
// TODO akankshamahajan: Update Poll API to take into account min_completions
|
||||
// TODO akankshamahajan:
|
||||
// 1. Update Poll API to take into account min_completions
|
||||
// and returns if number of handles in io_handles (any order) completed is
|
||||
// equal to atleast min_completions.
|
||||
// 2. Currently in case of direct_io, Read API is called because of which call
|
||||
// to Poll API fails as it expects IOHandle to be populated.
|
||||
virtual IOStatus Poll(std::vector<void*>& io_handles,
|
||||
size_t /*min_completions*/) override {
|
||||
#if defined(ROCKSDB_IOURING_PRESENT)
|
||||
@ -1094,12 +1097,14 @@ class PosixFileSystem : public FileSystem {
|
||||
req.offset = posix_handle->offset;
|
||||
req.len = posix_handle->len;
|
||||
size_t finished_len = 0;
|
||||
size_t bytes_read = 0;
|
||||
UpdateResult(cqe, "", req.len, posix_handle->iov.iov_len,
|
||||
true /*async_read*/, finished_len, &req);
|
||||
true /*async_read*/, finished_len, &req, bytes_read);
|
||||
posix_handle->is_finished = true;
|
||||
io_uring_cqe_seen(iu, cqe);
|
||||
posix_handle->cb(req, posix_handle->cb_arg);
|
||||
(void)finished_len;
|
||||
(void)bytes_read;
|
||||
|
||||
if (static_cast<Posix_IOHandle*>(io_handles[i]) == posix_handle) {
|
||||
break;
|
||||
|
53
env/io_posix.cc
vendored
53
env/io_posix.cc
vendored
@ -744,31 +744,36 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs,
|
||||
wrap_cache.erase(wrap_check);
|
||||
|
||||
FSReadRequest* req = req_wrap->req;
|
||||
size_t bytes_read = 0;
|
||||
UpdateResult(cqe, filename_, req->len, req_wrap->iov.iov_len,
|
||||
false /*async_read*/, req_wrap->finished_len, req);
|
||||
false /*async_read*/, req_wrap->finished_len, req,
|
||||
bytes_read);
|
||||
int32_t res = cqe->res;
|
||||
if (res == 0) {
|
||||
/// cqe->res == 0 can means EOF, or can mean partial results. See
|
||||
// comment
|
||||
// https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
|
||||
// Fall back to pread in this case.
|
||||
if (use_direct_io() && !IsSectorAligned(req_wrap->finished_len,
|
||||
GetRequiredBufferAlignment())) {
|
||||
// Bytes reads don't fill sectors. Should only happen at the end
|
||||
// of the file.
|
||||
req->result = Slice(req->scratch, req_wrap->finished_len);
|
||||
req->status = IOStatus::OK();
|
||||
} else {
|
||||
Slice tmp_slice;
|
||||
req->status =
|
||||
Read(req->offset + req_wrap->finished_len,
|
||||
req->len - req_wrap->finished_len, options, &tmp_slice,
|
||||
req->scratch + req_wrap->finished_len, dbg);
|
||||
req->result =
|
||||
Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
|
||||
if (res >= 0) {
|
||||
if (bytes_read == 0) {
|
||||
/// cqe->res == 0 can means EOF, or can mean partial results. See
|
||||
// comment
|
||||
// https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
|
||||
// Fall back to pread in this case.
|
||||
if (use_direct_io() &&
|
||||
!IsSectorAligned(req_wrap->finished_len,
|
||||
GetRequiredBufferAlignment())) {
|
||||
// Bytes reads don't fill sectors. Should only happen at the end
|
||||
// of the file.
|
||||
req->result = Slice(req->scratch, req_wrap->finished_len);
|
||||
req->status = IOStatus::OK();
|
||||
} else {
|
||||
Slice tmp_slice;
|
||||
req->status =
|
||||
Read(req->offset + req_wrap->finished_len,
|
||||
req->len - req_wrap->finished_len, options, &tmp_slice,
|
||||
req->scratch + req_wrap->finished_len, dbg);
|
||||
req->result =
|
||||
Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
|
||||
}
|
||||
} else if (bytes_read < req_wrap->iov.iov_len) {
|
||||
incomplete_rq_list.push_back(req_wrap);
|
||||
}
|
||||
} else if (res > 0 && res < static_cast<int32_t>(req_wrap->iov.iov_len)) {
|
||||
incomplete_rq_list.push_back(req_wrap);
|
||||
}
|
||||
io_uring_cqe_seen(iu, cqe);
|
||||
}
|
||||
@ -896,8 +901,8 @@ IOStatus PosixRandomAccessFile::ReadAsync(
|
||||
|
||||
// Initialize Posix_IOHandle.
|
||||
posix_handle->iu = iu;
|
||||
posix_handle->iov.iov_base = posix_handle->scratch;
|
||||
posix_handle->iov.iov_len = posix_handle->len;
|
||||
posix_handle->iov.iov_base = req.scratch;
|
||||
posix_handle->iov.iov_len = req.len;
|
||||
posix_handle->cb = cb;
|
||||
posix_handle->cb_arg = cb_arg;
|
||||
posix_handle->offset = req.offset;
|
||||
|
5
env/io_posix.h
vendored
5
env/io_posix.h
vendored
@ -66,12 +66,13 @@ struct Posix_IOHandle {
|
||||
|
||||
inline void UpdateResult(struct io_uring_cqe* cqe, const std::string& file_name,
|
||||
size_t len, size_t iov_len, bool async_read,
|
||||
size_t& finished_len, FSReadRequest* req) {
|
||||
size_t& finished_len, FSReadRequest* req,
|
||||
size_t& bytes_read) {
|
||||
if (cqe->res < 0) {
|
||||
req->result = Slice(req->scratch, 0);
|
||||
req->status = IOError("Req failed", file_name, cqe->res);
|
||||
} else {
|
||||
size_t bytes_read = static_cast<size_t>(cqe->res);
|
||||
bytes_read = static_cast<size_t>(cqe->res);
|
||||
TEST_SYNC_POINT_CALLBACK("UpdateResults::io_uring_result", &bytes_read);
|
||||
if (bytes_read == iov_len) {
|
||||
req->result = Slice(req->scratch, req->len);
|
||||
|
@ -66,6 +66,17 @@ void FilePrefetchBuffer::CalculateOffsetAndLen(size_t alignment,
|
||||
// chunk_len is greater than 0.
|
||||
bufs_[index].buffer_.RefitTail(static_cast<size_t>(chunk_offset_in_buffer),
|
||||
static_cast<size_t>(chunk_len));
|
||||
} else if (chunk_len > 0) {
|
||||
// For async prefetching, it doesn't call RefitTail with chunk_len > 0.
|
||||
// Allocate new buffer if needed because aligned buffer calculate remaining
|
||||
// buffer as capacity_ - cursize_ which might not be the case in this as we
|
||||
// are not refitting.
|
||||
// TODO akanksha: Update the condition when asynchronous prefetching is
|
||||
// stable.
|
||||
bufs_[index].buffer_.Alignment(alignment);
|
||||
bufs_[index].buffer_.AllocateNewBuffer(
|
||||
static_cast<size_t>(roundup_len), copy_data_to_new_buffer,
|
||||
chunk_offset_in_buffer, static_cast<size_t>(chunk_len));
|
||||
}
|
||||
}
|
||||
|
||||
@ -100,13 +111,6 @@ Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts,
|
||||
Env::IOPriority rate_limiter_priority,
|
||||
uint64_t read_len, uint64_t chunk_len,
|
||||
uint64_t rounddown_start, uint32_t index) {
|
||||
// Reset io_handle.
|
||||
if (io_handle_ != nullptr && del_fn_ != nullptr) {
|
||||
del_fn_(io_handle_);
|
||||
io_handle_ = nullptr;
|
||||
del_fn_ = nullptr;
|
||||
}
|
||||
|
||||
// callback for async read request.
|
||||
auto fp = std::bind(&FilePrefetchBuffer::PrefetchAsyncCallback, this,
|
||||
std::placeholders::_1, std::placeholders::_2);
|
||||
@ -118,6 +122,7 @@ Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts,
|
||||
req.scratch = bufs_[index].buffer_.BufferStart() + chunk_len;
|
||||
Status s = reader->ReadAsync(req, opts, fp, nullptr /*cb_arg*/, &io_handle_,
|
||||
&del_fn_, rate_limiter_priority);
|
||||
req.status.PermitUncheckedError();
|
||||
if (s.ok()) {
|
||||
async_read_in_progress_ = true;
|
||||
}
|
||||
@ -210,24 +215,34 @@ void FilePrefetchBuffer::CopyDataToBuffer(uint32_t src, uint64_t& offset,
|
||||
// the asynchronous prefetching in second buffer.
|
||||
Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
|
||||
RandomAccessFileReader* reader,
|
||||
FileSystem* fs, uint64_t offset,
|
||||
size_t length, size_t readahead_size,
|
||||
uint64_t offset, size_t length,
|
||||
size_t readahead_size,
|
||||
Env::IOPriority rate_limiter_priority,
|
||||
bool& copy_to_third_buffer) {
|
||||
if (!enable_) {
|
||||
return Status::OK();
|
||||
}
|
||||
if (async_read_in_progress_ && fs != nullptr) {
|
||||
if (async_read_in_progress_ && fs_ != nullptr) {
|
||||
// Wait for prefetch data to complete.
|
||||
// No mutex is needed as PrefetchAsyncCallback updates the result in second
|
||||
// buffer and FilePrefetchBuffer should wait for Poll before accessing the
|
||||
// second buffer.
|
||||
std::vector<void*> handles;
|
||||
handles.emplace_back(io_handle_);
|
||||
fs->Poll(handles, 1).PermitUncheckedError();
|
||||
fs_->Poll(handles, 1).PermitUncheckedError();
|
||||
}
|
||||
|
||||
// TODO akanksha: Update TEST_SYNC_POINT after new tests are added.
|
||||
// Reset and Release io_handle_ after the Poll API as request has been
|
||||
// completed.
|
||||
async_read_in_progress_ = false;
|
||||
if (io_handle_ != nullptr && del_fn_ != nullptr) {
|
||||
del_fn_(io_handle_);
|
||||
io_handle_ = nullptr;
|
||||
del_fn_ = nullptr;
|
||||
}
|
||||
|
||||
// TODO akanksha: Update TEST_SYNC_POINT after Async APIs are merged with
|
||||
// normal prefetching.
|
||||
TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start");
|
||||
Status s;
|
||||
size_t prefetch_size = length + readahead_size;
|
||||
@ -236,34 +251,47 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
|
||||
// Index of second buffer.
|
||||
uint32_t second = curr_ ^ 1;
|
||||
|
||||
// First clear the buffers if it contains outdated data. Outdated data can be
|
||||
// because previous sequential reads were read from the cache instead of these
|
||||
// buffer.
|
||||
{
|
||||
if (bufs_[curr_].buffer_.CurrentSize() > 0 &&
|
||||
offset >= bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) {
|
||||
bufs_[curr_].buffer_.Clear();
|
||||
}
|
||||
if (bufs_[second].buffer_.CurrentSize() > 0 &&
|
||||
offset >= bufs_[second].offset_ + bufs_[second].buffer_.CurrentSize()) {
|
||||
bufs_[second].buffer_.Clear();
|
||||
}
|
||||
}
|
||||
|
||||
// If data is in second buffer, make it curr_. Second buffer can be either
|
||||
// partial filled or full.
|
||||
if (bufs_[second].buffer_.CurrentSize() > 0 &&
|
||||
offset >= bufs_[second].offset_ &&
|
||||
offset <= bufs_[second].offset_ + bufs_[second].buffer_.CurrentSize()) {
|
||||
offset < bufs_[second].offset_ + bufs_[second].buffer_.CurrentSize()) {
|
||||
// Clear the curr_ as buffers have been swapped and curr_ contains the
|
||||
// outdated data.
|
||||
// outdated data and switch the buffers.
|
||||
bufs_[curr_].buffer_.Clear();
|
||||
// Switch the buffers.
|
||||
curr_ = curr_ ^ 1;
|
||||
second = curr_ ^ 1;
|
||||
}
|
||||
|
||||
// If second buffer contains outdated data, clear it for async prefetching.
|
||||
// Outdated can be because previous sequential reads were read from the cache
|
||||
// instead of this buffer.
|
||||
if (bufs_[second].buffer_.CurrentSize() > 0 &&
|
||||
offset >= bufs_[second].offset_ + bufs_[second].buffer_.CurrentSize()) {
|
||||
bufs_[second].buffer_.Clear();
|
||||
// After swap check if all the requested bytes are in curr_, it will go for
|
||||
// async prefetching only.
|
||||
if (bufs_[curr_].buffer_.CurrentSize() > 0 &&
|
||||
offset + length <=
|
||||
bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) {
|
||||
offset += length;
|
||||
length = 0;
|
||||
prefetch_size -= length;
|
||||
}
|
||||
|
||||
// Data is overlapping i.e. some of the data is in curr_ buffer and remaining
|
||||
// in second buffer.
|
||||
if (bufs_[curr_].buffer_.CurrentSize() > 0 &&
|
||||
bufs_[second].buffer_.CurrentSize() > 0 &&
|
||||
offset >= bufs_[curr_].offset_ &&
|
||||
offset < bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize() &&
|
||||
offset + prefetch_size > bufs_[second].offset_) {
|
||||
offset + length > bufs_[second].offset_) {
|
||||
// Allocate new buffer to third buffer;
|
||||
bufs_[2].buffer_.Clear();
|
||||
bufs_[2].buffer_.Alignment(alignment);
|
||||
@ -273,12 +301,10 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
|
||||
|
||||
// Move data from curr_ buffer to third.
|
||||
CopyDataToBuffer(curr_, offset, length);
|
||||
|
||||
if (length == 0) {
|
||||
// Requested data has been copied and curr_ still has unconsumed data.
|
||||
return s;
|
||||
}
|
||||
|
||||
CopyDataToBuffer(second, offset, length);
|
||||
// Length == 0: All the requested data has been copied to third buffer. It
|
||||
// should go for only async prefetching.
|
||||
@ -306,6 +332,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
|
||||
if (length > 0) {
|
||||
CalculateOffsetAndLen(alignment, offset, roundup_len1, curr_,
|
||||
false /*refit_tail*/, chunk_len1);
|
||||
assert(roundup_len1 >= chunk_len1);
|
||||
read_len1 = static_cast<size_t>(roundup_len1 - chunk_len1);
|
||||
}
|
||||
{
|
||||
@ -316,7 +343,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
|
||||
Roundup(rounddown_start2 + readahead_size, alignment);
|
||||
|
||||
// For length == 0, do the asynchronous prefetching in second instead of
|
||||
// synchronous prefetching of remaining prefetch_size.
|
||||
// synchronous prefetching in curr_.
|
||||
if (length == 0) {
|
||||
rounddown_start2 =
|
||||
bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize();
|
||||
@ -330,8 +357,8 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
|
||||
|
||||
// Update the buffer offset.
|
||||
bufs_[second].offset_ = rounddown_start2;
|
||||
assert(roundup_len2 >= chunk_len2);
|
||||
uint64_t read_len2 = static_cast<size_t>(roundup_len2 - chunk_len2);
|
||||
|
||||
ReadAsync(opts, reader, rate_limiter_priority, read_len2, chunk_len2,
|
||||
rounddown_start2, second)
|
||||
.PermitUncheckedError();
|
||||
@ -344,7 +371,6 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
|
||||
return s;
|
||||
}
|
||||
}
|
||||
|
||||
// Copy remaining requested bytes to third_buffer.
|
||||
if (copy_to_third_buffer && length > 0) {
|
||||
CopyDataToBuffer(curr_, offset, length);
|
||||
@ -416,8 +442,8 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
|
||||
bool FilePrefetchBuffer::TryReadFromCacheAsync(
|
||||
const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
|
||||
size_t n, Slice* result, Status* status,
|
||||
Env::IOPriority rate_limiter_priority, bool for_compaction /* = false */,
|
||||
FileSystem* fs) {
|
||||
Env::IOPriority rate_limiter_priority, bool for_compaction /* = false */
|
||||
) {
|
||||
if (track_min_offset_ && offset < min_offset_read_) {
|
||||
min_offset_read_ = static_cast<size_t>(offset);
|
||||
}
|
||||
@ -452,7 +478,7 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync(
|
||||
if (implicit_auto_readahead_ && async_io_) {
|
||||
// Prefetch n + readahead_size_/2 synchronously as remaining
|
||||
// readahead_size_/2 will be prefetched asynchronously.
|
||||
s = PrefetchAsync(opts, reader, fs, offset, n, readahead_size_ / 2,
|
||||
s = PrefetchAsync(opts, reader, offset, n, readahead_size_ / 2,
|
||||
rate_limiter_priority, copy_to_third_buffer);
|
||||
} else {
|
||||
s = Prefetch(opts, reader, offset, n + readahead_size_,
|
||||
@ -489,7 +515,6 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync(
|
||||
|
||||
void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req,
|
||||
void* /*cb_arg*/) {
|
||||
async_read_in_progress_ = false;
|
||||
uint32_t index = curr_ ^ 1;
|
||||
if (req.status.ok()) {
|
||||
if (req.offset + req.result.size() <=
|
||||
@ -505,12 +530,5 @@ void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req,
|
||||
size_t current_size = bufs_[index].buffer_.CurrentSize();
|
||||
bufs_[index].buffer_.Size(current_size + req.result.size());
|
||||
}
|
||||
|
||||
// Release io_handle_.
|
||||
if (io_handle_ != nullptr && del_fn_ != nullptr) {
|
||||
del_fn_(io_handle_);
|
||||
io_handle_ = nullptr;
|
||||
del_fn_ = nullptr;
|
||||
}
|
||||
}
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
@ -65,7 +65,7 @@ class FilePrefetchBuffer {
|
||||
FilePrefetchBuffer(size_t readahead_size = 0, size_t max_readahead_size = 0,
|
||||
bool enable = true, bool track_min_offset = false,
|
||||
bool implicit_auto_readahead = false,
|
||||
bool async_io = false)
|
||||
bool async_io = false, FileSystem* fs = nullptr)
|
||||
: curr_(0),
|
||||
readahead_size_(readahead_size),
|
||||
max_readahead_size_(max_readahead_size),
|
||||
@ -79,13 +79,29 @@ class FilePrefetchBuffer {
|
||||
io_handle_(nullptr),
|
||||
del_fn_(nullptr),
|
||||
async_read_in_progress_(false),
|
||||
async_io_(async_io) {
|
||||
async_io_(async_io),
|
||||
fs_(fs) {
|
||||
// If async_io_ is enabled, data is asynchronously filled in second buffer
|
||||
// while curr_ is being consumed. If data is overlapping in two buffers,
|
||||
// data is copied to third buffer to return continuous buffer.
|
||||
bufs_.resize(3);
|
||||
}
|
||||
|
||||
~FilePrefetchBuffer() {
|
||||
// Wait for any pending async job before destroying the class object.
|
||||
if (async_read_in_progress_ && fs_ != nullptr) {
|
||||
std::vector<void*> handles;
|
||||
handles.emplace_back(io_handle_);
|
||||
fs_->Poll(handles, 1).PermitUncheckedError();
|
||||
}
|
||||
// Release io_handle_.
|
||||
if (io_handle_ != nullptr && del_fn_ != nullptr) {
|
||||
del_fn_(io_handle_);
|
||||
io_handle_ = nullptr;
|
||||
del_fn_ = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
// Load data into the buffer from a file.
|
||||
// reader : the file reader.
|
||||
// offset : the file offset to start reading from.
|
||||
@ -100,8 +116,7 @@ class FilePrefetchBuffer {
|
||||
Env::IOPriority rate_limiter_priority);
|
||||
|
||||
Status PrefetchAsync(const IOOptions& opts, RandomAccessFileReader* reader,
|
||||
FileSystem* fs, uint64_t offset, size_t length,
|
||||
size_t readahead_size,
|
||||
uint64_t offset, size_t length, size_t readahead_size,
|
||||
Env::IOPriority rate_limiter_priority,
|
||||
bool& copy_to_third_buffer);
|
||||
|
||||
@ -129,7 +144,7 @@ class FilePrefetchBuffer {
|
||||
RandomAccessFileReader* reader, uint64_t offset,
|
||||
size_t n, Slice* result, Status* status,
|
||||
Env::IOPriority rate_limiter_priority,
|
||||
bool for_compaction /* = false */, FileSystem* fs);
|
||||
bool for_compaction /* = false */);
|
||||
|
||||
// The minimum `offset` ever passed to TryReadFromCache(). This will nly be
|
||||
// tracked if track_min_offset = true.
|
||||
@ -256,5 +271,6 @@ class FilePrefetchBuffer {
|
||||
IOHandleDeleter del_fn_;
|
||||
bool async_read_in_progress_;
|
||||
bool async_io_;
|
||||
FileSystem* fs_;
|
||||
};
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
@ -491,6 +491,12 @@ Status GetInfoLogFiles(const std::shared_ptr<FileSystem>& fs,
|
||||
|
||||
std::string NormalizePath(const std::string& path) {
|
||||
std::string dst;
|
||||
|
||||
if (path.length() > 2 && path[0] == kFilePathSeparator &&
|
||||
path[1] == kFilePathSeparator) { // Handle UNC names
|
||||
dst.append(2, kFilePathSeparator);
|
||||
}
|
||||
|
||||
for (auto c : path) {
|
||||
if (!dst.empty() && (c == kFilePathSeparator || c == '/') &&
|
||||
(dst.back() == kFilePathSeparator || dst.back() == '/')) {
|
||||
|
@ -694,8 +694,10 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
|
||||
options.write_buffer_size = 1024;
|
||||
options.create_if_missing = true;
|
||||
options.compression = kNoCompression;
|
||||
options.statistics = CreateDBStatistics();
|
||||
options.env = env.get();
|
||||
if (std::get<0>(GetParam())) {
|
||||
bool use_direct_io = std::get<0>(GetParam());
|
||||
if (use_direct_io) {
|
||||
options.use_direct_reads = true;
|
||||
options.use_direct_io_for_flush_and_compaction = true;
|
||||
}
|
||||
@ -708,8 +710,7 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
|
||||
Status s = TryReopen(options);
|
||||
if (std::get<0>(GetParam()) &&
|
||||
(s.IsNotSupported() || s.IsInvalidArgument())) {
|
||||
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
|
||||
// If direct IO is not supported, skip the test
|
||||
return;
|
||||
} else {
|
||||
@ -766,6 +767,8 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
|
||||
// TODO akanksha: Remove after adding new units.
|
||||
ro.async_io = true;
|
||||
}
|
||||
|
||||
ASSERT_OK(options.statistics->Reset());
|
||||
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
|
||||
int num_keys = 0;
|
||||
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
||||
@ -773,15 +776,25 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
|
||||
num_keys++;
|
||||
}
|
||||
ASSERT_EQ(num_keys, total_keys);
|
||||
|
||||
ASSERT_GT(buff_prefetch_count, 0);
|
||||
buff_prefetch_count = 0;
|
||||
// For index and data blocks.
|
||||
if (is_adaptive_readahead) {
|
||||
ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1));
|
||||
} else {
|
||||
ASSERT_EQ(readahead_carry_over_count, 0);
|
||||
}
|
||||
|
||||
// Check stats to make sure async prefetch is done.
|
||||
{
|
||||
HistogramData async_read_bytes;
|
||||
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
|
||||
if (ro.async_io && !use_direct_io) {
|
||||
ASSERT_GT(async_read_bytes.count, 0);
|
||||
} else {
|
||||
ASSERT_EQ(async_read_bytes.count, 0);
|
||||
}
|
||||
}
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
}
|
||||
@ -902,6 +915,8 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
|
||||
options.use_direct_reads = true;
|
||||
options.use_direct_io_for_flush_and_compaction = true;
|
||||
}
|
||||
|
||||
options.statistics = CreateDBStatistics();
|
||||
BlockBasedTableOptions table_options;
|
||||
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); // 8MB
|
||||
table_options.block_cache = cache;
|
||||
@ -948,7 +963,6 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
ReadOptions ro;
|
||||
ro.adaptive_readahead = true;
|
||||
// TODO akanksha: Remove after adding new units.
|
||||
ro.async_io = true;
|
||||
{
|
||||
/*
|
||||
@ -964,7 +978,9 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
|
||||
iter->Seek(BuildKey(1019));
|
||||
buff_prefetch_count = 0;
|
||||
}
|
||||
|
||||
{
|
||||
ASSERT_OK(options.statistics->Reset());
|
||||
// After caching, blocks will be read from cache (Sequential blocks)
|
||||
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
|
||||
iter->Seek(BuildKey(0));
|
||||
@ -1008,11 +1024,118 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ(current_readahead_size, expected_current_readahead_size);
|
||||
ASSERT_EQ(buff_prefetch_count, 2);
|
||||
|
||||
// Check stats to make sure async prefetch is done.
|
||||
{
|
||||
HistogramData async_read_bytes;
|
||||
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
|
||||
if (GetParam()) {
|
||||
ASSERT_EQ(async_read_bytes.count, 0);
|
||||
} else {
|
||||
ASSERT_GT(async_read_bytes.count, 0);
|
||||
}
|
||||
}
|
||||
|
||||
buff_prefetch_count = 0;
|
||||
}
|
||||
Close();
|
||||
}
|
||||
|
||||
extern "C" bool RocksDbIOUringEnable() { return true; }
|
||||
|
||||
// Tests the default implementation of ReadAsync API with PosixFileSystem.
|
||||
TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) {
|
||||
if (mem_env_ || encrypted_env_) {
|
||||
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
|
||||
return;
|
||||
}
|
||||
|
||||
const int kNumKeys = 1000;
|
||||
std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(
|
||||
FileSystem::Default(), /*support_prefetch=*/false);
|
||||
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
|
||||
|
||||
bool use_direct_io = false;
|
||||
Options options = CurrentOptions();
|
||||
options.write_buffer_size = 1024;
|
||||
options.create_if_missing = true;
|
||||
options.compression = kNoCompression;
|
||||
options.env = env.get();
|
||||
options.statistics = CreateDBStatistics();
|
||||
if (use_direct_io) {
|
||||
options.use_direct_reads = true;
|
||||
options.use_direct_io_for_flush_and_compaction = true;
|
||||
}
|
||||
BlockBasedTableOptions table_options;
|
||||
table_options.no_block_cache = true;
|
||||
table_options.cache_index_and_filter_blocks = false;
|
||||
table_options.metadata_block_size = 1024;
|
||||
table_options.index_type =
|
||||
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
|
||||
Status s = TryReopen(options);
|
||||
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
|
||||
// If direct IO is not supported, skip the test
|
||||
return;
|
||||
} else {
|
||||
ASSERT_OK(s);
|
||||
}
|
||||
|
||||
int total_keys = 0;
|
||||
// Write the keys.
|
||||
{
|
||||
WriteBatch batch;
|
||||
Random rnd(309);
|
||||
for (int j = 0; j < 5; j++) {
|
||||
for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) {
|
||||
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
|
||||
total_keys++;
|
||||
}
|
||||
ASSERT_OK(db_->Write(WriteOptions(), &batch));
|
||||
ASSERT_OK(Flush());
|
||||
}
|
||||
MoveFilesToLevel(2);
|
||||
}
|
||||
|
||||
int buff_prefetch_count = 0;
|
||||
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
|
||||
[&](void*) { buff_prefetch_count++; });
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
// Read the keys.
|
||||
{
|
||||
ReadOptions ro;
|
||||
ro.adaptive_readahead = true;
|
||||
ro.async_io = true;
|
||||
|
||||
ASSERT_OK(options.statistics->Reset());
|
||||
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
|
||||
int num_keys = 0;
|
||||
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
||||
ASSERT_OK(iter->status());
|
||||
num_keys++;
|
||||
}
|
||||
ASSERT_EQ(num_keys, total_keys);
|
||||
ASSERT_GT(buff_prefetch_count, 0);
|
||||
|
||||
// Check stats to make sure async prefetch is done.
|
||||
{
|
||||
HistogramData async_read_bytes;
|
||||
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
|
||||
#if defined(ROCKSDB_IOURING_PRESENT)
|
||||
ASSERT_GT(async_read_bytes.count, 0);
|
||||
#else
|
||||
ASSERT_EQ(async_read_bytes.count, 0);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
|
||||
Close();
|
||||
}
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -425,19 +425,75 @@ IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO akanksha: Add perf_times etc.
|
||||
// TODO akanksha:
|
||||
// 1. Handle use_direct_io case which currently calls Read API.
|
||||
IOStatus RandomAccessFileReader::ReadAsync(
|
||||
FSReadRequest& req, const IOOptions& opts,
|
||||
std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg,
|
||||
void** io_handle, IOHandleDeleter* del_fn,
|
||||
Env::IOPriority rate_limiter_priority) {
|
||||
if (use_direct_io()) {
|
||||
// For direct_io, it calls Read API.
|
||||
req.status = Read(opts, req.offset, req.len, &(req.result), req.scratch,
|
||||
nullptr /*dbg*/, rate_limiter_priority);
|
||||
cb(req, cb_arg);
|
||||
return IOStatus::OK();
|
||||
}
|
||||
return file_->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn,
|
||||
nullptr /*dbg*/);
|
||||
|
||||
// Create a callback and populate info.
|
||||
auto read_async_callback =
|
||||
std::bind(&RandomAccessFileReader::ReadAsyncCallback, this,
|
||||
std::placeholders::_1, std::placeholders::_2);
|
||||
ReadAsyncInfo* read_async_info = new ReadAsyncInfo;
|
||||
read_async_info->cb_ = cb;
|
||||
read_async_info->cb_arg_ = cb_arg;
|
||||
read_async_info->start_time_ = clock_->NowMicros();
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
if (ShouldNotifyListeners()) {
|
||||
read_async_info->fs_start_ts_ = FileOperationInfo::StartNow();
|
||||
}
|
||||
#endif
|
||||
|
||||
IOStatus s = file_->ReadAsync(req, opts, read_async_callback, read_async_info,
|
||||
io_handle, del_fn, nullptr /*dbg*/);
|
||||
if (!s.ok()) {
|
||||
delete read_async_info;
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
void RandomAccessFileReader::ReadAsyncCallback(const FSReadRequest& req,
|
||||
void* cb_arg) {
|
||||
ReadAsyncInfo* read_async_info = static_cast<ReadAsyncInfo*>(cb_arg);
|
||||
assert(read_async_info);
|
||||
assert(read_async_info->cb_);
|
||||
|
||||
read_async_info->cb_(req, read_async_info->cb_arg_);
|
||||
|
||||
// Update stats and notify listeners.
|
||||
if (stats_ != nullptr && file_read_hist_ != nullptr) {
|
||||
// elapsed doesn't take into account delay and overwrite as StopWatch does
|
||||
// in Read.
|
||||
uint64_t elapsed = clock_->NowMicros() - read_async_info->start_time_;
|
||||
file_read_hist_->Add(elapsed);
|
||||
}
|
||||
if (req.status.ok()) {
|
||||
RecordInHistogram(stats_, ASYNC_READ_BYTES, req.result.size());
|
||||
}
|
||||
#ifndef ROCKSDB_LITE
|
||||
if (ShouldNotifyListeners()) {
|
||||
auto finish_ts = FileOperationInfo::FinishNow();
|
||||
NotifyOnFileReadFinish(req.offset, req.result.size(),
|
||||
read_async_info->fs_start_ts_, finish_ts,
|
||||
req.status);
|
||||
}
|
||||
if (!req.status.ok()) {
|
||||
NotifyOnIOError(req.status, FileOperationType::kRead, file_name(),
|
||||
req.result.size(), req.offset);
|
||||
}
|
||||
#endif
|
||||
RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size());
|
||||
delete read_async_info;
|
||||
}
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
@ -92,6 +92,15 @@ class RandomAccessFileReader {
|
||||
const Temperature file_temperature_;
|
||||
const bool is_last_level_;
|
||||
|
||||
struct ReadAsyncInfo {
|
||||
#ifndef ROCKSDB_LITE
|
||||
FileOperationInfo::StartTimePoint fs_start_ts_;
|
||||
#endif
|
||||
uint64_t start_time_;
|
||||
std::function<void(const FSReadRequest&, void*)> cb_;
|
||||
void* cb_arg_;
|
||||
};
|
||||
|
||||
public:
|
||||
explicit RandomAccessFileReader(
|
||||
std::unique_ptr<FSRandomAccessFile>&& raf, const std::string& _file_name,
|
||||
@ -179,5 +188,7 @@ class RandomAccessFileReader {
|
||||
std::function<void(const FSReadRequest&, void*)> cb,
|
||||
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
|
||||
Env::IOPriority rate_limiter_priority);
|
||||
|
||||
void ReadAsyncCallback(const FSReadRequest& req, void* cb_arg);
|
||||
};
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
@ -160,7 +160,9 @@ class WritableFileWriter {
|
||||
bool perform_data_verification_;
|
||||
uint32_t buffered_data_crc32c_checksum_;
|
||||
bool buffered_data_with_checksum_;
|
||||
#ifndef ROCKSDB_LITE
|
||||
Temperature temperature_;
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
public:
|
||||
WritableFileWriter(
|
||||
@ -191,8 +193,10 @@ class WritableFileWriter {
|
||||
checksum_finalized_(false),
|
||||
perform_data_verification_(perform_data_verification),
|
||||
buffered_data_crc32c_checksum_(0),
|
||||
buffered_data_with_checksum_(buffered_data_with_checksum),
|
||||
temperature_(options.temperature) {
|
||||
buffered_data_with_checksum_(buffered_data_with_checksum) {
|
||||
#ifndef ROCKSDB_LITE
|
||||
temperature_ = options.temperature;
|
||||
#endif // ROCKSDB_LITE
|
||||
assert(!use_direct_io() || max_buffer_size_ > 0);
|
||||
TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0",
|
||||
reinterpret_cast<void*>(max_buffer_size_));
|
||||
|
@ -90,6 +90,19 @@ class FilterPolicy : public Customizable {
|
||||
virtual ~FilterPolicy();
|
||||
static const char* Type() { return "FilterPolicy"; }
|
||||
|
||||
// The name used for identifying whether a filter on disk is readable
|
||||
// by this FilterPolicy. If this FilterPolicy is part of a family that
|
||||
// can read each others filters, such as built-in BloomFilterPolcy and
|
||||
// RibbonFilterPolicy, the CompatibilityName is a shared family name,
|
||||
// while kinds of filters in the family can have distinct Customizable
|
||||
// Names. This function is pure virtual so that wrappers around built-in
|
||||
// policies are prompted to defer to CompatibilityName() of the wrapped
|
||||
// policy, which is important for compatibility.
|
||||
//
|
||||
// For custom filter policies that are not part of a read-compatible
|
||||
// family (rare), implementations may return Name().
|
||||
virtual const char* CompatibilityName() const = 0;
|
||||
|
||||
// Creates a new FilterPolicy based on the input value string and returns the
|
||||
// result The value might be an ID, and ID with properties, or an old-style
|
||||
// policy string.
|
||||
|
@ -72,10 +72,10 @@ struct LiveFileStorageInfo : public FileStorageInfo {
|
||||
// The metadata that describes an SST file. (Does not need to extend
|
||||
// LiveFileStorageInfo because SST files are always immutable.)
|
||||
struct SstFileMetaData : public FileStorageInfo {
|
||||
SstFileMetaData() {}
|
||||
SstFileMetaData() { file_type = kTableFile; }
|
||||
|
||||
SstFileMetaData(const std::string& _file_name, uint64_t _file_number,
|
||||
const std::string& _directory, size_t _size,
|
||||
const std::string& _directory, uint64_t _size,
|
||||
SequenceNumber _smallest_seqno, SequenceNumber _largest_seqno,
|
||||
const std::string& _smallestkey,
|
||||
const std::string& _largestkey, uint64_t _num_reads_sampled,
|
||||
|
@ -534,6 +534,8 @@ enum Histograms : uint32_t {
|
||||
// Error handler statistics
|
||||
ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
|
||||
|
||||
ASYNC_READ_BYTES,
|
||||
|
||||
HISTOGRAM_ENUM_MAX,
|
||||
};
|
||||
|
||||
|
@ -11,7 +11,7 @@
|
||||
|
||||
#define ROCKSDB_MAJOR 7
|
||||
#define ROCKSDB_MINOR 1
|
||||
#define ROCKSDB_PATCH 0
|
||||
#define ROCKSDB_PATCH 2
|
||||
|
||||
// Do not use these. We made the mistake of declaring macros starting with
|
||||
// double underscore. Now we have to live with our choice. We'll deprecate these
|
||||
|
@ -5551,7 +5551,9 @@ class HistogramTypeJni {
|
||||
case ROCKSDB_NAMESPACE::Histograms::NUM_SST_READ_PER_LEVEL:
|
||||
return 0x31;
|
||||
case ROCKSDB_NAMESPACE::Histograms::ERROR_HANDLER_AUTORESUME_RETRY_COUNT:
|
||||
return 0x31;
|
||||
return 0x32;
|
||||
case ROCKSDB_NAMESPACE::Histograms::ASYNC_READ_BYTES:
|
||||
return 0x33;
|
||||
case ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX:
|
||||
// 0x1F for backwards compatibility on current minor version.
|
||||
return 0x1F;
|
||||
@ -5669,6 +5671,8 @@ class HistogramTypeJni {
|
||||
case 0x32:
|
||||
return ROCKSDB_NAMESPACE::Histograms::
|
||||
ERROR_HANDLER_AUTORESUME_RETRY_COUNT;
|
||||
case 0x33:
|
||||
return ROCKSDB_NAMESPACE::Histograms::ASYNC_READ_BYTES;
|
||||
case 0x1F:
|
||||
// 0x1F for backwards compatibility on current minor version.
|
||||
return ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX;
|
||||
|
@ -180,6 +180,8 @@ public enum HistogramType {
|
||||
*/
|
||||
ERROR_HANDLER_AUTORESUME_RETRY_COUNT((byte) 0x32),
|
||||
|
||||
ASYNC_READ_BYTES((byte) 0x33),
|
||||
|
||||
// 0x1F for backwards compatibility on current minor version.
|
||||
HISTOGRAM_ENUM_MAX((byte) 0x1F);
|
||||
|
||||
|
@ -54,10 +54,9 @@ class InstrumentedMutex {
|
||||
class ALIGN_AS(CACHE_LINE_SIZE) CacheAlignedInstrumentedMutex
|
||||
: public InstrumentedMutex {
|
||||
using InstrumentedMutex::InstrumentedMutex;
|
||||
char padding[(CACHE_LINE_SIZE - sizeof(InstrumentedMutex) % CACHE_LINE_SIZE) %
|
||||
CACHE_LINE_SIZE] ROCKSDB_FIELD_UNUSED;
|
||||
};
|
||||
static_assert(sizeof(CacheAlignedInstrumentedMutex) % CACHE_LINE_SIZE == 0);
|
||||
static_assert(alignof(CacheAlignedInstrumentedMutex) != CACHE_LINE_SIZE ||
|
||||
sizeof(CacheAlignedInstrumentedMutex) % CACHE_LINE_SIZE == 0);
|
||||
|
||||
// RAII wrapper for InstrumentedMutex
|
||||
class InstrumentedMutexLock {
|
||||
|
@ -283,6 +283,7 @@ const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
|
||||
{NUM_SST_READ_PER_LEVEL, "rocksdb.num.sst.read.per.level"},
|
||||
{ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
|
||||
"rocksdb.error.handler.autoresume.retry.count"},
|
||||
{ASYNC_READ_BYTES, "rocksdb.async.read.bytes"},
|
||||
};
|
||||
|
||||
std::shared_ptr<Statistics> CreateDBStatistics() {
|
||||
|
@ -1487,6 +1487,7 @@ class MockFilterPolicy : public FilterPolicy {
|
||||
public:
|
||||
static const char* kClassName() { return "MockFilterPolicy"; }
|
||||
const char* Name() const override { return kClassName(); }
|
||||
const char* CompatibilityName() const override { return Name(); }
|
||||
FilterBitsBuilder* GetBuilderWithContext(
|
||||
const FilterBuildingContext&) const override {
|
||||
return nullptr;
|
||||
|
@ -1605,7 +1605,7 @@ void BlockBasedTableBuilder::WriteFilterBlock(
|
||||
? BlockBasedTable::kPartitionedFilterBlockPrefix
|
||||
: BlockBasedTable::kFullFilterBlockPrefix;
|
||||
}
|
||||
key.append(rep_->table_options.filter_policy->Name());
|
||||
key.append(rep_->table_options.filter_policy->CompatibilityName());
|
||||
meta_index_builder->Add(key, filter_block_handle);
|
||||
}
|
||||
}
|
||||
|
@ -12,6 +12,7 @@
|
||||
#include <array>
|
||||
#include <limits>
|
||||
#include <string>
|
||||
#include <unordered_set>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
@ -50,6 +51,7 @@
|
||||
#include "table/block_based/block_prefix_index.h"
|
||||
#include "table/block_based/block_type.h"
|
||||
#include "table/block_based/filter_block.h"
|
||||
#include "table/block_based/filter_policy_internal.h"
|
||||
#include "table/block_based/full_filter_block.h"
|
||||
#include "table/block_based/hash_index_reader.h"
|
||||
#include "table/block_based/partitioned_filter_block.h"
|
||||
@ -897,33 +899,59 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks(
|
||||
const BlockBasedTableOptions& table_options, const int level,
|
||||
size_t file_size, size_t max_file_size_for_l0_meta_pin,
|
||||
BlockCacheLookupContext* lookup_context) {
|
||||
Status s;
|
||||
|
||||
// Find filter handle and filter type
|
||||
if (rep_->filter_policy) {
|
||||
for (auto filter_type :
|
||||
{Rep::FilterType::kFullFilter, Rep::FilterType::kPartitionedFilter,
|
||||
Rep::FilterType::kBlockFilter}) {
|
||||
std::string prefix;
|
||||
switch (filter_type) {
|
||||
case Rep::FilterType::kFullFilter:
|
||||
prefix = kFullFilterBlockPrefix;
|
||||
auto name = rep_->filter_policy->CompatibilityName();
|
||||
bool builtin_compatible =
|
||||
strcmp(name, BuiltinFilterPolicy::kCompatibilityName()) == 0;
|
||||
|
||||
for (const auto& [filter_type, prefix] :
|
||||
{std::make_pair(Rep::FilterType::kFullFilter, kFullFilterBlockPrefix),
|
||||
std::make_pair(Rep::FilterType::kPartitionedFilter,
|
||||
kPartitionedFilterBlockPrefix),
|
||||
std::make_pair(Rep::FilterType::kBlockFilter, kFilterBlockPrefix)}) {
|
||||
if (builtin_compatible) {
|
||||
// This code is only here to deal with a hiccup in early 7.0.x where
|
||||
// there was an unintentional name change in the SST files metadata.
|
||||
// It should be OK to remove this in the future (late 2022) and just
|
||||
// have the 'else' code.
|
||||
// NOTE: the test:: names below are likely not needed but included
|
||||
// out of caution
|
||||
static const std::unordered_set<std::string> kBuiltinNameAndAliases = {
|
||||
BuiltinFilterPolicy::kCompatibilityName(),
|
||||
test::LegacyBloomFilterPolicy::kClassName(),
|
||||
test::FastLocalBloomFilterPolicy::kClassName(),
|
||||
test::Standard128RibbonFilterPolicy::kClassName(),
|
||||
DeprecatedBlockBasedBloomFilterPolicy::kClassName(),
|
||||
BloomFilterPolicy::kClassName(),
|
||||
RibbonFilterPolicy::kClassName(),
|
||||
};
|
||||
|
||||
// For efficiency, do a prefix seek and see if the first match is
|
||||
// good.
|
||||
meta_iter->Seek(prefix);
|
||||
if (meta_iter->status().ok() && meta_iter->Valid()) {
|
||||
Slice key = meta_iter->key();
|
||||
if (key.starts_with(prefix)) {
|
||||
key.remove_prefix(prefix.size());
|
||||
if (kBuiltinNameAndAliases.find(key.ToString()) !=
|
||||
kBuiltinNameAndAliases.end()) {
|
||||
Slice v = meta_iter->value();
|
||||
Status s = rep_->filter_handle.DecodeFrom(&v);
|
||||
if (s.ok()) {
|
||||
rep_->filter_type = filter_type;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
std::string filter_block_key = prefix + name;
|
||||
if (FindMetaBlock(meta_iter, filter_block_key, &rep_->filter_handle)
|
||||
.ok()) {
|
||||
rep_->filter_type = filter_type;
|
||||
break;
|
||||
case Rep::FilterType::kPartitionedFilter:
|
||||
prefix = kPartitionedFilterBlockPrefix;
|
||||
break;
|
||||
case Rep::FilterType::kBlockFilter:
|
||||
prefix = kFilterBlockPrefix;
|
||||
break;
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
std::string filter_block_key = prefix;
|
||||
filter_block_key.append(rep_->filter_policy->Name());
|
||||
if (FindMetaBlock(meta_iter, filter_block_key, &rep_->filter_handle)
|
||||
.ok()) {
|
||||
rep_->filter_type = filter_type;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -932,8 +960,8 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks(
|
||||
rep_->index_type == BlockBasedTableOptions::kTwoLevelIndexSearch);
|
||||
|
||||
// Find compression dictionary handle
|
||||
s = FindOptionalMetaBlock(meta_iter, kCompressionDictBlockName,
|
||||
&rep_->compression_dict_handle);
|
||||
Status s = FindOptionalMetaBlock(meta_iter, kCompressionDictBlockName,
|
||||
&rep_->compression_dict_handle);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
@ -1437,9 +1465,10 @@ template <typename TBlocklike>
|
||||
Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
|
||||
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
|
||||
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
|
||||
const bool wait, CachableEntry<TBlocklike>* block_entry,
|
||||
BlockType block_type, GetContext* get_context,
|
||||
BlockCacheLookupContext* lookup_context, BlockContents* contents) const {
|
||||
const bool wait, const bool for_compaction,
|
||||
CachableEntry<TBlocklike>* block_entry, BlockType block_type,
|
||||
GetContext* get_context, BlockCacheLookupContext* lookup_context,
|
||||
BlockContents* contents) const {
|
||||
assert(block_entry != nullptr);
|
||||
const bool no_io = (ro.read_tier == kBlockCacheTier);
|
||||
Cache* block_cache = rep_->table_options.block_cache.get();
|
||||
@ -1492,7 +1521,9 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
|
||||
CompressionType raw_block_comp_type;
|
||||
BlockContents raw_block_contents;
|
||||
if (!contents) {
|
||||
StopWatch sw(rep_->ioptions.clock, statistics, READ_BLOCK_GET_MICROS);
|
||||
Histograms histogram = for_compaction ? READ_BLOCK_COMPACTION_MICROS
|
||||
: READ_BLOCK_GET_MICROS;
|
||||
StopWatch sw(rep_->ioptions.clock, statistics, histogram);
|
||||
BlockFetcher block_fetcher(
|
||||
rep_->file.get(), prefetch_buffer, rep_->footer, ro, handle,
|
||||
&raw_block_contents, rep_->ioptions, do_uncompress,
|
||||
@ -1850,8 +1881,9 @@ void BlockBasedTable::RetrieveMultipleBlocks(
|
||||
// avoid looking up the block cache
|
||||
s = MaybeReadBlockAndLoadToCache(
|
||||
nullptr, options, handle, uncompression_dict, /*wait=*/true,
|
||||
block_entry, BlockType::kData, mget_iter->get_context,
|
||||
&lookup_data_block_context, &raw_block_contents);
|
||||
/*for_compaction=*/false, block_entry, BlockType::kData,
|
||||
mget_iter->get_context, &lookup_data_block_context,
|
||||
&raw_block_contents);
|
||||
|
||||
// block_entry value could be null if no block cache is present, i.e
|
||||
// BlockBasedTableOptions::no_block_cache is true and no compressed
|
||||
@ -1905,7 +1937,7 @@ Status BlockBasedTable::RetrieveBlock(
|
||||
if (use_cache) {
|
||||
s = MaybeReadBlockAndLoadToCache(
|
||||
prefetch_buffer, ro, handle, uncompression_dict, wait_for_cache,
|
||||
block_entry, block_type, get_context, lookup_context,
|
||||
for_compaction, block_entry, block_type, get_context, lookup_context,
|
||||
/*contents=*/nullptr);
|
||||
|
||||
if (!s.ok()) {
|
||||
@ -1934,8 +1966,9 @@ Status BlockBasedTable::RetrieveBlock(
|
||||
std::unique_ptr<TBlocklike> block;
|
||||
|
||||
{
|
||||
StopWatch sw(rep_->ioptions.clock, rep_->ioptions.stats,
|
||||
READ_BLOCK_GET_MICROS);
|
||||
Histograms histogram =
|
||||
for_compaction ? READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS;
|
||||
StopWatch sw(rep_->ioptions.clock, rep_->ioptions.stats, histogram);
|
||||
s = ReadBlockFromFile(
|
||||
rep_->file.get(), prefetch_buffer, rep_->footer, ro, handle, &block,
|
||||
rep_->ioptions, do_uncompress, maybe_compressed, block_type,
|
||||
|
@ -343,9 +343,10 @@ class BlockBasedTable : public TableReader {
|
||||
Status MaybeReadBlockAndLoadToCache(
|
||||
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
|
||||
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
|
||||
const bool wait, CachableEntry<TBlocklike>* block_entry,
|
||||
BlockType block_type, GetContext* get_context,
|
||||
BlockCacheLookupContext* lookup_context, BlockContents* contents) const;
|
||||
const bool wait, const bool for_compaction,
|
||||
CachableEntry<TBlocklike>* block_entry, BlockType block_type,
|
||||
GetContext* get_context, BlockCacheLookupContext* lookup_context,
|
||||
BlockContents* contents) const;
|
||||
|
||||
// Similar to the above, with one crucial difference: it will retrieve the
|
||||
// block from the file even if there are no caches configured (assuming the
|
||||
@ -654,10 +655,10 @@ struct BlockBasedTable::Rep {
|
||||
std::unique_ptr<FilePrefetchBuffer>* fpb,
|
||||
bool implicit_auto_readahead,
|
||||
bool async_io) const {
|
||||
fpb->reset(new FilePrefetchBuffer(readahead_size, max_readahead_size,
|
||||
!ioptions.allow_mmap_reads /* enable */,
|
||||
false /* track_min_offset */,
|
||||
implicit_auto_readahead, async_io));
|
||||
fpb->reset(new FilePrefetchBuffer(
|
||||
readahead_size, max_readahead_size,
|
||||
!ioptions.allow_mmap_reads /* enable */, false /* track_min_offset */,
|
||||
implicit_auto_readahead, async_io, ioptions.fs.get()));
|
||||
}
|
||||
|
||||
void CreateFilePrefetchBufferIfNotExists(
|
||||
|
@ -1325,6 +1325,16 @@ bool BuiltinFilterPolicy::IsInstanceOf(const std::string& name) const {
|
||||
}
|
||||
}
|
||||
|
||||
static const char* kBuiltinFilterMetadataName = "rocksdb.BuiltinBloomFilter";
|
||||
|
||||
const char* BuiltinFilterPolicy::kCompatibilityName() {
|
||||
return kBuiltinFilterMetadataName;
|
||||
}
|
||||
|
||||
const char* BuiltinFilterPolicy::CompatibilityName() const {
|
||||
return kBuiltinFilterMetadataName;
|
||||
}
|
||||
|
||||
BloomLikeFilterPolicy::BloomLikeFilterPolicy(double bits_per_key)
|
||||
: warned_(false), aggregate_rounding_balance_(0) {
|
||||
// Sanitize bits_per_key
|
||||
@ -1372,7 +1382,7 @@ bool BloomLikeFilterPolicy::IsInstanceOf(const std::string& name) const {
|
||||
}
|
||||
|
||||
const char* ReadOnlyBuiltinFilterPolicy::kClassName() {
|
||||
return "rocksdb.BuiltinBloomFilter";
|
||||
return kBuiltinFilterMetadataName;
|
||||
}
|
||||
|
||||
const char* DeprecatedBlockBasedBloomFilterPolicy::kClassName() {
|
||||
|
@ -135,6 +135,9 @@ class BuiltinFilterPolicy : public FilterPolicy {
|
||||
FilterBitsReader* GetFilterBitsReader(const Slice& contents) const override;
|
||||
static const char* kClassName();
|
||||
bool IsInstanceOf(const std::string& id) const override;
|
||||
// All variants of BuiltinFilterPolicy can read each others filters.
|
||||
const char* CompatibilityName() const override;
|
||||
static const char* kCompatibilityName();
|
||||
|
||||
public: // new
|
||||
// An internal function for the implementation of
|
||||
|
@ -84,6 +84,7 @@ class TestFilterBitsReader : public FilterBitsReader {
|
||||
class TestHashFilter : public FilterPolicy {
|
||||
public:
|
||||
const char* Name() const override { return "TestHashFilter"; }
|
||||
const char* CompatibilityName() const override { return Name(); }
|
||||
|
||||
FilterBitsBuilder* GetBuilderWithContext(
|
||||
const FilterBuildingContext&) const override {
|
||||
|
@ -520,8 +520,8 @@ Status PartitionedFilterBlockReader::CacheDependencies(const ReadOptions& ro,
|
||||
// filter blocks
|
||||
s = table()->MaybeReadBlockAndLoadToCache(
|
||||
prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(),
|
||||
/* wait */ true, &block, BlockType::kFilter, nullptr /* get_context */,
|
||||
&lookup_context, nullptr /* contents */);
|
||||
/* wait */ true, /* for_compaction */ false, &block, BlockType::kFilter,
|
||||
nullptr /* get_context */, &lookup_context, nullptr /* contents */);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
@ -182,8 +182,8 @@ Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro,
|
||||
// filter blocks
|
||||
Status s = table()->MaybeReadBlockAndLoadToCache(
|
||||
prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(),
|
||||
/*wait=*/true, &block, BlockType::kIndex, /*get_context=*/nullptr,
|
||||
&lookup_context, /*contents=*/nullptr);
|
||||
/*wait=*/true, /*for_compaction=*/false, &block, BlockType::kIndex,
|
||||
/*get_context=*/nullptr, &lookup_context, /*contents=*/nullptr);
|
||||
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
|
@ -74,8 +74,7 @@ inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
|
||||
if (read_options_.async_io) {
|
||||
read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCacheAsync(
|
||||
opts, file_, handle_.offset(), block_size_with_trailer_, &slice_,
|
||||
&io_s, read_options_.rate_limiter_priority, for_compaction_,
|
||||
ioptions_.fs.get());
|
||||
&io_s, read_options_.rate_limiter_priority, for_compaction_);
|
||||
} else {
|
||||
read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCache(
|
||||
opts, file_, handle_.offset(), block_size_with_trailer_, &slice_,
|
||||
|
@ -93,7 +93,7 @@ void BlobDBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
|
||||
for (auto bfile_pair : blob_files_) {
|
||||
auto blob_file = bfile_pair.second;
|
||||
LiveFileMetaData filemetadata;
|
||||
filemetadata.size = static_cast<size_t>(blob_file->GetFileSize());
|
||||
filemetadata.size = blob_file->GetFileSize();
|
||||
const uint64_t file_number = blob_file->BlobFileNumber();
|
||||
// Path should be relative to db_name, but begin with slash.
|
||||
filemetadata.name = BlobFileName("", bdb_options_.blob_dir, file_number);
|
||||
|
Loading…
Reference in New Issue
Block a user