Enable IO Uring in MultiGet in direct IO mode (#6815)

Summary:
Currently, in direct IO mode, `MultiGet` retrieves the data blocks one by one instead of in parallel, see `BlockBasedTable::RetrieveMultipleBlocks`.

Since direct IO is supported in `RandomAccessFileReader::MultiRead` in https://github.com/facebook/rocksdb/pull/6446, this PR applies `MultiRead` to `MultiGet` so that the data blocks can be retrieved in parallel.

Also, in direct IO mode and when data blocks are compressed and need to uncompressed, this PR only allocates one continuous aligned buffer to hold the data blocks, and then directly uncompress the blocks to insert into block cache, there is no longer intermediate copies to scratch buffers.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6815

Test Plan:
1. added a new unit test `BlockBasedTableReaderTest::MultiGet`.
2. existing unit tests and stress tests  contain tests against `MultiGet` in direct IO mode.

Reviewed By: anand1976

Differential Revision: D21426347

Pulled By: cheng-chang

fbshipit-source-id: b8446ae0e74152444ef9111e97f8e402ac31b24f
This commit is contained in:
Cheng Chang 2020-05-14 23:23:32 -07:00 committed by Facebook GitHub Bot
parent b11a8b1b9a
commit 91b7553293
9 changed files with 293 additions and 87 deletions

View File

@ -1069,6 +1069,7 @@ if(WITH_TESTS)
options/options_settable_test.cc
options/options_test.cc
table/block_based/block_based_filter_block_test.cc
table/block_based/block_based_table_reader_test.cc
table/block_based/block_test.cc
table/block_based/data_block_hash_index_test.cc
table/block_based/full_filter_block_test.cc

View File

@ -539,6 +539,7 @@ TESTS = \
random_access_file_reader_test \
file_reader_writer_test \
block_based_filter_block_test \
block_based_table_reader_test \
full_filter_block_test \
partitioned_filter_block_test \
hash_table_test \
@ -1568,6 +1569,9 @@ file_reader_writer_test: util/file_reader_writer_test.o $(LIBOBJECTS) $(TESTHARN
block_based_filter_block_test: table/block_based/block_based_filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
block_based_table_reader_test: table/block_based/block_based_table_reader_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
full_filter_block_test: table/block_based/full_filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

View File

@ -515,6 +515,13 @@ ROCKS_TESTS = [
[],
[],
],
[
"block_based_table_reader_test",
"table/block_based/block_based_table_reader_test.cc",
"serial",
[],
[],
],
[
"block_cache_trace_analyzer_test",
"tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc",

View File

@ -15,7 +15,7 @@ namespace ROCKSDB_NAMESPACE {
class RandomAccessFileReaderTest : public testing::Test {
public:
void SetUp() override {
test::ResetTmpDirForDirectIO();
test::SetupSyncPointsToMockDirectIO();
env_ = Env::Default();
fs_ = FileSystem::Default();
test_dir_ = test::PerThreadDBPath("random_access_file_reader_test");
@ -27,15 +27,6 @@ class RandomAccessFileReaderTest : public testing::Test {
EXPECT_OK(test::DestroyDir(env_, test_dir_));
}
bool IsDirectIOSupported() {
Write(".direct", "");
FileOptions opt;
opt.use_direct_reads = true;
std::unique_ptr<FSRandomAccessFile> f;
auto s = fs_->NewRandomAccessFile(Path(".direct"), opt, &f, nullptr);
return s.ok();
}
void Write(const std::string& fname, const std::string& content) {
std::unique_ptr<FSWritableFile> f;
ASSERT_OK(fs_->NewWritableFile(Path(fname), FileOptions(), &f, nullptr));
@ -84,11 +75,6 @@ class RandomAccessFileReaderTest : public testing::Test {
};
TEST_F(RandomAccessFileReaderTest, ReadDirectIO) {
if (!IsDirectIOSupported()) {
printf("Direct IO is not supported, skip this test\n");
return;
}
std::string fname = "read-direct-io";
Random rand(0);
std::string content;
@ -113,11 +99,6 @@ TEST_F(RandomAccessFileReaderTest, ReadDirectIO) {
}
TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
if (!IsDirectIOSupported()) {
printf("Direct IO is not supported, skip this test\n");
return;
}
// Creates a file with 3 pages.
std::string fname = "multi-read-direct-io";
Random rand(0);

1
src.mk
View File

@ -415,6 +415,7 @@ MAIN_SOURCES = \
monitoring/stats_history_test.cc \
options/options_test.cc \
table/block_based/block_based_filter_block_test.cc \
table/block_based/block_based_table_reader_test.cc \
table/block_based/block_test.cc \
table/block_based/data_block_hash_index_test.cc \
table/block_based/full_filter_block_test.cc \

View File

@ -1594,7 +1594,7 @@ void BlockBasedTable::RetrieveMultipleBlocks(
size_t read_amp_bytes_per_bit = rep_->table_options.read_amp_bytes_per_bit;
MemoryAllocator* memory_allocator = GetMemoryAllocator(rep_->table_options);
if (file->use_direct_io() || ioptions.allow_mmap_reads) {
if (ioptions.allow_mmap_reads) {
size_t idx_in_batch = 0;
for (auto mget_iter = batch->begin(); mget_iter != batch->end();
++mget_iter, ++idx_in_batch) {
@ -1614,6 +1614,10 @@ void BlockBasedTable::RetrieveMultipleBlocks(
return;
}
// In direct IO mode, blocks share the direct io buffer.
// Otherwise, blocks share the scratch buffer.
const bool use_shared_buffer = file->use_direct_io() || scratch != nullptr;
autovector<FSReadRequest, MultiGetContext::MAX_BATCH_SIZE> read_reqs;
size_t buf_offset = 0;
size_t idx_in_batch = 0;
@ -1634,7 +1638,11 @@ void BlockBasedTable::RetrieveMultipleBlocks(
// If current block is adjacent to the previous one, at the same time,
// compression is enabled and there is no compressed cache, we combine
// the two block read as one.
if (scratch != nullptr && prev_end == handle.offset()) {
// We don't combine block reads here in direct IO mode, because when doing
// direct IO read, the block requests will be realigned and merged when
// necessary.
if (use_shared_buffer && !file->use_direct_io() &&
prev_end == handle.offset()) {
req_offset_for_block.emplace_back(prev_len);
prev_len += block_size(handle);
} else {
@ -1644,11 +1652,13 @@ void BlockBasedTable::RetrieveMultipleBlocks(
FSReadRequest req;
req.offset = prev_offset;
req.len = prev_len;
if (scratch == nullptr) {
req.scratch = new char[req.len];
} else {
if (file->use_direct_io()) {
req.scratch = nullptr;
} else if (use_shared_buffer) {
req.scratch = scratch + buf_offset;
buf_offset += req.len;
} else {
req.scratch = new char[req.len];
}
read_reqs.emplace_back(req);
}
@ -1665,14 +1675,17 @@ void BlockBasedTable::RetrieveMultipleBlocks(
FSReadRequest req;
req.offset = prev_offset;
req.len = prev_len;
if (scratch == nullptr) {
req.scratch = new char[req.len];
} else {
if (file->use_direct_io()) {
req.scratch = nullptr;
} else if (use_shared_buffer) {
req.scratch = scratch + buf_offset;
} else {
req.scratch = new char[req.len];
}
read_reqs.emplace_back(req);
}
AlignedBuf direct_io_buf;
{
IOOptions opts;
IOStatus s = PrepareIOFromReadOptions(options, file->env(), opts);
@ -1681,7 +1694,7 @@ void BlockBasedTable::RetrieveMultipleBlocks(
req.status = s;
}
} else {
file->MultiRead(opts, &read_reqs[0], read_reqs.size(), nullptr);
file->MultiRead(opts, &read_reqs[0], read_reqs.size(), &direct_io_buf);
}
}
@ -1721,19 +1734,21 @@ void BlockBasedTable::RetrieveMultipleBlocks(
" bytes, got " + ToString(req.result.size()));
}
bool blocks_share_read_buffer = (req.result.size() != block_size(handle));
if (s.ok()) {
if (scratch == nullptr && !blocks_share_read_buffer) {
if (!use_shared_buffer) {
// We allocated a buffer for this block. Give ownership of it to
// BlockContents so it can free the memory
assert(req.result.data() == req.scratch);
std::unique_ptr<char[]> raw_block(req.scratch + req_offset);
assert(req.result.size() == block_size(handle));
assert(req_offset == 0);
std::unique_ptr<char[]> raw_block(req.scratch);
raw_block_contents = BlockContents(std::move(raw_block), handle.size());
} else {
// We used the scratch buffer which are shared by the blocks.
// We used the scratch buffer or direct io buffer
// which are shared by the blocks.
// raw_block_contents does not have the ownership.
raw_block_contents =
BlockContents(Slice(req.scratch + req_offset, handle.size()));
BlockContents(Slice(req.result.data() + req_offset, handle.size()));
}
#ifndef NDEBUG
@ -1757,16 +1772,15 @@ void BlockBasedTable::RetrieveMultipleBlocks(
}
if (s.ok()) {
// It handles a rare case: compression is set and these is no compressed
// cache (enable combined read). In this case, the scratch != nullptr.
// At the same time, some blocks are actually not compressed,
// since its compression space saving is smaller than the threshold. In
// this case, if the block shares the scratch memory, we need to copy it
// to the heap such that it can be added to the regular block cache.
// When the blocks share the same underlying buffer (scratch or direct io
// buffer), if the block is compressed, the shared buffer will be
// uncompressed into heap during uncompressing; otherwise, we need to
// manually copy the block into heap before inserting the block to block
// cache.
CompressionType compression_type =
raw_block_contents.get_compression_type();
if (scratch != nullptr && compression_type == kNoCompression) {
Slice raw = Slice(req.scratch + req_offset, block_size(handle));
if (use_shared_buffer && compression_type == kNoCompression) {
Slice raw = Slice(req.result.data() + req_offset, block_size(handle));
raw_block_contents = BlockContents(
CopyBufferToHeap(GetMemoryAllocator(rep_->table_options), raw),
handle.size());
@ -1808,8 +1822,10 @@ void BlockBasedTable::RetrieveMultipleBlocks(
handle.size(), &contents, footer.version(),
rep_->ioptions, memory_allocator);
} else {
// There are two cases here: 1) caller uses the scratch buffer; 2) we
// use the requst buffer. If scratch buffer is used, we ensure that
// There are two cases here:
// 1) caller uses the shared buffer (scratch or direct io buffer);
// 2) we use the requst buffer.
// If scratch buffer or direct io buffer is used, we ensure that
// all raw blocks are copyed to the heap as single blocks. If scratch
// buffer is not used, we also have no combined read, so the raw
// block can be used directly.
@ -2512,6 +2528,7 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
if (total_len) {
char* scratch = nullptr;
// If using direct IO, then scratch is not used, so keep it nullptr.
// If the blocks need to be uncompressed and we don't need the
// compressed blocks, then we can use a contiguous block of
// memory to read in all the blocks as it will be temporary
@ -2521,7 +2538,8 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
// 2. If blocks are uncompressed, alloc heap bufs
// 3. If blocks are compressed and no compressed block cache, use
// stack buf
if (rep_->table_options.block_cache_compressed == nullptr &&
if (!rep_->file->use_direct_io() &&
rep_->table_options.block_cache_compressed == nullptr &&
rep_->blocks_maybe_compressed) {
if (total_len <= kMultiGetReadStackBufSize) {
scratch = stack_buf;

View File

@ -0,0 +1,235 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "table/block_based/block_based_table_reader.h"
#include "db/table_properties_collector.h"
#include "options/options_helper.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "table/block_based/block_based_table_builder.h"
#include "table/block_based/block_based_table_factory.h"
#include "table/format.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
namespace ROCKSDB_NAMESPACE {
class BlockBasedTableReaderTest
: public testing::Test,
public testing::WithParamInterface<std::tuple<CompressionType, bool>> {
protected:
CompressionType compression_type_;
bool use_direct_reads_;
void SetUp() override {
std::tie(compression_type_, use_direct_reads_) = GetParam();
test::SetupSyncPointsToMockDirectIO();
test_dir_ = test::PerThreadDBPath("block_based_table_reader_test");
env_ = Env::Default();
fs_ = FileSystem::Default();
ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr));
}
void TearDown() override { EXPECT_OK(test::DestroyDir(env_, test_dir_)); }
// Creates a table with the specificied key value pairs (kv).
void CreateTable(const std::string& table_name,
const CompressionType& compression_type,
const std::map<std::string, std::string>& kv) {
std::unique_ptr<WritableFileWriter> writer;
NewFileWriter(table_name, &writer);
// Create table builder.
Options options;
ImmutableCFOptions ioptions(options);
InternalKeyComparator comparator(options.comparator);
ColumnFamilyOptions cf_options;
MutableCFOptions moptions(cf_options);
std::vector<std::unique_ptr<IntTblPropCollectorFactory>> factories;
std::unique_ptr<TableBuilder> table_builder(table_factory_.NewTableBuilder(
TableBuilderOptions(ioptions, moptions, comparator, &factories,
compression_type, 0 /* sample_for_compression */,
CompressionOptions(), false /* skip_filters */,
kDefaultColumnFamilyName, -1 /* level */),
0 /* column_family_id */, writer.get()));
// Build table.
for (auto it = kv.begin(); it != kv.end(); it++) {
std::string k = ToInternalKey(it->first);
std::string v = it->second;
table_builder->Add(k, v);
}
ASSERT_OK(table_builder->Finish());
}
void NewBlockBasedTableReader(const FileOptions& foptions,
const ImmutableCFOptions& ioptions,
const InternalKeyComparator& comparator,
const std::string& table_name,
std::unique_ptr<BlockBasedTable>* table) {
std::unique_ptr<RandomAccessFileReader> file;
NewFileReader(table_name, foptions, &file);
uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(Path(table_name), &file_size));
std::unique_ptr<TableReader> table_reader;
ASSERT_OK(BlockBasedTable::Open(ioptions, EnvOptions(),
table_factory_.table_options(), comparator,
std::move(file), file_size, &table_reader));
table->reset(reinterpret_cast<BlockBasedTable*>(table_reader.release()));
}
private:
std::string test_dir_;
Env* env_;
std::shared_ptr<FileSystem> fs_;
BlockBasedTableFactory table_factory_;
std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; }
void WriteToFile(const std::string& content, const std::string& filename) {
std::unique_ptr<FSWritableFile> f;
ASSERT_OK(fs_->NewWritableFile(Path(filename), FileOptions(), &f, nullptr));
ASSERT_OK(f->Append(content, IOOptions(), nullptr));
ASSERT_OK(f->Close(IOOptions(), nullptr));
}
void NewFileWriter(const std::string& filename,
std::unique_ptr<WritableFileWriter>* writer) {
std::string path = Path(filename);
EnvOptions env_options;
FileOptions foptions;
std::unique_ptr<FSWritableFile> file;
ASSERT_OK(fs_->NewWritableFile(path, foptions, &file, nullptr));
writer->reset(new WritableFileWriter(std::move(file), path, env_options));
}
void NewFileReader(const std::string& filename, const FileOptions& opt,
std::unique_ptr<RandomAccessFileReader>* reader) {
std::string path = Path(filename);
std::unique_ptr<FSRandomAccessFile> f;
ASSERT_OK(fs_->NewRandomAccessFile(path, opt, &f, nullptr));
reader->reset(new RandomAccessFileReader(std::move(f), path, env_));
}
std::string ToInternalKey(const std::string& key) {
InternalKey internal_key(key, 0, ValueType::kTypeValue);
return internal_key.Encode().ToString();
}
};
// Tests MultiGet in both direct IO and non-direct IO mode.
// The keys should be in cache after MultiGet.
TEST_P(BlockBasedTableReaderTest, MultiGet) {
// Prepare key-value pairs to occupy multiple blocks.
// Each value is 256B, every 16 pairs constitute 1 block.
// Adjacent blocks contain values with different compression complexity:
// human readable strings are easier to compress than random strings.
std::map<std::string, std::string> kv;
{
Random rnd(101);
uint32_t key = 0;
for (int block = 0; block < 100; block++) {
for (int i = 0; i < 16; i++) {
char k[9] = {0};
// Internal key is constructed directly from this key,
// and internal key size is required to be >= 8 bytes,
// so use %08u as the format string.
sprintf(k, "%08u", key);
std::string v;
if (block % 2) {
v = test::RandomHumanReadableString(&rnd, 256);
} else {
test::RandomString(&rnd, 256, &v);
}
kv[std::string(k)] = v;
key++;
}
}
}
// Prepare keys, values, and statuses for MultiGet.
autovector<Slice, MultiGetContext::MAX_BATCH_SIZE> keys;
autovector<PinnableSlice, MultiGetContext::MAX_BATCH_SIZE> values;
autovector<Status, MultiGetContext::MAX_BATCH_SIZE> statuses;
{
const int step =
static_cast<int>(kv.size()) / MultiGetContext::MAX_BATCH_SIZE;
auto it = kv.begin();
for (int i = 0; i < MultiGetContext::MAX_BATCH_SIZE; i++) {
keys.emplace_back(it->first);
values.emplace_back();
statuses.emplace_back();
std::advance(it, step);
}
}
std::string table_name =
"BlockBasedTableReaderTest" + CompressionTypeToString(compression_type_);
CreateTable(table_name, compression_type_, kv);
std::unique_ptr<BlockBasedTable> table;
Options options;
ImmutableCFOptions ioptions(options);
FileOptions foptions;
foptions.use_direct_reads = use_direct_reads_;
InternalKeyComparator comparator(options.comparator);
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table);
// Ensure that keys are not in cache before MultiGet.
for (auto& key : keys) {
ASSERT_FALSE(table->TEST_KeyInCache(ReadOptions(), key));
}
// Prepare MultiGetContext.
autovector<GetContext, MultiGetContext::MAX_BATCH_SIZE> get_context;
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
for (size_t i = 0; i < keys.size(); ++i) {
get_context.emplace_back(
BytewiseComparator(), nullptr, nullptr, nullptr, GetContext::kNotFound,
keys[i], &values[i], nullptr, nullptr, nullptr, true /* do_merge */,
nullptr, nullptr, nullptr, nullptr, nullptr, nullptr);
key_context.emplace_back(nullptr, keys[i], &values[i], nullptr,
&statuses.back());
key_context.back().get_context = &get_context.back();
}
for (auto& key_ctx : key_context) {
sorted_keys.emplace_back(&key_ctx);
}
MultiGetContext ctx(&sorted_keys, 0, sorted_keys.size(), 0, ReadOptions());
// Execute MultiGet.
MultiGetContext::Range range = ctx.GetMultiGetRange();
table->MultiGet(ReadOptions(), &range, nullptr);
for (const Status& status : statuses) {
ASSERT_OK(status);
}
// Check that keys are in cache after MultiGet.
for (size_t i = 0; i < keys.size(); i++) {
ASSERT_TRUE(table->TEST_KeyInCache(ReadOptions(), keys[i]));
ASSERT_EQ(values[i].ToString(), kv[keys[i].ToString()]);
}
}
// Param 1: compression type
// Param 2: whether to use direct reads
INSTANTIATE_TEST_CASE_P(
MultiGet, BlockBasedTableReaderTest,
::testing::Combine(::testing::ValuesIn(GetSupportedCompressions()),
::testing::Bool()));
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -60,18 +60,15 @@ struct TestStats {
class BlockFetcherTest : public testing::Test {
protected:
void SetUp() override {
test::ResetTmpDirForDirectIO();
test::SetupSyncPointsToMockDirectIO();
test_dir_ = test::PerThreadDBPath("block_fetcher_test");
env_ = Env::Default();
fs_ = FileSystem::Default();
ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr));
is_direct_io_supported_ = DetectDirectIOSupport();
}
void TearDown() override { EXPECT_OK(test::DestroyDir(env_, test_dir_)); }
bool IsDirectIOSupported() const { return is_direct_io_supported_; }
void AssertSameBlock(const BlockContents& block1,
const BlockContents& block2) {
ASSERT_EQ(block1.data.ToString(), block2.data.ToString());
@ -141,11 +138,6 @@ class BlockFetcherTest : public testing::Test {
bool do_uncompress,
const TestStats& expected_non_direct_io_stats,
const TestStats& expected_direct_io_stats) {
if (!IsDirectIOSupported()) {
printf("Skip this test since direct IO is not supported\n");
return;
}
for (CompressionType compression_type : GetSupportedCompressions()) {
bool do_compress = compression_type != kNoCompression;
if (compressed != do_compress) continue;
@ -212,7 +204,6 @@ class BlockFetcherTest : public testing::Test {
Env* env_;
std::shared_ptr<FileSystem> fs_;
BlockBasedTableFactory table_factory_;
bool is_direct_io_supported_;
std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; }
@ -223,15 +214,6 @@ class BlockFetcherTest : public testing::Test {
ASSERT_OK(f->Close(IOOptions(), nullptr));
}
bool DetectDirectIOSupport() {
WriteToFile("", ".direct");
FileOptions opt;
opt.use_direct_reads = true;
std::unique_ptr<FSRandomAccessFile> f;
auto s = fs_->NewRandomAccessFile(Path(".direct"), opt, &f, nullptr);
return s.ok();
}
void NewFileWriter(const std::string& filename,
std::unique_ptr<WritableFileWriter>* writer) {
std::string path = Path(filename);
@ -321,11 +303,6 @@ class BlockFetcherTest : public testing::Test {
MemoryAllocator* heap_buf_allocator,
MemoryAllocator* compressed_buf_allocator,
BlockContents* block, MemcpyStats* memcpy_stats) {
if (use_direct_io && !IsDirectIOSupported()) {
printf("Skip this test since direct IO is not supported\n");
return;
}
Options options;
ImmutableCFOptions ioptions(options);
InternalKeyComparator comparator(options.comparator);
@ -366,11 +343,6 @@ class BlockFetcherTest : public testing::Test {
// Expects:
// the index block contents are the same for both read modes.
TEST_F(BlockFetcherTest, FetchIndexBlock) {
if (!IsDirectIOSupported()) {
printf("Skip this test since direct IO is not supported\n");
return;
}
for (CompressionType compression : GetSupportedCompressions()) {
std::string table_name =
"FetchIndexBlock" + CompressionTypeToString(compression);

View File

@ -510,19 +510,6 @@ size_t GetLinesCount(const std::string& fname, const std::string& pattern) {
return count;
}
void ResetTmpDirForDirectIO() {
#ifdef OS_LINUX
unsetenv("TEST_TMPDIR");
char* tmpdir = getenv("DISK_TEMP_DIR");
if (tmpdir == nullptr) {
tmpdir = getenv("HOME");
}
if (tmpdir != nullptr) {
setenv("TEST_TMPDIR", tmpdir, 1);
}
#endif
}
void SetupSyncPointsToMockDirectIO() {
#if !defined(NDEBUG) && !defined(OS_MACOSX) && !defined(OS_WIN) && \
!defined(OS_SOLARIS) && !defined(OS_AIX) && !defined(OS_OPENBSD)