rocksdb/util/file_reader_writer.cc
sdong 7ccd1c80a7 Add two unit tests for SyncWAL()
Summary:
Add two unit tests for SyncWAL(). One makes sure SyncWAL() doesn't block writes in the other thread. Another one makes sure SyncWAL() doesn't wait ongoing writes to finish before being executed.

Create a new test file db_wal_test and move two WAL related tests from db_test to here.

Test Plan: Run the new tests

Reviewers: IslamAbdelRahman, rven, kradhakrishnan, kolmike, tnovak, yhchiang

Reviewed By: yhchiang

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D43605
2015-08-05 14:27:02 -07:00

262 lines
7.1 KiB
C++

// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/file_reader_writer.h"
#include <algorithm>
#include "port/port.h"
#include "util/iostats_context_imp.h"
#include "util/random.h"
#include "util/rate_limiter.h"
#include "util/sync_point.h"
namespace rocksdb {
Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
Status s = file_->Read(n, result, scratch);
IOSTATS_ADD(bytes_read, result->size());
return s;
}
Status SequentialFileReader::Skip(uint64_t n) { return file_->Skip(n); }
Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const {
StopWatch sw(env_, stats_, hist_type_);
IOSTATS_TIMER_GUARD(read_nanos);
Status s = file_->Read(offset, n, result, scratch);
IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
return s;
}
Status WritableFileWriter::Append(const Slice& data) {
const char* src = data.data();
size_t left = data.size();
Status s;
pending_sync_ = true;
pending_fsync_ = true;
TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2);
writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left);
// if there is no space in the cache, then flush
if (cursize_ + left > capacity_) {
s = Flush();
if (!s.ok()) {
return s;
}
// Increase the buffer size, but capped at 1MB
if (capacity_ < (1 << 20)) {
capacity_ *= 2;
buf_.reset(new char[capacity_]);
}
assert(cursize_ == 0);
}
// if the write fits into the cache, then write to cache
// otherwise do a write() syscall to write to OS buffers.
if (cursize_ + left <= capacity_) {
memcpy(buf_.get() + cursize_, src, left);
cursize_ += left;
} else {
while (left != 0) {
size_t size = RequestToken(left);
{
IOSTATS_TIMER_GUARD(write_nanos);
s = writable_file_->Append(Slice(src, size));
if (!s.ok()) {
return s;
}
}
IOSTATS_ADD(bytes_written, size);
TEST_KILL_RANDOM(rocksdb_kill_odds);
left -= size;
src += size;
}
}
TEST_KILL_RANDOM(rocksdb_kill_odds);
filesize_ += data.size();
return Status::OK();
}
Status WritableFileWriter::Close() {
Status s;
s = Flush(); // flush cache to OS
if (!s.ok()) {
return s;
}
TEST_KILL_RANDOM(rocksdb_kill_odds);
return writable_file_->Close();
}
// write out the cached data to the OS cache
Status WritableFileWriter::Flush() {
TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2);
size_t left = cursize_;
char* src = buf_.get();
while (left != 0) {
size_t size = RequestToken(left);
{
IOSTATS_TIMER_GUARD(write_nanos);
Status s = writable_file_->Append(Slice(src, size));
if (!s.ok()) {
return s;
}
}
IOSTATS_ADD(bytes_written, size);
TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2);
left -= size;
src += size;
}
cursize_ = 0;
writable_file_->Flush();
// sync OS cache to disk for every bytes_per_sync_
// TODO: give log file and sst file different options (log
// files could be potentially cached in OS for their whole
// life time, thus we might not want to flush at all).
// We try to avoid sync to the last 1MB of data. For two reasons:
// (1) avoid rewrite the same page that is modified later.
// (2) for older version of OS, write can block while writing out
// the page.
// Xfs does neighbor page flushing outside of the specified ranges. We
// need to make sure sync range is far from the write offset.
if (bytes_per_sync_) {
uint64_t kBytesNotSyncRange = 1024 * 1024; // recent 1MB is not synced.
uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB.
if (filesize_ > kBytesNotSyncRange) {
uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange;
offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
assert(offset_sync_to >= last_sync_size_);
if (offset_sync_to > 0 &&
offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
RangeSync(last_sync_size_, offset_sync_to - last_sync_size_);
last_sync_size_ = offset_sync_to;
}
}
}
return Status::OK();
}
Status WritableFileWriter::Sync(bool use_fsync) {
Status s = Flush();
if (!s.ok()) {
return s;
}
TEST_KILL_RANDOM(rocksdb_kill_odds);
if (pending_sync_) {
s = SyncInternal(use_fsync);
if (!s.ok()) {
return s;
}
}
TEST_KILL_RANDOM(rocksdb_kill_odds);
pending_sync_ = false;
if (use_fsync) {
pending_fsync_ = false;
}
return Status::OK();
}
Status WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
if (!writable_file_->IsSyncThreadSafe()) {
return Status::NotSupported(
"Can't WritableFileWriter::SyncWithoutFlush() because "
"WritableFile::IsSyncThreadSafe() is false");
}
TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
Status s = SyncInternal(use_fsync);
TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
return s;
}
Status WritableFileWriter::SyncInternal(bool use_fsync) {
Status s;
if (use_fsync) {
s = writable_file_->Fsync();
} else {
s = writable_file_->Sync();
}
return s;
}
Status WritableFileWriter::RangeSync(off_t offset, off_t nbytes) {
IOSTATS_TIMER_GUARD(range_sync_nanos);
return writable_file_->RangeSync(offset, nbytes);
}
size_t WritableFileWriter::RequestToken(size_t bytes) {
Env::IOPriority io_priority;
if (rate_limiter_&&(io_priority = writable_file_->GetIOPriority()) <
Env::IO_TOTAL) {
bytes = std::min(bytes,
static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()));
rate_limiter_->Request(bytes, io_priority);
}
return bytes;
}
Status RandomRWFileAccessor::Write(uint64_t offset, const Slice& data) {
Status s;
pending_sync_ = true;
pending_fsync_ = true;
{
IOSTATS_TIMER_GUARD(write_nanos);
s = random_rw_file_->Write(offset, data);
if (!s.ok()) {
return s;
}
}
IOSTATS_ADD(bytes_written, data.size());
return s;
}
Status RandomRWFileAccessor::Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const {
Status s;
{
IOSTATS_TIMER_GUARD(read_nanos);
s = random_rw_file_->Read(offset, n, result, scratch);
if (!s.ok()) {
return s;
}
}
IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
return s;
}
Status RandomRWFileAccessor::Close() { return random_rw_file_->Close(); }
Status RandomRWFileAccessor::Sync(bool use_fsync) {
Status s;
if (pending_sync_) {
if (use_fsync) {
s = random_rw_file_->Fsync();
} else {
s = random_rw_file_->Sync();
}
if (!s.ok()) {
return s;
}
}
if (use_fsync) {
pending_fsync_ = false;
}
pending_sync_ = false;
return s;
}
} // namespace rocksdb