A few more tests, a fix, and a perf improvement
This commit is contained in:
parent
2a2b36afcc
commit
7c6d3fddec
@ -12,6 +12,7 @@
|
||||
#include "db/db_test_util.h"
|
||||
#include "options/options_helper.h"
|
||||
#include "port/stack_trace.h"
|
||||
#include "rocksdb/filter_policy.h"
|
||||
#include "rocksdb/flush_block_policy.h"
|
||||
#include "rocksdb/merge_operator.h"
|
||||
#include "rocksdb/perf_context.h"
|
||||
@ -2066,6 +2067,220 @@ TEST_P(DBMultiGetTestWithParam, MultiGetBatchedValueSizeMultiLevelMerge) {
|
||||
INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam,
|
||||
testing::Combine(testing::Bool(), testing::Bool()));
|
||||
|
||||
#if USE_COROUTINES
|
||||
class DBMultiGetAsyncIOTest : public DBBasicTest {
|
||||
public:
|
||||
DBMultiGetAsyncIOTest()
|
||||
: DBBasicTest(), statistics_(ROCKSDB_NAMESPACE::CreateDBStatistics()) {
|
||||
BlockBasedTableOptions bbto;
|
||||
bbto.filter_policy.reset(NewBloomFilterPolicy(10));
|
||||
Options options = CurrentOptions();
|
||||
options.disable_auto_compactions = true;
|
||||
options.statistics = statistics_;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
||||
Reopen(options);
|
||||
int num_keys = 0;
|
||||
|
||||
// Put all keys in the bottommost level, and overwrite some keys
|
||||
// in L0 and L1
|
||||
for (int i = 0; i < 128; ++i) {
|
||||
EXPECT_OK(Put(Key(i), "val_l2_" + std::to_string(i)));
|
||||
num_keys++;
|
||||
if (num_keys == 8) {
|
||||
EXPECT_OK(Flush());
|
||||
num_keys = 0;
|
||||
}
|
||||
}
|
||||
if (num_keys > 0) {
|
||||
EXPECT_OK(Flush());
|
||||
num_keys = 0;
|
||||
}
|
||||
MoveFilesToLevel(2);
|
||||
|
||||
for (int i = 0; i < 128; i += 3) {
|
||||
EXPECT_OK(Put(Key(i), "val_l1_" + std::to_string(i)));
|
||||
num_keys++;
|
||||
if (num_keys == 8) {
|
||||
EXPECT_OK(Flush());
|
||||
num_keys = 0;
|
||||
}
|
||||
}
|
||||
if (num_keys > 0) {
|
||||
EXPECT_OK(Flush());
|
||||
num_keys = 0;
|
||||
}
|
||||
MoveFilesToLevel(1);
|
||||
|
||||
for (int i = 0; i < 128; i += 5) {
|
||||
EXPECT_OK(Put(Key(i), "val_l0_" + std::to_string(i)));
|
||||
num_keys++;
|
||||
if (num_keys == 8) {
|
||||
EXPECT_OK(Flush());
|
||||
num_keys = 0;
|
||||
}
|
||||
}
|
||||
if (num_keys > 0) {
|
||||
EXPECT_OK(Flush());
|
||||
num_keys = 0;
|
||||
}
|
||||
EXPECT_EQ(0, num_keys);
|
||||
}
|
||||
|
||||
const std::shared_ptr<Statistics>& statistics() { return statistics_; }
|
||||
|
||||
private:
|
||||
std::shared_ptr<Statistics> statistics_;
|
||||
};
|
||||
|
||||
TEST_F(DBMultiGetAsyncIOTest, GetFromL0) {
|
||||
std::vector<std::string> key_strs;
|
||||
std::vector<Slice> keys;
|
||||
std::vector<PinnableSlice> values;
|
||||
std::vector<Status> statuses;
|
||||
|
||||
// All 3 keys in L0. The L0 files should be read serially.
|
||||
key_strs.push_back(Key(0));
|
||||
key_strs.push_back(Key(40));
|
||||
key_strs.push_back(Key(80));
|
||||
keys.push_back(key_strs[0]);
|
||||
keys.push_back(key_strs[1]);
|
||||
keys.push_back(key_strs[2]);
|
||||
values.resize(keys.size());
|
||||
statuses.resize(keys.size());
|
||||
|
||||
ReadOptions ro;
|
||||
ro.async_io = true;
|
||||
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
|
||||
keys.data(), values.data(), statuses.data());
|
||||
ASSERT_EQ(values.size(), 3);
|
||||
ASSERT_EQ(values[0], "val_l0_" + std::to_string(0));
|
||||
ASSERT_EQ(values[1], "val_l0_" + std::to_string(40));
|
||||
ASSERT_EQ(values[2], "val_l0_" + std::to_string(80));
|
||||
|
||||
HistogramData multiget_io_batch_size;
|
||||
|
||||
statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
|
||||
|
||||
// No async IO in this case since we don't do parallel lookup in L0
|
||||
ASSERT_EQ(multiget_io_batch_size.count, 0);
|
||||
ASSERT_EQ(multiget_io_batch_size.max, 0);
|
||||
}
|
||||
|
||||
TEST_F(DBMultiGetAsyncIOTest, GetFromL1) {
|
||||
std::vector<std::string> key_strs;
|
||||
std::vector<Slice> keys;
|
||||
std::vector<PinnableSlice> values;
|
||||
std::vector<Status> statuses;
|
||||
|
||||
key_strs.push_back(Key(33));
|
||||
key_strs.push_back(Key(54));
|
||||
key_strs.push_back(Key(102));
|
||||
keys.push_back(key_strs[0]);
|
||||
keys.push_back(key_strs[1]);
|
||||
keys.push_back(key_strs[2]);
|
||||
values.resize(keys.size());
|
||||
statuses.resize(keys.size());
|
||||
|
||||
ReadOptions ro;
|
||||
ro.async_io = true;
|
||||
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
|
||||
keys.data(), values.data(), statuses.data());
|
||||
ASSERT_EQ(values.size(), 3);
|
||||
ASSERT_EQ(statuses[0], Status::OK());
|
||||
ASSERT_EQ(statuses[1], Status::OK());
|
||||
ASSERT_EQ(statuses[2], Status::OK());
|
||||
ASSERT_EQ(values[0], "val_l1_" + std::to_string(33));
|
||||
ASSERT_EQ(values[1], "val_l1_" + std::to_string(54));
|
||||
ASSERT_EQ(values[2], "val_l1_" + std::to_string(102));
|
||||
|
||||
HistogramData multiget_io_batch_size;
|
||||
|
||||
statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
|
||||
|
||||
// A batch of 3 async IOs is expected, one for each overlapping file in L1
|
||||
ASSERT_EQ(multiget_io_batch_size.count, 1);
|
||||
ASSERT_EQ(multiget_io_batch_size.max, 3);
|
||||
}
|
||||
|
||||
TEST_F(DBMultiGetAsyncIOTest, LastKeyInFile) {
|
||||
std::vector<std::string> key_strs;
|
||||
std::vector<Slice> keys;
|
||||
std::vector<PinnableSlice> values;
|
||||
std::vector<Status> statuses;
|
||||
|
||||
// 24 is the last key in the first L1 file
|
||||
key_strs.push_back(Key(21));
|
||||
key_strs.push_back(Key(54));
|
||||
key_strs.push_back(Key(102));
|
||||
keys.push_back(key_strs[0]);
|
||||
keys.push_back(key_strs[1]);
|
||||
keys.push_back(key_strs[2]);
|
||||
values.resize(keys.size());
|
||||
statuses.resize(keys.size());
|
||||
|
||||
ReadOptions ro;
|
||||
ro.async_io = true;
|
||||
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
|
||||
keys.data(), values.data(), statuses.data());
|
||||
ASSERT_EQ(values.size(), 3);
|
||||
ASSERT_EQ(statuses[0], Status::OK());
|
||||
ASSERT_EQ(statuses[1], Status::OK());
|
||||
ASSERT_EQ(statuses[2], Status::OK());
|
||||
ASSERT_EQ(values[0], "val_l1_" + std::to_string(21));
|
||||
ASSERT_EQ(values[1], "val_l1_" + std::to_string(54));
|
||||
ASSERT_EQ(values[2], "val_l1_" + std::to_string(102));
|
||||
|
||||
HistogramData multiget_io_batch_size;
|
||||
|
||||
statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
|
||||
|
||||
// Since the first MultiGet key is the last key in a file, the MultiGet is
|
||||
// expected to lookup in that file first, before moving on to other files.
|
||||
// So the first file lookup will issue one async read, and the next lookup
|
||||
// will lookup 2 files in parallel and issue 2 async reads
|
||||
ASSERT_EQ(multiget_io_batch_size.count, 2);
|
||||
ASSERT_EQ(multiget_io_batch_size.max, 2);
|
||||
}
|
||||
|
||||
TEST_F(DBMultiGetAsyncIOTest, GetFromL1AndL2) {
|
||||
std::vector<std::string> key_strs;
|
||||
std::vector<Slice> keys;
|
||||
std::vector<PinnableSlice> values;
|
||||
std::vector<Status> statuses;
|
||||
|
||||
// 33 and 102 are in L1, and 56 is in L2
|
||||
key_strs.push_back(Key(33));
|
||||
key_strs.push_back(Key(56));
|
||||
key_strs.push_back(Key(102));
|
||||
keys.push_back(key_strs[0]);
|
||||
keys.push_back(key_strs[1]);
|
||||
keys.push_back(key_strs[2]);
|
||||
values.resize(keys.size());
|
||||
statuses.resize(keys.size());
|
||||
|
||||
ReadOptions ro;
|
||||
ro.async_io = true;
|
||||
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
|
||||
keys.data(), values.data(), statuses.data());
|
||||
ASSERT_EQ(values.size(), 3);
|
||||
ASSERT_EQ(statuses[0], Status::OK());
|
||||
ASSERT_EQ(statuses[1], Status::OK());
|
||||
ASSERT_EQ(statuses[2], Status::OK());
|
||||
ASSERT_EQ(values[0], "val_l1_" + std::to_string(33));
|
||||
ASSERT_EQ(values[1], "val_l2_" + std::to_string(56));
|
||||
ASSERT_EQ(values[2], "val_l1_" + std::to_string(102));
|
||||
|
||||
HistogramData multiget_io_batch_size;
|
||||
|
||||
statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
|
||||
|
||||
// There is only one MultiGet key in the bottommost level - 56. Thus
|
||||
// the bottommost level will not use async IO.
|
||||
ASSERT_EQ(multiget_io_batch_size.count, 1);
|
||||
ASSERT_EQ(multiget_io_batch_size.max, 2);
|
||||
}
|
||||
#endif // USE_COROUTINES
|
||||
|
||||
TEST_F(DBBasicTest, MultiGetStats) {
|
||||
Options options;
|
||||
options.create_if_missing = true;
|
||||
@ -3435,6 +3650,11 @@ class DeadlineRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
|
||||
IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
|
||||
const IOOptions& options, IODebugContext* dbg) override;
|
||||
|
||||
IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts,
|
||||
std::function<void(const FSReadRequest&, void*)> cb,
|
||||
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
|
||||
IODebugContext* dbg) override;
|
||||
|
||||
private:
|
||||
DeadlineFS& fs_;
|
||||
std::unique_ptr<FSRandomAccessFile> file_;
|
||||
@ -3575,6 +3795,26 @@ IOStatus DeadlineRandomAccessFile::Read(uint64_t offset, size_t len,
|
||||
return s;
|
||||
}
|
||||
|
||||
IOStatus DeadlineRandomAccessFile::ReadAsync(
|
||||
FSReadRequest& req, const IOOptions& opts,
|
||||
std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg,
|
||||
void** io_handle, IOHandleDeleter* del_fn, IODebugContext* dbg) {
|
||||
const std::chrono::microseconds deadline = fs_.GetDeadline();
|
||||
const std::chrono::microseconds io_timeout = fs_.GetIOTimeout();
|
||||
IOStatus s;
|
||||
if (deadline.count() || io_timeout.count()) {
|
||||
fs_.AssertDeadline(deadline, io_timeout, opts);
|
||||
}
|
||||
if (s.ok()) {
|
||||
s = FSRandomAccessFileWrapper::ReadAsync(req, opts, cb, cb_arg, io_handle,
|
||||
del_fn, dbg);
|
||||
}
|
||||
if (s.ok()) {
|
||||
s = fs_.ShouldDelay(opts);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
IOStatus DeadlineRandomAccessFile::MultiRead(FSReadRequest* reqs,
|
||||
size_t num_reqs,
|
||||
const IOOptions& options,
|
||||
@ -3596,7 +3836,8 @@ IOStatus DeadlineRandomAccessFile::MultiRead(FSReadRequest* reqs,
|
||||
|
||||
// A test class for intercepting random reads and injecting artificial
|
||||
// delays. Used for testing the MultiGet deadline feature
|
||||
class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet {
|
||||
class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet,
|
||||
public testing::WithParamInterface<bool> {
|
||||
public:
|
||||
DBBasicTestMultiGetDeadline()
|
||||
: DBBasicTestMultiGet(
|
||||
@ -3619,7 +3860,13 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet {
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
|
||||
TEST_P(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
|
||||
#ifndef USE_COROUTINES
|
||||
if (GetParam()) {
|
||||
ROCKSDB_GTEST_SKIP("This test requires coroutine support");
|
||||
return;
|
||||
}
|
||||
#endif // USE_COROUTINES
|
||||
std::shared_ptr<DeadlineFS> fs = std::make_shared<DeadlineFS>(env_, false);
|
||||
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
|
||||
Options options = CurrentOptions();
|
||||
@ -3650,6 +3897,7 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
|
||||
|
||||
ReadOptions ro;
|
||||
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
|
||||
ro.async_io = GetParam();
|
||||
// Delay the first IO
|
||||
fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 0);
|
||||
|
||||
@ -3752,6 +4000,9 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
|
||||
Close();
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(DeadlineIO, DBBasicTestMultiGetDeadline,
|
||||
::testing::Bool());
|
||||
|
||||
TEST_F(DBBasicTest, ManifestWriteFailure) {
|
||||
Options options = GetDefaultOptions();
|
||||
options.create_if_missing = true;
|
||||
|
@ -592,6 +592,10 @@ class FilePickerMultiGet {
|
||||
|
||||
const MultiGetRange& CurrentFileRange() { return current_file_range_; }
|
||||
|
||||
bool RemainingOverlapInLevel() {
|
||||
return !current_level_range_.Suffix(current_file_range_).empty();
|
||||
}
|
||||
|
||||
private:
|
||||
unsigned int num_levels_;
|
||||
unsigned int curr_level_;
|
||||
@ -2231,7 +2235,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
|
||||
// L0 files won't be parallelized anyway. The regular synchronous version
|
||||
// is faster.
|
||||
if (!read_options.async_io || !using_coroutines() ||
|
||||
fp.GetHitFileLevel() == 0) {
|
||||
fp.GetHitFileLevel() == 0 || !fp.RemainingOverlapInLevel()) {
|
||||
if (f) {
|
||||
// Call MultiGetFromSST for looking up a single file
|
||||
s = MultiGetFromSST(read_options, fp.CurrentFileRange(),
|
||||
|
@ -290,6 +290,18 @@ class MultiGetContext {
|
||||
|
||||
MultiGetContext* context() const { return ctx_; }
|
||||
|
||||
Range Suffix(const Range& other) const {
|
||||
size_t other_last = other.FindLastRemaining();
|
||||
size_t my_last = FindLastRemaining();
|
||||
|
||||
if (my_last > other_last) {
|
||||
return Range(*this, Iterator(this, other_last),
|
||||
Iterator(this, my_last));
|
||||
} else {
|
||||
return Range(*this, begin(), begin());
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
friend MultiGetContext;
|
||||
MultiGetContext* ctx_;
|
||||
@ -306,6 +318,15 @@ class MultiGetContext {
|
||||
return (((Mask{1} << end_) - 1) & ~((Mask{1} << start_) - 1) &
|
||||
~(ctx_->value_mask_ | skip_mask_));
|
||||
}
|
||||
|
||||
size_t FindLastRemaining() const {
|
||||
Mask mask = RemainingMask();
|
||||
size_t index = (mask >>= start_) ? start_ : 0;
|
||||
while (mask >>= 1) {
|
||||
index++;
|
||||
}
|
||||
return index;
|
||||
}
|
||||
};
|
||||
|
||||
// Return the initial range that encompasses all the keys in the batch
|
||||
|
@ -19,6 +19,8 @@ bool AsyncFileReader::MultiReadAsyncImpl(ReadAwaiter* awaiter) {
|
||||
awaiter->io_handle_.resize(awaiter->num_reqs_);
|
||||
awaiter->del_fn_.resize(awaiter->num_reqs_);
|
||||
for (size_t i = 0; i < awaiter->num_reqs_; ++i) {
|
||||
awaiter->io_handle_.push_back(nullptr);
|
||||
awaiter->del_fn_.push_back(nullptr);
|
||||
awaiter->file_
|
||||
->ReadAsync(
|
||||
awaiter->read_reqs_[i], awaiter->opts_,
|
||||
@ -44,10 +46,12 @@ void AsyncFileReader::Poll() {
|
||||
waiter = head_;
|
||||
do {
|
||||
for (size_t i = 0; i < waiter->num_reqs_; ++i) {
|
||||
io_handles.push_back(waiter->io_handle_[i]);
|
||||
if (waiter->io_handle_[i]) {
|
||||
io_handles.push_back(waiter->io_handle_[i]);
|
||||
}
|
||||
}
|
||||
} while (waiter != tail_ && (waiter = waiter->next_));
|
||||
{
|
||||
if (io_handles.size() > 0) {
|
||||
StopWatch sw(SystemClock::Default().get(), stats_, POLL_WAIT_MICROS);
|
||||
fs_->Poll(io_handles, io_handles.size()).PermitUncheckedError();
|
||||
}
|
||||
@ -56,7 +60,9 @@ void AsyncFileReader::Poll() {
|
||||
head_ = waiter->next_;
|
||||
|
||||
for (size_t i = 0; i < waiter->num_reqs_; ++i) {
|
||||
waiter->del_fn_[i](waiter->io_handle_[i]);
|
||||
if (waiter->io_handle_[i] && waiter->del_fn_[i]) {
|
||||
waiter->del_fn_[i](waiter->io_handle_[i]);
|
||||
}
|
||||
}
|
||||
waiter->awaiting_coro_.resume();
|
||||
} while (waiter != tail_);
|
||||
|
Loading…
Reference in New Issue
Block a user