rocksdb/db/forward_iterator_bench.cc
Yanqin Jin 0376869f05 Remove using namespace ()
Summary:
As title.
This is part of an fb-internal task.
First, remove all `using namespace` statements if applicable.
Next, utilize multiple build platforms and see if anything is broken.
Should anything become broken, fix the compilation errors with as little extra change as possible.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9369

Test Plan:
internal build and make check
make clean && make static_lib && cd examples && make all

Reviewed By: pdillinger

Differential Revision: D33517260

Pulled By: riversand963

fbshipit-source-id: 3fc4ce6402a073421dfd9a9b2d1c79441dca7a40
2022-01-12 09:31:12 -08:00

377 lines
12 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#if !defined(GFLAGS) || defined(ROCKSDB_LITE)
#include <cstdio>
int main() {
fprintf(stderr, "Please install gflags to run rocksdb tools\n");
return 1;
}
#elif defined(OS_MACOSX) || defined(OS_WIN)
// Block forward_iterator_bench under MAC and Windows
int main() { return 0; }
#else
#include <semaphore.h>
#include <atomic>
#include <bitset>
#include <chrono>
#include <climits>
#include <condition_variable>
#include <limits>
#include <mutex>
#include <queue>
#include <random>
#include <thread>
#include "port/port.h"
#include "rocksdb/cache.h"
#include "rocksdb/db.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "test_util/testharness.h"
#include "util/gflags_compat.h"
const int MAX_SHARDS = 100000;
DEFINE_int32(writers, 8, "");
DEFINE_int32(readers, 8, "");
DEFINE_int64(rate, 100000, "");
DEFINE_int64(value_size, 300, "");
DEFINE_int64(shards, 1000, "");
DEFINE_int64(memtable_size, 500000000, "");
DEFINE_int64(block_cache_size, 300000000, "");
DEFINE_int64(block_size, 65536, "");
DEFINE_double(runtime, 300.0, "");
DEFINE_bool(cache_only_first, true, "");
DEFINE_bool(iterate_upper_bound, true, "");
struct Stats {
char pad1[128] __attribute__((__unused__));
std::atomic<uint64_t> written{0};
char pad2[128] __attribute__((__unused__));
std::atomic<uint64_t> read{0};
std::atomic<uint64_t> cache_misses{0};
char pad3[128] __attribute__((__unused__));
} stats;
struct Key {
Key() {}
Key(uint64_t shard_in, uint64_t seqno_in)
: shard_be(htobe64(shard_in)), seqno_be(htobe64(seqno_in)) {}
uint64_t shard() const { return be64toh(shard_be); }
uint64_t seqno() const { return be64toh(seqno_be); }
private:
uint64_t shard_be;
uint64_t seqno_be;
} __attribute__((__packed__));
struct Reader;
struct Writer;
struct ShardState {
char pad1[128] __attribute__((__unused__));
std::atomic<uint64_t> last_written{0};
Writer* writer;
Reader* reader;
char pad2[128] __attribute__((__unused__));
std::atomic<uint64_t> last_read{0};
std::unique_ptr<ROCKSDB_NAMESPACE::Iterator> it;
std::unique_ptr<ROCKSDB_NAMESPACE::Iterator> it_cacheonly;
Key upper_bound;
ROCKSDB_NAMESPACE::Slice upper_bound_slice;
char pad3[128] __attribute__((__unused__));
};
struct Reader {
public:
explicit Reader(std::vector<ShardState>* shard_states,
ROCKSDB_NAMESPACE::DB* db)
: shard_states_(shard_states), db_(db) {
sem_init(&sem_, 0, 0);
thread_ = port::Thread(&Reader::run, this);
}
void run() {
while (1) {
sem_wait(&sem_);
if (done_.load()) {
break;
}
uint64_t shard;
{
std::lock_guard<std::mutex> guard(queue_mutex_);
assert(!shards_pending_queue_.empty());
shard = shards_pending_queue_.front();
shards_pending_queue_.pop();
shards_pending_set_.reset(shard);
}
readOnceFromShard(shard);
}
}
void readOnceFromShard(uint64_t shard) {
ShardState& state = (*shard_states_)[shard];
if (!state.it) {
// Initialize iterators
ROCKSDB_NAMESPACE::ReadOptions options;
options.tailing = true;
if (FLAGS_iterate_upper_bound) {
state.upper_bound = Key(shard, std::numeric_limits<uint64_t>::max());
state.upper_bound_slice = ROCKSDB_NAMESPACE::Slice(
(const char*)&state.upper_bound, sizeof(state.upper_bound));
options.iterate_upper_bound = &state.upper_bound_slice;
}
state.it.reset(db_->NewIterator(options));
if (FLAGS_cache_only_first) {
options.read_tier = ROCKSDB_NAMESPACE::ReadTier::kBlockCacheTier;
state.it_cacheonly.reset(db_->NewIterator(options));
}
}
const uint64_t upto = state.last_written.load();
for (ROCKSDB_NAMESPACE::Iterator* it :
{state.it_cacheonly.get(), state.it.get()}) {
if (it == nullptr) {
continue;
}
if (state.last_read.load() >= upto) {
break;
}
bool need_seek = true;
for (uint64_t seq = state.last_read.load() + 1; seq <= upto; ++seq) {
if (need_seek) {
Key from(shard, state.last_read.load() + 1);
it->Seek(ROCKSDB_NAMESPACE::Slice((const char*)&from, sizeof(from)));
need_seek = false;
} else {
it->Next();
}
if (it->status().IsIncomplete()) {
++::stats.cache_misses;
break;
}
assert(it->Valid());
assert(it->key().size() == sizeof(Key));
Key key;
memcpy(&key, it->key().data(), it->key().size());
// fprintf(stderr, "Expecting (%ld, %ld) read (%ld, %ld)\n",
// shard, seq, key.shard(), key.seqno());
assert(key.shard() == shard);
assert(key.seqno() == seq);
state.last_read.store(seq);
++::stats.read;
}
}
}
void onWrite(uint64_t shard) {
{
std::lock_guard<std::mutex> guard(queue_mutex_);
if (!shards_pending_set_.test(shard)) {
shards_pending_queue_.push(shard);
shards_pending_set_.set(shard);
sem_post(&sem_);
}
}
}
~Reader() {
done_.store(true);
sem_post(&sem_);
thread_.join();
}
private:
char pad1[128] __attribute__((__unused__));
std::vector<ShardState>* shard_states_;
ROCKSDB_NAMESPACE::DB* db_;
ROCKSDB_NAMESPACE::port::Thread thread_;
sem_t sem_;
std::mutex queue_mutex_;
std::bitset<MAX_SHARDS + 1> shards_pending_set_;
std::queue<uint64_t> shards_pending_queue_;
std::atomic<bool> done_{false};
char pad2[128] __attribute__((__unused__));
};
struct Writer {
explicit Writer(std::vector<ShardState>* shard_states,
ROCKSDB_NAMESPACE::DB* db)
: shard_states_(shard_states), db_(db) {}
void start() { thread_ = port::Thread(&Writer::run, this); }
void run() {
std::queue<std::chrono::steady_clock::time_point> workq;
std::chrono::steady_clock::time_point deadline(
std::chrono::steady_clock::now() +
std::chrono::nanoseconds((uint64_t)(1000000000 * FLAGS_runtime)));
std::vector<uint64_t> my_shards;
for (int i = 1; i <= FLAGS_shards; ++i) {
if ((*shard_states_)[i].writer == this) {
my_shards.push_back(i);
}
}
std::mt19937 rng{std::random_device()()};
std::uniform_int_distribution<int> shard_dist(
0, static_cast<int>(my_shards.size()) - 1);
std::string value(FLAGS_value_size, '*');
while (1) {
auto now = std::chrono::steady_clock::now();
if (FLAGS_runtime >= 0 && now >= deadline) {
break;
}
if (workq.empty()) {
for (int i = 0; i < FLAGS_rate; i += FLAGS_writers) {
std::chrono::nanoseconds offset(1000000000LL * i / FLAGS_rate);
workq.push(now + offset);
}
}
while (!workq.empty() && workq.front() < now) {
workq.pop();
uint64_t shard = my_shards[shard_dist(rng)];
ShardState& state = (*shard_states_)[shard];
uint64_t seqno = state.last_written.load() + 1;
Key key(shard, seqno);
// fprintf(stderr, "Writing (%ld, %ld)\n", shard, seqno);
ROCKSDB_NAMESPACE::Status status =
db_->Put(ROCKSDB_NAMESPACE::WriteOptions(),
ROCKSDB_NAMESPACE::Slice((const char*)&key, sizeof(key)),
ROCKSDB_NAMESPACE::Slice(value));
assert(status.ok());
state.last_written.store(seqno);
state.reader->onWrite(shard);
++::stats.written;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
// fprintf(stderr, "Writer done\n");
}
~Writer() { thread_.join(); }
private:
char pad1[128] __attribute__((__unused__));
std::vector<ShardState>* shard_states_;
ROCKSDB_NAMESPACE::DB* db_;
ROCKSDB_NAMESPACE::port::Thread thread_;
char pad2[128] __attribute__((__unused__));
};
struct StatsThread {
explicit StatsThread(ROCKSDB_NAMESPACE::DB* db)
: db_(db), thread_(&StatsThread::run, this) {}
void run() {
auto tstart = std::chrono::steady_clock::now(), tlast = tstart;
uint64_t wlast = 0, rlast = 0;
while (!done_.load()) {
{
std::unique_lock<std::mutex> lock(cvm_);
cv_.wait_for(lock, std::chrono::seconds(1));
}
auto now = std::chrono::steady_clock::now();
double elapsed =
std::chrono::duration_cast<std::chrono::duration<double> >(
now - tlast).count();
uint64_t w = ::stats.written.load();
uint64_t r = ::stats.read.load();
fprintf(stderr,
"%s elapsed %4lds | written %10ld | w/s %10.0f | read %10ld | "
"r/s %10.0f | cache misses %10ld\n",
db_->GetEnv()->TimeToString(time(nullptr)).c_str(),
std::chrono::duration_cast<std::chrono::seconds>(now - tstart)
.count(),
w, (w - wlast) / elapsed, r, (r - rlast) / elapsed,
::stats.cache_misses.load());
wlast = w;
rlast = r;
tlast = now;
}
}
~StatsThread() {
{
std::lock_guard<std::mutex> guard(cvm_);
done_.store(true);
}
cv_.notify_all();
thread_.join();
}
private:
ROCKSDB_NAMESPACE::DB* db_;
std::mutex cvm_;
std::condition_variable cv_;
ROCKSDB_NAMESPACE::port::Thread thread_;
std::atomic<bool> done_{false};
};
int main(int argc, char** argv) {
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
std::mt19937 rng{std::random_device()()};
ROCKSDB_NAMESPACE::Status status;
std::string path =
ROCKSDB_NAMESPACE::test::PerThreadDBPath("forward_iterator_test");
fprintf(stderr, "db path is %s\n", path.c_str());
ROCKSDB_NAMESPACE::Options options;
options.create_if_missing = true;
options.compression = ROCKSDB_NAMESPACE::CompressionType::kNoCompression;
options.compaction_style =
ROCKSDB_NAMESPACE::CompactionStyle::kCompactionStyleNone;
options.level0_slowdown_writes_trigger = 99999;
options.level0_stop_writes_trigger = 99999;
options.use_direct_io_for_flush_and_compaction = true;
options.write_buffer_size = FLAGS_memtable_size;
ROCKSDB_NAMESPACE::BlockBasedTableOptions table_options;
table_options.block_cache =
ROCKSDB_NAMESPACE::NewLRUCache(FLAGS_block_cache_size);
table_options.block_size = FLAGS_block_size;
options.table_factory.reset(
ROCKSDB_NAMESPACE::NewBlockBasedTableFactory(table_options));
status = ROCKSDB_NAMESPACE::DestroyDB(path, options);
assert(status.ok());
ROCKSDB_NAMESPACE::DB* db_raw;
status = ROCKSDB_NAMESPACE::DB::Open(options, path, &db_raw);
assert(status.ok());
std::unique_ptr<ROCKSDB_NAMESPACE::DB> db(db_raw);
std::vector<ShardState> shard_states(FLAGS_shards + 1);
std::deque<Reader> readers;
while (static_cast<int>(readers.size()) < FLAGS_readers) {
readers.emplace_back(&shard_states, db_raw);
}
std::deque<Writer> writers;
while (static_cast<int>(writers.size()) < FLAGS_writers) {
writers.emplace_back(&shard_states, db_raw);
}
// Each shard gets a random reader and random writer assigned to it
for (int i = 1; i <= FLAGS_shards; ++i) {
std::uniform_int_distribution<int> reader_dist(0, FLAGS_readers - 1);
std::uniform_int_distribution<int> writer_dist(0, FLAGS_writers - 1);
shard_states[i].reader = &readers[reader_dist(rng)];
shard_states[i].writer = &writers[writer_dist(rng)];
}
StatsThread stats_thread(db_raw);
for (Writer& w : writers) {
w.start();
}
writers.clear();
readers.clear();
}
#endif // !defined(GFLAGS) || defined(ROCKSDB_LITE)