diff --git a/CMakeLists.txt b/CMakeLists.txt index adbbaa22f..f2d60fa42 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1022,6 +1022,7 @@ if(WITH_TESTS) env/io_posix_test.cc env/mock_env_test.cc file/delete_scheduler_test.cc + file/random_access_file_reader_test.cc logging/auto_roll_logger_test.cc logging/env_logger_test.cc logging/event_logger_test.cc diff --git a/Makefile b/Makefile index d3711702c..1710843e8 100644 --- a/Makefile +++ b/Makefile @@ -513,6 +513,7 @@ TESTS = \ fault_injection_test \ filelock_test \ filename_test \ + random_access_file_reader_test \ file_reader_writer_test \ block_based_filter_block_test \ full_filter_block_test \ @@ -1505,6 +1506,9 @@ delete_scheduler_test: file/delete_scheduler_test.o $(LIBOBJECTS) $(TESTHARNESS) filename_test: db/filename_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +random_access_file_reader_test: file/random_access_file_reader_test.o $(LIBOBJECTS) $(TESTHARNESS) $(TESTUTIL) + $(AM_LINK) + file_reader_writer_test: util/file_reader_writer_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 17cb66804..d4ea69b00 100644 --- a/TARGETS +++ b/TARGETS @@ -1300,6 +1300,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "random_access_file_reader_test", + "file/random_access_file_reader_test.cc", + "serial", + [], + [], + ], [ "random_test", "util/random_test.cc", diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index ba1fa9050..f596aa019 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -20,11 +20,11 @@ #include "util/rate_limiter.h" namespace ROCKSDB_NAMESPACE { + Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, - char* scratch, - std::unique_ptr* internal_buf, + char* scratch, AlignedBuf* aligned_buf, bool for_compaction) const { - (void) internal_buf; + (void)aligned_buf; Status s; uint64_t elapsed = 0; { @@ -81,11 +81,11 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, size_t res_len = 0; if (s.ok() && offset_advance < buf.CurrentSize()) { res_len = std::min(buf.CurrentSize() - offset_advance, n); - if (internal_buf == nullptr) { + if (aligned_buf == nullptr) { buf.Read(scratch, offset_advance, res_len); } else { scratch = buf.BufferStart(); - internal_buf->reset(buf.Release()); + aligned_buf->reset(buf.Release()); } } *result = Slice(scratch, res_len); @@ -154,11 +154,44 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, return s; } +size_t End(const FSReadRequest& r) { + return static_cast(r.offset) + r.len; +} + +FSReadRequest Align(const FSReadRequest& r, size_t alignment) { + FSReadRequest req; + req.offset = static_cast( + TruncateToPageBoundary(alignment, static_cast(r.offset))); + req.len = Roundup(End(r), alignment) - req.offset; + return req; +} + +// Try to merge src to dest if they have overlap. +// +// Each request represents an inclusive interval [offset, offset + len]. +// If the intervals have overlap, update offset and len to represent the +// merged interval, and return true. +// Otherwise, do nothing and return false. +bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) { + size_t dest_offset = static_cast(dest->offset); + size_t src_offset = static_cast(src.offset); + size_t dest_end = End(*dest); + size_t src_end = End(src); + if (std::max(dest_offset, dest_offset) > std::min(dest_end, src_end)) { + return false; + } + dest->offset = static_cast(std::min(dest_offset, src_offset)); + dest->len = std::max(dest_end, src_end) - dest->offset; + return true; +} + Status RandomAccessFileReader::MultiRead(FSReadRequest* read_reqs, - size_t num_reqs) const { + size_t num_reqs, + AlignedBuf* aligned_buf) const { + (void)aligned_buf; // suppress warning of unused variable in LITE mode + assert(num_reqs > 0); Status s; uint64_t elapsed = 0; - assert(!use_direct_io()); { StopWatch sw(env_, stats_, hist_type_, (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/, @@ -166,6 +199,44 @@ Status RandomAccessFileReader::MultiRead(FSReadRequest* read_reqs, auto prev_perf_level = GetPerfLevel(); IOSTATS_TIMER_GUARD(read_nanos); + FSReadRequest* fs_reqs = read_reqs; + size_t num_fs_reqs = num_reqs; +#ifndef ROCKSDB_LITE + std::vector aligned_reqs; + if (use_direct_io()) { + // num_reqs is the max possible size, + // this can reduce std::vecector's internal resize operations. + aligned_reqs.reserve(num_reqs); + // Align and merge the read requests. + size_t alignment = file_->GetRequiredBufferAlignment(); + aligned_reqs.push_back(Align(read_reqs[0], alignment)); + for (size_t i = 1; i < num_reqs; i++) { + const auto& r = Align(read_reqs[i], alignment); + if (!TryMerge(&aligned_reqs.back(), r)) { + aligned_reqs.push_back(r); + } + } + + // Allocate aligned buffer and let scratch buffers point to it. + size_t total_len = 0; + for (const auto& r : aligned_reqs) { + total_len += r.len; + } + AlignedBuffer buf; + buf.Alignment(alignment); + buf.AllocateNewBuffer(total_len); + char* scratch = buf.BufferStart(); + for (auto& r : aligned_reqs) { + r.scratch = scratch; + scratch += r.len; + } + + aligned_buf->reset(buf.Release()); + fs_reqs = aligned_reqs.data(); + num_fs_reqs = aligned_reqs.size(); + } +#endif // ROCKSDB_LITE + #ifndef ROCKSDB_LITE FileOperationInfo::TimePoint start_ts; if (ShouldNotifyListeners()) { @@ -174,8 +245,31 @@ Status RandomAccessFileReader::MultiRead(FSReadRequest* read_reqs, #endif // ROCKSDB_LITE { IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_); - s = file_->MultiRead(read_reqs, num_reqs, IOOptions(), nullptr); + s = file_->MultiRead(fs_reqs, num_fs_reqs, IOOptions(), nullptr); } + +#ifndef ROCKSDB_LITE + if (use_direct_io()) { + // Populate results in the unaligned read requests. + size_t aligned_i = 0; + for (size_t i = 0; i < num_reqs; i++) { + auto& r = read_reqs[i]; + if (static_cast(r.offset) > End(aligned_reqs[aligned_i])) { + aligned_i++; + } + const auto& fs_r = fs_reqs[aligned_i]; + r.status = fs_r.status; + if (r.status.ok()) { + uint64_t offset = r.offset - fs_r.offset; + size_t len = std::min(r.len, static_cast(fs_r.len - offset)); + r.result = Slice(fs_r.scratch + offset, len); + } else { + r.result = Slice(); + } + } + } +#endif // ROCKSDB_LITE + for (size_t i = 0; i < num_reqs; ++i) { #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { @@ -194,4 +288,5 @@ Status RandomAccessFileReader::MultiRead(FSReadRequest* read_reqs, return s; } + } // namespace ROCKSDB_NAMESPACE diff --git a/file/random_access_file_reader.h b/file/random_access_file_reader.h index c0ca371c2..e97cd8c2a 100644 --- a/file/random_access_file_reader.h +++ b/file/random_access_file_reader.h @@ -22,6 +22,8 @@ namespace ROCKSDB_NAMESPACE { class Statistics; class HistogramImpl; +using AlignedBuf = std::unique_ptr; + // RandomAccessFileReader is a wrapper on top of Env::RnadomAccessFile. It is // responsible for: // - Handling Buffered and Direct reads appropriately. @@ -59,7 +61,7 @@ class RandomAccessFileReader { public: explicit RandomAccessFileReader( - std::unique_ptr&& raf, std::string _file_name, + std::unique_ptr&& raf, const std::string& _file_name, Env* env = nullptr, Statistics* stats = nullptr, uint32_t hist_type = 0, HistogramImpl* file_read_hist = nullptr, RateLimiter* rate_limiter = nullptr, @@ -106,17 +108,22 @@ class RandomAccessFileReader { // 1. if using mmap, result is stored in a buffer other than scratch; // 2. if not using mmap, result is stored in the buffer starting from scratch. // - // In direct IO mode, an internal aligned buffer is allocated. - // 1. If internal_buf is null, then results are copied to the buffer + // In direct IO mode, an aligned buffer is allocated internally. + // 1. If aligned_buf is null, then results are copied to the buffer // starting from scratch; - // 2. Otherwise, scratch is not used and can be null, the internal_buf owns + // 2. Otherwise, scratch is not used and can be null, the aligned_buf owns // the internally allocated buffer on return, and the result refers to a - // region in internal_buf. + // region in aligned_buf. Status Read(uint64_t offset, size_t n, Slice* result, char* scratch, - std::unique_ptr* internal_buf, - bool for_compaction = false) const; + AlignedBuf* aligned_buf, bool for_compaction = false) const; - Status MultiRead(FSReadRequest* reqs, size_t num_reqs) const; + // REQUIRES: + // num_reqs > 0, reqs do not overlap, and offsets in reqs are increasing. + // In non-direct IO mode, aligned_buf should be null; + // In direct IO mode, aligned_buf stores the aligned buffer allocated inside + // MultiRead, the result Slices in reqs refer to aligned_buf. + Status MultiRead(FSReadRequest* reqs, size_t num_reqs, + AlignedBuf* aligned_buf) const; Status Prefetch(uint64_t offset, size_t n) const { return file_->Prefetch(offset, n, IOOptions(), nullptr); diff --git a/file/random_access_file_reader_test.cc b/file/random_access_file_reader_test.cc new file mode 100644 index 000000000..b74c21dba --- /dev/null +++ b/file/random_access_file_reader_test.cc @@ -0,0 +1,255 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "port/port.h" +#include "port/stack_trace.h" +#include "rocksdb/file_system.h" +#include "file/random_access_file_reader.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" + +namespace ROCKSDB_NAMESPACE { + +class RandomAccessFileReaderTest : public testing::Test { + public: + void SetUp() override { +#ifdef OS_LINUX + // TEST_TMPDIR may be set to /dev/shm in Makefile, + // but /dev/shm does not support direct IO. + // The default TEST_TMPDIR is under /tmp, but /tmp might also be a tmpfs + // which does not support direct IO neither. + unsetenv("TEST_TMPDIR"); + char* tmpdir = getenv("DISK_TEMP_DIR"); + if (tmpdir == nullptr) { + tmpdir = getenv("HOME"); + } + if (tmpdir != nullptr) { + setenv("TEST_TMPDIR", tmpdir, 1); + } +#endif + env_ = Env::Default(); + fs_ = FileSystem::Default(); + test_dir_ = test::PerThreadDBPath("random_access_file_reader_test"); + ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr)); + alignment_ = GetAlignment(); + } + + void TearDown() override { + EXPECT_OK(test::DestroyDir(env_, test_dir_)); + } + + bool IsDirectIOSupported() { + Write(".direct", ""); + FileOptions opt; + opt.use_direct_reads = true; + std::unique_ptr f; + auto s = fs_->NewRandomAccessFile(Path(".direct"), opt, &f, nullptr); + return s.ok(); + } + + void Write(const std::string& fname, const std::string& content) { + std::unique_ptr f; + ASSERT_OK(fs_->NewWritableFile(Path(fname), FileOptions(), &f, nullptr)); + ASSERT_OK(f->Append(content, IOOptions(), nullptr)); + ASSERT_OK(f->Close(IOOptions(), nullptr)); + } + + void Read(const std::string& fname, const FileOptions& opts, + std::unique_ptr* reader) { + std::string fpath = Path(fname); + std::unique_ptr f; + ASSERT_OK(fs_->NewRandomAccessFile(fpath, opts, &f, nullptr)); + (*reader).reset(new RandomAccessFileReader(std::move(f), fpath, env_)); + } + + void AssertResult(const std::string& content, + const std::vector& reqs) { + for (const auto& r : reqs) { + ASSERT_OK(r.status); + ASSERT_EQ(r.len, r.result.size()); + ASSERT_EQ(content.substr(r.offset, r.len), r.result.ToString()); + } + } + + size_t alignment() const { return alignment_; } + + private: + Env* env_; + std::shared_ptr fs_; + std::string test_dir_; + size_t alignment_; + + std::string Path(const std::string& fname) { + return test_dir_ + "/" + fname; + } + + size_t GetAlignment() { + std::string f = "get_alignment"; + Write(f, ""); + std::unique_ptr r; + Read(f, FileOptions(), &r); + size_t alignment = r->file()->GetRequiredBufferAlignment(); + EXPECT_OK(fs_->DeleteFile(Path(f), IOOptions(), nullptr)); + return alignment; + } +}; + +TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) { + if (!IsDirectIOSupported()) { + printf("Direct IO is not supported, skip this test\n"); + return; + } + + // Creates a file with 3 pages. + std::string fname = "multi-read-direct-io"; + Random rand(0); + std::string content; + test::RandomString(&rand, 3 * static_cast(alignment()), &content); + Write(fname, content); + + FileOptions opts; + opts.use_direct_reads = true; + std::unique_ptr r; + Read(fname, opts, &r); + ASSERT_TRUE(r->use_direct_io()); + + { + // Reads 2 blocks in the 1st page. + // The results should be SharedSlices of the same underlying buffer. + // + // Illustration (each x is a 1/4 page) + // First page: xxxx + // 1st block: x + // 2nd block: xx + FSReadRequest r0; + r0.offset = 0; + r0.len = alignment() / 4; + r0.scratch = nullptr; + + FSReadRequest r1; + r1.offset = alignment() / 2; + r1.len = alignment() / 2; + r1.scratch = nullptr; + + std::vector reqs; + reqs.push_back(std::move(r0)); + reqs.push_back(std::move(r1)); + AlignedBuf aligned_buf; + ASSERT_OK(r->MultiRead(reqs.data(), reqs.size(), &aligned_buf)); + + AssertResult(content, reqs); + } + + { + // Reads 3 blocks: + // 1st block in the 1st page; + // 2nd block from the middle of the 1st page to the middle of the 2nd page; + // 3rd block in the 2nd page. + // The results should be SharedSlices of the same underlying buffer. + // + // Illustration (each x is a 1/4 page) + // 2 pages: xxxxxxxx + // 1st block: x + // 2nd block: xxxx + // 3rd block: x + FSReadRequest r0; + r0.offset = 0; + r0.len = alignment() / 4; + r0.scratch = nullptr; + + FSReadRequest r1; + r1.offset = alignment() / 2; + r1.len = alignment(); + r1.scratch = nullptr; + + FSReadRequest r2; + r2.offset = 2 * alignment() - alignment() / 4; + r2.len = alignment() / 4; + r2.scratch = nullptr; + + std::vector reqs; + reqs.push_back(std::move(r0)); + reqs.push_back(std::move(r1)); + reqs.push_back(std::move(r2)); + AlignedBuf aligned_buf; + ASSERT_OK(r->MultiRead(reqs.data(), reqs.size(), &aligned_buf)); + + AssertResult(content, reqs); + } + + { + // Reads 3 blocks: + // 1st block in the middle of the 1st page; + // 2nd block in the middle of the 2nd page; + // 3rd block in the middle of the 3rd page. + // The results should be SharedSlices of the same underlying buffer. + // + // Illustration (each x is a 1/4 page) + // 3 pages: xxxxxxxxxxxx + // 1st block: xx + // 2nd block: xx + // 3rd block: xx + FSReadRequest r0; + r0.offset = alignment() / 4; + r0.len = alignment() / 2; + r0.scratch = nullptr; + + FSReadRequest r1; + r1.offset = alignment() + alignment() / 4; + r1.len = alignment() / 2; + r1.scratch = nullptr; + + FSReadRequest r2; + r2.offset = 2 * alignment() + alignment() / 4; + r2.len = alignment() / 2; + r2.scratch = nullptr; + + std::vector reqs; + reqs.push_back(std::move(r0)); + reqs.push_back(std::move(r1)); + reqs.push_back(std::move(r2)); + AlignedBuf aligned_buf; + ASSERT_OK(r->MultiRead(reqs.data(), reqs.size(), &aligned_buf)); + + AssertResult(content, reqs); + } + + { + // Reads 2 blocks: + // 1st block in the middle of the 1st page; + // 2nd block in the middle of the 3rd page. + // The results are two different buffers. + // + // Illustration (each x is a 1/4 page) + // 3 pages: xxxxxxxxxxxx + // 1st block: xx + // 2nd block: xx + FSReadRequest r0; + r0.offset = alignment() / 4; + r0.len = alignment() / 2; + r0.scratch = nullptr; + + FSReadRequest r1; + r1.offset = 2 * alignment() + alignment() / 4; + r1.len = alignment() / 2; + r1.scratch = nullptr; + + std::vector reqs; + reqs.push_back(std::move(r0)); + reqs.push_back(std::move(r1)); + AlignedBuf aligned_buf; + ASSERT_OK(r->MultiRead(reqs.data(), reqs.size(), &aligned_buf)); + + AssertResult(content, reqs); + } +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/port/win/env_win.cc b/port/win/env_win.cc index 295bb2b7e..426aa965a 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -1424,7 +1424,7 @@ Status WinEnv::UnlockFile(FileLock* lock) { return winenv_io_.UnlockFile(lock); } -Status WinEnv::GetTestDirectory(std::string* result) { +Status WinEnv::GetTestDirectory(std::string* result) { return winenv_io_.GetTestDirectory(result); } diff --git a/src.mk b/src.mk index 628848e73..89558cf96 100644 --- a/src.mk +++ b/src.mk @@ -395,6 +395,7 @@ MAIN_SOURCES = \ env/env_test.cc \ env/io_posix_test.cc \ env/mock_env_test.cc \ + file/random_access_file_reader_test.cc \ logging/auto_roll_logger_test.cc \ logging/env_logger_test.cc \ logging/event_logger_test.cc \ diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 3206b258c..8e85af170 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -1607,7 +1607,6 @@ void BlockBasedTable::RetrieveMultipleBlocks( req.scratch = scratch + buf_offset; buf_offset += req.len; } - req.status = IOStatus::OK(); read_reqs.emplace_back(req); } @@ -1628,11 +1627,10 @@ void BlockBasedTable::RetrieveMultipleBlocks( } else { req.scratch = scratch + buf_offset; } - req.status = IOStatus::OK(); read_reqs.emplace_back(req); } - file->MultiRead(&read_reqs[0], read_reqs.size()); + file->MultiRead(&read_reqs[0], read_reqs.size(), nullptr); idx_in_batch = 0; size_t valid_batch_idx = 0; @@ -1699,7 +1697,7 @@ void BlockBasedTable::RetrieveMultipleBlocks( // in each read request. Checksum is stored in the block trailer, // which is handle.size() + 1. s = ROCKSDB_NAMESPACE::VerifyChecksum(footer.checksum(), - req.result.data() + req_offset, + data + req_offset, handle.size() + 1, expected); TEST_SYNC_POINT_CALLBACK("RetrieveMultipleBlocks:VerifyChecksum", &s); } diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 61ced1bff..ea8710727 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -1483,7 +1483,7 @@ Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number, // Allocate the buffer. This is safe in C++11 std::string buf; - std::unique_ptr internal_buf; + AlignedBuf aligned_buf; // A partial blob record contain checksum, key and value. Slice blob_record; @@ -1492,11 +1492,11 @@ Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number, StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS); if (reader->use_direct_io()) { s = reader->Read(record_offset, static_cast(record_size), - &blob_record, nullptr, &internal_buf); + &blob_record, nullptr, &aligned_buf); } else { buf.reserve(static_cast(record_size)); s = reader->Read(record_offset, static_cast(record_size), - &blob_record, &buf[0], nullptr); + &blob_record, &buf[0], nullptr); } RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size()); } diff --git a/utilities/blob_db/blob_file.cc b/utilities/blob_db/blob_file.cc index f9707e1f7..3db6cfc24 100644 --- a/utilities/blob_db/blob_file.cc +++ b/utilities/blob_db/blob_file.cc @@ -139,11 +139,11 @@ Status BlobFile::ReadFooter(BlobLogFooter* bf) { Slice result; std::string buf; - std::unique_ptr internal_buf; + AlignedBuf aligned_buf; Status s; if (ra_file_reader_->use_direct_io()) { s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kSize, &result, - nullptr, &internal_buf); + nullptr, &aligned_buf); } else { buf.reserve(BlobLogFooter::kSize + 10); s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kSize, &result, @@ -263,11 +263,11 @@ Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) { // Read file header. std::string header_buf; - std::unique_ptr internal_buf; + AlignedBuf aligned_buf; Slice header_slice; if (file_reader->use_direct_io()) { s = file_reader->Read(0, BlobLogHeader::kSize, &header_slice, nullptr, - &internal_buf); + &aligned_buf); } else { header_buf.reserve(BlobLogHeader::kSize); s = file_reader->Read(0, BlobLogHeader::kSize, &header_slice, @@ -306,12 +306,14 @@ Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) { std::string footer_buf; Slice footer_slice; if (file_reader->use_direct_io()) { - s = file_reader->Read(file_size - BlobLogFooter::kSize, BlobLogFooter::kSize, - &footer_slice, nullptr, &internal_buf); + s = file_reader->Read(file_size - BlobLogFooter::kSize, + BlobLogFooter::kSize, &footer_slice, nullptr, + &aligned_buf); } else { footer_buf.reserve(BlobLogFooter::kSize); - s = file_reader->Read(file_size - BlobLogFooter::kSize, BlobLogFooter::kSize, - &footer_slice, &footer_buf[0], nullptr); + s = file_reader->Read(file_size - BlobLogFooter::kSize, + BlobLogFooter::kSize, &footer_slice, &footer_buf[0], + nullptr); } if (!s.ok()) { ROCKS_LOG_ERROR(info_log_,