alignment is on in ReadaheadRandomAccessFile::Read()
Summary: Closes https://github.com/facebook/rocksdb/pull/1857 Differential Revision: D4534518 Pulled By: wat-ze-hex fbshipit-source-id: b456946
This commit is contained in:
parent
dcafd40aed
commit
1f055723b7
@ -468,13 +468,15 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
|
|||||||
ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile>&& file,
|
ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile>&& file,
|
||||||
size_t readahead_size)
|
size_t readahead_size)
|
||||||
: file_(std::move(file)),
|
: file_(std::move(file)),
|
||||||
readahead_size_(readahead_size),
|
alignment_(file_->GetRequiredBufferAlignment()),
|
||||||
|
readahead_size_(Roundup(readahead_size, alignment_)),
|
||||||
forward_calls_(file_->ShouldForwardRawRequest()),
|
forward_calls_(file_->ShouldForwardRawRequest()),
|
||||||
buffer_(),
|
buffer_(),
|
||||||
buffer_offset_(0),
|
buffer_offset_(0),
|
||||||
buffer_len_(0) {
|
buffer_len_(0) {
|
||||||
if (!forward_calls_) {
|
if (!forward_calls_) {
|
||||||
buffer_.reset(new char[readahead_size_]);
|
buffer_.Alignment(alignment_);
|
||||||
|
buffer_.AllocateNewBuffer(readahead_size_ + alignment_);
|
||||||
} else if (readahead_size_ > 0) {
|
} else if (readahead_size_ > 0) {
|
||||||
file_->EnableReadAhead();
|
file_->EnableReadAhead();
|
||||||
}
|
}
|
||||||
@ -500,31 +502,45 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
|
|||||||
|
|
||||||
std::unique_lock<std::mutex> lk(lock_);
|
std::unique_lock<std::mutex> lk(lock_);
|
||||||
|
|
||||||
size_t copied = 0;
|
size_t cached_len = 0;
|
||||||
// if offset between [buffer_offset_, buffer_offset_ + buffer_len>
|
// Check if there is a cache hit, means that [offset, offset + n) is either
|
||||||
if (offset >= buffer_offset_ && offset < buffer_len_ + buffer_offset_) {
|
// complitely or partially in the buffer
|
||||||
uint64_t offset_in_buffer = offset - buffer_offset_;
|
// If it's completely cached, including end of file case when offset + n is
|
||||||
copied = std::min(buffer_len_ - static_cast<size_t>(offset_in_buffer), n);
|
// greater than EOF, return
|
||||||
memcpy(scratch, buffer_.get() + offset_in_buffer, copied);
|
if (TryReadFromCache_(offset, n, &cached_len, scratch) &&
|
||||||
if (copied == n) {
|
(cached_len == n ||
|
||||||
// fully cached
|
// End of file
|
||||||
*result = Slice(scratch, n);
|
buffer_len_ < readahead_size_ + alignment_)) {
|
||||||
return Status::OK();
|
*result = Slice(scratch, cached_len);
|
||||||
}
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
size_t advanced_offset = offset + cached_len;
|
||||||
|
// In the case of cache hit advanced_offset is already aligned, means that
|
||||||
|
// chunk_offset equals to advanced_offset
|
||||||
|
size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset);
|
||||||
Slice readahead_result;
|
Slice readahead_result;
|
||||||
Status s = file_->Read(offset + copied, readahead_size_, &readahead_result,
|
Status s = file_->Read(chunk_offset, readahead_size_ + alignment_,
|
||||||
buffer_.get());
|
&readahead_result, buffer_.BufferStart());
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
// In the case of cache miss, i.e. when cached_len equals 0, an offset can
|
||||||
|
// exceed the file end position, so the following check is required
|
||||||
|
if (advanced_offset < chunk_offset + readahead_result.size()) {
|
||||||
|
// In the case of cache miss, the first chunk_padding bytes in buffer_ are
|
||||||
|
// stored for alignment only and must be skipped
|
||||||
|
size_t chunk_padding = advanced_offset - chunk_offset;
|
||||||
|
auto remaining_len =
|
||||||
|
std::min(readahead_result.size() - chunk_padding, n - cached_len);
|
||||||
|
memcpy(scratch + cached_len, readahead_result.data() + chunk_padding,
|
||||||
|
remaining_len);
|
||||||
|
*result = Slice(scratch, cached_len + remaining_len);
|
||||||
|
} else {
|
||||||
|
*result = Slice(scratch, cached_len);
|
||||||
|
}
|
||||||
|
|
||||||
auto left_to_copy = std::min(readahead_result.size(), n - copied);
|
if (readahead_result.data() == buffer_.BufferStart()) {
|
||||||
memcpy(scratch + copied, readahead_result.data(), left_to_copy);
|
buffer_offset_ = chunk_offset;
|
||||||
*result = Slice(scratch, copied + left_to_copy);
|
|
||||||
|
|
||||||
if (readahead_result.data() == buffer_.get()) {
|
|
||||||
buffer_offset_ = offset + copied;
|
|
||||||
buffer_len_ = readahead_result.size();
|
buffer_len_ = readahead_result.size();
|
||||||
} else {
|
} else {
|
||||||
buffer_len_ = 0;
|
buffer_len_ = 0;
|
||||||
@ -548,12 +564,26 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
bool TryReadFromCache_(uint64_t offset, size_t n, size_t* cached_len,
|
||||||
|
char* scratch) const {
|
||||||
|
if (offset < buffer_offset_ || offset >= buffer_offset_ + buffer_len_) {
|
||||||
|
*cached_len = 0;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
uint64_t offset_in_buffer = offset - buffer_offset_;
|
||||||
|
*cached_len =
|
||||||
|
std::min(buffer_len_ - static_cast<size_t>(offset_in_buffer), n);
|
||||||
|
memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
std::unique_ptr<RandomAccessFile> file_;
|
std::unique_ptr<RandomAccessFile> file_;
|
||||||
|
const size_t alignment_;
|
||||||
size_t readahead_size_;
|
size_t readahead_size_;
|
||||||
const bool forward_calls_;
|
const bool forward_calls_;
|
||||||
|
|
||||||
mutable std::mutex lock_;
|
mutable std::mutex lock_;
|
||||||
mutable std::unique_ptr<char[]> buffer_;
|
mutable AlignedBuffer buffer_;
|
||||||
mutable uint64_t buffer_offset_;
|
mutable uint64_t buffer_offset_;
|
||||||
mutable size_t buffer_len_;
|
mutable size_t buffer_len_;
|
||||||
};
|
};
|
||||||
|
@ -3,10 +3,12 @@
|
|||||||
// LICENSE file in the root directory of this source tree. An additional grant
|
// LICENSE file in the root directory of this source tree. An additional grant
|
||||||
// of patent rights can be found in the PATENTS file in the same directory.
|
// of patent rights can be found in the PATENTS file in the same directory.
|
||||||
//
|
//
|
||||||
#include <vector>
|
|
||||||
#include "util/file_reader_writer.h"
|
#include "util/file_reader_writer.h"
|
||||||
|
#include <algorithm>
|
||||||
|
#include <vector>
|
||||||
#include "util/random.h"
|
#include "util/random.h"
|
||||||
#include "util/testharness.h"
|
#include "util/testharness.h"
|
||||||
|
#include "util/testutil.h"
|
||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
@ -125,6 +127,108 @@ TEST_F(WritableFileWriterTest, AppendStatusReturn) {
|
|||||||
ASSERT_NOK(writer->Append(std::string(2 * kMb, 'b')));
|
ASSERT_NOK(writer->Append(std::string(2 * kMb, 'b')));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class ReadaheadRandomAccessFileTest
|
||||||
|
: public testing::Test,
|
||||||
|
public testing::WithParamInterface<size_t> {
|
||||||
|
public:
|
||||||
|
static std::vector<size_t> GetReadaheadSizeList() {
|
||||||
|
return {1lu << 12, 1lu << 16};
|
||||||
|
}
|
||||||
|
virtual void SetUp() override {
|
||||||
|
readahead_size_ = GetParam();
|
||||||
|
scratch_.reset(new char[2 * readahead_size_]);
|
||||||
|
ResetSourceStr();
|
||||||
|
}
|
||||||
|
ReadaheadRandomAccessFileTest() : control_contents_() {}
|
||||||
|
std::string Read(uint64_t offset, size_t n) {
|
||||||
|
Slice result;
|
||||||
|
test_read_holder_->Read(offset, n, &result, scratch_.get());
|
||||||
|
return std::string(result.data(), result.size());
|
||||||
|
}
|
||||||
|
void ResetSourceStr(const std::string& str = "") {
|
||||||
|
auto write_holder = std::unique_ptr<WritableFileWriter>(
|
||||||
|
test::GetWritableFileWriter(new test::StringSink(&control_contents_)));
|
||||||
|
write_holder->Append(Slice(str));
|
||||||
|
write_holder->Flush();
|
||||||
|
auto read_holder = std::unique_ptr<RandomAccessFile>(
|
||||||
|
new test::StringSource(control_contents_));
|
||||||
|
test_read_holder_ =
|
||||||
|
NewReadaheadRandomAccessFile(std::move(read_holder), readahead_size_);
|
||||||
|
}
|
||||||
|
size_t GetReadaheadSize() const { return readahead_size_; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
size_t readahead_size_;
|
||||||
|
Slice control_contents_;
|
||||||
|
std::unique_ptr<RandomAccessFile> test_read_holder_;
|
||||||
|
std::unique_ptr<char[]> scratch_;
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST_P(ReadaheadRandomAccessFileTest, EmptySourceStrTest) {
|
||||||
|
ASSERT_EQ("", Read(0, 1));
|
||||||
|
ASSERT_EQ("", Read(0, 0));
|
||||||
|
ASSERT_EQ("", Read(13, 13));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenLessThanReadaheadSizeTest) {
|
||||||
|
std::string str = "abcdefghijklmnopqrs";
|
||||||
|
ResetSourceStr(str);
|
||||||
|
ASSERT_EQ(str.substr(3, 4), Read(3, 4));
|
||||||
|
ASSERT_EQ(str.substr(0, 3), Read(0, 3));
|
||||||
|
ASSERT_EQ(str, Read(0, str.size()));
|
||||||
|
ASSERT_EQ(str.substr(7, std::min(static_cast<int>(str.size()) - 7, 30)),
|
||||||
|
Read(7, 30));
|
||||||
|
ASSERT_EQ("", Read(100, 100));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_P(ReadaheadRandomAccessFileTest,
|
||||||
|
SourceStrLenCanBeGreaterThanReadaheadSizeTest) {
|
||||||
|
Random rng(42);
|
||||||
|
for (int k = 0; k < 100; ++k) {
|
||||||
|
size_t strLen = k * GetReadaheadSize() +
|
||||||
|
rng.Uniform(static_cast<int>(GetReadaheadSize()));
|
||||||
|
std::string str =
|
||||||
|
test::RandomHumanReadableString(&rng, static_cast<int>(strLen));
|
||||||
|
ResetSourceStr(str);
|
||||||
|
for (int test = 1; test <= 100; ++test) {
|
||||||
|
size_t offset = rng.Uniform(static_cast<int>(strLen));
|
||||||
|
size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize()));
|
||||||
|
ASSERT_EQ(str.substr(offset, std::min(n, str.size() - offset)),
|
||||||
|
Read(offset, n));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_P(ReadaheadRandomAccessFileTest, NExceedReadaheadTest) {
|
||||||
|
Random rng(7);
|
||||||
|
size_t strLen = 4 * GetReadaheadSize() +
|
||||||
|
rng.Uniform(static_cast<int>(GetReadaheadSize()));
|
||||||
|
std::string str =
|
||||||
|
test::RandomHumanReadableString(&rng, static_cast<int>(strLen));
|
||||||
|
ResetSourceStr(str);
|
||||||
|
for (int test = 1; test <= 100; ++test) {
|
||||||
|
size_t offset = rng.Uniform(static_cast<int>(strLen));
|
||||||
|
size_t n =
|
||||||
|
GetReadaheadSize() + rng.Uniform(static_cast<int>(GetReadaheadSize()));
|
||||||
|
ASSERT_EQ(str.substr(offset, std::min(n, str.size() - offset)),
|
||||||
|
Read(offset, n));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
INSTANTIATE_TEST_CASE_P(
|
||||||
|
EmptySourceStrTest, ReadaheadRandomAccessFileTest,
|
||||||
|
::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
|
||||||
|
INSTANTIATE_TEST_CASE_P(
|
||||||
|
SourceStrLenLessThanReadaheadSizeTest, ReadaheadRandomAccessFileTest,
|
||||||
|
::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
|
||||||
|
INSTANTIATE_TEST_CASE_P(
|
||||||
|
SourceStrLenCanBeGreaterThanReadaheadSizeTest,
|
||||||
|
ReadaheadRandomAccessFileTest,
|
||||||
|
::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
|
||||||
|
INSTANTIATE_TEST_CASE_P(
|
||||||
|
NExceedReadaheadTest, ReadaheadRandomAccessFileTest,
|
||||||
|
::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user