Merge the latest changes from github/master

This commit is contained in:
Dmitri Smirnov 2015-07-02 17:23:41 -07:00
parent feb99c31a4
commit d2f0912bd3
19 changed files with 823 additions and 84 deletions

50
db/c.cc
View File

@ -608,6 +608,10 @@ void rocksdb_close(rocksdb_t* db) {
delete db; delete db;
} }
void rocksdb_options_set_uint64add_merge_operator(rocksdb_options_t* opt) {
opt->rep.merge_operator = rocksdb::MergeOperators::CreateUInt64AddOperator();
}
rocksdb_t* rocksdb_open_column_families( rocksdb_t* rocksdb_open_column_families(
const rocksdb_options_t* db_options, const rocksdb_options_t* db_options,
const char* name, const char* name,
@ -1359,6 +1363,26 @@ void rocksdb_block_based_options_set_whole_key_filtering(
options->rep.whole_key_filtering = v; options->rep.whole_key_filtering = v;
} }
void rocksdb_block_based_options_set_format_version(
rocksdb_block_based_table_options_t* options, int v) {
options->rep.format_version = v;
}
void rocksdb_block_based_options_set_index_type(
rocksdb_block_based_table_options_t* options, int v) {
options->rep.index_type = static_cast<BlockBasedTableOptions::IndexType>(v);
}
void rocksdb_block_based_options_set_hash_index_allow_collision(
rocksdb_block_based_table_options_t* options, unsigned char v) {
options->rep.hash_index_allow_collision = v;
}
void rocksdb_block_based_options_set_cache_index_and_filter_blocks(
rocksdb_block_based_table_options_t* options, unsigned char v) {
options->rep.cache_index_and_filter_blocks = v;
}
void rocksdb_options_set_block_based_table_factory( void rocksdb_options_set_block_based_table_factory(
rocksdb_options_t *opt, rocksdb_options_t *opt,
rocksdb_block_based_table_options_t* table_options) { rocksdb_block_based_table_options_t* table_options) {
@ -1741,6 +1765,11 @@ void rocksdb_options_set_min_write_buffer_number_to_merge(rocksdb_options_t* opt
opt->rep.min_write_buffer_number_to_merge = n; opt->rep.min_write_buffer_number_to_merge = n;
} }
void rocksdb_options_set_max_write_buffer_number_to_maintain(
rocksdb_options_t* opt, int n) {
opt->rep.max_write_buffer_number_to_maintain = n;
}
void rocksdb_options_set_max_background_compactions(rocksdb_options_t* opt, int n) { void rocksdb_options_set_max_background_compactions(rocksdb_options_t* opt, int n) {
opt->rep.max_background_compactions = n; opt->rep.max_background_compactions = n;
} }
@ -2284,6 +2313,27 @@ rocksdb_slicetransform_t* rocksdb_slicetransform_create_fixed_prefix(size_t pref
return wrapper; return wrapper;
} }
rocksdb_slicetransform_t* rocksdb_slicetransform_create_noop() {
struct Wrapper : public rocksdb_slicetransform_t {
const SliceTransform* rep_;
~Wrapper() { delete rep_; }
const char* Name() const override { return rep_->Name(); }
Slice Transform(const Slice& src) const override {
return rep_->Transform(src);
}
bool InDomain(const Slice& src) const override {
return rep_->InDomain(src);
}
bool InRange(const Slice& src) const override { return rep_->InRange(src); }
static void DoNothing(void*) { }
};
Wrapper* wrapper = new Wrapper;
wrapper->rep_ = rocksdb::NewNoopTransform();
wrapper->state_ = nullptr;
wrapper->destructor_ = &Wrapper::DoNothing;
return wrapper;
}
rocksdb_universal_compaction_options_t* rocksdb_universal_compaction_options_create() { rocksdb_universal_compaction_options_t* rocksdb_universal_compaction_options_create() {
rocksdb_universal_compaction_options_t* result = new rocksdb_universal_compaction_options_t; rocksdb_universal_compaction_options_t* result = new rocksdb_universal_compaction_options_t;
result->rep = new rocksdb::CompactionOptionsUniversal; result->rep = new rocksdb::CompactionOptionsUniversal;

View File

@ -711,15 +711,19 @@ TEST_F(ColumnFamilyTest, DifferentWriteBufferSizes) {
default_cf.write_buffer_size = 100000; default_cf.write_buffer_size = 100000;
default_cf.max_write_buffer_number = 10; default_cf.max_write_buffer_number = 10;
default_cf.min_write_buffer_number_to_merge = 1; default_cf.min_write_buffer_number_to_merge = 1;
default_cf.max_write_buffer_number_to_maintain = 0;
one.write_buffer_size = 200000; one.write_buffer_size = 200000;
one.max_write_buffer_number = 10; one.max_write_buffer_number = 10;
one.min_write_buffer_number_to_merge = 2; one.min_write_buffer_number_to_merge = 2;
one.max_write_buffer_number_to_maintain = 1;
two.write_buffer_size = 1000000; two.write_buffer_size = 1000000;
two.max_write_buffer_number = 10; two.max_write_buffer_number = 10;
two.min_write_buffer_number_to_merge = 3; two.min_write_buffer_number_to_merge = 3;
two.max_write_buffer_number_to_maintain = 2;
three.write_buffer_size = 90000; three.write_buffer_size = 90000;
three.max_write_buffer_number = 10; three.max_write_buffer_number = 10;
three.min_write_buffer_number_to_merge = 4; three.min_write_buffer_number_to_merge = 4;
three.max_write_buffer_number_to_maintain = -1;
Reopen({default_cf, one, two, three}); Reopen({default_cf, one, two, three});

View File

@ -54,6 +54,8 @@ int main() {
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
#include "rocksdb/perf_context.h" #include "rocksdb/perf_context.h"
#include "rocksdb/utilities/flashcache.h" #include "rocksdb/utilities/flashcache.h"
#include "rocksdb/utilities/optimistic_transaction.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "port/port.h" #include "port/port.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "util/crc32c.h" #include "util/crc32c.h"
@ -106,7 +108,8 @@ DEFINE_string(benchmarks,
"compress," "compress,"
"uncompress," "uncompress,"
"acquireload," "acquireload,"
"fillseekseq,", "fillseekseq,"
"randomtransaction",
"Comma-separated list of operations to run in the specified order" "Comma-separated list of operations to run in the specified order"
"Actual benchmarks:\n" "Actual benchmarks:\n"
@ -157,6 +160,8 @@ DEFINE_string(benchmarks,
"\tacquireload -- load N*1000 times\n" "\tacquireload -- load N*1000 times\n"
"\tfillseekseq -- write N values in sequential key, then read " "\tfillseekseq -- write N values in sequential key, then read "
"them by seeking to each key\n" "them by seeking to each key\n"
"\trandomtransaction -- execute N random transactions and "
"verify correctness\n"
"Meta operations:\n" "Meta operations:\n"
"\tcompact -- Compact the entire DB\n" "\tcompact -- Compact the entire DB\n"
"\tstats -- Print DB stats\n" "\tstats -- Print DB stats\n"
@ -263,6 +268,20 @@ DEFINE_int32(min_write_buffer_number_to_merge,
" writing less data to storage if there are duplicate records " " writing less data to storage if there are duplicate records "
" in each of these individual write buffers."); " in each of these individual write buffers.");
DEFINE_int32(max_write_buffer_number_to_maintain,
rocksdb::Options().max_write_buffer_number_to_maintain,
"The total maximum number of write buffers to maintain in memory "
"including copies of buffers that have already been flushed. "
"Unlike max_write_buffer_number, this parameter does not affect "
"flushing. This controls the minimum amount of write history "
"that will be available in memory for conflict checking when "
"Transactions are used. If this value is too low, some "
"transactions may fail at commit time due to not being able to "
"determine whether there were any write conflicts. Setting this "
"value to 0 will cause write buffers to be freed immediately "
"after they are flushed. If this value is set to -1, "
"'max_write_buffer_number' will be used.");
DEFINE_int32(max_background_compactions, DEFINE_int32(max_background_compactions,
rocksdb::Options().max_background_compactions, rocksdb::Options().max_background_compactions,
"The maximum number of concurrent background compactions" "The maximum number of concurrent background compactions"
@ -425,6 +444,18 @@ DEFINE_int32(deletepercent, 2, "Percentage of deletes out of reads/writes/"
DEFINE_uint64(delete_obsolete_files_period_micros, 0, DEFINE_uint64(delete_obsolete_files_period_micros, 0,
"Ignored. Left here for backward compatibility"); "Ignored. Left here for backward compatibility");
DEFINE_bool(transaction_db, false,
"Open a OptimisticTransactionDB instance. "
"Required for randomtransaction benchmark.");
DEFINE_uint64(transaction_sets, 2,
"Number of keys each transaction will "
"modify (use in RandomTransaction only). Max: 9999");
DEFINE_int32(transaction_sleep, 0,
"Max microseconds to sleep in between "
"reading and writing a value (used in RandomTransaction only). ");
namespace { namespace {
enum rocksdb::CompressionType StringToCompressionType(const char* ctype) { enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
assert(ctype); assert(ctype);
@ -884,6 +915,7 @@ static void AppendWithSpace(std::string* str, Slice msg) {
struct DBWithColumnFamilies { struct DBWithColumnFamilies {
std::vector<ColumnFamilyHandle*> cfh; std::vector<ColumnFamilyHandle*> cfh;
DB* db; DB* db;
OptimisticTransactionDB* txn_db;
std::atomic<size_t> num_created; // Need to be updated after all the std::atomic<size_t> num_created; // Need to be updated after all the
// new entries in cfh are set. // new entries in cfh are set.
size_t num_hot; // Number of column families to be queried at each moment. size_t num_hot; // Number of column families to be queried at each moment.
@ -891,7 +923,7 @@ struct DBWithColumnFamilies {
// Column families will be created and used to be queried. // Column families will be created and used to be queried.
port::Mutex create_cf_mutex; // Only one thread can execute CreateNewCf() port::Mutex create_cf_mutex; // Only one thread can execute CreateNewCf()
DBWithColumnFamilies() : db(nullptr) { DBWithColumnFamilies() : db(nullptr), txn_db(nullptr) {
cfh.clear(); cfh.clear();
num_created = 0; num_created = 0;
num_hot = 0; num_hot = 0;
@ -900,9 +932,23 @@ struct DBWithColumnFamilies {
DBWithColumnFamilies(const DBWithColumnFamilies& other) DBWithColumnFamilies(const DBWithColumnFamilies& other)
: cfh(other.cfh), : cfh(other.cfh),
db(other.db), db(other.db),
txn_db(other.txn_db),
num_created(other.num_created.load()), num_created(other.num_created.load()),
num_hot(other.num_hot) {} num_hot(other.num_hot) {}
void DeleteDBs() {
std::for_each(cfh.begin(), cfh.end(),
[](ColumnFamilyHandle* cfhi) { delete cfhi; });
cfh.clear();
if (txn_db) {
delete txn_db;
txn_db = nullptr;
} else {
delete db;
}
db = nullptr;
}
ColumnFamilyHandle* GetCfh(int64_t rand_num) { ColumnFamilyHandle* GetCfh(int64_t rand_num) {
assert(num_hot > 0); assert(num_hot > 0);
return cfh[num_created.load(std::memory_order_acquire) - num_hot + return cfh[num_created.load(std::memory_order_acquire) - num_hot +
@ -1604,9 +1650,7 @@ class Benchmark {
} }
~Benchmark() { ~Benchmark() {
std::for_each(db_.cfh.begin(), db_.cfh.end(), db_.DeleteDBs();
[](ColumnFamilyHandle* cfh) { delete cfh; });
delete db_.db;
delete prefix_extractor_; delete prefix_extractor_;
if (cache_.get() != nullptr) { if (cache_.get() != nullptr) {
// this will leak, but we're shutting down so nobody cares // this will leak, but we're shutting down so nobody cares
@ -1710,6 +1754,8 @@ class Benchmark {
write_options_.disableWAL = FLAGS_disable_wal; write_options_.disableWAL = FLAGS_disable_wal;
void (Benchmark::*method)(ThreadState*) = nullptr; void (Benchmark::*method)(ThreadState*) = nullptr;
void (Benchmark::*post_process_method)() = nullptr;
bool fresh_db = false; bool fresh_db = false;
int num_threads = FLAGS_threads; int num_threads = FLAGS_threads;
@ -1825,6 +1871,9 @@ class Benchmark {
method = &Benchmark::Compress; method = &Benchmark::Compress;
} else if (name == Slice("uncompress")) { } else if (name == Slice("uncompress")) {
method = &Benchmark::Uncompress; method = &Benchmark::Uncompress;
} else if (name == Slice("randomtransaction")) {
method = &Benchmark::RandomTransaction;
post_process_method = &Benchmark::RandomTransactionVerify;
} else if (name == Slice("stats")) { } else if (name == Slice("stats")) {
PrintStats("rocksdb.stats"); PrintStats("rocksdb.stats");
} else if (name == Slice("levelstats")) { } else if (name == Slice("levelstats")) {
@ -1845,11 +1894,7 @@ class Benchmark {
method = nullptr; method = nullptr;
} else { } else {
if (db_.db != nullptr) { if (db_.db != nullptr) {
std::for_each(db_.cfh.begin(), db_.cfh.end(), db_.DeleteDBs();
[](ColumnFamilyHandle* cfh) { delete cfh; });
delete db_.db;
db_.db = nullptr;
db_.cfh.clear();
DestroyDB(FLAGS_db, open_options_); DestroyDB(FLAGS_db, open_options_);
} }
for (size_t i = 0; i < multi_dbs_.size(); i++) { for (size_t i = 0; i < multi_dbs_.size(); i++) {
@ -1865,6 +1910,9 @@ class Benchmark {
fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str()); fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
RunBenchmark(num_threads, name, method); RunBenchmark(num_threads, name, method);
} }
if (post_process_method != nullptr) {
(this->*post_process_method)();
}
} }
if (FLAGS_statistics) { if (FLAGS_statistics) {
fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str()); fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
@ -2175,6 +2223,8 @@ class Benchmark {
options.max_write_buffer_number = FLAGS_max_write_buffer_number; options.max_write_buffer_number = FLAGS_max_write_buffer_number;
options.min_write_buffer_number_to_merge = options.min_write_buffer_number_to_merge =
FLAGS_min_write_buffer_number_to_merge; FLAGS_min_write_buffer_number_to_merge;
options.max_write_buffer_number_to_maintain =
FLAGS_max_write_buffer_number_to_maintain;
options.max_background_compactions = FLAGS_max_background_compactions; options.max_background_compactions = FLAGS_max_background_compactions;
options.max_background_flushes = FLAGS_max_background_flushes; options.max_background_flushes = FLAGS_max_background_flushes;
options.compaction_style = FLAGS_compaction_style_e; options.compaction_style = FLAGS_compaction_style_e;
@ -2428,6 +2478,11 @@ class Benchmark {
NewGenericRateLimiter(FLAGS_rate_limiter_bytes_per_sec)); NewGenericRateLimiter(FLAGS_rate_limiter_bytes_per_sec));
} }
if (FLAGS_readonly && FLAGS_transaction_db) {
fprintf(stderr, "Cannot use readonly flag with transaction_db\n");
exit(1);
}
if (FLAGS_num_multi_db <= 1) { if (FLAGS_num_multi_db <= 1) {
OpenDb(options, FLAGS_db, &db_); OpenDb(options, FLAGS_db, &db_);
} else { } else {
@ -2462,15 +2517,25 @@ class Benchmark {
if (FLAGS_readonly) { if (FLAGS_readonly) {
s = DB::OpenForReadOnly(options, db_name, column_families, s = DB::OpenForReadOnly(options, db_name, column_families,
&db->cfh, &db->db); &db->cfh, &db->db);
} else if (FLAGS_transaction_db) {
s = OptimisticTransactionDB::Open(options, db_name, column_families,
&db->cfh, &db->txn_db);
if (s.ok()) {
db->db = db->txn_db->GetBaseDB();
}
} else { } else {
s = DB::Open(options, db_name, column_families, &db->cfh, &db->db); s = DB::Open(options, db_name, column_families, &db->cfh, &db->db);
} }
db->cfh.resize(FLAGS_num_column_families); db->cfh.resize(FLAGS_num_column_families);
db->num_created = num_hot; db->num_created = num_hot;
db->num_hot = num_hot; db->num_hot = num_hot;
} else if (FLAGS_readonly) { } else if (FLAGS_readonly) {
s = DB::OpenForReadOnly(options, db_name, &db->db); s = DB::OpenForReadOnly(options, db_name, &db->db);
} else if (FLAGS_transaction_db) {
s = OptimisticTransactionDB::Open(options, db_name, &db->txn_db);
if (s.ok()) {
db->db = db->txn_db->GetBaseDB();
}
} else { } else {
s = DB::Open(options, db_name, &db->db); s = DB::Open(options, db_name, &db->db);
} }
@ -3515,6 +3580,203 @@ class Benchmark {
} }
} }
// This benchmark stress tests Transactions. For a given --duration (or
// total number of --writes, a Transaction will perform a read-modify-write
// to increment the value of a key in each of N(--transaction-sets) sets of
// keys (where each set has --num keys). If --threads is set, this will be
// done in parallel.
//
// To test transactions, use --transaction_db=true. Not setting this
// parameter
// will run the same benchmark without transactions.
//
// RandomTransactionVerify() will then validate the correctness of the results
// by checking if the sum of all keys in each set is the same.
void RandomTransaction(ThreadState* thread) {
ReadOptions options(FLAGS_verify_checksum, true);
Duration duration(FLAGS_duration, readwrites_);
ReadOptions read_options(FLAGS_verify_checksum, true);
std::string value;
DB* db = db_.db;
uint64_t transactions_done = 0;
uint64_t transactions_aborted = 0;
Status s;
uint64_t num_prefix_ranges = FLAGS_transaction_sets;
bool use_txn = FLAGS_transaction_db;
if (num_prefix_ranges == 0 || num_prefix_ranges > 9999) {
fprintf(stderr, "invalid value for transaction_sets\n");
abort();
}
if (FLAGS_num_multi_db > 1) {
fprintf(stderr,
"Cannot run RandomTransaction benchmark with "
"FLAGS_multi_db > 1.");
abort();
}
while (!duration.Done(1)) {
OptimisticTransaction* txn = nullptr;
WriteBatch* batch = nullptr;
if (use_txn) {
txn = db_.txn_db->BeginTransaction(write_options_);
assert(txn);
} else {
batch = new WriteBatch();
}
// pick a random number to use to increment a key in each set
uint64_t incr = (thread->rand.Next() % 100) + 1;
// For each set, pick a key at random and increment it
for (uint8_t i = 0; i < num_prefix_ranges; i++) {
uint64_t int_value;
char prefix_buf[5];
// key format: [SET#][random#]
std::string rand_key = ToString(thread->rand.Next() % FLAGS_num);
Slice base_key(rand_key);
// Pad prefix appropriately so we can iterate over each set
snprintf(prefix_buf, sizeof(prefix_buf), "%04d", i + 1);
std::string full_key = std::string(prefix_buf) + base_key.ToString();
Slice key(full_key);
if (use_txn) {
s = txn->Get(read_options, key, &value);
} else {
s = db->Get(read_options, key, &value);
}
if (s.ok()) {
int_value = std::stoull(value);
if (int_value == 0 || int_value == ULONG_MAX) {
fprintf(stderr, "Get returned unexpected value: %s\n",
value.c_str());
abort();
}
} else if (s.IsNotFound()) {
int_value = 0;
} else {
fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
abort();
}
if (FLAGS_transaction_sleep > 0) {
FLAGS_env->SleepForMicroseconds(thread->rand.Next() %
FLAGS_transaction_sleep);
}
std::string sum = ToString(int_value + incr);
if (use_txn) {
txn->Put(key, sum);
} else {
batch->Put(key, sum);
}
}
if (use_txn) {
s = txn->Commit();
} else {
s = db->Write(write_options_, batch);
}
if (!s.ok()) {
// Ideally, we'd want to run this stress test with enough concurrency
// on a small enough set of keys that we get some failed transactions
// due to conflicts.
if (use_txn && s.IsBusy()) {
transactions_aborted++;
} else {
fprintf(stderr, "Unexpected write error: %s\n", s.ToString().c_str());
abort();
}
}
if (txn) {
delete txn;
}
if (batch) {
delete batch;
}
transactions_done++;
}
char msg[100];
if (use_txn) {
snprintf(msg, sizeof(msg),
"( transactions:%" PRIu64 " aborts:%" PRIu64 ")",
transactions_done, transactions_aborted);
} else {
snprintf(msg, sizeof(msg), "( batches:%" PRIu64 " )", transactions_done);
}
thread->stats.AddMessage(msg);
if (FLAGS_perf_level > 0) {
thread->stats.AddMessage(perf_context.ToString());
}
}
// Verifies consistency of data after RandomTransaction() has been run.
// Since each iteration of RandomTransaction() incremented a key in each set
// by the same value, the sum of the keys in each set should be the same.
void RandomTransactionVerify() {
if (!FLAGS_transaction_db) {
// transactions not used, nothing to verify.
return;
}
uint64_t prev_total = 0;
// For each set of keys with the same prefix, sum all the values
for (uint32_t i = 0; i < FLAGS_transaction_sets; i++) {
char prefix_buf[5];
snprintf(prefix_buf, sizeof(prefix_buf), "%04u", i + 1);
uint64_t total = 0;
Iterator* iter = db_.db->NewIterator(ReadOptions());
for (iter->Seek(Slice(prefix_buf, 4)); iter->Valid(); iter->Next()) {
Slice key = iter->key();
// stop when we reach a different prefix
if (key.ToString().compare(0, 4, prefix_buf) != 0) {
break;
}
Slice value = iter->value();
uint64_t int_value = std::stoull(value.ToString());
if (int_value == 0 || int_value == ULONG_MAX) {
fprintf(stderr, "Iter returned unexpected value: %s\n",
value.ToString().c_str());
abort();
}
total += int_value;
}
delete iter;
if (i > 0) {
if (total != prev_total) {
fprintf(stderr,
"RandomTransactionVerify found inconsistent totals. "
"Set[%" PRIu32 "]: %" PRIu64 ", Set[%" PRIu32 "]: %" PRIu64
" \n",
i - 1, prev_total, i, total);
abort();
}
}
prev_total = total;
}
fprintf(stdout, "RandomTransactionVerify Success! Total:%" PRIu64 "\n",
prev_total);
}
void Compact(ThreadState* thread) { void Compact(ThreadState* thread) {
DB* db = SelectDB(thread); DB* db = SelectDB(thread);
db->CompactRange(CompactRangeOptions(), nullptr, nullptr); db->CompactRange(CompactRangeOptions(), nullptr, nullptr);

View File

@ -1561,7 +1561,7 @@ const char* VersionStorageInfo::LevelSummary(
if (!files_marked_for_compaction_.empty()) { if (!files_marked_for_compaction_.empty()) {
snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
" (%zu files need compaction)", " (%" ROCKSDB_PRIszt " files need compaction)",
files_marked_for_compaction_.size()); files_marked_for_compaction_.size());
} }

View File

@ -553,6 +553,8 @@ class WritableFile {
void operator=(const WritableFile&); void operator=(const WritableFile&);
protected: protected:
friend class WritableFileWrapper;
Env::IOPriority io_priority_; Env::IOPriority io_priority_;
}; };
@ -892,6 +894,47 @@ class EnvWrapper : public Env {
Env* target_; Env* target_;
}; };
// An implementation of WritableFile that forwards all calls to another
// WritableFile. May be useful to clients who wish to override just part of the
// functionality of another WritableFile.
// It's declared as friend of WritableFile to allow forwarding calls to
// protected virtual methods.
class WritableFileWrapper : public WritableFile {
public:
explicit WritableFileWrapper(WritableFile* t) : target_(t) { }
Status Append(const Slice& data) override { return target_->Append(data); }
Status Close() override { return target_->Close(); }
Status Flush() override { return target_->Flush(); }
Status Sync() override { return target_->Sync(); }
Status Fsync() override { return target_->Fsync(); }
void SetIOPriority(Env::IOPriority pri) override {
target_->SetIOPriority(pri);
}
uint64_t GetFileSize() override { return target_->GetFileSize(); }
void GetPreallocationStatus(size_t* block_size,
size_t* last_allocated_block) override {
target_->GetPreallocationStatus(block_size, last_allocated_block);
}
size_t GetUniqueId(char* id, size_t max_size) const override {
return target_->GetUniqueId(id, max_size);
}
Status InvalidateCache(size_t offset, size_t length) override {
return target_->InvalidateCache(offset, length);
}
protected:
Status Allocate(off_t offset, off_t len) override {
return target_->Allocate(offset, len);
}
Status RangeSync(off_t offset, off_t nbytes) override {
return target_->RangeSync(offset, nbytes);
}
private:
WritableFile* target_;
};
// Returns a new environment that stores its data in memory and delegates // Returns a new environment that stores its data in memory and delegates
// all non-file-storage tasks to base_env. The caller must delete the result // all non-file-storage tasks to base_env. The caller must delete the result
// when it is no longer needed. // when it is no longer needed.

View File

@ -25,7 +25,7 @@ namespace rocksdb {
// ++pos) { // ++pos) {
// ... // ...
// } // }
typedef std::map<const std::string, std::string> UserCollectedProperties; typedef std::map<std::string, std::string> UserCollectedProperties;
// TableProperties contains a bunch of read-only properties of its associated // TableProperties contains a bunch of read-only properties of its associated
// table. // table.

View File

@ -13,9 +13,6 @@
#pragma once #pragma once
#ifndef STORAGE_ROCKSDB_INCLUDE_THREAD_STATUS_H_
#define STORAGE_ROCKSDB_INCLUDE_THREAD_STATUS_H_
#include <stdint.h> #include <stdint.h>
#include <cstddef> #include <cstddef>
#include <map> #include <map>
@ -205,5 +202,3 @@ struct ThreadStatus {
} // namespace rocksdb } // namespace rocksdb
#endif // STORAGE_ROCKSDB_INCLUDE_THREAD_STATUS_H_

View File

@ -993,6 +993,30 @@ void Java_org_rocksdb_Options_setMinWriteBufferNumberToMerge(
jhandle)->min_write_buffer_number_to_merge = jhandle)->min_write_buffer_number_to_merge =
static_cast<int>(jmin_write_buffer_number_to_merge); static_cast<int>(jmin_write_buffer_number_to_merge);
} }
/*
* Class: org_rocksdb_Options
* Method: maxWriteBufferNumberToMaintain
* Signature: (J)I
*/
jint Java_org_rocksdb_Options_maxWriteBufferNumberToMaintain(JNIEnv* env,
jobject jobj,
jlong jhandle) {
return reinterpret_cast<rocksdb::Options*>(jhandle)
->max_write_buffer_number_to_maintain;
}
/*
* Class: org_rocksdb_Options
* Method: setMaxWriteBufferNumberToMaintain
* Signature: (JI)V
*/
void Java_org_rocksdb_Options_setMaxWriteBufferNumberToMaintain(
JNIEnv* env, jobject jobj, jlong jhandle,
jint jmax_write_buffer_number_to_maintain) {
reinterpret_cast<rocksdb::Options*>(jhandle)
->max_write_buffer_number_to_maintain =
static_cast<int>(jmax_write_buffer_number_to_maintain);
}
/* /*
* Class: org_rocksdb_Options * Class: org_rocksdb_Options
@ -2153,6 +2177,30 @@ void Java_org_rocksdb_ColumnFamilyOptions_setMinWriteBufferNumberToMerge(
static_cast<int>(jmin_write_buffer_number_to_merge); static_cast<int>(jmin_write_buffer_number_to_merge);
} }
/*
* Class: org_rocksdb_ColumnFamilyOptions
* Method: maxWriteBufferNumberToMaintain
* Signature: (J)I
*/
jint Java_org_rocksdb_ColumnFamilyOptions_maxWriteBufferNumberToMaintain(
JNIEnv* env, jobject jobj, jlong jhandle) {
return reinterpret_cast<rocksdb::ColumnFamilyOptions*>(jhandle)
->max_write_buffer_number_to_maintain;
}
/*
* Class: org_rocksdb_ColumnFamilyOptions
* Method: setMaxWriteBufferNumberToMaintain
* Signature: (JI)V
*/
void Java_org_rocksdb_ColumnFamilyOptions_setMaxWriteBufferNumberToMaintain(
JNIEnv* env, jobject jobj, jlong jhandle,
jint jmax_write_buffer_number_to_maintain) {
reinterpret_cast<rocksdb::ColumnFamilyOptions*>(jhandle)
->max_write_buffer_number_to_maintain =
static_cast<int>(jmax_write_buffer_number_to_maintain);
}
/* /*
* Class: org_rocksdb_ColumnFamilyOptions * Class: org_rocksdb_ColumnFamilyOptions
* Method: setCompressionType * Method: setCompressionType

View File

@ -47,7 +47,7 @@ jbyteArray Java_org_rocksdb_WriteBatchTest_getContents(
rocksdb::MemTable* mem = new rocksdb::MemTable( rocksdb::MemTable* mem = new rocksdb::MemTable(
cmp, rocksdb::ImmutableCFOptions(options), cmp, rocksdb::ImmutableCFOptions(options),
rocksdb::MutableCFOptions(options, rocksdb::ImmutableCFOptions(options)), rocksdb::MutableCFOptions(options, rocksdb::ImmutableCFOptions(options)),
&wb); &wb, rocksdb::kMaxSequenceNumber);
mem->Ref(); mem->Ref();
std::string state; std::string state;
rocksdb::ColumnFamilyMemTablesDefault cf_mems_default(mem); rocksdb::ColumnFamilyMemTablesDefault cf_mems_default(mem);

View File

@ -296,7 +296,11 @@ public:
pending_fsync_ = true; pending_fsync_ = true;
SSIZE_T done = pwrite(hFile_, src, left, offset); SSIZE_T done = 0;
{
IOSTATS_TIMER_GUARD(write_nanos);
done = pwrite(hFile_, src, left, offset);
}
if (done < 0) { if (done < 0) {
return IOErrorFromWindowsError("pwrite failed to: " + filename_, GetLastError()); return IOErrorFromWindowsError("pwrite failed to: " + filename_, GetLastError());
@ -371,6 +375,11 @@ public:
pending_fsync_ = false; pending_fsync_ = false;
return Status::OK(); return Status::OK();
} }
virtual Status Allocate(off_t offset, off_t len) override {
IOSTATS_TIMER_GUARD(allocate_nanos);
return fallocate(filename_, hFile_, len);
}
}; };
@ -459,6 +468,7 @@ private:
// Normally it does not present a problem since in memory mapped files // Normally it does not present a problem since in memory mapped files
// we do not disable buffering // we do not disable buffering
Status ReserveFileSpace(uint64_t toSize) { Status ReserveFileSpace(uint64_t toSize) {
IOSTATS_TIMER_GUARD(allocate_nanos);
return fallocate(filename_, hFile_, toSize); return fallocate(filename_, hFile_, toSize);
} }
@ -1281,6 +1291,7 @@ public:
return status; return status;
} }
IOSTATS_TIMER_GUARD(allocate_nanos);
status = fallocate(filename_, hFile_, spaceToReserve); status = fallocate(filename_, hFile_, spaceToReserve);
if (status.ok()) { if (status.ok()) {
reservedsize_ = spaceToReserve; reservedsize_ = spaceToReserve;
@ -1500,13 +1511,17 @@ public:
// Corruption test needs to rename and delete files of these kind // Corruption test needs to rename and delete files of these kind
// while they are still open with another handle. For that reason we // while they are still open with another handle. For that reason we
// allow share_write and delete(allows rename). // allow share_write and delete(allows rename).
HANDLE hFile = CreateFileA(fname.c_str(), HANDLE hFile = 0;
GENERIC_READ, {
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, IOSTATS_TIMER_GUARD(open_nanos);
NULL, hFile = CreateFileA(fname.c_str(),
OPEN_EXISTING, // Original fopen mode is "rb" GENERIC_READ,
FILE_ATTRIBUTE_NORMAL, FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
NULL); NULL,
OPEN_EXISTING, // Original fopen mode is "rb"
FILE_ATTRIBUTE_NORMAL,
NULL);
}
if (hFile == INVALID_HANDLE_VALUE) { if (hFile == INVALID_HANDLE_VALUE) {
auto lastError = GetLastError(); auto lastError = GetLastError();
@ -1549,15 +1564,19 @@ public:
} }
/// Shared access is necessary for corruption test to pass /// Shared access is necessary for corruption test to pass
// almost all tests wwould work with a possible exception of fault_injection // almost all tests would work with a possible exception of fault_injection
HANDLE hFile = CreateFileA( HANDLE hFile;
fname.c_str(), {
GENERIC_READ, IOSTATS_TIMER_GUARD(open_nanos);
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, hFile = CreateFileA(
NULL, fname.c_str(),
OPEN_EXISTING, GENERIC_READ,
fileFlags, FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
NULL); NULL,
OPEN_EXISTING,
fileFlags,
NULL);
}
if (INVALID_HANDLE_VALUE == hFile) { if (INVALID_HANDLE_VALUE == hFile) {
auto lastError = GetLastError(); auto lastError = GetLastError();
@ -1649,14 +1668,18 @@ public:
shared_mode |= (FILE_SHARE_WRITE | FILE_SHARE_DELETE); shared_mode |= (FILE_SHARE_WRITE | FILE_SHARE_DELETE);
} }
HANDLE hFile = CreateFileA(fname.c_str(), HANDLE hFile = 0;
desired_access, // Access desired {
shared_mode, IOSTATS_TIMER_GUARD(open_nanos);
NULL, // Security attributes hFile = CreateFileA(fname.c_str(),
CREATE_ALWAYS, // Posix env says O_CREAT | O_RDWR | O_TRUNC desired_access, // Access desired
fileFlags, // Flags shared_mode,
NULL); // Template File NULL, // Security attributes
CREATE_ALWAYS, // Posix env says O_CREAT | O_RDWR | O_TRUNC
fileFlags, // Flags
NULL); // Template File
}
if (INVALID_HANDLE_VALUE == hFile) { if (INVALID_HANDLE_VALUE == hFile) {
auto lastError = GetLastError(); auto lastError = GetLastError();
return IOErrorFromWindowsError("Failed to create a NewWriteableFile: " + fname, lastError); return IOErrorFromWindowsError("Failed to create a NewWriteableFile: " + fname, lastError);
@ -1683,14 +1706,18 @@ public:
Status s; Status s;
HANDLE hFile = CreateFileA(fname.c_str(), HANDLE hFile = 0;
GENERIC_READ | GENERIC_WRITE, {
FILE_SHARE_READ, IOSTATS_TIMER_GUARD(open_nanos);
NULL, hFile = CreateFileA(fname.c_str(),
OPEN_ALWAYS, // Posix env specifies O_CREAT, it will open existing file or create new GENERIC_READ | GENERIC_WRITE,
FILE_ATTRIBUTE_NORMAL, FILE_SHARE_READ,
NULL); NULL,
OPEN_ALWAYS, // Posix env specifies O_CREAT, it will open existing file or create new
FILE_ATTRIBUTE_NORMAL,
NULL);
}
if (hFile == INVALID_HANDLE_VALUE) { if (hFile == INVALID_HANDLE_VALUE) {
auto lastError = GetLastError(); auto lastError = GetLastError();
s = IOErrorFromWindowsError("Failed to Open/Create NewRandomRWFile" + fname, lastError); s = IOErrorFromWindowsError("Failed to Open/Create NewRandomRWFile" + fname, lastError);
@ -1710,6 +1737,7 @@ public:
if (!DirExists(name)) { if (!DirExists(name)) {
s = IOError("Directory does not exist: " + name, EEXIST); s = IOError("Directory does not exist: " + name, EEXIST);
} else { } else {
IOSTATS_TIMER_GUARD(open_nanos);
result->reset(new WinDirectory); result->reset(new WinDirectory);
} }
return s; return s;
@ -1889,9 +1917,12 @@ public:
// Obtain exclusive access to the LOCK file // Obtain exclusive access to the LOCK file
// Previously, instead of NORMAL attr we set DELETE on close and that worked // Previously, instead of NORMAL attr we set DELETE on close and that worked
// well except with fault_injection test that insists on deleting it. // well except with fault_injection test that insists on deleting it.
HANDLE hFile = CreateFileA(lockFname.c_str(), (GENERIC_READ | GENERIC_WRITE), HANDLE hFile = 0;
ExclusiveAccessON, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL); {
IOSTATS_TIMER_GUARD(open_nanos);
hFile = CreateFileA(lockFname.c_str(), (GENERIC_READ | GENERIC_WRITE),
ExclusiveAccessON, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
}
if (INVALID_HANDLE_VALUE == hFile) { if (INVALID_HANDLE_VALUE == hFile) {
auto lastError = GetLastError(); auto lastError = GetLastError();
@ -1975,13 +2006,17 @@ public:
result->reset(); result->reset();
HANDLE hFile = CreateFileA(fname.c_str(), HANDLE hFile = 0;
GENERIC_WRITE, {
FILE_SHARE_READ | FILE_SHARE_DELETE, // In RocksDb log files are renamed and deleted before they are closed. This enables doing so. IOSTATS_TIMER_GUARD(open_nanos);
NULL, hFile = CreateFileA(fname.c_str(),
CREATE_ALWAYS, // Original fopen mode is "w" GENERIC_WRITE,
FILE_ATTRIBUTE_NORMAL, FILE_SHARE_READ | FILE_SHARE_DELETE, // In RocksDb log files are renamed and deleted before they are closed. This enables doing so.
NULL); NULL,
CREATE_ALWAYS, // Original fopen mode is "w"
FILE_ATTRIBUTE_NORMAL,
NULL);
}
if (hFile == INVALID_HANDLE_VALUE) { if (hFile == INVALID_HANDLE_VALUE) {
auto lastError = GetLastError(); auto lastError = GetLastError();

View File

@ -20,6 +20,7 @@
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "port/win/win_logger.h" #include "port/win/win_logger.h"
#include "port/sys_time.h" #include "port/sys_time.h"
#include "util/iostats_context_imp.h"
namespace rocksdb { namespace rocksdb {
@ -62,6 +63,7 @@ void WinLogger::Flush() {
void WinLogger::Logv(const char* format, va_list ap) { void WinLogger::Logv(const char* format, va_list ap) {
const uint64_t thread_id = (*gettid_)(); const uint64_t thread_id = (*gettid_)();
IOSTATS_TIMER_GUARD(logger_nanos);
// We try twice: the first time with a fixed-size stack allocated buffer, // We try twice: the first time with a fixed-size stack allocated buffer,
// and the second time with a much larger dynamically allocated buffer. // and the second time with a much larger dynamically allocated buffer.
char buffer[500]; char buffer[500];

View File

@ -51,7 +51,7 @@ Arena::~Arena() {
for (const auto& block : blocks_) { for (const auto& block : blocks_) {
delete[] block; delete[] block;
} }
#ifdef MAP_HUGETLB #ifndef OS_WIN
for (const auto& mmap_info : huge_blocks_) { for (const auto& mmap_info : huge_blocks_) {
auto ret = munmap(mmap_info.addr_, mmap_info.length_); auto ret = munmap(mmap_info.addr_, mmap_info.length_);
if (ret != 0) { if (ret != 0) {

View File

@ -350,7 +350,7 @@ class PosixMmapReadableFile: public RandomAccessFile {
virtual ~PosixMmapReadableFile() { virtual ~PosixMmapReadableFile() {
int ret = munmap(mmapped_region_, length_); int ret = munmap(mmapped_region_, length_);
if (ret != 0) { if (ret != 0) {
fprintf(stdout, "failed to munmap %p length %zu \n", fprintf(stdout, "failed to munmap %p length %" ROCKSDB_PRIszt " \n",
mmapped_region_, length_); mmapped_region_, length_);
} }
} }
@ -443,14 +443,17 @@ class PosixMmapFile : public WritableFile {
TEST_KILL_RANDOM(rocksdb_kill_odds); TEST_KILL_RANDOM(rocksdb_kill_odds);
// we can't fallocate with FALLOC_FL_KEEP_SIZE here // we can't fallocate with FALLOC_FL_KEEP_SIZE here
int alloc_status = fallocate(fd_, 0, file_offset_, map_size_); {
if (alloc_status != 0) { IOSTATS_TIMER_GUARD(allocate_nanos);
// fallback to posix_fallocate int alloc_status = fallocate(fd_, 0, file_offset_, map_size_);
alloc_status = posix_fallocate(fd_, file_offset_, map_size_); if (alloc_status != 0) {
} // fallback to posix_fallocate
if (alloc_status != 0) { alloc_status = posix_fallocate(fd_, file_offset_, map_size_);
return Status::IOError("Error allocating space to file : " + filename_ + }
"Error : " + strerror(alloc_status)); if (alloc_status != 0) {
return Status::IOError("Error allocating space to file : " + filename_ +
"Error : " + strerror(alloc_status));
}
} }
TEST_KILL_RANDOM(rocksdb_kill_odds); TEST_KILL_RANDOM(rocksdb_kill_odds);
@ -639,6 +642,7 @@ class PosixMmapFile : public WritableFile {
#ifdef ROCKSDB_FALLOCATE_PRESENT #ifdef ROCKSDB_FALLOCATE_PRESENT
virtual Status Allocate(off_t offset, off_t len) override { virtual Status Allocate(off_t offset, off_t len) override {
TEST_KILL_RANDOM(rocksdb_kill_odds); TEST_KILL_RANDOM(rocksdb_kill_odds);
IOSTATS_TIMER_GUARD(allocate_nanos);
int alloc_status = fallocate( int alloc_status = fallocate(
fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, offset, len); fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, offset, len);
if (alloc_status == 0) { if (alloc_status == 0) {
@ -725,7 +729,12 @@ class PosixWritableFile : public WritableFile {
cursize_ += left; cursize_ += left;
} else { } else {
while (left != 0) { while (left != 0) {
ssize_t done = write(fd_, src, RequestToken(left)); ssize_t done;
size_t size = RequestToken(left);
{
IOSTATS_TIMER_GUARD(write_nanos);
done = write(fd_, src, size);
}
if (done < 0) { if (done < 0) {
if (errno == EINTR) { if (errno == EINTR) {
continue; continue;
@ -773,6 +782,7 @@ class PosixWritableFile : public WritableFile {
// tmpfs (since Linux 3.5) // tmpfs (since Linux 3.5)
// We ignore error since failure of this operation does not affect // We ignore error since failure of this operation does not affect
// correctness. // correctness.
IOSTATS_TIMER_GUARD(allocate_nanos);
fallocate(fd_, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE, fallocate(fd_, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE,
filesize_, block_size * last_allocated_block - filesize_); filesize_, block_size * last_allocated_block - filesize_);
#endif #endif
@ -791,7 +801,12 @@ class PosixWritableFile : public WritableFile {
size_t left = cursize_; size_t left = cursize_;
char* src = buf_.get(); char* src = buf_.get();
while (left != 0) { while (left != 0) {
ssize_t done = write(fd_, src, RequestToken(left)); ssize_t done;
size_t size = RequestToken(left);
{
IOSTATS_TIMER_GUARD(write_nanos);
done = write(fd_, src, size);
}
if (done < 0) { if (done < 0) {
if (errno == EINTR) { if (errno == EINTR) {
continue; continue;
@ -865,7 +880,9 @@ class PosixWritableFile : public WritableFile {
#ifdef ROCKSDB_FALLOCATE_PRESENT #ifdef ROCKSDB_FALLOCATE_PRESENT
virtual Status Allocate(off_t offset, off_t len) override { virtual Status Allocate(off_t offset, off_t len) override {
TEST_KILL_RANDOM(rocksdb_kill_odds); TEST_KILL_RANDOM(rocksdb_kill_odds);
int alloc_status = fallocate( int alloc_status;
IOSTATS_TIMER_GUARD(allocate_nanos);
alloc_status = fallocate(
fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, offset, len); fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, offset, len);
if (alloc_status == 0) { if (alloc_status == 0) {
return Status::OK(); return Status::OK();
@ -875,6 +892,7 @@ class PosixWritableFile : public WritableFile {
} }
virtual Status RangeSync(off_t offset, off_t nbytes) override { virtual Status RangeSync(off_t offset, off_t nbytes) override {
IOSTATS_TIMER_GUARD(range_sync_nanos);
if (sync_file_range(fd_, offset, nbytes, SYNC_FILE_RANGE_WRITE) == 0) { if (sync_file_range(fd_, offset, nbytes, SYNC_FILE_RANGE_WRITE) == 0) {
return Status::OK(); return Status::OK();
} else { } else {
@ -933,7 +951,11 @@ class PosixRandomRWFile : public RandomRWFile {
pending_fsync_ = true; pending_fsync_ = true;
while (left != 0) { while (left != 0) {
ssize_t done = pwrite(fd_, src, left, offset); ssize_t done;
{
IOSTATS_TIMER_GUARD(write_nanos);
done = pwrite(fd_, src, left, offset);
}
if (done < 0) { if (done < 0) {
if (errno == EINTR) { if (errno == EINTR) {
continue; continue;
@ -1009,6 +1031,7 @@ class PosixRandomRWFile : public RandomRWFile {
#ifdef ROCKSDB_FALLOCATE_PRESENT #ifdef ROCKSDB_FALLOCATE_PRESENT
virtual Status Allocate(off_t offset, off_t len) override { virtual Status Allocate(off_t offset, off_t len) override {
TEST_KILL_RANDOM(rocksdb_kill_odds); TEST_KILL_RANDOM(rocksdb_kill_odds);
IOSTATS_TIMER_GUARD(allocate_nanos);
int alloc_status = fallocate( int alloc_status = fallocate(
fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, offset, len); fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, offset, len);
if (alloc_status == 0) { if (alloc_status == 0) {
@ -1117,6 +1140,7 @@ class PosixEnv : public Env {
result->reset(); result->reset();
FILE* f = nullptr; FILE* f = nullptr;
do { do {
IOSTATS_TIMER_GUARD(open_nanos);
f = fopen(fname.c_str(), "r"); f = fopen(fname.c_str(), "r");
} while (f == nullptr && errno == EINTR); } while (f == nullptr && errno == EINTR);
if (f == nullptr) { if (f == nullptr) {
@ -1135,7 +1159,11 @@ class PosixEnv : public Env {
const EnvOptions& options) override { const EnvOptions& options) override {
result->reset(); result->reset();
Status s; Status s;
int fd = open(fname.c_str(), O_RDONLY); int fd;
{
IOSTATS_TIMER_GUARD(open_nanos);
fd = open(fname.c_str(), O_RDONLY);
}
SetFD_CLOEXEC(fd, &options); SetFD_CLOEXEC(fd, &options);
if (fd < 0) { if (fd < 0) {
s = IOError(fname, errno); s = IOError(fname, errno);
@ -1168,6 +1196,7 @@ class PosixEnv : public Env {
Status s; Status s;
int fd = -1; int fd = -1;
do { do {
IOSTATS_TIMER_GUARD(open_nanos);
fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644); fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
} while (fd < 0 && errno == EINTR); } while (fd < 0 && errno == EINTR);
if (fd < 0) { if (fd < 0) {
@ -1208,7 +1237,11 @@ class PosixEnv : public Env {
return Status::NotSupported("No support for mmap read/write yet"); return Status::NotSupported("No support for mmap read/write yet");
} }
Status s; Status s;
const int fd = open(fname.c_str(), O_CREAT | O_RDWR, 0644); int fd;
{
IOSTATS_TIMER_GUARD(open_nanos);
fd = open(fname.c_str(), O_CREAT | O_RDWR, 0644);
}
if (fd < 0) { if (fd < 0) {
s = IOError(fname, errno); s = IOError(fname, errno);
} else { } else {
@ -1221,7 +1254,11 @@ class PosixEnv : public Env {
virtual Status NewDirectory(const std::string& name, virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) override { unique_ptr<Directory>* result) override {
result->reset(); result->reset();
const int fd = open(name.c_str(), 0); int fd;
{
IOSTATS_TIMER_GUARD(open_nanos);
fd = open(name.c_str(), 0);
}
if (fd < 0) { if (fd < 0) {
return IOError(name, errno); return IOError(name, errno);
} else { } else {
@ -1333,7 +1370,11 @@ class PosixEnv : public Env {
virtual Status LockFile(const std::string& fname, FileLock** lock) override { virtual Status LockFile(const std::string& fname, FileLock** lock) override {
*lock = nullptr; *lock = nullptr;
Status result; Status result;
int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644); int fd;
{
IOSTATS_TIMER_GUARD(open_nanos);
fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
}
if (fd < 0) { if (fd < 0) {
result = IOError(fname, errno); result = IOError(fname, errno);
} else if (LockOrUnlock(fname, fd, true) == -1) { } else if (LockOrUnlock(fname, fd, true) == -1) {
@ -1408,7 +1449,11 @@ class PosixEnv : public Env {
virtual Status NewLogger(const std::string& fname, virtual Status NewLogger(const std::string& fname,
shared_ptr<Logger>* result) override { shared_ptr<Logger>* result) override {
FILE* f = fopen(fname.c_str(), "w"); FILE* f;
{
IOSTATS_TIMER_GUARD(open_nanos);
f = fopen(fname.c_str(), "w");
}
if (f == nullptr) { if (f == nullptr) {
result->reset(); result->reset();
return IOError(fname, errno); return IOError(fname, errno);
@ -1782,7 +1827,7 @@ class PosixEnv : public Env {
#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
#if __GLIBC_PREREQ(2, 12) #if __GLIBC_PREREQ(2, 12)
char name_buf[16]; char name_buf[16];
snprintf(name_buf, sizeof name_buf, "rocksdb:bg%zu", bgthreads_.size()); snprintf(name_buf, sizeof name_buf, "rocksdb:bg%" ROCKSDB_PRIszt, bgthreads_.size());
name_buf[sizeof name_buf - 1] = '\0'; name_buf[sizeof name_buf - 1] = '\0';
pthread_setname_np(t, name_buf); pthread_setname_np(t, name_buf);
#endif #endif

View File

@ -992,6 +992,87 @@ TEST_F(EnvPosixTest, Preallocation) {
ASSERT_EQ(last_allocated_block, 7UL); ASSERT_EQ(last_allocated_block, 7UL);
} }
// Test that all WritableFileWrapper forwards all calls to WritableFile.
TEST_F(EnvPosixTest, WritableFileWrapper) {
class Base : public WritableFile {
public:
mutable int *step_;
void inc(int x) const {
EXPECT_EQ(x, (*step_)++);
}
explicit Base(int* step) : step_(step) {
inc(0);
}
Status Append(const Slice& data) override { inc(1); return Status::OK(); }
Status Close() override { inc(2); return Status::OK(); }
Status Flush() override { inc(3); return Status::OK(); }
Status Sync() override { inc(4); return Status::OK(); }
Status Fsync() override { inc(5); return Status::OK(); }
void SetIOPriority(Env::IOPriority pri) override { inc(6); }
uint64_t GetFileSize() override { inc(7); return 0; }
void GetPreallocationStatus(size_t* block_size,
size_t* last_allocated_block) override {
inc(8);
}
size_t GetUniqueId(char* id, size_t max_size) const override {
inc(9);
return 0;
}
Status InvalidateCache(size_t offset, size_t length) override {
inc(10);
return Status::OK();
}
protected:
Status Allocate(off_t offset, off_t len) override {
inc(11);
return Status::OK();
}
Status RangeSync(off_t offset, off_t nbytes) override {
inc(12);
return Status::OK();
}
public:
~Base() {
inc(13);
}
};
class Wrapper : public WritableFileWrapper {
public:
explicit Wrapper(WritableFile* target) : WritableFileWrapper(target) {}
void CallProtectedMethods() {
Allocate(0, 0);
RangeSync(0, 0);
}
};
int step = 0;
{
Base b(&step);
Wrapper w(&b);
w.Append(Slice());
w.Close();
w.Flush();
w.Sync();
w.Fsync();
w.SetIOPriority(Env::IOPriority::IO_HIGH);
w.GetFileSize();
w.GetPreallocationStatus(nullptr, nullptr);
w.GetUniqueId(nullptr, 0);
w.InvalidateCache(0, 0);
w.CallProtectedMethods();
}
EXPECT_EQ(14, step);
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

View File

@ -98,6 +98,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"write_buffer_size", "1"}, {"write_buffer_size", "1"},
{"max_write_buffer_number", "2"}, {"max_write_buffer_number", "2"},
{"min_write_buffer_number_to_merge", "3"}, {"min_write_buffer_number_to_merge", "3"},
{"max_write_buffer_number_to_maintain", "99"},
{"compression", "kSnappyCompression"}, {"compression", "kSnappyCompression"},
{"compression_per_level", {"compression_per_level",
"kNoCompression:" "kNoCompression:"
@ -184,6 +185,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_cf_opt.write_buffer_size, 1U); ASSERT_EQ(new_cf_opt.write_buffer_size, 1U);
ASSERT_EQ(new_cf_opt.max_write_buffer_number, 2); ASSERT_EQ(new_cf_opt.max_write_buffer_number, 2);
ASSERT_EQ(new_cf_opt.min_write_buffer_number_to_merge, 3); ASSERT_EQ(new_cf_opt.min_write_buffer_number_to_merge, 3);
ASSERT_EQ(new_cf_opt.max_write_buffer_number_to_maintain, 99);
ASSERT_EQ(new_cf_opt.compression, kSnappyCompression); ASSERT_EQ(new_cf_opt.compression, kSnappyCompression);
ASSERT_EQ(new_cf_opt.compression_per_level.size(), 6U); ASSERT_EQ(new_cf_opt.compression_per_level.size(), 6U);
ASSERT_EQ(new_cf_opt.compression_per_level[0], kNoCompression); ASSERT_EQ(new_cf_opt.compression_per_level[0], kNoCompression);

View File

@ -61,6 +61,8 @@ class PosixLogger : public Logger {
using Logger::Logv; using Logger::Logv;
virtual void Logv(const char* format, va_list ap) override { virtual void Logv(const char* format, va_list ap) override {
IOSTATS_TIMER_GUARD(logger_nanos);
const uint64_t thread_id = (*gettid_)(); const uint64_t thread_id = (*gettid_)();
// We try twice: the first time with a fixed-size stack allocated buffer, // We try twice: the first time with a fixed-size stack allocated buffer,

View File

@ -7,7 +7,12 @@
#include <string> #include <string>
#include "db/db_impl.h" #include "db/db_impl.h"
#include "db/managed_iterator.h" #include "db/managed_iterator.h"
#include "db/write_callback.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/utilities/optimistic_transaction.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "rocksdb/write_batch.h"
#include "util/xfunc.h" #include "util/xfunc.h"
@ -64,6 +69,116 @@ void xf_manage_new(DBImpl* db, ReadOptions* read_options,
void xf_manage_create(ManagedIterator* iter) { iter->SetDropOld(false); } void xf_manage_create(ManagedIterator* iter) { iter->SetDropOld(false); }
void xf_transaction_set_memtable_history(
int32_t* max_write_buffer_number_to_maintain) {
*max_write_buffer_number_to_maintain = 10;
}
void xf_transaction_clear_memtable_history(
int32_t* max_write_buffer_number_to_maintain) {
*max_write_buffer_number_to_maintain = 0;
}
class XFTransactionWriteHandler : public WriteBatch::Handler {
public:
OptimisticTransaction* txn_;
DBImpl* db_impl_;
XFTransactionWriteHandler(OptimisticTransaction* txn, DBImpl* db_impl)
: txn_(txn), db_impl_(db_impl) {}
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
InstrumentedMutexLock l(&db_impl_->mutex_);
ColumnFamilyHandle* cfh = db_impl_->GetColumnFamilyHandle(column_family_id);
if (cfh == nullptr) {
return Status::InvalidArgument(
"XFUNC test could not find column family "
"handle for id ",
ToString(column_family_id));
}
txn_->Put(cfh, key, value);
return Status::OK();
}
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
InstrumentedMutexLock l(&db_impl_->mutex_);
ColumnFamilyHandle* cfh = db_impl_->GetColumnFamilyHandle(column_family_id);
if (cfh == nullptr) {
return Status::InvalidArgument(
"XFUNC test could not find column family "
"handle for id ",
ToString(column_family_id));
}
txn_->Merge(cfh, key, value);
return Status::OK();
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
InstrumentedMutexLock l(&db_impl_->mutex_);
ColumnFamilyHandle* cfh = db_impl_->GetColumnFamilyHandle(column_family_id);
if (cfh == nullptr) {
return Status::InvalidArgument(
"XFUNC test could not find column family "
"handle for id ",
ToString(column_family_id));
}
txn_->Delete(cfh, key);
return Status::OK();
}
virtual void LogData(const Slice& blob) override { txn_->PutLogData(blob); }
};
// Whenever DBImpl::Write is called, create a transaction and do the write via
// the transaction.
void xf_transaction_write(const WriteOptions& write_options,
const DBOptions& db_options, WriteBatch* my_batch,
WriteCallback* callback, DBImpl* db_impl, Status* s,
bool* write_attempted) {
if (callback != nullptr) {
// We may already be in a transaction, don't force a transaction
*write_attempted = false;
return;
}
OptimisticTransactionDB* txn_db = new OptimisticTransactionDB(db_impl);
OptimisticTransaction* txn =
OptimisticTransaction::BeginTransaction(txn_db, write_options);
XFTransactionWriteHandler handler(txn, db_impl);
*s = my_batch->Iterate(&handler);
if (!s->ok()) {
Log(InfoLogLevel::ERROR_LEVEL, db_options.info_log,
"XFUNC test could not iterate batch. status: $s\n",
s->ToString().c_str());
}
*s = txn->Commit();
if (!s->ok()) {
Log(InfoLogLevel::ERROR_LEVEL, db_options.info_log,
"XFUNC test could not commit transaction. status: $s\n",
s->ToString().c_str());
}
*write_attempted = true;
delete txn;
delete txn_db;
}
} // namespace rocksdb } // namespace rocksdb
#endif // XFUNC #endif // XFUNC

View File

@ -32,6 +32,7 @@ namespace rocksdb {
#else #else
struct Options; struct Options;
struct WriteOptions;
class ManagedIterator; class ManagedIterator;
class DBImpl; class DBImpl;
void GetXFTestOptions(Options* options, int skip_policy); void GetXFTestOptions(Options* options, int skip_policy);
@ -40,6 +41,15 @@ void xf_manage_new(DBImpl* db, ReadOptions* readoptions,
bool is_snapshot_supported); bool is_snapshot_supported);
void xf_manage_create(ManagedIterator* iter); void xf_manage_create(ManagedIterator* iter);
void xf_manage_options(ReadOptions* read_options); void xf_manage_options(ReadOptions* read_options);
void xf_transaction_set_memtable_history(
int32_t* max_write_buffer_number_to_maintain);
void xf_transaction_clear_memtable_history(
int32_t* max_write_buffer_number_to_maintain);
void xf_transaction_write(const WriteOptions& write_options,
const DBOptions& db_options,
class WriteBatch* my_batch,
class WriteCallback* callback, DBImpl* db_impl,
Status* success, bool* write_attempted);
// This class provides the facility to run custom code to test a specific // This class provides the facility to run custom code to test a specific
// feature typically with all existing unit tests. // feature typically with all existing unit tests.

View File

@ -328,7 +328,27 @@ class BackupEngineImpl : public BackupEngine {
BackupRateLimiter* rate_limiter; BackupRateLimiter* rate_limiter;
uint64_t size_limit; uint64_t size_limit;
std::promise<CopyResult> result; std::promise<CopyResult> result;
CopyWorkItem() {} CopyWorkItem() {}
CopyWorkItem(const CopyWorkItem&) = delete;
CopyWorkItem& operator=(const CopyWorkItem&) = delete;
CopyWorkItem(CopyWorkItem&& o) {
*this = std::move(o);
}
CopyWorkItem& operator=(CopyWorkItem&& o) {
src_path = std::move(o.src_path);
dst_path = std::move(o.dst_path);
src_env = o.src_env;
dst_env = o.dst_env;
sync = o.sync;
rate_limiter = o.rate_limiter;
size_limit = o.size_limit;
result = std::move(o.result);
return *this;
}
CopyWorkItem(std::string _src_path, CopyWorkItem(std::string _src_path,
std::string _dst_path, std::string _dst_path,
Env* _src_env, Env* _src_env,
@ -354,7 +374,23 @@ class BackupEngineImpl : public BackupEngine {
std::string dst_path; std::string dst_path;
std::string dst_relative; std::string dst_relative;
BackupAfterCopyWorkItem() {} BackupAfterCopyWorkItem() {}
BackupAfterCopyWorkItem(std::future<CopyResult> _result,
BackupAfterCopyWorkItem(BackupAfterCopyWorkItem&& o) {
*this = std::move(o);
}
BackupAfterCopyWorkItem& operator=(BackupAfterCopyWorkItem&& o) {
result = std::move(o.result);
shared = o.shared;
needed_to_copy = o.needed_to_copy;
backup_env = o.backup_env;
dst_path_tmp = std::move(o.dst_path_tmp);
dst_path = std::move(o.dst_path);
dst_relative = std::move(o.dst_relative);
return *this;
}
BackupAfterCopyWorkItem(std::future<CopyResult>&& _result,
bool _shared, bool _shared,
bool _needed_to_copy, bool _needed_to_copy,
Env* _backup_env, Env* _backup_env,
@ -374,10 +410,19 @@ class BackupEngineImpl : public BackupEngine {
std::future<CopyResult> result; std::future<CopyResult> result;
uint32_t checksum_value; uint32_t checksum_value;
RestoreAfterCopyWorkItem() {} RestoreAfterCopyWorkItem() {}
RestoreAfterCopyWorkItem(std::future<CopyResult> _result, RestoreAfterCopyWorkItem(std::future<CopyResult>&& _result,
uint32_t _checksum_value) uint32_t _checksum_value)
: result(std::move(_result)), : result(std::move(_result)),
checksum_value(_checksum_value) {} checksum_value(_checksum_value) {}
RestoreAfterCopyWorkItem(RestoreAfterCopyWorkItem&& o) {
*this = std::move(o);
}
RestoreAfterCopyWorkItem& operator=(RestoreAfterCopyWorkItem&& o) {
result = std::move(o.result);
checksum_value = o.checksum_value;
return *this;
}
}; };
channel<CopyWorkItem> files_to_copy_; channel<CopyWorkItem> files_to_copy_;