rocksdb/utilities/env_mirror.cc
Aaron Gao 972f96b3fb direct io write support
Summary:
rocksdb direct io support

```
[gzh@dev11575.prn2 ~/rocksdb] ./db_bench -benchmarks=fillseq --num=1000000
Initializing RocksDB Options from the specified file
Initializing RocksDB Options from command-line flags
RocksDB:    version 5.0
Date:       Wed Nov 23 13:17:43 2016
CPU:        40 * Intel(R) Xeon(R) CPU E5-2660 v2 @ 2.20GHz
CPUCache:   25600 KB
Keys:       16 bytes each
Values:     100 bytes each (50 bytes after compression)
Entries:    1000000
Prefix:    0 bytes
Keys per prefix:    0
RawSize:    110.6 MB (estimated)
FileSize:   62.9 MB (estimated)
Write rate: 0 bytes/second
Compression: Snappy
Memtablerep: skip_list
Perf Level: 1
WARNING: Assertions are enabled; benchmarks unnecessarily slow
------------------------------------------------
Initializing RocksDB Options from the specified file
Initializing RocksDB Options from command-line flags
DB path: [/tmp/rocksdbtest-112628/dbbench]
fillseq      :       4.393 micros/op 227639 ops/sec;   25.2 MB/s

[gzh@dev11575.prn2 ~/roc
Closes https://github.com/facebook/rocksdb/pull/1564

Differential Revision: D4241093

Pulled By: lightmark

fbshipit-source-id: 98c29e3
2016-12-22 13:09:19 -08:00

265 lines
7.8 KiB
C++

// Copyright (c) 2015, Red Hat, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef ROCKSDB_LITE
#include "rocksdb/utilities/env_mirror.h"
namespace rocksdb {
// An implementaiton of Env that mirrors all work over two backend
// Env's. This is useful for debugging purposes.
class SequentialFileMirror : public SequentialFile {
public:
unique_ptr<SequentialFile> a_, b_;
std::string fname;
explicit SequentialFileMirror(std::string f) : fname(f) {}
Status Read(size_t n, Slice* result, char* scratch) {
Slice aslice;
Status as = a_->Read(n, &aslice, scratch);
if (as == Status::OK()) {
char* bscratch = new char[n];
Slice bslice;
size_t off = 0;
size_t left = aslice.size();
while (left) {
Status bs = b_->Read(left, &bslice, bscratch);
assert(as == bs);
assert(memcmp(bscratch, scratch + off, bslice.size()) == 0);
off += bslice.size();
left -= bslice.size();
}
delete[] bscratch;
*result = aslice;
} else {
Status bs = b_->Read(n, result, scratch);
assert(as == bs);
}
return as;
}
Status Skip(uint64_t n) {
Status as = a_->Skip(n);
Status bs = b_->Skip(n);
assert(as == bs);
return as;
}
Status InvalidateCache(size_t offset, size_t length) {
Status as = a_->InvalidateCache(offset, length);
Status bs = b_->InvalidateCache(offset, length);
assert(as == bs);
return as;
};
};
class RandomAccessFileMirror : public RandomAccessFile {
public:
unique_ptr<RandomAccessFile> a_, b_;
std::string fname;
explicit RandomAccessFileMirror(std::string f) : fname(f) {}
Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const {
Status as = a_->Read(offset, n, result, scratch);
if (as == Status::OK()) {
char* bscratch = new char[n];
Slice bslice;
size_t off = 0;
size_t left = result->size();
while (left) {
Status bs = b_->Read(offset + off, left, &bslice, bscratch);
assert(as == bs);
assert(memcmp(bscratch, scratch + off, bslice.size()) == 0);
off += bslice.size();
left -= bslice.size();
}
delete[] bscratch;
} else {
Status bs = b_->Read(offset, n, result, scratch);
assert(as == bs);
}
return as;
}
bool ShouldForwardRawRequest() const {
// NOTE: not verified
return a_->ShouldForwardRawRequest();
}
size_t GetUniqueId(char* id, size_t max_size) const {
// NOTE: not verified
return a_->GetUniqueId(id, max_size);
}
};
class WritableFileMirror : public WritableFile {
public:
unique_ptr<WritableFile> a_, b_;
std::string fname;
explicit WritableFileMirror(std::string f) : fname(f) {}
Status Append(const Slice& data) override {
Status as = a_->Append(data);
Status bs = b_->Append(data);
assert(as == bs);
return as;
}
Status PositionedAppend(const Slice& data, uint64_t offset) override {
Status as = a_->PositionedAppend(data, offset);
Status bs = b_->PositionedAppend(data, offset);
assert(as == bs);
return as;
}
Status Truncate(uint64_t size) override {
Status as = a_->Truncate(size);
Status bs = b_->Truncate(size);
assert(as == bs);
return as;
}
Status Close() override {
Status as = a_->Close();
Status bs = b_->Close();
assert(as == bs);
return as;
}
Status Flush() override {
Status as = a_->Flush();
Status bs = b_->Flush();
assert(as == bs);
return as;
}
Status Sync() override {
Status as = a_->Sync();
Status bs = b_->Sync();
assert(as == bs);
return as;
}
Status Fsync() override {
Status as = a_->Fsync();
Status bs = b_->Fsync();
assert(as == bs);
return as;
}
bool IsSyncThreadSafe() const override {
bool as = a_->IsSyncThreadSafe();
assert(as == b_->IsSyncThreadSafe());
return as;
}
void SetIOPriority(Env::IOPriority pri) override {
a_->SetIOPriority(pri);
b_->SetIOPriority(pri);
}
Env::IOPriority GetIOPriority() override {
// NOTE: we don't verify this one
return a_->GetIOPriority();
}
uint64_t GetFileSize() override {
uint64_t as = a_->GetFileSize();
assert(as == b_->GetFileSize());
return as;
}
void GetPreallocationStatus(size_t* block_size,
size_t* last_allocated_block) override {
// NOTE: we don't verify this one
return a_->GetPreallocationStatus(block_size, last_allocated_block);
}
size_t GetUniqueId(char* id, size_t max_size) const override {
// NOTE: we don't verify this one
return a_->GetUniqueId(id, max_size);
}
Status InvalidateCache(size_t offset, size_t length) override {
Status as = a_->InvalidateCache(offset, length);
Status bs = b_->InvalidateCache(offset, length);
assert(as == bs);
return as;
}
protected:
Status Allocate(uint64_t offset, uint64_t length) override {
Status as = a_->Allocate(offset, length);
Status bs = b_->Allocate(offset, length);
assert(as == bs);
return as;
}
Status RangeSync(uint64_t offset, uint64_t nbytes) override {
Status as = a_->RangeSync(offset, nbytes);
Status bs = b_->RangeSync(offset, nbytes);
assert(as == bs);
return as;
}
};
Status EnvMirror::NewSequentialFile(const std::string& f,
unique_ptr<SequentialFile>* r,
const EnvOptions& options) {
if (f.find("/proc/") == 0) {
return a_->NewSequentialFile(f, r, options);
}
SequentialFileMirror* mf = new SequentialFileMirror(f);
Status as = a_->NewSequentialFile(f, &mf->a_, options);
Status bs = b_->NewSequentialFile(f, &mf->b_, options);
assert(as == bs);
if (as.ok())
r->reset(mf);
else
delete mf;
return as;
}
Status EnvMirror::NewRandomAccessFile(const std::string& f,
unique_ptr<RandomAccessFile>* r,
const EnvOptions& options) {
if (f.find("/proc/") == 0) {
return a_->NewRandomAccessFile(f, r, options);
}
RandomAccessFileMirror* mf = new RandomAccessFileMirror(f);
Status as = a_->NewRandomAccessFile(f, &mf->a_, options);
Status bs = b_->NewRandomAccessFile(f, &mf->b_, options);
assert(as == bs);
if (as.ok())
r->reset(mf);
else
delete mf;
return as;
}
Status EnvMirror::NewWritableFile(const std::string& f,
unique_ptr<WritableFile>* r,
const EnvOptions& options) {
if (f.find("/proc/") == 0) return a_->NewWritableFile(f, r, options);
WritableFileMirror* mf = new WritableFileMirror(f);
Status as = a_->NewWritableFile(f, &mf->a_, options);
Status bs = b_->NewWritableFile(f, &mf->b_, options);
assert(as == bs);
if (as.ok())
r->reset(mf);
else
delete mf;
return as;
}
Status EnvMirror::ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
unique_ptr<WritableFile>* r,
const EnvOptions& options) {
if (fname.find("/proc/") == 0)
return a_->ReuseWritableFile(fname, old_fname, r, options);
WritableFileMirror* mf = new WritableFileMirror(fname);
Status as = a_->ReuseWritableFile(fname, old_fname, &mf->a_, options);
Status bs = b_->ReuseWritableFile(fname, old_fname, &mf->b_, options);
assert(as == bs);
if (as.ok())
r->reset(mf);
else
delete mf;
return as;
}
} // namespace rocksdb
#endif