diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 28bffe735..911b69ea4 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -68,6 +68,12 @@ struct EnvOptions { // If true, then use mmap to write data bool use_mmap_writes = true; + // If true, then use O_DIRECT for reading data + bool use_direct_reads = false; + + // If true, then use O_DIRECT for writing data + bool use_direct_writes = false; + // If false, fallocate() calls are bypassed bool allow_fallocate = true; diff --git a/util/env_posix.cc b/util/env_posix.cc index ff4ab9f21..dc4bc8ae6 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -152,6 +152,17 @@ class PosixEnv : public Env { if (f == nullptr) { *result = nullptr; return IOError(fname, errno); + } else if (options.use_direct_reads && !options.use_mmap_writes) { + int flags = O_RDONLY | O_DIRECT; + TEST_SYNC_POINT_CALLBACK("NewSequentialFile:O_DIRECT", &flags); + int fd = open(fname.c_str(), flags, 0644); + if (fd < 0) { + return IOError(fname, errno); + } + std::unique_ptr file( + new PosixDirectIOSequentialFile(fname, fd)); + *result = std::move(file); + return Status::OK(); } else { int fd = fileno(f); SetFD_CLOEXEC(fd, &options); @@ -189,6 +200,18 @@ class PosixEnv : public Env { } } close(fd); + } else if (options.use_direct_reads) { + int flags = O_RDONLY | O_DIRECT; + TEST_SYNC_POINT_CALLBACK("NewRandomAccessFile:O_DIRECT", &flags); + fd = open(fname.c_str(), flags, 0644); + if (fd < 0) { + s = IOError(fname, errno); + } else { + std::unique_ptr file( + new PosixDirectIORandomAccessFile(fname, fd)); + *result = std::move(file); + s = Status::OK(); + } } else { result->reset(new PosixRandomAccessFile(fname, fd, options)); } @@ -221,6 +244,18 @@ class PosixEnv : public Env { } if (options.use_mmap_writes && !forceMmapOff) { result->reset(new PosixMmapFile(fname, fd, page_size_, options)); + } else if (options.use_direct_writes) { + int flags = O_WRONLY | O_APPEND | O_TRUNC | O_CREAT | O_DIRECT; + TEST_SYNC_POINT_CALLBACK("NewWritableFile:O_DIRECT", &flags); + fd = open(fname.c_str(), flags, 0644); + if (fd < 0) { + s = IOError(fname, errno); + } else { + std::unique_ptr file( + new PosixDirectIOWritableFile(fname, fd)); + *result = std::move(file); + s = Status::OK(); + } } else { // disable mmap writes EnvOptions no_mmap_writes_options = options; @@ -763,6 +798,9 @@ std::string Env::GenerateUniqueId() { return uuid2; } +// +// Default Posix Env +// Env* Env::Default() { // The following function call initializes the singletons of ThreadLocalPtr // right before the static default_env. This guarantees default_env will diff --git a/util/env_test.cc b/util/env_test.cc index 8fe868c6e..a5e703077 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -36,6 +36,7 @@ #include "util/log_buffer.h" #include "util/mutexlock.h" #include "util/string_util.h" +#include "util/sync_point.h" #include "util/testharness.h" #include "util/testutil.h" @@ -43,6 +44,17 @@ namespace rocksdb { static const int kDelayMicros = 100000; +std::unique_ptr NewAligned(const size_t size, + const char ch) { + char* ptr = nullptr; + if (posix_memalign(reinterpret_cast(&ptr), 4 * 1024, size) != 0) { + return std::unique_ptr(nullptr, free); + } + std::unique_ptr uptr(ptr, free); + memset(uptr.get(), ch, size); + return uptr; +} + class EnvPosixTest : public testing::Test { private: port::Mutex mu_; @@ -553,108 +565,119 @@ class IoctlFriendlyTmpdir { // Only works in linux platforms TEST_F(EnvPosixTest, RandomAccessUniqueID) { - // Create file. - const EnvOptions soptions; - IoctlFriendlyTmpdir ift; - std::string fname = ift.name() + "/testfile"; - unique_ptr wfile; - ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); + for (bool directio : {true, false}) { + // Create file. + EnvOptions soptions; + soptions.use_direct_reads = soptions.use_direct_writes = directio; + IoctlFriendlyTmpdir ift; + std::string fname = ift.name() + "/testfile"; + unique_ptr wfile; + ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); - unique_ptr file; + unique_ptr file; - // Get Unique ID - ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); - size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE); - ASSERT_TRUE(id_size > 0); - std::string unique_id1(temp_id, id_size); - ASSERT_TRUE(IsUniqueIDValid(unique_id1)); + // Get Unique ID + ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); + size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE); + ASSERT_TRUE(id_size > 0); + std::string unique_id1(temp_id, id_size); + ASSERT_TRUE(IsUniqueIDValid(unique_id1)); - // Get Unique ID again - ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); - id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE); - ASSERT_TRUE(id_size > 0); - std::string unique_id2(temp_id, id_size); - ASSERT_TRUE(IsUniqueIDValid(unique_id2)); + // Get Unique ID again + ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); + id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE); + ASSERT_TRUE(id_size > 0); + std::string unique_id2(temp_id, id_size); + ASSERT_TRUE(IsUniqueIDValid(unique_id2)); - // Get Unique ID again after waiting some time. - env_->SleepForMicroseconds(1000000); - ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); - id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE); - ASSERT_TRUE(id_size > 0); - std::string unique_id3(temp_id, id_size); - ASSERT_TRUE(IsUniqueIDValid(unique_id3)); + // Get Unique ID again after waiting some time. + env_->SleepForMicroseconds(1000000); + ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); + id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE); + ASSERT_TRUE(id_size > 0); + std::string unique_id3(temp_id, id_size); + ASSERT_TRUE(IsUniqueIDValid(unique_id3)); - // Check IDs are the same. - ASSERT_EQ(unique_id1, unique_id2); - ASSERT_EQ(unique_id2, unique_id3); + // Check IDs are the same. + ASSERT_EQ(unique_id1, unique_id2); + ASSERT_EQ(unique_id2, unique_id3); - // Delete the file - env_->DeleteFile(fname); + // Delete the file + env_->DeleteFile(fname); + } } // only works in linux platforms #ifdef ROCKSDB_FALLOCATE_PRESENT TEST_F(EnvPosixTest, AllocateTest) { - IoctlFriendlyTmpdir ift; - std::string fname = ift.name() + "/preallocate_testfile"; + for (bool directio : {true, false}) { + IoctlFriendlyTmpdir ift; + std::string fname = ift.name() + "/preallocate_testfile"; - // Try fallocate in a file to see whether the target file system supports it. - // Skip the test if fallocate is not supported. - std::string fname_test_fallocate = ift.name() + "/preallocate_testfile_2"; - int fd = -1; - do { - fd = open(fname_test_fallocate.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644); - } while (fd < 0 && errno == EINTR); - ASSERT_GT(fd, 0); + // Try fallocate in a file to see whether the target file system supports + // it. + // Skip the test if fallocate is not supported. + std::string fname_test_fallocate = ift.name() + "/preallocate_testfile_2"; + int fd = -1; + do { + fd = open(fname_test_fallocate.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644); + } while (fd < 0 && errno == EINTR); + ASSERT_GT(fd, 0); - int alloc_status = fallocate(fd, 0, 0, 1); + int alloc_status = fallocate(fd, 0, 0, 1); - int err_number = 0; - if (alloc_status != 0) { - err_number = errno; - fprintf(stderr, "Warning: fallocate() fails, %s\n", strerror(err_number)); + int err_number = 0; + if (alloc_status != 0) { + err_number = errno; + fprintf(stderr, "Warning: fallocate() fails, %s\n", strerror(err_number)); + } + close(fd); + ASSERT_OK(env_->DeleteFile(fname_test_fallocate)); + if (alloc_status != 0 && err_number == EOPNOTSUPP) { + // The filesystem containing the file does not support fallocate + return; + } + + EnvOptions soptions; + soptions.use_mmap_writes = false; + soptions.use_direct_reads = soptions.use_direct_writes = directio; + unique_ptr wfile; + ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); + + // allocate 100 MB + size_t kPreallocateSize = 100 * 1024 * 1024; + size_t kBlockSize = 512; + size_t kPageSize = 4096; + size_t kDataSize = 1024 * 1024; + auto data_ptr = NewAligned(kDataSize, 'A'); + Slice data(data_ptr.get(), kDataSize); + wfile->SetPreallocationBlockSize(kPreallocateSize); + wfile->PrepareWrite(wfile->GetFileSize(), kDataSize); + ASSERT_OK(wfile->Append(data)); + ASSERT_OK(wfile->Flush()); + + struct stat f_stat; + ASSERT_EQ(stat(fname.c_str(), &f_stat), 0); + ASSERT_EQ((unsigned int)kDataSize, f_stat.st_size); + // verify that blocks are preallocated + // Note here that we don't check the exact number of blocks preallocated -- + // we only require that number of allocated blocks is at least what we + // expect. + // It looks like some FS give us more blocks that we asked for. That's fine. + // It might be worth investigating further. + ASSERT_LE((unsigned int)(kPreallocateSize / kBlockSize), f_stat.st_blocks); + + // close the file, should deallocate the blocks + wfile.reset(); + + stat(fname.c_str(), &f_stat); + ASSERT_EQ((unsigned int)kDataSize, f_stat.st_size); + // verify that preallocated blocks were deallocated on file close + // Because the FS might give us more blocks, we add a full page to the size + // and expect the number of blocks to be less or equal to that. + ASSERT_GE((f_stat.st_size + kPageSize + kBlockSize - 1) / kBlockSize, + (unsigned int)f_stat.st_blocks); } - close(fd); - ASSERT_OK(env_->DeleteFile(fname_test_fallocate)); - if (alloc_status != 0 && err_number == EOPNOTSUPP) { - // The filesystem containing the file does not support fallocate - return; - } - - EnvOptions soptions; - soptions.use_mmap_writes = false; - unique_ptr wfile; - ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); - - // allocate 100 MB - size_t kPreallocateSize = 100 * 1024 * 1024; - size_t kBlockSize = 512; - size_t kPageSize = 4096; - std::string data(1024 * 1024, 'a'); - wfile->SetPreallocationBlockSize(kPreallocateSize); - wfile->PrepareWrite(wfile->GetFileSize(), data.size()); - ASSERT_OK(wfile->Append(Slice(data))); - ASSERT_OK(wfile->Flush()); - - struct stat f_stat; - stat(fname.c_str(), &f_stat); - ASSERT_EQ((unsigned int)data.size(), f_stat.st_size); - // verify that blocks are preallocated - // Note here that we don't check the exact number of blocks preallocated -- - // we only require that number of allocated blocks is at least what we expect. - // It looks like some FS give us more blocks that we asked for. That's fine. - // It might be worth investigating further. - ASSERT_LE((unsigned int)(kPreallocateSize / kBlockSize), f_stat.st_blocks); - - // close the file, should deallocate the blocks - wfile.reset(); - - stat(fname.c_str(), &f_stat); - ASSERT_EQ((unsigned int)data.size(), f_stat.st_size); - // verify that preallocated blocks were deallocated on file close - // Because the FS might give us more blocks, we add a full page to the size - // and expect the number of blocks to be less or equal to that. - ASSERT_GE((f_stat.st_size + kPageSize + kBlockSize - 1) / kBlockSize, (unsigned int)f_stat.st_blocks); } #endif // ROCKSDB_FALLOCATE_PRESENT @@ -675,119 +698,159 @@ bool HasPrefix(const std::unordered_set& ss) { // Only works in linux platforms TEST_F(EnvPosixTest, RandomAccessUniqueIDConcurrent) { - // Check whether a bunch of concurrently existing files have unique IDs. - const EnvOptions soptions; + for (bool directio : {true, false}) { + // Check whether a bunch of concurrently existing files have unique IDs. + EnvOptions soptions; + soptions.use_direct_reads = soptions.use_direct_writes = directio; - // Create the files - IoctlFriendlyTmpdir ift; - std::vector fnames; - for (int i = 0; i < 1000; ++i) { - fnames.push_back(ift.name() + "/" + "testfile" + ToString(i)); + // Create the files + IoctlFriendlyTmpdir ift; + std::vector fnames; + for (int i = 0; i < 1000; ++i) { + fnames.push_back(ift.name() + "/" + "testfile" + ToString(i)); - // Create file. - unique_ptr wfile; - ASSERT_OK(env_->NewWritableFile(fnames[i], &wfile, soptions)); - } - - // Collect and check whether the IDs are unique. - std::unordered_set ids; - for (const std::string fname: fnames) { - unique_ptr file; - std::string unique_id; - ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); - size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE); - ASSERT_TRUE(id_size > 0); - unique_id = std::string(temp_id, id_size); - ASSERT_TRUE(IsUniqueIDValid(unique_id)); - - ASSERT_TRUE(ids.count(unique_id) == 0); - ids.insert(unique_id); - } - - // Delete the files - for (const std::string fname: fnames) { - ASSERT_OK(env_->DeleteFile(fname)); - } - - ASSERT_TRUE(!HasPrefix(ids)); -} - -// Only works in linux platforms -TEST_F(EnvPosixTest, RandomAccessUniqueIDDeletes) { - const EnvOptions soptions; - - IoctlFriendlyTmpdir ift; - std::string fname = ift.name() + "/" + "testfile"; - - // Check that after file is deleted we don't get same ID again in a new file. - std::unordered_set ids; - for (int i = 0; i < 1000; ++i) { - // Create file. - { + // Create file. unique_ptr wfile; - ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); + ASSERT_OK(env_->NewWritableFile(fnames[i], &wfile, soptions)); } - // Get Unique ID - std::string unique_id; - { + // Collect and check whether the IDs are unique. + std::unordered_set ids; + for (const std::string fname : fnames) { unique_ptr file; + std::string unique_id; ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE); ASSERT_TRUE(id_size > 0); unique_id = std::string(temp_id, id_size); + ASSERT_TRUE(IsUniqueIDValid(unique_id)); + + ASSERT_TRUE(ids.count(unique_id) == 0); + ids.insert(unique_id); } - ASSERT_TRUE(IsUniqueIDValid(unique_id)); - ASSERT_TRUE(ids.count(unique_id) == 0); - ids.insert(unique_id); + // Delete the files + for (const std::string fname : fnames) { + ASSERT_OK(env_->DeleteFile(fname)); + } - // Delete the file - ASSERT_OK(env_->DeleteFile(fname)); + ASSERT_TRUE(!HasPrefix(ids)); } +} - ASSERT_TRUE(!HasPrefix(ids)); +// Only works in linux platforms +TEST_F(EnvPosixTest, RandomAccessUniqueIDDeletes) { + for (bool directio : {true, false}) { + EnvOptions soptions; + soptions.use_direct_reads = soptions.use_direct_writes = directio; + + IoctlFriendlyTmpdir ift; + std::string fname = ift.name() + "/" + "testfile"; + + // Check that after file is deleted we don't get same ID again in a new + // file. + std::unordered_set ids; + for (int i = 0; i < 1000; ++i) { + // Create file. + { + unique_ptr wfile; + ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); + } + + // Get Unique ID + std::string unique_id; + { + unique_ptr file; + ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); + size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE); + ASSERT_TRUE(id_size > 0); + unique_id = std::string(temp_id, id_size); + } + + ASSERT_TRUE(IsUniqueIDValid(unique_id)); + ASSERT_TRUE(ids.count(unique_id) == 0); + ids.insert(unique_id); + + // Delete the file + ASSERT_OK(env_->DeleteFile(fname)); + } + + ASSERT_TRUE(!HasPrefix(ids)); + } } // Only works in linux platforms TEST_P(EnvPosixTestWithParam, InvalidateCache) { - const EnvOptions soptions; - std::string fname = test::TmpDir(env_) + "/" + "testfile"; + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + for (bool directio : {true, false}) { + EnvOptions soptions; + soptions.use_direct_reads = soptions.use_direct_writes = directio; + std::string fname = test::TmpDir(env_) + "/" + "testfile"; - // Create file. - { - unique_ptr wfile; - ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); - ASSERT_OK(wfile.get()->Append(Slice("Hello world"))); - ASSERT_OK(wfile.get()->InvalidateCache(0, 0)); - ASSERT_OK(wfile.get()->Close()); - } + const size_t kSectorSize = 512; + auto data = NewAligned(kSectorSize, 'A'); + Slice slice(data.get(), kSectorSize); - // Random Read - { - unique_ptr file; - char scratch[100]; - Slice result; - ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); - ASSERT_OK(file.get()->Read(0, 11, &result, scratch)); - ASSERT_EQ(memcmp(scratch, "Hello world", 11), 0); - ASSERT_OK(file.get()->InvalidateCache(0, 11)); - ASSERT_OK(file.get()->InvalidateCache(0, 0)); - } + // Create file. + { + unique_ptr wfile; + if (soptions.use_direct_writes) { + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "NewWritableFile:O_DIRECT", [&](void* arg) { + int* val = static_cast(arg); + *val &= ~O_DIRECT; + }); + } - // Sequential Read - { - unique_ptr file; - char scratch[100]; - Slice result; - ASSERT_OK(env_->NewSequentialFile(fname, &file, soptions)); - ASSERT_OK(file.get()->Read(11, &result, scratch)); - ASSERT_EQ(memcmp(scratch, "Hello world", 11), 0); - ASSERT_OK(file.get()->InvalidateCache(0, 11)); - ASSERT_OK(file.get()->InvalidateCache(0, 0)); + ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); + ASSERT_OK(wfile.get()->Append(slice)); + ASSERT_OK(wfile.get()->InvalidateCache(0, 0)); + ASSERT_OK(wfile.get()->Close()); + } + + // Random Read + { + unique_ptr file; + char scratch[kSectorSize]; + Slice result; + if (soptions.use_direct_reads) { + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "NewRandomAccessFile:O_DIRECT", [&](void* arg) { + int* val = static_cast(arg); + *val &= ~O_DIRECT; + }); + } + + ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); + ASSERT_OK(file.get()->Read(0, kSectorSize, &result, scratch)); + ASSERT_EQ(memcmp(scratch, data.get(), kSectorSize), 0); + ASSERT_OK(file.get()->InvalidateCache(0, 11)); + ASSERT_OK(file.get()->InvalidateCache(0, 0)); + } + + // Sequential Read + { + unique_ptr file; + char scratch[kSectorSize]; + Slice result; + if (soptions.use_direct_reads) { + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "NewSequentialFile:O_DIRECT", [&](void* arg) { + int* val = static_cast(arg); + *val &= ~O_DIRECT; + }); + } + + ASSERT_OK(env_->NewSequentialFile(fname, &file, soptions)); + ASSERT_OK(file.get()->Read(kSectorSize, &result, scratch)); + ASSERT_EQ(memcmp(scratch, data.get(), kSectorSize), 0); + ASSERT_OK(file.get()->InvalidateCache(0, 11)); + ASSERT_OK(file.get()->InvalidateCache(0, 0)); + } + // Delete the file + ASSERT_OK(env_->DeleteFile(fname)); } - // Delete the file - ASSERT_OK(env_->DeleteFile(fname)); + rocksdb::SyncPoint::GetInstance()->ClearTrace(); } #endif // not TRAVIS #endif // OS_LINUX @@ -909,73 +972,109 @@ TEST_P(EnvPosixTestWithParam, LogBufferMaxSizeTest) { } TEST_P(EnvPosixTestWithParam, Preallocation) { - const std::string src = test::TmpDir(env_) + "/" + "testfile"; - unique_ptr srcfile; - const EnvOptions soptions; - ASSERT_OK(env_->NewWritableFile(src, &srcfile, soptions)); - srcfile->SetPreallocationBlockSize(1024 * 1024); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + for (bool directio : {true, false}) { + const std::string src = test::TmpDir(env_) + "/" + "testfile"; + unique_ptr srcfile; + EnvOptions soptions; + soptions.use_direct_reads = soptions.use_direct_writes = directio; + if (soptions.use_direct_writes) { + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "NewWritableFile:O_DIRECT", [&](void* arg) { + int* val = static_cast(arg); + *val &= ~O_DIRECT; + }); + } - // No writes should mean no preallocation - size_t block_size, last_allocated_block; - srcfile->GetPreallocationStatus(&block_size, &last_allocated_block); - ASSERT_EQ(last_allocated_block, 0UL); + ASSERT_OK(env_->NewWritableFile(src, &srcfile, soptions)); + srcfile->SetPreallocationBlockSize(1024 * 1024); - // Small write should preallocate one block - std::string str = "test"; - srcfile->PrepareWrite(srcfile->GetFileSize(), str.size()); - srcfile->Append(str); - srcfile->GetPreallocationStatus(&block_size, &last_allocated_block); - ASSERT_EQ(last_allocated_block, 1UL); + // No writes should mean no preallocation + size_t block_size, last_allocated_block; + srcfile->GetPreallocationStatus(&block_size, &last_allocated_block); + ASSERT_EQ(last_allocated_block, 0UL); - // Write an entire preallocation block, make sure we increased by two. - std::string buf(block_size, ' '); - srcfile->PrepareWrite(srcfile->GetFileSize(), buf.size()); - srcfile->Append(buf); - srcfile->GetPreallocationStatus(&block_size, &last_allocated_block); - ASSERT_EQ(last_allocated_block, 2UL); + // Small write should preallocate one block + size_t kStrSize = 512; + auto data = NewAligned(kStrSize, 'A'); + Slice str(data.get(), kStrSize); + srcfile->PrepareWrite(srcfile->GetFileSize(), kStrSize); + srcfile->Append(str); + srcfile->GetPreallocationStatus(&block_size, &last_allocated_block); + ASSERT_EQ(last_allocated_block, 1UL); - // Write five more blocks at once, ensure we're where we need to be. - buf = std::string(block_size * 5, ' '); - srcfile->PrepareWrite(srcfile->GetFileSize(), buf.size()); - srcfile->Append(buf); - srcfile->GetPreallocationStatus(&block_size, &last_allocated_block); - ASSERT_EQ(last_allocated_block, 7UL); + // Write an entire preallocation block, make sure we increased by two. + { + auto buf_ptr = NewAligned(block_size, ' '); + Slice buf(buf_ptr.get(), block_size); + srcfile->PrepareWrite(srcfile->GetFileSize(), block_size); + srcfile->Append(buf); + srcfile->GetPreallocationStatus(&block_size, &last_allocated_block); + ASSERT_EQ(last_allocated_block, 2UL); + } + + // Write five more blocks at once, ensure we're where we need to be. + { + auto buf_ptr = NewAligned(block_size * 5, ' '); + Slice buf = Slice(buf_ptr.get(), block_size * 5); + srcfile->PrepareWrite(srcfile->GetFileSize(), buf.size()); + srcfile->Append(buf); + srcfile->GetPreallocationStatus(&block_size, &last_allocated_block); + ASSERT_EQ(last_allocated_block, 7UL); + } + } + rocksdb::SyncPoint::GetInstance()->ClearTrace(); } // Test that the two ways to get children file attributes (in bulk or // individually) behave consistently. TEST_P(EnvPosixTestWithParam, ConsistentChildrenAttributes) { - const EnvOptions soptions; - const int kNumChildren = 10; + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + for (bool directio : {true, false}) { + EnvOptions soptions; + soptions.use_direct_reads = soptions.use_direct_writes = directio; + const int kNumChildren = 10; - std::string data; - for (int i = 0; i < kNumChildren; ++i) { - std::ostringstream oss; - oss << test::TmpDir(env_) << "/testfile_" << i; - const std::string path = oss.str(); - unique_ptr file; - ASSERT_OK(env_->NewWritableFile(path, &file, soptions)); - file->Append(data); - data.append("test"); - } - - std::vector file_attrs; - ASSERT_OK(env_->GetChildrenFileAttributes(test::TmpDir(env_), &file_attrs)); - for (int i = 0; i < kNumChildren; ++i) { - std::ostringstream oss; - oss << "testfile_" << i; - const std::string name = oss.str(); - const std::string path = test::TmpDir(env_) + "/" + name; - - auto file_attrs_iter = std::find_if( - file_attrs.begin(), file_attrs.end(), - [&name](const Env::FileAttributes& fm) { return fm.name == name; }); - ASSERT_TRUE(file_attrs_iter != file_attrs.end()); - uint64_t size; - ASSERT_OK(env_->GetFileSize(path, &size)); - ASSERT_EQ(size, 4 * i); - ASSERT_EQ(size, file_attrs_iter->size_bytes); + std::string data; + for (int i = 0; i < kNumChildren; ++i) { + std::ostringstream oss; + oss << test::TmpDir(env_) << "/testfile_" << i; + const std::string path = oss.str(); + unique_ptr file; + if (soptions.use_direct_writes) { + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "NewWritableFile:O_DIRECT", [&](void* arg) { + int* val = static_cast(arg); + *val &= ~O_DIRECT; + }); + } + + ASSERT_OK(env_->NewWritableFile(path, &file, soptions)); + auto buf_ptr = NewAligned(data.size(), 'T'); + Slice buf(buf_ptr.get(), data.size()); + file->Append(buf); + data.append(std::string(512, 'T')); + } + + std::vector file_attrs; + ASSERT_OK(env_->GetChildrenFileAttributes(test::TmpDir(env_), &file_attrs)); + for (int i = 0; i < kNumChildren; ++i) { + std::ostringstream oss; + oss << "testfile_" << i; + const std::string name = oss.str(); + const std::string path = test::TmpDir(env_) + "/" + name; + + auto file_attrs_iter = std::find_if( + file_attrs.begin(), file_attrs.end(), + [&name](const Env::FileAttributes& fm) { return fm.name == name; }); + ASSERT_TRUE(file_attrs_iter != file_attrs.end()); + uint64_t size; + ASSERT_OK(env_->GetFileSize(path, &size)); + ASSERT_EQ(size, 512 * i); + ASSERT_EQ(size, file_attrs_iter->size_bytes); + } } + rocksdb::SyncPoint::GetInstance()->ClearTrace(); } // Test that all WritableFileWrapper forwards all calls to WritableFile. diff --git a/util/io_posix.cc b/util/io_posix.cc index df1a6b6e7..9696769b5 100644 --- a/util/io_posix.cc +++ b/util/io_posix.cc @@ -12,6 +12,7 @@ #include "util/io_posix.h" #include #include +#include #if defined(OS_LINUX) #include #endif @@ -46,6 +47,112 @@ int Fadvise(int fd, off_t offset, size_t len, int advice) { #endif } +/* + * DirectIOHelper + */ +namespace { +const size_t kSectorSize = 512; +#ifdef OS_LINUX +const size_t kPageSize = sysconf(_SC_PAGESIZE); +#else +const size_t kPageSize = 4 * 1024; +#endif + +std::unique_ptr NewAligned(const size_t size) { + void* ptr = nullptr; + if (posix_memalign(&ptr, 4 * 1024, size) != 0) { + return std::unique_ptr(nullptr, free); + } + std::unique_ptr uptr(ptr, free); + return uptr; +} + +size_t Upper(const size_t size, const size_t fac) { + if (size % fac == 0) { + return size; + } + return size + (fac - size % fac); +} + +size_t Lower(const size_t size, const size_t fac) { + if (size % fac == 0) { + return size; + } + return size - (size % fac); +} + +bool IsSectorAligned(const size_t off) { return off % kSectorSize == 0; } + +static bool IsPageAligned(const void* ptr) { + return uintptr_t(ptr) % (kPageSize) == 0; +} + +Status ReadAligned(int fd, Slice* data, const uint64_t offset, + const size_t size, char* scratch) { + assert(IsSectorAligned(offset)); + assert(IsSectorAligned(size)); + assert(IsPageAligned(scratch)); + + size_t bytes_read = 0; + ssize_t status = -1; + while (bytes_read < size) { + status = + pread(fd, scratch + bytes_read, size - bytes_read, offset + bytes_read); + if (status <= 0) { + if (errno == EINTR) { + continue; + } + break; + } + bytes_read += status; + } + + *data = Slice(scratch, bytes_read); + return status < 0 ? Status::IOError(strerror(errno)) : Status::OK(); +} + +Status ReadUnaligned(int fd, Slice* data, const uint64_t offset, + const size_t size, char* scratch) { + assert(scratch); + assert(!IsSectorAligned(offset) || !IsSectorAligned(size) || + !IsPageAligned(scratch)); + + const uint64_t aligned_off = Lower(offset, kSectorSize); + const size_t aligned_size = Upper(size + (offset - aligned_off), kSectorSize); + auto aligned_scratch = NewAligned(aligned_size); + assert(aligned_scratch); + if (!aligned_scratch) { + return Status::IOError("Unable to allocate"); + } + + assert(IsSectorAligned(aligned_off)); + assert(IsSectorAligned(aligned_size)); + assert(aligned_scratch); + assert(IsPageAligned(aligned_scratch.get())); + assert(offset + size <= aligned_off + aligned_size); + + Slice scratch_slice; + Status s = ReadAligned(fd, &scratch_slice, aligned_off, aligned_size, + reinterpret_cast(aligned_scratch.get())); + + // copy data upto min(size, what was read) + memcpy(scratch, reinterpret_cast(aligned_scratch.get()) + + (offset % kSectorSize), + std::min(size, scratch_slice.size())); + *data = Slice(scratch, std::min(size, scratch_slice.size())); + return s; +} + +Status DirectIORead(int fd, Slice* result, size_t off, size_t n, + char* scratch) { + if (IsSectorAligned(off) && IsSectorAligned(n) && + IsPageAligned(result->data())) { + return ReadAligned(fd, result, off, n, scratch); + } + return ReadUnaligned(fd, result, off, n, scratch); +} +} // namespace + /* * PosixSequentialFile */ @@ -104,15 +211,37 @@ Status PosixSequentialFile::InvalidateCache(size_t offset, size_t length) { #endif } +/* + * PosixDirectIOSequentialFile + */ +Status PosixDirectIOSequentialFile::Read(size_t n, Slice* result, + char* scratch) { + const size_t off = off_.fetch_add(n); + return DirectIORead(fd_, result, off, n, scratch); +} + +Status PosixDirectIOSequentialFile::Skip(uint64_t n) { + off_ += n; + return Status::OK(); +} + +Status PosixDirectIOSequentialFile::InvalidateCache(size_t /*offset*/, + size_t /*length*/) { + return Status::OK(); +} + +/* + * PosixRandomAccessFile + */ #if defined(OS_LINUX) -namespace { -static size_t GetUniqueIdFromFile(int fd, char* id, size_t max_size) { +size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) { if (max_size < kMaxVarint64Length * 3) { return 0; } struct stat buf; int result = fstat(fd, &buf); + assert(result != -1); if (result == -1) { return 0; } @@ -132,12 +261,10 @@ static size_t GetUniqueIdFromFile(int fd, char* id, size_t max_size) { assert(rid >= id); return static_cast(rid - id); } -} // namespace #endif #if defined(OS_MACOSX) -namespace { -static size_t GetUniqueIdFromFile(int fd, char* id, size_t max_size) { +size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) { if (max_size < kMaxVarint64Length * 3) { return 0; } @@ -155,9 +282,7 @@ static size_t GetUniqueIdFromFile(int fd, char* id, size_t max_size) { assert(rid >= id); return static_cast(rid - id); } -} // namespace #endif - /* * PosixRandomAccessFile * @@ -206,7 +331,7 @@ Status PosixRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result, #if defined(OS_LINUX) || defined(OS_MACOSX) size_t PosixRandomAccessFile::GetUniqueId(char* id, size_t max_size) const { - return GetUniqueIdFromFile(fd_, id, max_size); + return PosixHelper::GetUniqueIdFromFile(fd_, id, max_size); } #endif @@ -246,6 +371,15 @@ Status PosixRandomAccessFile::InvalidateCache(size_t offset, size_t length) { #endif } +/* + * PosixDirectIORandomAccessFile + */ +Status PosixDirectIORandomAccessFile::Read(uint64_t offset, size_t n, + Slice* result, char* scratch) const { + Status s = DirectIORead(fd_, result, offset, n, scratch); + return s; +} + /* * PosixMmapReadableFile * @@ -663,10 +797,37 @@ Status PosixWritableFile::RangeSync(uint64_t offset, uint64_t nbytes) { } size_t PosixWritableFile::GetUniqueId(char* id, size_t max_size) const { - return GetUniqueIdFromFile(fd_, id, max_size); + return PosixHelper::GetUniqueIdFromFile(fd_, id, max_size); } #endif +/* + * PosixDirectIOWritableFile + */ +Status PosixDirectIOWritableFile::Append(const Slice& data) { + assert(IsSectorAligned(data.size()) && IsPageAligned(data.data())); + if (!IsSectorAligned(data.size()) || !IsPageAligned(data.data())) { + return Status::IOError("Unaligned buffer for direct IO"); + } + return PosixWritableFile::Append(data); +} + +Status PosixDirectIOWritableFile::PositionedAppend(const Slice& data, + uint64_t offset) { + assert(IsSectorAligned(offset)); + assert(IsSectorAligned(data.size())); + assert(IsPageAligned(data.data())); + if (!IsSectorAligned(offset) || !IsSectorAligned(data.size()) || + !IsPageAligned(data.data())) { + return Status::IOError("offset or size is not aligned"); + } + return PosixWritableFile::PositionedAppend(data, offset); +} + +/* + * PosixDirectory + */ + PosixDirectory::~PosixDirectory() { close(fd_); } Status PosixDirectory::Fsync() { diff --git a/util/io_posix.h b/util/io_posix.h index fb0c93fbf..7db8e23d9 100644 --- a/util/io_posix.h +++ b/util/io_posix.h @@ -8,6 +8,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once #include +#include #include "rocksdb/env.h" // For non linux platform, the following macros are used only as place @@ -26,6 +27,11 @@ static Status IOError(const std::string& context, int err_number) { return Status::IOError(context, strerror(err_number)); } +class PosixHelper { + public: + static size_t GetUniqueIdFromFile(int fd, char* id, size_t max_size); +}; + class PosixSequentialFile : public SequentialFile { private: std::string filename_; @@ -43,8 +49,25 @@ class PosixSequentialFile : public SequentialFile { virtual Status InvalidateCache(size_t offset, size_t length) override; }; -class PosixRandomAccessFile : public RandomAccessFile { +class PosixDirectIOSequentialFile : public SequentialFile { + public: + explicit PosixDirectIOSequentialFile(const std::string& filename, int fd) + : filename_(filename), fd_(fd) {} + + virtual ~PosixDirectIOSequentialFile() {} + + Status Read(size_t n, Slice* result, char* scratch) override; + Status Skip(uint64_t n) override; + Status InvalidateCache(size_t offset, size_t length) override; + private: + const std::string filename_; + int fd_ = -1; + std::atomic off_{0}; // read offset +}; + +class PosixRandomAccessFile : public RandomAccessFile { + protected: std::string filename_; int fd_; bool use_os_buffer_; @@ -63,8 +86,23 @@ class PosixRandomAccessFile : public RandomAccessFile { virtual Status InvalidateCache(size_t offset, size_t length) override; }; +// Direct IO random access file direct IO implementation +class PosixDirectIORandomAccessFile : public PosixRandomAccessFile { + public: + explicit PosixDirectIORandomAccessFile(const std::string& filename, int fd) + : PosixRandomAccessFile(filename, fd, EnvOptions()) {} + virtual ~PosixDirectIORandomAccessFile() {} + + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override; + virtual void Hint(AccessPattern pattern) override {} + Status InvalidateCache(size_t offset, size_t length) override { + return Status::OK(); + } +}; + class PosixWritableFile : public WritableFile { - private: + protected: const std::string filename_; int fd_; uint64_t filesize_; @@ -74,9 +112,9 @@ class PosixWritableFile : public WritableFile { #endif public: - PosixWritableFile(const std::string& fname, int fd, - const EnvOptions& options); - ~PosixWritableFile(); + explicit PosixWritableFile(const std::string& fname, int fd, + const EnvOptions& options); + virtual ~PosixWritableFile(); // Means Close() will properly take care of truncate // and it does not need any additional information @@ -96,6 +134,22 @@ class PosixWritableFile : public WritableFile { #endif }; +class PosixDirectIOWritableFile : public PosixWritableFile { + public: + explicit PosixDirectIOWritableFile(const std::string& filename, int fd) + : PosixWritableFile(filename, fd, EnvOptions()) {} + virtual ~PosixDirectIOWritableFile() {} + + bool UseOSBuffer() const override { return false; } + size_t GetRequiredBufferAlignment() const override { return 4 * 1024; } + Status Append(const Slice& data) override; + Status PositionedAppend(const Slice& data, uint64_t offset) override; + bool UseDirectIO() const override { return true; } + Status InvalidateCache(size_t offset, size_t length) override { + return Status::OK(); + } +}; + class PosixMmapReadableFile : public RandomAccessFile { private: int fd_;