rocksdb/tools/db_repl_stress.cc
mrambacher c7c7b07f06 More Makefile Cleanup (#7097)
Summary:
Cleans up some of the dependencies on test code in the Makefile while building tools:
- Moves the test::RandomString, DBBaseTest::RandomString into Random
- Moves the test::RandomHumanReadableString into Random
- Moves the DestroyDir method into file_utils
- Moves the SetupSyncPointsToMockDirectIO into sync_point.
- Moves the FaultInjection Env and FS classes under env

These changes allow all of the tools to build without dependencies on test_util, thereby simplifying the build dependencies.  By moving the FaultInjection code, the dependency in db_stress on different libraries for debug vs release was eliminated.

Tested both release and debug builds via Make and CMake for both static and shared libraries.

More work remains to clean up how the tools are built and remove some unnecessary dependencies.  There is also more work that should be done to get the Makefile and CMake to align in their builds -- what is in the libraries and the sizes of the executables are different.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7097

Reviewed By: riversand963

Differential Revision: D22463160

Pulled By: pdillinger

fbshipit-source-id: e19462b53324ab3f0b7c72459dbc73165cc382b2
2020-07-09 14:35:17 -07:00

154 lines
4.4 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#ifndef GFLAGS
#include <cstdio>
int main() {
fprintf(stderr, "Please install gflags to run rocksdb tools\n");
return 1;
}
#else
#include <atomic>
#include <cstdio>
#include "db/write_batch_internal.h"
#include "rocksdb/db.h"
#include "rocksdb/types.h"
#include "test_util/testutil.h"
#include "util/gflags_compat.h"
// Run a thread to perform Put's.
// Another thread uses GetUpdatesSince API to keep getting the updates.
// options :
// --num_inserts = the num of inserts the first thread should perform.
// --wal_ttl = the wal ttl for the run.
using namespace ROCKSDB_NAMESPACE;
using GFLAGS_NAMESPACE::ParseCommandLineFlags;
using GFLAGS_NAMESPACE::SetUsageMessage;
struct DataPumpThread {
size_t no_records;
DB* db; // Assumption DB is Open'ed already.
};
static void DataPumpThreadBody(void* arg) {
DataPumpThread* t = reinterpret_cast<DataPumpThread*>(arg);
DB* db = t->db;
Random rnd(301);
size_t i = 0;
while (i++ < t->no_records) {
if (!db->Put(WriteOptions(), Slice(rnd.RandomString(500)),
Slice(rnd.RandomString(500)))
.ok()) {
fprintf(stderr, "Error in put\n");
exit(1);
}
}
}
struct ReplicationThread {
std::atomic<bool> stop;
DB* db;
volatile size_t no_read;
};
static void ReplicationThreadBody(void* arg) {
ReplicationThread* t = reinterpret_cast<ReplicationThread*>(arg);
DB* db = t->db;
std::unique_ptr<TransactionLogIterator> iter;
SequenceNumber currentSeqNum = 1;
while (!t->stop.load(std::memory_order_acquire)) {
iter.reset();
Status s;
while (!db->GetUpdatesSince(currentSeqNum, &iter).ok()) {
if (t->stop.load(std::memory_order_acquire)) {
return;
}
}
fprintf(stderr, "Refreshing iterator\n");
for (; iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) {
BatchResult res = iter->GetBatch();
if (res.sequence != currentSeqNum) {
fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n",
(long)currentSeqNum, (long)res.sequence);
exit(1);
}
}
}
}
DEFINE_uint64(num_inserts, 1000,
"the num of inserts the first thread should"
" perform.");
DEFINE_uint64(wal_ttl_seconds, 1000, "the wal ttl for the run(in seconds)");
DEFINE_uint64(wal_size_limit_MB, 10,
"the wal size limit for the run"
"(in MB)");
int main(int argc, const char** argv) {
SetUsageMessage(
std::string("\nUSAGE:\n") + std::string(argv[0]) +
" --num_inserts=<num_inserts> --wal_ttl_seconds=<WAL_ttl_seconds>" +
" --wal_size_limit_MB=<WAL_size_limit_MB>");
ParseCommandLineFlags(&argc, const_cast<char***>(&argv), true);
Env* env = Env::Default();
std::string default_db_path;
env->GetTestDirectory(&default_db_path);
default_db_path += "db_repl_stress";
Options options;
options.create_if_missing = true;
options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
DB* db;
DestroyDB(default_db_path, options);
Status s = DB::Open(options, default_db_path, &db);
if (!s.ok()) {
fprintf(stderr, "Could not open DB due to %s\n", s.ToString().c_str());
exit(1);
}
DataPumpThread dataPump;
dataPump.no_records = FLAGS_num_inserts;
dataPump.db = db;
env->StartThread(DataPumpThreadBody, &dataPump);
ReplicationThread replThread;
replThread.db = db;
replThread.no_read = 0;
replThread.stop.store(false, std::memory_order_release);
env->StartThread(ReplicationThreadBody, &replThread);
while (replThread.no_read < FLAGS_num_inserts)
;
replThread.stop.store(true, std::memory_order_release);
if (replThread.no_read < dataPump.no_records) {
// no. read should be => than inserted.
fprintf(stderr,
"No. of Record's written and read not same\nRead : %" ROCKSDB_PRIszt
" Written : %" ROCKSDB_PRIszt "\n",
replThread.no_read, dataPump.no_records);
exit(1);
}
fprintf(stderr, "Successful!\n");
exit(0);
}
#endif // GFLAGS
#else // ROCKSDB_LITE
#include <stdio.h>
int main(int /*argc*/, char** /*argv*/) {
fprintf(stderr, "Not supported in lite mode.\n");
return 1;
}
#endif // ROCKSDB_LITE