Merge pull request #968 from yuslepukhin/one_shot_buffer
Enable per-request buffer allocation in RandomAccessFile
This commit is contained in:
commit
b5750790e0
@ -1121,6 +1121,9 @@ struct DBOptions {
|
|||||||
// This option is currently honored only on Windows
|
// This option is currently honored only on Windows
|
||||||
//
|
//
|
||||||
// Default: 1 Mb
|
// Default: 1 Mb
|
||||||
|
//
|
||||||
|
// Special value: 0 - means do not maintain per instance buffer. Allocate
|
||||||
|
// per request buffer and avoid locking.
|
||||||
size_t random_access_max_buffer_size;
|
size_t random_access_max_buffer_size;
|
||||||
|
|
||||||
// This is the maximum buffer size that is used by WritableFileWriter.
|
// This is the maximum buffer size that is used by WritableFileWriter.
|
||||||
|
@ -766,6 +766,18 @@ class WinRandomAccessFile : public RandomAccessFile {
|
|||||||
return read;
|
return read;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void CalculateReadParameters(uint64_t offset, size_t bytes_requested,
|
||||||
|
size_t& actual_bytes_toread,
|
||||||
|
uint64_t& first_page_start) const {
|
||||||
|
|
||||||
|
const size_t alignment = buffer_.Alignment();
|
||||||
|
|
||||||
|
first_page_start = TruncateToPageBoundary(alignment, offset);
|
||||||
|
const uint64_t last_page_start =
|
||||||
|
TruncateToPageBoundary(alignment, offset + bytes_requested - 1);
|
||||||
|
actual_bytes_toread = (last_page_start - first_page_start) + alignment;
|
||||||
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment,
|
WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment,
|
||||||
const EnvOptions& options)
|
const EnvOptions& options)
|
||||||
@ -797,66 +809,87 @@ class WinRandomAccessFile : public RandomAccessFile {
|
|||||||
|
|
||||||
virtual Status Read(uint64_t offset, size_t n, Slice* result,
|
virtual Status Read(uint64_t offset, size_t n, Slice* result,
|
||||||
char* scratch) const override {
|
char* scratch) const override {
|
||||||
|
|
||||||
Status s;
|
Status s;
|
||||||
SSIZE_T r = -1;
|
SSIZE_T r = -1;
|
||||||
size_t left = n;
|
size_t left = n;
|
||||||
char* dest = scratch;
|
char* dest = scratch;
|
||||||
|
|
||||||
|
if (n == 0) {
|
||||||
|
*result = Slice(scratch, 0);
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
// When in unbuffered mode we need to do the following changes:
|
// When in unbuffered mode we need to do the following changes:
|
||||||
// - use our own aligned buffer
|
// - use our own aligned buffer
|
||||||
// - always read at the offset of that is a multiple of alignment
|
// - always read at the offset of that is a multiple of alignment
|
||||||
if (!use_os_buffer_) {
|
if (!use_os_buffer_) {
|
||||||
std::unique_lock<std::mutex> lock(buffer_mut_);
|
|
||||||
|
|
||||||
// Let's see if at least some of the requested data is already
|
uint64_t first_page_start = 0;
|
||||||
// in the buffer
|
size_t actual_bytes_toread = 0;
|
||||||
if (offset >= buffered_start_ &&
|
size_t bytes_requested = left;
|
||||||
|
|
||||||
|
if (!read_ahead_ && random_access_max_buffer_size_ == 0) {
|
||||||
|
CalculateReadParameters(offset, bytes_requested, actual_bytes_toread,
|
||||||
|
first_page_start);
|
||||||
|
|
||||||
|
assert(actual_bytes_toread > 0);
|
||||||
|
|
||||||
|
r = ReadIntoOneShotBuffer(offset, first_page_start,
|
||||||
|
actual_bytes_toread, left, dest);
|
||||||
|
} else {
|
||||||
|
|
||||||
|
std::unique_lock<std::mutex> lock(buffer_mut_);
|
||||||
|
|
||||||
|
// Let's see if at least some of the requested data is already
|
||||||
|
// in the buffer
|
||||||
|
if (offset >= buffered_start_ &&
|
||||||
offset < (buffered_start_ + buffer_.CurrentSize())) {
|
offset < (buffered_start_ + buffer_.CurrentSize())) {
|
||||||
size_t buffer_offset = offset - buffered_start_;
|
size_t buffer_offset = offset - buffered_start_;
|
||||||
r = buffer_.Read(dest, buffer_offset, left);
|
r = buffer_.Read(dest, buffer_offset, left);
|
||||||
assert(r >= 0);
|
assert(r >= 0);
|
||||||
|
|
||||||
left -= size_t(r);
|
left -= size_t(r);
|
||||||
offset += r;
|
offset += r;
|
||||||
dest += r;
|
dest += r;
|
||||||
}
|
|
||||||
|
|
||||||
// Still some left or none was buffered
|
|
||||||
if (left > 0) {
|
|
||||||
// Figure out the start/end offset for reading and amount to read
|
|
||||||
const size_t alignment = buffer_.Alignment();
|
|
||||||
const size_t first_page_start =
|
|
||||||
TruncateToPageBoundary(alignment, offset);
|
|
||||||
|
|
||||||
size_t bytes_requested = left;
|
|
||||||
if (read_ahead_ && bytes_requested < compaction_readahead_size_) {
|
|
||||||
bytes_requested = compaction_readahead_size_;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const size_t last_page_start =
|
// Still some left or none was buffered
|
||||||
TruncateToPageBoundary(alignment, offset + bytes_requested - 1);
|
if (left > 0) {
|
||||||
const size_t actual_bytes_toread =
|
// Figure out the start/end offset for reading and amount to read
|
||||||
(last_page_start - first_page_start) + alignment;
|
bytes_requested = left;
|
||||||
|
|
||||||
if (buffer_.Capacity() < actual_bytes_toread) {
|
if (read_ahead_ && bytes_requested < compaction_readahead_size_) {
|
||||||
// If we are in read-ahead mode or the requested size
|
bytes_requested = compaction_readahead_size_;
|
||||||
// exceeds max buffer size then use one-shot
|
}
|
||||||
// big buffer otherwise reallocate main buffer
|
|
||||||
if (read_ahead_ ||
|
CalculateReadParameters(offset, bytes_requested, actual_bytes_toread,
|
||||||
(actual_bytes_toread > random_access_max_buffer_size_)) {
|
first_page_start);
|
||||||
// Unlock the mutex since we are not using instance buffer
|
|
||||||
lock.unlock();
|
assert(actual_bytes_toread > 0);
|
||||||
r = ReadIntoOneShotBuffer(offset, first_page_start,
|
|
||||||
actual_bytes_toread, left, dest);
|
if (buffer_.Capacity() < actual_bytes_toread) {
|
||||||
} else {
|
// If we are in read-ahead mode or the requested size
|
||||||
buffer_.AllocateNewBuffer(actual_bytes_toread);
|
// exceeds max buffer size then use one-shot
|
||||||
r = ReadIntoInstanceBuffer(offset, first_page_start,
|
// big buffer otherwise reallocate main buffer
|
||||||
actual_bytes_toread, left, dest);
|
if (read_ahead_ ||
|
||||||
|
(actual_bytes_toread > random_access_max_buffer_size_)) {
|
||||||
|
// Unlock the mutex since we are not using instance buffer
|
||||||
|
lock.unlock();
|
||||||
|
r = ReadIntoOneShotBuffer(offset, first_page_start,
|
||||||
|
actual_bytes_toread, left, dest);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
buffer_.AllocateNewBuffer(actual_bytes_toread);
|
||||||
|
r = ReadIntoInstanceBuffer(offset, first_page_start,
|
||||||
|
actual_bytes_toread, left, dest);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
buffer_.Clear();
|
||||||
|
r = ReadIntoInstanceBuffer(offset, first_page_start,
|
||||||
|
actual_bytes_toread, left, dest);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
buffer_.Clear();
|
|
||||||
r = ReadIntoInstanceBuffer(offset, first_page_start,
|
|
||||||
actual_bytes_toread, left, dest);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
Loading…
Reference in New Issue
Block a user