c2485f2d81
Summary: A new file interface `SupportPrefetch()` is added. When the user overrides it to `false`, an internal prefetch buffer will be used for readahead. Useful for non-directIO but FS doesn't have readahead support. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7312 Reviewed By: anand1976 Differential Revision: D23329847 Pulled By: jay-zhuang fbshipit-source-id: 71cd4ce6f4a820840294e4e6aec111ab76175527
187 lines
5.8 KiB
C++
187 lines
5.8 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "db/db_test_util.h"
|
|
#include "test_util/sync_point.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
class MockFS;
|
|
|
|
class MockRandomAccessFile : public FSRandomAccessFileWrapper {
|
|
public:
|
|
MockRandomAccessFile(std::unique_ptr<FSRandomAccessFile>& file,
|
|
bool support_prefetch, std::atomic_int& prefetch_count)
|
|
: FSRandomAccessFileWrapper(file.get()),
|
|
file_(std::move(file)),
|
|
support_prefetch_(support_prefetch),
|
|
prefetch_count_(prefetch_count) {}
|
|
|
|
IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options,
|
|
IODebugContext* dbg) override {
|
|
if (support_prefetch_) {
|
|
prefetch_count_.fetch_add(1);
|
|
return target()->Prefetch(offset, n, options, dbg);
|
|
} else {
|
|
return IOStatus::NotSupported();
|
|
}
|
|
}
|
|
|
|
private:
|
|
std::unique_ptr<FSRandomAccessFile> file_;
|
|
const bool support_prefetch_;
|
|
std::atomic_int& prefetch_count_;
|
|
};
|
|
|
|
class MockFS : public FileSystemWrapper {
|
|
public:
|
|
explicit MockFS(bool support_prefetch)
|
|
: FileSystemWrapper(FileSystem::Default()),
|
|
support_prefetch_(support_prefetch) {}
|
|
|
|
IOStatus NewRandomAccessFile(const std::string& fname,
|
|
const FileOptions& opts,
|
|
std::unique_ptr<FSRandomAccessFile>* result,
|
|
IODebugContext* dbg) override {
|
|
std::unique_ptr<FSRandomAccessFile> file;
|
|
IOStatus s;
|
|
s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
|
|
result->reset(
|
|
new MockRandomAccessFile(file, support_prefetch_, prefetch_count_));
|
|
return s;
|
|
}
|
|
|
|
void ClearPrefetchCount() { prefetch_count_ = 0; }
|
|
|
|
bool IsPrefetchCalled() { return prefetch_count_ > 0; }
|
|
|
|
private:
|
|
const bool support_prefetch_;
|
|
std::atomic_int prefetch_count_{0};
|
|
};
|
|
|
|
class PrefetchTest
|
|
: public DBTestBase,
|
|
public ::testing::WithParamInterface<std::tuple<bool, bool>> {
|
|
public:
|
|
PrefetchTest() : DBTestBase("/prefetch_test", true) {}
|
|
};
|
|
|
|
std::string BuildKey(int num, std::string postfix = "") {
|
|
return "my_key_" + std::to_string(num) + postfix;
|
|
}
|
|
|
|
TEST_P(PrefetchTest, Basic) {
|
|
// First param is if the mockFS support_prefetch or not
|
|
bool support_prefetch = std::get<0>(GetParam());
|
|
|
|
// Second param is if directIO is enabled or not
|
|
bool use_direct_io = std::get<1>(GetParam());
|
|
|
|
const int kNumKeys = 1100;
|
|
std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(support_prefetch);
|
|
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
|
|
Options options = CurrentOptions();
|
|
options.write_buffer_size = 1024;
|
|
options.create_if_missing = true;
|
|
options.compression = kNoCompression;
|
|
options.env = env.get();
|
|
if (use_direct_io) {
|
|
options.use_direct_reads = true;
|
|
options.use_direct_io_for_flush_and_compaction = true;
|
|
}
|
|
|
|
int buff_prefetch_count = 0;
|
|
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
|
|
[&](void*) { buff_prefetch_count++; });
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
Status s = TryReopen(options);
|
|
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
|
|
// If direct IO is not supported, skip the test
|
|
return;
|
|
} else {
|
|
ASSERT_OK(s);
|
|
}
|
|
|
|
// create first key range
|
|
WriteBatch batch;
|
|
for (int i = 0; i < kNumKeys; i++) {
|
|
batch.Put(BuildKey(i), "value for range 1 key");
|
|
}
|
|
ASSERT_OK(db_->Write(WriteOptions(), &batch));
|
|
|
|
// create second key range
|
|
batch.Clear();
|
|
for (int i = 0; i < kNumKeys; i++) {
|
|
batch.Put(BuildKey(i, "key2"), "value for range 2 key");
|
|
}
|
|
ASSERT_OK(db_->Write(WriteOptions(), &batch));
|
|
|
|
// delete second key range
|
|
batch.Clear();
|
|
for (int i = 0; i < kNumKeys; i++) {
|
|
batch.Delete(BuildKey(i, "key2"));
|
|
}
|
|
ASSERT_OK(db_->Write(WriteOptions(), &batch));
|
|
|
|
// compact database
|
|
std::string start_key = BuildKey(0);
|
|
std::string end_key = BuildKey(kNumKeys - 1);
|
|
Slice least(start_key.data(), start_key.size());
|
|
Slice greatest(end_key.data(), end_key.size());
|
|
|
|
// commenting out the line below causes the example to work correctly
|
|
db_->CompactRange(CompactRangeOptions(), &least, &greatest);
|
|
|
|
if (support_prefetch && !use_direct_io) {
|
|
// If underline file system supports prefetch, and directIO is not enabled
|
|
// make sure prefetch() is called and FilePrefetchBuffer is not used.
|
|
ASSERT_TRUE(fs->IsPrefetchCalled());
|
|
fs->ClearPrefetchCount();
|
|
ASSERT_EQ(0, buff_prefetch_count);
|
|
} else {
|
|
// If underline file system doesn't support prefetch, or directIO is
|
|
// enabled, make sure prefetch() is not called and FilePrefetchBuffer is
|
|
// used.
|
|
ASSERT_FALSE(fs->IsPrefetchCalled());
|
|
ASSERT_GT(buff_prefetch_count, 0);
|
|
buff_prefetch_count = 0;
|
|
}
|
|
|
|
// count the keys
|
|
{
|
|
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
|
|
int num_keys = 0;
|
|
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
|
num_keys++;
|
|
}
|
|
}
|
|
|
|
// Make sure prefetch is called only if file system support prefetch.
|
|
if (support_prefetch && !use_direct_io) {
|
|
ASSERT_TRUE(fs->IsPrefetchCalled());
|
|
fs->ClearPrefetchCount();
|
|
ASSERT_EQ(0, buff_prefetch_count);
|
|
} else {
|
|
ASSERT_FALSE(fs->IsPrefetchCalled());
|
|
ASSERT_GT(buff_prefetch_count, 0);
|
|
buff_prefetch_count = 0;
|
|
}
|
|
Close();
|
|
}
|
|
|
|
INSTANTIATE_TEST_CASE_P(PrefetchTest, PrefetchTest,
|
|
::testing::Combine(::testing::Bool(),
|
|
::testing::Bool()));
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
int main(int argc, char** argv) {
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
|
|
return RUN_ALL_TESTS();
|
|
}
|