Roundup read bytes in ReadaheadRandomAccessFile

Summary:
Fix alignment in ReadaheadRandomAccessFile
Closes https://github.com/facebook/rocksdb/pull/2253

Differential Revision: D5012336

Pulled By: lightmark

fbshipit-source-id: 10d2c829520cb787227ef653ef63d5d701725778
This commit is contained in:
Aaron Gao 2017-05-05 11:58:10 -07:00
parent 49412d93e2
commit 459e00b365
3 changed files with 25 additions and 2 deletions

View File

@ -2601,6 +2601,7 @@ TEST_P(DBCompactionDirectIOTest, DirectIO) {
options.use_direct_io_for_flush_and_compaction = GetParam(); options.use_direct_io_for_flush_and_compaction = GetParam();
options.env = new MockEnv(Env::Default()); options.env = new MockEnv(Env::Default());
Reopen(options); Reopen(options);
bool readahead = false;
SyncPoint::GetInstance()->SetCallBack( SyncPoint::GetInstance()->SetCallBack(
"TableCache::NewIterator:for_compaction", [&](void* arg) { "TableCache::NewIterator:for_compaction", [&](void* arg) {
bool* use_direct_reads = static_cast<bool*>(arg); bool* use_direct_reads = static_cast<bool*>(arg);
@ -2613,11 +2614,18 @@ TEST_P(DBCompactionDirectIOTest, DirectIO) {
ASSERT_EQ(*use_direct_writes, ASSERT_EQ(*use_direct_writes,
options.use_direct_io_for_flush_and_compaction); options.use_direct_io_for_flush_and_compaction);
}); });
if (options.use_direct_io_for_flush_and_compaction) {
SyncPoint::GetInstance()->SetCallBack(
"SanitizeOptions:direct_io", [&](void* arg) {
readahead = true;
});
}
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
MakeTables(3, "p", "q", 1); MakeTables(3, "p", "q", 1);
ASSERT_EQ("1,1,1", FilesPerLevel(1)); ASSERT_EQ("1,1,1", FilesPerLevel(1));
Compact(1, "p1", "p9"); Compact(1, "p1", "p9");
ASSERT_FALSE(readahead ^ options.use_direct_io_for_flush_and_compaction);
ASSERT_EQ("0,0,1", FilesPerLevel(1)); ASSERT_EQ("0,0,1", FilesPerLevel(1));
Destroy(options); Destroy(options);
delete options.env; delete options.env;

View File

@ -97,7 +97,9 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max()); result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
} }
if (result.use_direct_reads && result.compaction_readahead_size == 0) { if (result.use_direct_io_for_flush_and_compaction &&
result.compaction_readahead_size == 0) {
TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr);
result.compaction_readahead_size = 1024 * 1024 * 2; result.compaction_readahead_size = 1024 * 1024 * 2;
} }

View File

@ -21,6 +21,16 @@
namespace rocksdb { namespace rocksdb {
#ifndef NDEBUG
namespace {
bool IsSectorAligned(const size_t off, size_t sector_size) {
return off % sector_size == 0;
}
}
#endif
Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
Status s; Status s;
if (use_direct_io()) { if (use_direct_io()) {
@ -511,7 +521,8 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
if (prefetch_offset == buffer_offset_) { if (prefetch_offset == buffer_offset_) {
return Status::OK(); return Status::OK();
} }
return ReadIntoBuffer(prefetch_offset, offset - prefetch_offset + n); return ReadIntoBuffer(prefetch_offset,
Roundup(offset + n, alignment_) - prefetch_offset);
} }
virtual size_t GetUniqueId(char* id, size_t max_size) const override { virtual size_t GetUniqueId(char* id, size_t max_size) const override {
@ -546,6 +557,8 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
if (n > buffer_.Capacity()) { if (n > buffer_.Capacity()) {
n = buffer_.Capacity(); n = buffer_.Capacity();
} }
assert(IsSectorAligned(offset, alignment_));
assert(IsSectorAligned(n, alignment_));
Slice result; Slice result;
Status s = file_->Read(offset, n, &result, buffer_.BufferStart()); Status s = file_->Read(offset, n, &result, buffer_.BufferStart());
if (s.ok()) { if (s.ok()) {