diff --git a/test_util/testutil.h b/test_util/testutil.h index bb732ff3a..716ae7d26 100644 --- a/test_util/testutil.h +++ b/test_util/testutil.h @@ -492,13 +492,11 @@ inline std::string EncodeInt(uint64_t x) { return result; } -class StringEnv : public EnvWrapper { - public: class SeqStringSource : public SequentialFile { public: explicit SeqStringSource(const std::string& data) : data_(data), offset_(0) {} - ~SeqStringSource() {} + ~SeqStringSource() override {} Status Read(size_t n, Slice* result, char* scratch) override { std::string output; if (offset_ < data_.size()) { @@ -527,129 +525,136 @@ class StringEnv : public EnvWrapper { size_t offset_; }; - class StringSink : public WritableFile { + class StringEnv : public EnvWrapper { public: - explicit StringSink(std::string* contents) - : WritableFile(), contents_(contents) {} - virtual Status Truncate(uint64_t size) override { - contents_->resize(static_cast(size)); - return Status::OK(); - } - virtual Status Close() override { return Status::OK(); } - virtual Status Flush() override { return Status::OK(); } - virtual Status Sync() override { return Status::OK(); } - virtual Status Append(const Slice& slice) override { - contents_->append(slice.data(), slice.size()); + class StringSink : public WritableFile { + public: + explicit StringSink(std::string* contents) + : WritableFile(), contents_(contents) {} + virtual Status Truncate(uint64_t size) override { + contents_->resize(static_cast(size)); + return Status::OK(); + } + virtual Status Close() override { return Status::OK(); } + virtual Status Flush() override { return Status::OK(); } + virtual Status Sync() override { return Status::OK(); } + virtual Status Append(const Slice& slice) override { + contents_->append(slice.data(), slice.size()); + return Status::OK(); + } + + private: + std::string* contents_; + }; + + explicit StringEnv(Env* t) : EnvWrapper(t) {} + ~StringEnv() override {} + + const std::string& GetContent(const std::string& f) { return files_[f]; } + + const Status WriteToNewFile(const std::string& file_name, + const std::string& content) { + std::unique_ptr r; + auto s = NewWritableFile(file_name, &r, EnvOptions()); + if (!s.ok()) { + return s; + } + r->Append(content); + r->Flush(); + r->Close(); + assert(files_[file_name] == content); return Status::OK(); } - private: - std::string* contents_; - }; - - explicit StringEnv(Env* t) : EnvWrapper(t) {} - virtual ~StringEnv() {} - - const std::string& GetContent(const std::string& f) { return files_[f]; } - - const Status WriteToNewFile(const std::string& file_name, - const std::string& content) { - std::unique_ptr r; - auto s = NewWritableFile(file_name, &r, EnvOptions()); - if (!s.ok()) { - return s; - } - r->Append(content); - r->Flush(); - r->Close(); - assert(files_[file_name] == content); - return Status::OK(); - } - - // The following text is boilerplate that forwards all methods to target() - Status NewSequentialFile(const std::string& f, - std::unique_ptr* r, - const EnvOptions& /*options*/) override { - auto iter = files_.find(f); - if (iter == files_.end()) { - return Status::NotFound("The specified file does not exist", f); - } - r->reset(new SeqStringSource(iter->second)); - return Status::OK(); - } - Status NewRandomAccessFile(const std::string& /*f*/, - std::unique_ptr* /*r*/, + // The following text is boilerplate that forwards all methods to target() + Status NewSequentialFile(const std::string& f, + std::unique_ptr* r, const EnvOptions& /*options*/) override { - return Status::NotSupported(); - } - Status NewWritableFile(const std::string& f, std::unique_ptr* r, - const EnvOptions& /*options*/) override { - auto iter = files_.find(f); - if (iter != files_.end()) { - return Status::IOError("The specified file already exists", f); + auto iter = files_.find(f); + if (iter == files_.end()) { + return Status::NotFound("The specified file does not exist", f); + } + r->reset(new SeqStringSource(iter->second)); + return Status::OK(); } - r->reset(new StringSink(&files_[f])); - return Status::OK(); - } - virtual Status NewDirectory(const std::string& /*name*/, - std::unique_ptr* /*result*/) override { - return Status::NotSupported(); - } - Status FileExists(const std::string& f) override { - if (files_.find(f) == files_.end()) { - return Status::NotFound(); + Status NewRandomAccessFile(const std::string& /*f*/, + std::unique_ptr* /*r*/, + const EnvOptions& /*options*/) override { + return Status::NotSupported(); } - return Status::OK(); - } - Status GetChildren(const std::string& /*dir*/, - std::vector* /*r*/) override { - return Status::NotSupported(); - } - Status DeleteFile(const std::string& f) override { - files_.erase(f); - return Status::OK(); - } - Status CreateDir(const std::string& /*d*/) override { - return Status::NotSupported(); - } - Status CreateDirIfMissing(const std::string& /*d*/) override { - return Status::NotSupported(); - } - Status DeleteDir(const std::string& /*d*/) override { - return Status::NotSupported(); - } - Status GetFileSize(const std::string& f, uint64_t* s) override { - auto iter = files_.find(f); - if (iter == files_.end()) { - return Status::NotFound("The specified file does not exist:", f); + Status NewWritableFile(const std::string& f, + std::unique_ptr* r, + const EnvOptions& /*options*/) override { + auto iter = files_.find(f); + if (iter != files_.end()) { + return Status::IOError("The specified file already exists", f); + } + r->reset(new StringSink(&files_[f])); + return Status::OK(); + } + virtual Status NewDirectory( + const std::string& /*name*/, + std::unique_ptr* /*result*/) override { + return Status::NotSupported(); + } + Status FileExists(const std::string& f) override { + if (files_.find(f) == files_.end()) { + return Status::NotFound(); + } + return Status::OK(); + } + Status GetChildren(const std::string& /*dir*/, + std::vector* /*r*/) override { + return Status::NotSupported(); + } + Status DeleteFile(const std::string& f) override { + files_.erase(f); + return Status::OK(); + } + Status CreateDir(const std::string& /*d*/) override { + return Status::NotSupported(); + } + Status CreateDirIfMissing(const std::string& /*d*/) override { + return Status::NotSupported(); + } + Status DeleteDir(const std::string& /*d*/) override { + return Status::NotSupported(); + } + Status GetFileSize(const std::string& f, uint64_t* s) override { + auto iter = files_.find(f); + if (iter == files_.end()) { + return Status::NotFound("The specified file does not exist:", f); + } + *s = iter->second.size(); + return Status::OK(); } - *s = iter->second.size(); - return Status::OK(); - } - Status GetFileModificationTime(const std::string& /*fname*/, - uint64_t* /*file_mtime*/) override { - return Status::NotSupported(); - } + Status GetFileModificationTime(const std::string& /*fname*/, + uint64_t* /*file_mtime*/) override { + return Status::NotSupported(); + } - Status RenameFile(const std::string& /*s*/, + Status RenameFile(const std::string& /*s*/, + const std::string& /*t*/) override { + return Status::NotSupported(); + } + + Status LinkFile(const std::string& /*s*/, const std::string& /*t*/) override { - return Status::NotSupported(); - } + return Status::NotSupported(); + } - Status LinkFile(const std::string& /*s*/, const std::string& /*t*/) override { - return Status::NotSupported(); - } + Status LockFile(const std::string& /*f*/, FileLock** /*l*/) override { + return Status::NotSupported(); + } - Status LockFile(const std::string& /*f*/, FileLock** /*l*/) override { - return Status::NotSupported(); - } + Status UnlockFile(FileLock* /*l*/) override { + return Status::NotSupported(); + } - Status UnlockFile(FileLock* /*l*/) override { return Status::NotSupported(); } - - protected: - std::unordered_map files_; -}; + protected: + std::unordered_map files_; + }; // Randomly initialize the given DBOptions void RandomInitDBOptions(DBOptions* db_opt, Random* rnd); diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index 15f41bf3a..9175fa502 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -639,6 +639,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const override { + // Read-ahead only make sense if we have some slack left after reading if (n + alignment_ >= readahead_size_) { return file_->Read(offset, n, result, scratch); } @@ -646,14 +647,13 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { std::unique_lock lk(lock_); size_t cached_len = 0; - // Check if there is a cache hit, means that [offset, offset + n) is either - // completely or partially in the buffer + // Check if there is a cache hit, meaning that [offset, offset + n) is either + // completely or partially in the buffer. // If it's completely cached, including end of file case when offset + n is - // greater than EOF, return + // greater than EOF, then return. if (TryReadFromCache(offset, n, &cached_len, scratch) && - (cached_len == n || - // End of file - buffer_.CurrentSize() < readahead_size_)) { + (cached_len == n || buffer_.CurrentSize() < readahead_size_)) { + // We read exactly what we needed, or we hit end of file - return. *result = Slice(scratch, cached_len); return Status::OK(); } @@ -661,25 +661,14 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { // In the case of cache hit advanced_offset is already aligned, means that // chunk_offset equals to advanced_offset size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset); - Slice readahead_result; Status s = ReadIntoBuffer(chunk_offset, readahead_size_); if (s.ok()) { - // In the case of cache miss, i.e. when cached_len equals 0, an offset can - // exceed the file end position, so the following check is required - if (advanced_offset < chunk_offset + buffer_.CurrentSize()) { - // In the case of cache miss, the first chunk_padding bytes in buffer_ - // are - // stored for alignment only and must be skipped - size_t chunk_padding = advanced_offset - chunk_offset; - auto remaining_len = - std::min(buffer_.CurrentSize() - chunk_padding, n - cached_len); - memcpy(scratch + cached_len, buffer_.BufferStart() + chunk_padding, - remaining_len); - *result = Slice(scratch, cached_len + remaining_len); - } else { - *result = Slice(scratch, cached_len); - } + // The data we need is now in cache, so we can safely read it + size_t remaining_len; + TryReadFromCache(advanced_offset, n - cached_len, &remaining_len, + scratch + cached_len); + *result = Slice(scratch, cached_len + remaining_len); } return s; } @@ -690,6 +679,9 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { // `Read()` assumes a smaller prefetch buffer indicates EOF was reached. return Status::OK(); } + + std::unique_lock lk(lock_); + size_t offset_ = static_cast(offset); size_t prefetch_offset = TruncateToPageBoundary(alignment_, offset_); if (prefetch_offset == buffer_offset_) { @@ -706,12 +698,18 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { void Hint(AccessPattern pattern) override { file_->Hint(pattern); } Status InvalidateCache(size_t offset, size_t length) override { + std::unique_lock lk(lock_); + buffer_.Clear(); return file_->InvalidateCache(offset, length); } bool use_direct_io() const override { return file_->use_direct_io(); } private: + // Tries to read from buffer_ n bytes starting at offset. If anything was read + // from the cache, it sets cached_len to the number of bytes actually read, + // copies these number of bytes to scratch and returns true. + // If nothing was read sets cached_len to 0 and returns false. bool TryReadFromCache(uint64_t offset, size_t n, size_t* cached_len, char* scratch) const { if (offset < buffer_offset_ || @@ -726,6 +724,9 @@ private: return true; } + // Reads into buffer_ the next n bytes from file_ starting at offset. + // Can actually read less if EOF was reached. + // Returns the status of the read operastion on the file. Status ReadIntoBuffer(uint64_t offset, size_t n) const { if (n > buffer_.Capacity()) { n = buffer_.Capacity(); @@ -742,14 +743,171 @@ private: return s; } - std::unique_ptr file_; + const std::unique_ptr file_; const size_t alignment_; - size_t readahead_size_; + const size_t readahead_size_; mutable std::mutex lock_; + // The buffer storing the prefetched data mutable AlignedBuffer buffer_; + // The offset in file_, corresponding to data stored in buffer_ mutable uint64_t buffer_offset_; }; + +// This class wraps a SequentialFile, exposing same API, with the differenece +// of being able to prefetch up to readahead_size bytes and then serve them +// from memory, avoiding the entire round-trip if, for example, the data for the +// file is actually remote. +class ReadaheadSequentialFile : public SequentialFile { + public: + ReadaheadSequentialFile(std::unique_ptr&& file, + size_t readahead_size) + : file_(std::move(file)), + alignment_(file_->GetRequiredBufferAlignment()), + readahead_size_(Roundup(readahead_size, alignment_)), + buffer_(), + buffer_offset_(0), + read_offset_(0) { + buffer_.Alignment(alignment_); + buffer_.AllocateNewBuffer(readahead_size_); + } + + ReadaheadSequentialFile(const ReadaheadSequentialFile&) = delete; + + ReadaheadSequentialFile& operator=(const ReadaheadSequentialFile&) = delete; + + Status Read(size_t n, Slice* result, char* scratch) override { + std::unique_lock lk(lock_); + + size_t cached_len = 0; + // Check if there is a cache hit, meaning that [offset, offset + n) is + // either completely or partially in the buffer. If it's completely cached, + // including end of file case when offset + n is greater than EOF, then + // return. + if (TryReadFromCache(n, &cached_len, scratch) && + (cached_len == n || buffer_.CurrentSize() < readahead_size_)) { + // We read exactly what we needed, or we hit end of file - return. + *result = Slice(scratch, cached_len); + return Status::OK(); + } + n -= cached_len; + + Status s; + // Read-ahead only make sense if we have some slack left after reading + if (n + alignment_ >= readahead_size_) { + s = file_->Read(n, result, scratch + cached_len); + if (s.ok()) { + read_offset_ += result->size(); + *result = Slice(scratch, cached_len + result->size()); + } + buffer_.Clear(); + return s; + } + + s = ReadIntoBuffer(readahead_size_); + if (s.ok()) { + // The data we need is now in cache, so we can safely read it + size_t remaining_len; + TryReadFromCache(n, &remaining_len, scratch + cached_len); + *result = Slice(scratch, cached_len + remaining_len); + } + return s; + } + + Status Skip(uint64_t n) override { + std::unique_lock lk(lock_); + Status s = Status::OK(); + // First check if we need to skip already cached data + if (buffer_.CurrentSize() > 0) { + // Do we need to skip beyond cached data? + if (read_offset_ + n >= buffer_offset_ + buffer_.CurrentSize()) { + // Yes. Skip whaterver is in memory and adjust offset accordingly + n -= buffer_offset_ + buffer_.CurrentSize() - read_offset_; + read_offset_ = buffer_offset_ + buffer_.CurrentSize(); + } else { + // No. The entire section to be skipped is entirely i cache. + read_offset_ += n; + n = 0; + } + } + if (n > 0) { + // We still need to skip more, so call the file API for skipping + s = file_->Skip(n); + if (s.ok()) { + read_offset_ += n; + } + buffer_.Clear(); + } + return s; + } + + Status PositionedRead(uint64_t offset, size_t n, Slice* result, + char* scratch) override { + return file_->PositionedRead(offset, n, result, scratch); + } + + Status InvalidateCache(size_t offset, size_t length) override { + std::unique_lock lk(lock_); + buffer_.Clear(); + return file_->InvalidateCache(offset, length); + } + + bool use_direct_io() const override { return file_->use_direct_io(); } + + private: + // Tries to read from buffer_ n bytes. If anything was read from the cache, it + // sets cached_len to the number of bytes actually read, copies these number + // of bytes to scratch and returns true. + // If nothing was read sets cached_len to 0 and returns false. + bool TryReadFromCache(size_t n, size_t* cached_len, char* scratch) { + if (read_offset_ < buffer_offset_ || + read_offset_ >= buffer_offset_ + buffer_.CurrentSize()) { + *cached_len = 0; + return false; + } + uint64_t offset_in_buffer = read_offset_ - buffer_offset_; + *cached_len = std::min( + buffer_.CurrentSize() - static_cast(offset_in_buffer), n); + memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len); + read_offset_ += *cached_len; + return true; + } + + // Reads into buffer_ the next n bytes from file_. + // Can actually read less if EOF was reached. + // Returns the status of the read operastion on the file. + Status ReadIntoBuffer(size_t n) { + if (n > buffer_.Capacity()) { + n = buffer_.Capacity(); + } + assert(IsFileSectorAligned(n, alignment_)); + Slice result; + Status s = file_->Read(n, &result, buffer_.BufferStart()); + if (s.ok()) { + buffer_offset_ = read_offset_; + buffer_.Size(result.size()); + assert(buffer_.BufferStart() == result.data()); + } + return s; + } + + const std::unique_ptr file_; + const size_t alignment_; + const size_t readahead_size_; + + std::mutex lock_; + // The buffer storing the prefetched data + AlignedBuffer buffer_; + // The offset in file_, corresponding to data stored in buffer_ + uint64_t buffer_offset_; + // The offset up to which data was read from file_. In fact, it can be larger + // than the actual file size, since the file_->Skip(n) call doesn't return the + // actual number of bytes that were skipped, which can be less than n. + // This is not a problemm since read_offset_ is monotonically increasing and + // its only use is to figure out if next piece of data should be read from + // buffer_ or file_ directly. + uint64_t read_offset_; +}; } // namespace Status FilePrefetchBuffer::Prefetch(RandomAccessFileReader* reader, @@ -866,6 +1024,14 @@ std::unique_ptr NewReadaheadRandomAccessFile( return result; } +std::unique_ptr +SequentialFileReader::NewReadaheadSequentialFile( + std::unique_ptr&& file, size_t readahead_size) { + std::unique_ptr result( + new ReadaheadSequentialFile(std::move(file), readahead_size)); + return result; +} + Status NewWritableFile(Env* env, const std::string& fname, std::unique_ptr* result, const EnvOptions& options) { diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 3052ca8f4..a93274644 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -43,12 +43,18 @@ class SequentialFileReader { private: std::unique_ptr file_; std::string file_name_; - std::atomic offset_; // read offset + std::atomic offset_{0}; // read offset public: explicit SequentialFileReader(std::unique_ptr&& _file, const std::string& _file_name) - : file_(std::move(_file)), file_name_(_file_name), offset_(0) {} + : file_(std::move(_file)), file_name_(_file_name) {} + + explicit SequentialFileReader(std::unique_ptr&& _file, + const std::string& _file_name, + size_t _readahead_size) + : file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size)), + file_name_(_file_name) {} SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT { *this = std::move(o); @@ -66,13 +72,17 @@ class SequentialFileReader { Status Skip(uint64_t n); - void Rewind(); - SequentialFile* file() { return file_.get(); } std::string file_name() { return file_name_; } bool use_direct_io() const { return file_->use_direct_io(); } + + private: + // NewReadaheadSequentialFile provides a wrapper over SequentialFile to + // always prefetch additional data with every read. + static std::unique_ptr NewReadaheadSequentialFile( + std::unique_ptr&& file, size_t readahead_size); }; // RandomAccessFileReader is a wrapper on top of Env::RnadomAccessFile. It is diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index a4a9458d6..aa74303b8 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -275,7 +275,7 @@ TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenLessThanReadaheadSizeTest) { } TEST_P(ReadaheadRandomAccessFileTest, - SourceStrLenCanBeGreaterThanReadaheadSizeTest) { + SourceStrLenGreaterThanReadaheadSizeTest) { Random rng(42); for (int k = 0; k < 100; ++k) { size_t strLen = k * GetReadaheadSize() + @@ -286,13 +286,13 @@ TEST_P(ReadaheadRandomAccessFileTest, for (int test = 1; test <= 100; ++test) { size_t offset = rng.Uniform(static_cast(strLen)); size_t n = rng.Uniform(static_cast(GetReadaheadSize())); - ASSERT_EQ(str.substr(offset, std::min(n, str.size() - offset)), + ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), Read(offset, n)); } } } -TEST_P(ReadaheadRandomAccessFileTest, NExceedReadaheadTest) { +TEST_P(ReadaheadRandomAccessFileTest, ReadExceedsReadaheadSizeTest) { Random rng(7); size_t strLen = 4 * GetReadaheadSize() + rng.Uniform(static_cast(GetReadaheadSize())); @@ -303,7 +303,7 @@ TEST_P(ReadaheadRandomAccessFileTest, NExceedReadaheadTest) { size_t offset = rng.Uniform(static_cast(strLen)); size_t n = GetReadaheadSize() + rng.Uniform(static_cast(GetReadaheadSize())); - ASSERT_EQ(str.substr(offset, std::min(n, str.size() - offset)), + ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), Read(offset, n)); } } @@ -315,13 +315,118 @@ INSTANTIATE_TEST_CASE_P( SourceStrLenLessThanReadaheadSizeTest, ReadaheadRandomAccessFileTest, ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList())); INSTANTIATE_TEST_CASE_P( - SourceStrLenCanBeGreaterThanReadaheadSizeTest, - ReadaheadRandomAccessFileTest, + SourceStrLenGreaterThanReadaheadSizeTest, ReadaheadRandomAccessFileTest, ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList())); INSTANTIATE_TEST_CASE_P( - NExceedReadaheadTest, ReadaheadRandomAccessFileTest, + ReadExceedsReadaheadSizeTest, ReadaheadRandomAccessFileTest, ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList())); +class ReadaheadSequentialFileTest : public testing::Test, + public testing::WithParamInterface { + public: + static std::vector GetReadaheadSizeList() { + return {1lu << 12, 1lu << 16}; + } + void SetUp() override { + readahead_size_ = GetParam(); + scratch_.reset(new char[2 * readahead_size_]); + ResetSourceStr(); + } + ReadaheadSequentialFileTest() {} + std::string Read(size_t n) { + Slice result; + test_read_holder_->Read(n, &result, scratch_.get()); + return std::string(result.data(), result.size()); + } + void Skip(size_t n) { test_read_holder_->Skip(n); } + void ResetSourceStr(const std::string& str = "") { + auto read_holder = + std::unique_ptr(new test::SeqStringSource(str)); + test_read_holder_.reset(new SequentialFileReader(std::move(read_holder), + "test", readahead_size_)); + } + size_t GetReadaheadSize() const { return readahead_size_; } + + private: + size_t readahead_size_; + std::unique_ptr test_read_holder_; + std::unique_ptr scratch_; +}; + +TEST_P(ReadaheadSequentialFileTest, EmptySourceStrTest) { + ASSERT_EQ("", Read(0)); + ASSERT_EQ("", Read(1)); + ASSERT_EQ("", Read(13)); +} + +TEST_P(ReadaheadSequentialFileTest, SourceStrLenLessThanReadaheadSizeTest) { + std::string str = "abcdefghijklmnopqrs"; + ResetSourceStr(str); + ASSERT_EQ(str.substr(0, 3), Read(3)); + ASSERT_EQ(str.substr(3, 1), Read(1)); + ASSERT_EQ(str.substr(4), Read(str.size())); + ASSERT_EQ("", Read(100)); +} + +TEST_P(ReadaheadSequentialFileTest, SourceStrLenGreaterThanReadaheadSizeTest) { + Random rng(42); + for (int s = 0; s < 1; ++s) { + for (int k = 0; k < 100; ++k) { + size_t strLen = k * GetReadaheadSize() + + rng.Uniform(static_cast(GetReadaheadSize())); + std::string str = + test::RandomHumanReadableString(&rng, static_cast(strLen)); + ResetSourceStr(str); + size_t offset = 0; + for (int test = 1; test <= 100; ++test) { + size_t n = rng.Uniform(static_cast(GetReadaheadSize())); + if (s && test % 2) { + Skip(n); + } else { + ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), Read(n)); + } + offset = std::min(offset + n, strLen); + } + } + } +} + +TEST_P(ReadaheadSequentialFileTest, ReadExceedsReadaheadSizeTest) { + Random rng(42); + for (int s = 0; s < 1; ++s) { + for (int k = 0; k < 100; ++k) { + size_t strLen = k * GetReadaheadSize() + + rng.Uniform(static_cast(GetReadaheadSize())); + std::string str = + test::RandomHumanReadableString(&rng, static_cast(strLen)); + ResetSourceStr(str); + size_t offset = 0; + for (int test = 1; test <= 100; ++test) { + size_t n = GetReadaheadSize() + + rng.Uniform(static_cast(GetReadaheadSize())); + if (s && test % 2) { + Skip(n); + } else { + ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), Read(n)); + } + offset = std::min(offset + n, strLen); + } + } + } +} + +INSTANTIATE_TEST_CASE_P( + EmptySourceStrTest, ReadaheadSequentialFileTest, + ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList())); +INSTANTIATE_TEST_CASE_P( + SourceStrLenLessThanReadaheadSizeTest, ReadaheadSequentialFileTest, + ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList())); +INSTANTIATE_TEST_CASE_P( + SourceStrLenGreaterThanReadaheadSizeTest, ReadaheadSequentialFileTest, + ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList())); +INSTANTIATE_TEST_CASE_P( + ReadExceedsReadaheadSizeTest, ReadaheadSequentialFileTest, + ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList())); } // namespace rocksdb int main(int argc, char** argv) {