Removing duplicate code in db_bench/db_stress, fixing typos

Summary:
While working on single delete support for db_bench, I realized that
db_bench/db_stress contain a bunch of duplicate code related to
copmression and found some typos. This patch removes duplicate code,
typos and a redundant #ifndef in internal_stats.cc.

Test Plan: make db_stress && make db_bench && ./db_bench --benchmarks=compress,uncompress

Reviewers: yhchiang, sdong, rven, anthony, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D43965
This commit is contained in:
Andres Notzli 2015-08-11 11:46:15 -07:00
parent a03085b556
commit 4249f159d5
5 changed files with 118 additions and 220 deletions

View File

@ -111,8 +111,8 @@ DEFINE_string(benchmarks,
"fillseekseq,"
"randomtransaction",
"Comma-separated list of operations to run in the specified order"
"Actual benchmarks:\n"
"Comma-separated list of operations to run in the specified"
" order. Available benchmarks:\n"
"\tfillseq -- write N values in sequential key"
" order in async mode\n"
"\tfillrandom -- write N values in random key order in async"
@ -312,7 +312,7 @@ DEFINE_int32(universal_compression_size_percent, -1,
"compaction. -1 means compress everything.");
DEFINE_bool(universal_allow_trivial_move, false,
"Sllow trivial move in universal compaction.");
"Allow trivial move in universal compaction.");
DEFINE_int64(cache_size, -1, "Number of bytes to use as a cache of uncompressed"
"data. Negative means use default settings.");
@ -1403,6 +1403,35 @@ class Benchmark {
return true;
}
inline bool CompressSlice(const Slice& input, std::string* compressed) {
bool ok = true;
switch (FLAGS_compression_type_e) {
case rocksdb::kSnappyCompression:
ok = Snappy_Compress(Options().compression_opts, input.data(),
input.size(), compressed);
break;
case rocksdb::kZlibCompression:
ok = Zlib_Compress(Options().compression_opts, 2, input.data(),
input.size(), compressed);
break;
case rocksdb::kBZip2Compression:
ok = BZip2_Compress(Options().compression_opts, 2, input.data(),
input.size(), compressed);
break;
case rocksdb::kLZ4Compression:
ok = LZ4_Compress(Options().compression_opts, 2, input.data(),
input.size(), compressed);
break;
case rocksdb::kLZ4HCCompression:
ok = LZ4HC_Compress(Options().compression_opts, 2, input.data(),
input.size(), compressed);
break;
default:
ok = false;
}
return ok;
}
void PrintHeader() {
PrintEnvironment();
fprintf(stdout, "Keys: %d bytes each\n", FLAGS_key_size);
@ -1432,26 +1461,10 @@ class Benchmark {
}
#endif
}
switch (FLAGS_compression_type_e) {
case rocksdb::kNoCompression:
fprintf(stdout, "Compression: none\n");
break;
case rocksdb::kSnappyCompression:
fprintf(stdout, "Compression: snappy\n");
break;
case rocksdb::kZlibCompression:
fprintf(stdout, "Compression: zlib\n");
break;
case rocksdb::kBZip2Compression:
fprintf(stdout, "Compression: bzip2\n");
break;
case rocksdb::kLZ4Compression:
fprintf(stdout, "Compression: lz4\n");
break;
case rocksdb::kLZ4HCCompression:
fprintf(stdout, "Compression: lz4hc\n");
break;
}
const char* compression =
CompressionTypeToString(FLAGS_compression_type_e).c_str();
fprintf(stdout, "Compression: %s\n", compression);
switch (FLAGS_rep_factory) {
case kPrefixHash:
@ -1472,11 +1485,11 @@ class Benchmark {
}
fprintf(stdout, "Perf Level: %d\n", FLAGS_perf_level);
PrintWarnings();
PrintWarnings(compression);
fprintf(stdout, "------------------------------------------------\n");
}
void PrintWarnings() {
void PrintWarnings(const char* compression) {
#if defined(__GNUC__) && !defined(__OPTIMIZE__)
fprintf(stdout,
"WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"
@ -1489,51 +1502,17 @@ class Benchmark {
if (FLAGS_compression_type_e != rocksdb::kNoCompression) {
// The test string should not be too small.
const int len = FLAGS_block_size;
char* text = (char*) malloc(len+1);
bool result = true;
const char* name = nullptr;
std::string input_str(len, 'y');
std::string compressed;
memset(text, (int) 'y', len);
text[len] = '\0';
switch (FLAGS_compression_type_e) {
case kSnappyCompression:
result = Snappy_Compress(Options().compression_opts, text,
strlen(text), &compressed);
name = "Snappy";
break;
case kZlibCompression:
result = Zlib_Compress(Options().compression_opts, 2, text,
strlen(text), &compressed);
name = "Zlib";
break;
case kBZip2Compression:
result = BZip2_Compress(Options().compression_opts, 2, text,
strlen(text), &compressed);
name = "BZip2";
break;
case kLZ4Compression:
result = LZ4_Compress(Options().compression_opts, 2, text,
strlen(text), &compressed);
name = "LZ4";
break;
case kLZ4HCCompression:
result = LZ4HC_Compress(Options().compression_opts, 2, text,
strlen(text), &compressed);
name = "LZ4HC";
break;
case kNoCompression:
assert(false); // cannot happen
break;
}
bool result = CompressSlice(Slice(input_str), &compressed);
if (!result) {
fprintf(stdout, "WARNING: %s compression is not enabled\n", name);
} else if (name && compressed.size() >= strlen(text)) {
fprintf(stdout, "WARNING: %s compression is not effective\n", name);
fprintf(stdout, "WARNING: %s compression is not enabled\n",
compression);
} else if (compressed.size() >= input_str.size()) {
fprintf(stdout, "WARNING: %s compression is not effective\n",
compression);
}
free(text);
}
}
@ -1731,18 +1710,9 @@ class Benchmark {
}
PrintHeader();
Open(&open_options_);
const char* benchmarks = FLAGS_benchmarks.c_str();
while (benchmarks != nullptr) {
const char* sep = strchr(benchmarks, ',');
Slice name;
if (sep == nullptr) {
name = benchmarks;
benchmarks = nullptr;
} else {
name = Slice(benchmarks, sep - benchmarks);
benchmarks = sep + 1;
}
std::stringstream benchmark_stream(FLAGS_benchmarks);
std::string name;
while (std::getline(benchmark_stream, name, ',')) {
// Sanitize parameters
num_ = FLAGS_num;
reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
@ -1763,138 +1733,136 @@ class Benchmark {
bool fresh_db = false;
int num_threads = FLAGS_threads;
if (name == Slice("fillseq")) {
if (name == "fillseq") {
fresh_db = true;
method = &Benchmark::WriteSeq;
} else if (name == Slice("fillbatch")) {
} else if (name == "fillbatch") {
fresh_db = true;
entries_per_batch_ = 1000;
method = &Benchmark::WriteSeq;
} else if (name == Slice("fillrandom")) {
} else if (name == "fillrandom") {
fresh_db = true;
method = &Benchmark::WriteRandom;
} else if (name == Slice("filluniquerandom")) {
} else if (name == "filluniquerandom") {
fresh_db = true;
if (num_threads > 1) {
fprintf(stderr, "filluniquerandom multithreaded not supported"
", use 1 thread");
fprintf(stderr,
"filluniquerandom multithreaded not supported"
", use 1 thread");
num_threads = 1;
}
method = &Benchmark::WriteUniqueRandom;
} else if (name == Slice("overwrite")) {
fresh_db = false;
} else if (name == "overwrite") {
method = &Benchmark::WriteRandom;
} else if (name == Slice("fillsync")) {
} else if (name == "fillsync") {
fresh_db = true;
num_ /= 1000;
write_options_.sync = true;
method = &Benchmark::WriteRandom;
} else if (name == Slice("fill100K")) {
} else if (name == "fill100K") {
fresh_db = true;
num_ /= 1000;
value_size_ = 100 * 1000;
method = &Benchmark::WriteRandom;
} else if (name == Slice("readseq")) {
} else if (name == "readseq") {
method = &Benchmark::ReadSequential;
} else if (name == Slice("readtocache")) {
} else if (name == "readtocache") {
method = &Benchmark::ReadSequential;
num_threads = 1;
reads_ = num_;
} else if (name == Slice("readreverse")) {
} else if (name == "readreverse") {
method = &Benchmark::ReadReverse;
} else if (name == Slice("readrandom")) {
} else if (name == "readrandom") {
method = &Benchmark::ReadRandom;
} else if (name == Slice("readrandomfast")) {
} else if (name == "readrandomfast") {
method = &Benchmark::ReadRandomFast;
} else if (name == Slice("multireadrandom")) {
} else if (name == "multireadrandom") {
fprintf(stderr, "entries_per_batch = %" PRIi64 "\n",
entries_per_batch_);
method = &Benchmark::MultiReadRandom;
} else if (name == Slice("readmissing")) {
} else if (name == "readmissing") {
++key_size_;
method = &Benchmark::ReadRandom;
} else if (name == Slice("newiterator")) {
} else if (name == "newiterator") {
method = &Benchmark::IteratorCreation;
} else if (name == Slice("newiteratorwhilewriting")) {
} else if (name == "newiteratorwhilewriting") {
num_threads++; // Add extra thread for writing
method = &Benchmark::IteratorCreationWhileWriting;
} else if (name == Slice("seekrandom")) {
} else if (name == "seekrandom") {
method = &Benchmark::SeekRandom;
} else if (name == Slice("seekrandomwhilewriting")) {
} else if (name == "seekrandomwhilewriting") {
num_threads++; // Add extra thread for writing
method = &Benchmark::SeekRandomWhileWriting;
} else if (name == Slice("seekrandomwhilemerging")) {
} else if (name == "seekrandomwhilemerging") {
num_threads++; // Add extra thread for merging
method = &Benchmark::SeekRandomWhileMerging;
} else if (name == Slice("readrandomsmall")) {
} else if (name == "readrandomsmall") {
reads_ /= 1000;
method = &Benchmark::ReadRandom;
} else if (name == Slice("deleteseq")) {
} else if (name == "deleteseq") {
method = &Benchmark::DeleteSeq;
} else if (name == Slice("deleterandom")) {
} else if (name == "deleterandom") {
method = &Benchmark::DeleteRandom;
} else if (name == Slice("readwhilewriting")) {
} else if (name == "readwhilewriting") {
num_threads++; // Add extra thread for writing
method = &Benchmark::ReadWhileWriting;
} else if (name == Slice("readwhilemerging")) {
} else if (name == "readwhilemerging") {
num_threads++; // Add extra thread for writing
method = &Benchmark::ReadWhileMerging;
} else if (name == Slice("readrandomwriterandom")) {
} else if (name == "readrandomwriterandom") {
method = &Benchmark::ReadRandomWriteRandom;
} else if (name == Slice("readrandommergerandom")) {
} else if (name == "readrandommergerandom") {
if (FLAGS_merge_operator.empty()) {
fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
name.ToString().c_str());
name.c_str());
exit(1);
}
method = &Benchmark::ReadRandomMergeRandom;
} else if (name == Slice("updaterandom")) {
} else if (name == "updaterandom") {
method = &Benchmark::UpdateRandom;
} else if (name == Slice("appendrandom")) {
} else if (name == "appendrandom") {
method = &Benchmark::AppendRandom;
} else if (name == Slice("mergerandom")) {
} else if (name == "mergerandom") {
if (FLAGS_merge_operator.empty()) {
fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
name.ToString().c_str());
name.c_str());
exit(1);
}
method = &Benchmark::MergeRandom;
} else if (name == Slice("randomwithverify")) {
} else if (name == "randomwithverify") {
method = &Benchmark::RandomWithVerify;
} else if (name == Slice("fillseekseq")) {
} else if (name == "fillseekseq") {
method = &Benchmark::WriteSeqSeekSeq;
} else if (name == Slice("compact")) {
} else if (name == "compact") {
method = &Benchmark::Compact;
} else if (name == Slice("crc32c")) {
} else if (name == "crc32c") {
method = &Benchmark::Crc32c;
} else if (name == Slice("xxhash")) {
} else if (name == "xxhash") {
method = &Benchmark::xxHash;
} else if (name == Slice("acquireload")) {
} else if (name == "acquireload") {
method = &Benchmark::AcquireLoad;
} else if (name == Slice("compress")) {
} else if (name == "compress") {
method = &Benchmark::Compress;
} else if (name == Slice("uncompress")) {
} else if (name == "uncompress") {
method = &Benchmark::Uncompress;
} else if (name == Slice("randomtransaction")) {
} else if (name == "randomtransaction") {
method = &Benchmark::RandomTransaction;
post_process_method = &Benchmark::RandomTransactionVerify;
} else if (name == Slice("stats")) {
} else if (name == "stats") {
PrintStats("rocksdb.stats");
} else if (name == Slice("levelstats")) {
} else if (name == "levelstats") {
PrintStats("rocksdb.levelstats");
} else if (name == Slice("sstables")) {
} else if (name == "sstables") {
PrintStats("rocksdb.sstables");
} else {
if (name != Slice()) { // No error message for empty name
fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str());
exit(1);
}
} else if (!name.empty()) { // No error message for empty name
fprintf(stderr, "unknown benchmark '%s'\n", name.c_str());
exit(1);
}
if (fresh_db) {
if (FLAGS_use_existing_db) {
fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n",
name.ToString().c_str());
name.c_str());
method = nullptr;
} else {
if (db_.db != nullptr) {
@ -2098,30 +2066,7 @@ class Benchmark {
// Compress 1G
while (ok && bytes < int64_t(1) << 30) {
switch (FLAGS_compression_type_e) {
case rocksdb::kSnappyCompression:
ok = Snappy_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kZlibCompression:
ok = Zlib_Compress(Options().compression_opts, 2, input.data(),
input.size(), &compressed);
break;
case rocksdb::kBZip2Compression:
ok = BZip2_Compress(Options().compression_opts, 2, input.data(),
input.size(), &compressed);
break;
case rocksdb::kLZ4Compression:
ok = LZ4_Compress(Options().compression_opts, 2, input.data(),
input.size(), &compressed);
break;
case rocksdb::kLZ4HCCompression:
ok = LZ4HC_Compress(Options().compression_opts, 2, input.data(),
input.size(), &compressed);
break;
default:
ok = false;
}
ok = CompressSlice(input, &compressed);
produced += compressed.size();
bytes += input.size();
thread->stats.FinishedOps(nullptr, nullptr, 1);
@ -2143,32 +2088,7 @@ class Benchmark {
Slice input = gen.Generate(FLAGS_block_size);
std::string compressed;
bool ok;
switch (FLAGS_compression_type_e) {
case rocksdb::kSnappyCompression:
ok = Snappy_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kZlibCompression:
ok = Zlib_Compress(Options().compression_opts, 2, input.data(),
input.size(), &compressed);
break;
case rocksdb::kBZip2Compression:
ok = BZip2_Compress(Options().compression_opts, 2, input.data(),
input.size(), &compressed);
break;
case rocksdb::kLZ4Compression:
ok = LZ4_Compress(Options().compression_opts, 2, input.data(),
input.size(), &compressed);
break;
case rocksdb::kLZ4HCCompression:
ok = LZ4HC_Compress(Options().compression_opts, 2, input.data(),
input.size(), &compressed);
break;
default:
ok = false;
}
bool ok = CompressSlice(input, &compressed);
int64_t bytes = 0;
int decompress_size;
while (ok && bytes < 1024 * 1048576) {
@ -2262,7 +2182,7 @@ class Benchmark {
flashcache_aware_env_ =
std::move(NewFlashcacheAwareEnv(FLAGS_env, cachedev_fd_));
if (flashcache_aware_env_.get() == nullptr) {
fprintf(stderr, "Failed to open flashcahce device at %s\n",
fprintf(stderr, "Failed to open flashcache device at %s\n",
FLAGS_flashcache_dev.c_str());
std::abort();
}
@ -3430,7 +3350,7 @@ class Benchmark {
// Update the value (by appending data)
Slice operand = gen.Generate(value_size_);
if (value.size() > 0) {
// Use a delimeter to match the semantics for StringAppendOperator
// Use a delimiter to match the semantics for StringAppendOperator
value.append(1,',');
}
value.append(operand.data(), operand.size());

View File

@ -652,7 +652,7 @@ void DBIter::Seek(const Slice& target) {
if (iter_->Valid()) {
direction_ = kForward;
ClearSavedValue();
FindNextUserEntry(false /*not skipping */);
FindNextUserEntry(false /* not skipping */);
} else {
valid_ = false;
}
@ -660,7 +660,7 @@ void DBIter::Seek(const Slice& target) {
void DBIter::SeekToFirst() {
// Don't use iter_::Seek() if we set a prefix extractor
// because prefix seek wiil be used.
// because prefix seek will be used.
if (prefix_extractor_ != nullptr) {
max_skip_ = std::numeric_limits<uint64_t>::max();
}
@ -681,7 +681,7 @@ void DBIter::SeekToFirst() {
void DBIter::SeekToLast() {
// Don't use iter_::Seek() if we set a prefix extractor
// because prefix seek wiil be used.
// because prefix seek will be used.
if (prefix_extractor_ != nullptr) {
max_skip_ = std::numeric_limits<uint64_t>::max();
}

View File

@ -621,7 +621,7 @@ TEST_F(DBTest, GetSnapshot) {
ASSERT_OK(Put(1, key, "v1"));
const Snapshot* s1 = db_->GetSnapshot();
if (option_config_ == kHashCuckoo) {
// NOt supported case.
// Unsupported case.
ASSERT_TRUE(s1 == nullptr);
break;
}

View File

@ -333,7 +333,7 @@ bool InternalStats::GetIntProperty(DBPropertyType property_type,
*value = (cfd_->imm()->IsFlushPending() ? 1 : 0);
return true;
case kCompactionPending:
// 1 if the system already determines at least one compacdtion is needed.
// 1 if the system already determines at least one compaction is needed.
// 0 otherwise,
*value = (cfd_->compaction_picker()->NeedsCompaction(vstorage) ? 1 : 0);
return true;
@ -385,11 +385,9 @@ bool InternalStats::GetIntProperty(DBPropertyType property_type,
case kNumLiveVersions:
*value = cfd_->GetNumLiveVersions();
return true;
#ifndef ROCKSDB_LITE
case kIsFileDeletionEnabled:
*value = db->IsFileDeletionsEnabled();
return true;
#endif
case kBaseLevel:
*value = vstorage->base_level();
return true;

View File

@ -40,24 +40,25 @@ int main() {
#include <gflags/gflags.h>
#include "db/db_impl.h"
#include "db/version_set.h"
#include "rocksdb/statistics.h"
#include "hdfs/env_hdfs.h"
#include "port/port.h"
#include "rocksdb/cache.h"
#include "rocksdb/utilities/db_ttl.h"
#include "rocksdb/env.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "port/port.h"
#include "rocksdb/statistics.h"
#include "rocksdb/utilities/db_ttl.h"
#include "rocksdb/write_batch.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
#include "util/histogram.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/random.h"
#include "util/testutil.h"
#include "util/logging.h"
#include "hdfs/env_hdfs.h"
#include "utilities/merge_operators.h"
#include "util/string_util.h"
#include "util/testutil.h"
#include "utilities/merge_operators.h"
using GFLAGS::ParseCommandLineFlags;
using GFLAGS::RegisterFlagValidator;
@ -1811,29 +1812,8 @@ class StressTest {
fprintf(stdout, "Num keys per lock : %d\n",
1 << FLAGS_log2_keys_per_lock);
const char* compression = "";
switch (FLAGS_compression_type_e) {
case rocksdb::kNoCompression:
compression = "none";
break;
case rocksdb::kSnappyCompression:
compression = "snappy";
break;
case rocksdb::kZlibCompression:
compression = "zlib";
break;
case rocksdb::kBZip2Compression:
compression = "bzip2";
break;
case rocksdb::kLZ4Compression:
compression = "lz4";
break;
case rocksdb::kLZ4HCCompression:
compression = "lz4hc";
break;
}
fprintf(stdout, "Compression : %s\n", compression);
std::string compression = CompressionTypeToString(FLAGS_compression_type_e);
fprintf(stdout, "Compression : %s\n", compression.c_str());
const char* memtablerep = "";
switch (FLAGS_rep_factory) {