[Java] Enable filluniquerandom, readseq, BloomFilter, and 70+ command-line options to DbBenchmark.java

Summary:
* Add filluniquerandom
* Add readseq, implemented using iterator.
* Realize most command-line-arguments from db_bench.cc (70+).
* Some code are commented out as some of the options in Options
  not yet have Java bindings.
* Add default option to DbBenchmark.
* RocksDB will now take the ownership of all c++ raw-pointers from Options, which includes a c++ raw-pointer for Filter.

Test Plan: ./jdb_bench.sh --db=/tmp/rocksjava-bench/db --num_levels=6 --key_size=20 --prefix_size=20 --keys_per_prefix=0 --value_size=100 --block_size=4096 --cache_size=17179869184 --cache_numshardbits=6 --compression_ratio=1 --min_level_to_compress=-1 --disable_seek_compaction=1 --hard_rate_limit=2 --write_buffer_size=134217728 --max_write_buffer_number=2 --level0_file_num_compaction_trigger=8 --target_file_size_base=134217728 --max_bytes_for_level_base=1073741824 --disable_wal=0 --wal_dir=/tmp/rocksjava-bench/wal --sync=0 --disable_data_sync=1 --verify_checksum=1 --delete_obsolete_files_period_micros=314572800 --max_grandparent_overlap_factor=10 --max_background_compactions=4 --max_background_flushes=0 --level0_slowdown_writes_trigger=16 --level0_stop_writes_trigger=24 --statistics=0 --stats_per_interval=0 --stats_interval=1048576 --histogram=0 --use_plain_table=1 --open_files=-1 --mmap_read=1 --mmap_write=0 --memtablerep=prefix_hash --bloom_bits=10 --bloom_locality=1 --perf_level=0 --benchmarks=filluniquerandom,readseq,readrandom --use_existing_db=0 --threads=4

Reviewers: haobo, dhruba, sdong, ankgup87, rsumbaly, swapnilghike, zzbennett

Reviewed By: haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D18267
This commit is contained in:
Yueh-Hsuan Chiang 2014-04-24 13:43:57 -07:00
parent 4cd9f58c04
commit 05979493d6
6 changed files with 776 additions and 70 deletions

View File

@ -20,7 +20,7 @@ public abstract class Filter {
/**
* Deletes underlying C++ filter pointer.
*/
public synchronized void dispose() {
protected synchronized void dispose() {
if(nativeHandle_ != 0) {
dispose0(nativeHandle_);
}

View File

@ -146,15 +146,21 @@ public class Options {
/**
* Use the specified filter policy to reduce disk reads.
*
* Note that the caller should not dispose the input filter as
* Options.dispose() will dispose this filter.
*
* @param Filter policy java instance.
* @return the instance of the current Options.
* @see RocksDB.open()
*/
public Options setFilter(Filter filter) {
assert(isInitialized());
setFilter0(nativeHandle_, filter);
setFilterHandle(nativeHandle_, filter.nativeHandle_);
filter_ = filter;
return this;
}
private native void setFilterHandle(long optHandle, long filterHandle);
/*
* Disable compaction triggered by seek.
@ -786,7 +792,8 @@ public class Options {
long handle, int limit);
/**
* The following two fields affect how archived logs will be deleted.
* WalTtlSeconds() and walSizeLimitMB() affect how archived logs
* will be deleted.
* 1. If both set to 0, logs will be deleted asap and will not get into
* the archive.
* 2. If WAL_ttl_seconds is 0 and WAL_size_limit_MB is not 0,
@ -800,6 +807,7 @@ public class Options {
* checks will be performed with ttl being first.
*
* @return the wal-ttl seconds
* @see walSizeLimitMB()
*/
public long walTtlSeconds() {
assert(isInitialized());
@ -808,7 +816,8 @@ public class Options {
private native long walTtlSeconds(long handle);
/**
* The following two fields affect how archived logs will be deleted.
* WalTtlSeconds() and walSizeLimitMB() affect how archived logs
* will be deleted.
* 1. If both set to 0, logs will be deleted asap and will not get into
* the archive.
* 2. If WAL_ttl_seconds is 0 and WAL_size_limit_MB is not 0,
@ -823,13 +832,64 @@ public class Options {
*
* @param walTtlSeconds the ttl seconds
* @return the reference to the current option.
* @see setWalSizeLimitMB()
*/
public Options setWALTtlSeconds(long walTtlSeconds) {
public Options setWalTtlSeconds(long walTtlSeconds) {
assert(isInitialized());
setWALTtlSeconds(nativeHandle_, walTtlSeconds);
setWalTtlSeconds(nativeHandle_, walTtlSeconds);
return this;
}
private native void setWALTtlSeconds(long handle, long walTtlSeconds);
private native void setWalTtlSeconds(long handle, long walTtlSeconds);
/**
* WalTtlSeconds() and walSizeLimitMB() affect how archived logs
* will be deleted.
* 1. If both set to 0, logs will be deleted asap and will not get into
* the archive.
* 2. If WAL_ttl_seconds is 0 and WAL_size_limit_MB is not 0,
* WAL files will be checked every 10 min and if total size is greater
* then WAL_size_limit_MB, they will be deleted starting with the
* earliest until size_limit is met. All empty files will be deleted.
* 3. If WAL_ttl_seconds is not 0 and WAL_size_limit_MB is 0, then
* WAL files will be checked every WAL_ttl_secondsi / 2 and those that
* are older than WAL_ttl_seconds will be deleted.
* 4. If both are not 0, WAL files will be checked every 10 min and both
* checks will be performed with ttl being first.
*
* @return size limit in mega-bytes.
* @see walSizeLimitMB()
*/
public long walSizeLimitMB() {
assert(isInitialized());
return walSizeLimitMB(nativeHandle_);
}
private native long walSizeLimitMB(long handle);
/**
* WalTtlSeconds() and walSizeLimitMB() affect how archived logs
* will be deleted.
* 1. If both set to 0, logs will be deleted asap and will not get into
* the archive.
* 2. If WAL_ttl_seconds is 0 and WAL_size_limit_MB is not 0,
* WAL files will be checked every 10 min and if total size is greater
* then WAL_size_limit_MB, they will be deleted starting with the
* earliest until size_limit is met. All empty files will be deleted.
* 3. If WAL_ttl_seconds is not 0 and WAL_size_limit_MB is 0, then
* WAL files will be checked every WAL_ttl_secondsi / 2 and those that
* are older than WAL_ttl_seconds will be deleted.
* 4. If both are not 0, WAL files will be checked every 10 min and both
* checks will be performed with ttl being first.
*
* @param sizeLimitMB size limit in mega-bytes.
* @return the reference to the current option.
* @see setWalSizeLimitMB()
*/
public Options setWalSizeLimitMB(long sizeLimitMB) {
assert(isInitialized());
setWalSizeLimitMB(nativeHandle_, sizeLimitMB);
return this;
}
private native void setWalSizeLimitMB(long handle, long sizeLimitMB);
/**
* Number of bytes to preallocate (via fallocate) the manifest
@ -2298,8 +2358,7 @@ public class Options {
private native void useFixedLengthPrefixExtractor(
long handle, int prefixLength);
private native void setFilter0(long optHandle, Filter fp);
long nativeHandle_;
long cacheSize_;
Filter filter_;
}

View File

@ -36,6 +36,7 @@ public class RocksDB {
// the c++ one.
Options options = new Options();
db.open(options.nativeHandle_, options.cacheSize_, path);
db.transferCppRawPointersOwnership(options);
options.dispose();
return db;
}
@ -46,8 +47,12 @@ public class RocksDB {
*/
public static RocksDB open(Options options, String path)
throws RocksDBException {
// when non-default Options is used, keeping an Options reference
// in RocksDB can prevent Java to GC during the life-time of
// the currently-created RocksDB.
RocksDB db = new RocksDB();
db.open(options.nativeHandle_, options.cacheSize_, path);
db.transferCppRawPointersOwnershipFrom(options);
return db;
}
@ -195,6 +200,17 @@ public class RocksDB {
nativeHandle_ = 0;
}
/**
* Transfer the ownership of all c++ raw-pointers from Options
* to RocksDB to ensure the life-time of those raw-pointers
* will be at least as long as the life-time of any RocksDB
* that uses these raw-pointers.
*/
protected void transferCppRawPointersOwnershipFrom(Options opt) {
filter_ = opt.filter_;
opt.filter_ = null;
}
// native methods
protected native void open(
long optionsHandle, long cacheSize, String path) throws RocksDBException;
@ -227,4 +243,5 @@ public class RocksDB {
protected native void close0();
protected long nativeHandle_;
protected Filter filter_;
}

View File

@ -54,6 +54,10 @@ class Stats {
StringBuilder message_;
boolean excludeFromMerge_;
// TODO(yhchiang): use the following arguments:
// (Long)Flag.stats_interval
// (Integer)Flag.stats_per_interval
Stats(int id) {
id_ = id;
nextReport_ = 100;
@ -163,6 +167,7 @@ public class DbBenchmark {
}
abstract class BenchmarkTask implements Callable<Stats> {
// TODO(yhchiang): use (Integer)Flag.perf_level.
public BenchmarkTask(
int tid, long randSeed, long numEntries, long keyRange) {
tid_ = tid;
@ -311,13 +316,73 @@ public class DbBenchmark {
}
}
class WriteUniqueRandomTask extends WriteTask {
static final int MAX_BUFFER_SIZE = 10000000;
public WriteUniqueRandomTask(
int tid, long randSeed, long numEntries, long keyRange,
WriteOptions writeOpt, long entriesPerBatch) {
super(tid, randSeed, numEntries, keyRange,
writeOpt, entriesPerBatch);
initRandomKeySequence();
}
public WriteUniqueRandomTask(
int tid, long randSeed, long numEntries, long keyRange,
WriteOptions writeOpt, long entriesPerBatch,
long maxWritesPerSecond) {
super(tid, randSeed, numEntries, keyRange,
writeOpt, entriesPerBatch,
maxWritesPerSecond);
initRandomKeySequence();
}
@Override protected void getKey(byte[] key, long id, long range) {
generateKeyFromLong(key, nextUniqueRandom());
}
protected void initRandomKeySequence() {
bufferSize_ = MAX_BUFFER_SIZE;
if (bufferSize_ > keyRange_) {
bufferSize_ = (int) keyRange_;
}
currentKeyCount_ = bufferSize_;
keyBuffer_ = new long[MAX_BUFFER_SIZE];
for (int k = 0; k < bufferSize_; ++k) {
keyBuffer_[k] = k;
}
}
/**
* Semi-randomly return the next unique key. It is guaranteed to be
* fully random if keyRange_ <= MAX_BUFFER_SIZE.
*/
long nextUniqueRandom() {
if (bufferSize_ == 0) {
System.err.println("bufferSize_ == 0.");
return 0;
}
int r = rand_.nextInt(bufferSize_);
// randomly pick one from the keyBuffer
long randKey = keyBuffer_[r];
if (currentKeyCount_ < keyRange_) {
// if we have not yet inserted all keys, insert next new key to [r].
keyBuffer_[r] = currentKeyCount_++;
} else {
// move the last element to [r] and decrease the size by 1.
keyBuffer_[r] = keyBuffer_[--bufferSize_];
}
return randKey;
}
int bufferSize_;
long currentKeyCount_;
long[] keyBuffer_;
}
class ReadRandomTask extends BenchmarkTask {
public ReadRandomTask(
int tid, long randSeed, long numEntries, long keyRange) {
super(tid, randSeed, numEntries, keyRange);
}
@Override public void runTask() throws RocksDBException {
stats_.found_ = 0;
byte[] key = new byte[keySize_];
byte[] value = new byte[valueSize_];
for (long i = 0; i < numEntries_; i++) {
@ -338,18 +403,22 @@ public class DbBenchmark {
class ReadSequentialTask extends BenchmarkTask {
public ReadSequentialTask(
int tid, long randSeed, long numEntries, long keyRange, long initId) {
int tid, long randSeed, long numEntries, long keyRange) {
super(tid, randSeed, numEntries, keyRange);
initId_ = initId;
}
@Override public void runTask() throws RocksDBException {
// make sure we have enough things to read in sequential
if (numEntries_ > keyRange_ - initId_) {
numEntries_ = keyRange_ - initId_;
org.rocksdb.Iterator iter = db_.newIterator();
long i;
for (iter.seekToFirst(), i = 0;
iter.isValid() && i < numEntries_;
iter.next(), ++i) {
stats_.found_++;
stats_.finishedSingleOp(iter.key().length + iter.value().length);
if (isFinished()) {
return;
}
}
throw new UnsupportedOperationException();
}
private long initId_;
}
public DbBenchmark(Map<Flag, Object> flags) throws Exception {
@ -360,22 +429,33 @@ public class DbBenchmark {
flags.get(Flag.num) : flags.get(Flag.reads));
keySize_ = (Integer) flags.get(Flag.key_size);
valueSize_ = (Integer) flags.get(Flag.value_size);
writeBufferSize_ = (Integer) flags.get(Flag.write_buffer_size) > 0 ?
(Integer) flags.get(Flag.write_buffer_size) : 0;
compressionRatio_ = (Double) flags.get(Flag.compression_ratio);
useExisting_ = (Boolean) flags.get(Flag.use_existing_db);
randSeed_ = (Long) flags.get(Flag.seed);
databaseDir_ = (String) flags.get(Flag.db);
writesPerSeconds_ = (Integer) flags.get(Flag.writes_per_second);
cacheSize_ = (Long) flags.get(Flag.cache_size);
gen_ = new RandomGenerator(compressionRatio_);
gen_ = new RandomGenerator(randSeed_, compressionRatio_);
memtable_ = (String) flags.get(Flag.memtablerep);
maxWriteBufferNumber_ = (Integer) flags.get(Flag.max_write_buffer_number);
prefixSize_ = (Integer) flags.get(Flag.prefix_size);
keysPerPrefix_ = (Integer) flags.get(Flag.keys_per_prefix);
hashBucketCount_ = (Long) flags.get(Flag.hash_bucket_count);
usePlainTable_ = (Boolean) flags.get(Flag.use_plain_table);
flags_ = flags;
finishLock_ = new Object();
// options.setPrefixSize((Integer)flags_.get(Flag.prefix_size));
// options.setKeysPerPrefix((Long)flags_.get(Flag.keys_per_prefix));
}
private void prepareReadOptions(ReadOptions options) {
options.setVerifyChecksums((Boolean)flags_.get(Flag.verify_checksum));
options.setTailing((Boolean)flags_.get(Flag.use_tailing_iterator));
}
private void prepareWriteOptions(WriteOptions options) {
options.setSync((Boolean)flags_.get(Flag.sync));
options.setDisableWAL((Boolean)flags_.get(Flag.disable_wal));
}
private void prepareOptions(Options options) {
@ -405,9 +485,119 @@ public class DbBenchmark {
options.memTableFactoryName());
}
if (usePlainTable_) {
options.setSstFormatConfig(
options.setTableFormatConfig(
new PlainTableConfig().setKeySize(keySize_));
}
options.setMaxWriteBufferNumber(
(Integer)flags_.get(Flag.max_write_buffer_number));
options.setMaxBackgroundCompactions(
(Integer)flags_.get(Flag.max_background_compactions));
options.setMaxBackgroundFlushes(
(Integer)flags_.get(Flag.max_background_flushes));
options.setCacheSize(
(Long)flags_.get(Flag.cache_size));
options.setBlockSize(
(Integer)flags_.get(Flag.block_size));
options.setMaxOpenFiles(
(Integer)flags_.get(Flag.open_files));
options.setCreateIfMissing(
!(Boolean)flags_.get(Flag.use_existing_db));
options.setTableCacheRemoveScanCountLimit(
(Integer)flags_.get(Flag.cache_remove_scan_count_limit));
options.setDisableDataSync(
(Boolean)flags_.get(Flag.disable_data_sync));
options.setUseFsync(
(Boolean)flags_.get(Flag.use_fsync));
options.setWalDir(
(String)flags_.get(Flag.wal_dir));
options.setDisableSeekCompaction(
(Boolean)flags_.get(Flag.disable_seek_compaction));
options.setDeleteObsoleteFilesPeriodMicros(
(Long)flags_.get(Flag.delete_obsolete_files_period_micros));
options.setTableCacheNumshardbits(
(Integer)flags_.get(Flag.table_cache_numshardbits));
options.setAllowMmapReads(
(Boolean)flags_.get(Flag.mmap_read));
options.setAllowMmapWrites(
(Boolean)flags_.get(Flag.mmap_write));
options.setAdviseRandomOnOpen(
(Boolean)flags_.get(Flag.advise_random_on_open));
options.setUseAdaptiveMutex(
(Boolean)flags_.get(Flag.use_adaptive_mutex));
options.setBytesPerSync(
(Long)flags_.get(Flag.bytes_per_sync));
options.setBloomLocality(
(Integer)flags_.get(Flag.bloom_locality));
options.setMinWriteBufferNumberToMerge(
(Integer)flags_.get(Flag.min_write_buffer_number_to_merge));
options.setMemtablePrefixBloomBits(
(Integer)flags_.get(Flag.memtable_bloom_bits));
options.setNumLevels(
(Integer)flags_.get(Flag.num_levels));
options.setTargetFileSizeBase(
(Integer)flags_.get(Flag.target_file_size_base));
options.setTargetFileSizeMultiplier(
(Integer)flags_.get(Flag.target_file_size_multiplier));
options.setMaxBytesForLevelBase(
(Integer)flags_.get(Flag.max_bytes_for_level_base));
options.setMaxBytesForLevelMultiplier(
(Integer)flags_.get(Flag.max_bytes_for_level_multiplier));
options.setLevelZeroStopWritesTrigger(
(Integer)flags_.get(Flag.level0_stop_writes_trigger));
options.setLevelZeroSlowdownWritesTrigger(
(Integer)flags_.get(Flag.level0_slowdown_writes_trigger));
options.setLevelZeroFileNumCompactionTrigger(
(Integer)flags_.get(Flag.level0_file_num_compaction_trigger));
options.setSoftRateLimit(
(Double)flags_.get(Flag.soft_rate_limit));
options.setHardRateLimit(
(Double)flags_.get(Flag.hard_rate_limit));
options.setRateLimitDelayMaxMilliseconds(
(Integer)flags_.get(Flag.rate_limit_delay_max_milliseconds));
options.setMaxGrandparentOverlapFactor(
(Integer)flags_.get(Flag.max_grandparent_overlap_factor));
options.setDisableAutoCompactions(
(Boolean)flags_.get(Flag.disable_auto_compactions));
options.setSourceCompactionFactor(
(Integer)flags_.get(Flag.source_compaction_factor));
options.setFilterDeletes(
(Boolean)flags_.get(Flag.filter_deletes));
options.setMaxSuccessiveMerges(
(Integer)flags_.get(Flag.max_successive_merges));
options.setWalTtlSeconds((Long)flags_.get(Flag.wal_ttl_seconds));
options.setWalSizeLimitMB((Long)flags_.get(Flag.wal_size_limit_MB));
int bloomBits = (Integer)flags_.get(Flag.bloom_bits);
if (bloomBits > 0) {
// Internally, options will keep a reference to this BloomFilter.
// This will disallow Java to GC this BloomFilter. In addition,
// options.dispose() will release the c++ object of this BloomFilter.
// As a result, the caller should not directly call
// BloomFilter.dispose().
options.setFilter(new BloomFilter(bloomBits));
}
/* TODO(yhchiang): enable the following parameters
options.setCompressionType((String)flags_.get(Flag.compression_type));
options.setCompressionLevel((Integer)flags_.get(Flag.compression_level));
options.setMinLevelToCompress((Integer)flags_.get(Flag.min_level_to_compress));
options.setHdfs((String)flags_.get(Flag.hdfs)); // env
options.setCacheNumshardbits((Integer)flags_.get(Flag.cache_numshardbits));
options.setStatistics((Boolean)flags_.get(Flag.statistics));
options.setUniversalSizeRatio(
(Integer)flags_.get(Flag.universal_size_ratio));
options.setUniversalMinMergeWidth(
(Integer)flags_.get(Flag.universal_min_merge_width));
options.setUniversalMaxMergeWidth(
(Integer)flags_.get(Flag.universal_max_merge_width));
options.setUniversalMaxSizeAmplificationPercent(
(Integer)flags_.get(Flag.universal_max_size_amplification_percent));
options.setUniversalCompressionSizePercent(
(Integer)flags_.get(Flag.universal_compression_size_percent));
// TODO(yhchiang): add RocksDB.openForReadOnly() to enable Flag.readonly
// TODO(yhchiang): enable Flag.merge_operator by switch
options.setAccessHintOnCompactionStart(
(String)flags_.get(Flag.compaction_fadvice));
// available values of fadvice are "NONE", "NORMAL", "SEQUENTIAL", "WILLNEED" for fadvice
*/
}
private void run() throws RocksDBException {
@ -424,6 +614,9 @@ public class DbBenchmark {
List<Callable<Stats>> tasks = new ArrayList<Callable<Stats>>();
List<Callable<Stats>> bgTasks = new ArrayList<Callable<Stats>>();
WriteOptions writeOpt = new WriteOptions();
prepareWriteOptions(writeOpt);
ReadOptions readOpt = new ReadOptions();
prepareReadOptions(readOpt);
int currentTaskId = 0;
boolean known = true;
@ -436,6 +629,9 @@ public class DbBenchmark {
} else if (benchmark.equals("fillrandom")) {
tasks.add(new WriteRandomTask(
currentTaskId++, randSeed_, num_, num_, writeOpt, 1));
} else if (benchmark.equals("filluniquerandom")) {
tasks.add(new WriteUniqueRandomTask(
currentTaskId++, randSeed_, num_, num_, writeOpt, 1));
} else if (benchmark.equals("fillsync")) {
writeOpt.setSync(true);
tasks.add(new WriteRandomTask(
@ -444,13 +640,12 @@ public class DbBenchmark {
} else if (benchmark.equals("readseq")) {
for (int t = 0; t < threadNum_; ++t) {
tasks.add(new ReadSequentialTask(
currentTaskId++, randSeed_, reads_ / threadNum_,
num_, (num_ / threadNum_) * t));
currentTaskId++, randSeed_, reads_, num_));
}
} else if (benchmark.equals("readrandom")) {
for (int t = 0; t < threadNum_; ++t) {
tasks.add(new ReadRandomTask(
currentTaskId++, randSeed_, reads_ / threadNum_, num_));
currentTaskId++, randSeed_, reads_, num_));
}
} else if (benchmark.equals("readwhilewriting")) {
WriteTask writeTask = new WriteRandomTask(
@ -508,6 +703,7 @@ public class DbBenchmark {
}
}
writeOpt.dispose();
readOpt.dispose();
}
options.dispose();
db_.close();
@ -573,7 +769,7 @@ public class DbBenchmark {
System.out.printf(
"%-16s : %11.5f micros/op; %6.1f MB/s; %d / %d task(s) finished.\n",
benchmark, elapsedSeconds * 1e6 / num_,
benchmark, elapsedSeconds * 1e6 / stats.done_,
(stats.bytes_ / 1048576.0) / elapsedSeconds,
taskFinishedCount, concurrentThreads);
}
@ -616,14 +812,13 @@ public class DbBenchmark {
static void printHelp() {
System.out.println("usage:");
for (Flag flag : Flag.values()) {
System.out.format(" --%s%n %s%n",
System.out.format(" --%s%n\t%s%n",
flag.name(),
flag.desc());
if (flag.getDefaultValue() != null) {
System.out.format(" DEFAULT: %s%n",
System.out.format("\tDEFAULT: %s%n",
flag.getDefaultValue().toString());
}
System.out.println("");
}
}
@ -677,30 +872,28 @@ public class DbBenchmark {
"\t\tfillseq -- write N values in sequential key order in async mode.\n" +
"\t\tfillrandom -- write N values in random key order in async mode.\n" +
"\t\tfillbatch -- write N/1000 batch where each batch has 1000 values\n" +
"\t\t in random key order in sync mode.\n" +
"\t\t in random key order in sync mode.\n" +
"\t\tfillsync -- write N/100 values in random key order in sync mode.\n" +
"\t\tfill100K -- write N/1000 100K values in random order in async mode.\n" +
"\t\treadseq -- read N times sequentially.\n" +
"\t\treadrandom -- read N times in random order.\n" +
"\t\treadhot -- read N times in random order from 1% section of DB.\n" +
"\t\treadwhilewriting -- measure the read performance of multiple readers\n" +
"\t\t with a bg single writer. The write rate of the bg\n" +
"\t\t is capped by --writes_per_second.\n" +
"\t\t with a bg single writer. The write rate of the bg\n" +
"\t\t is capped by --writes_per_second.\n" +
"\tMeta Operations:\n" +
"\t\tdelete -- delete DB") {
@Override public Object parseValue(String value) {
return new ArrayList<String>(Arrays.asList(value.split(",")));
}
},
compression_ratio(0.5d,
"Arrange to generate values that shrink to this fraction of\n" +
"\ttheir original size after compression") {
"\ttheir original size after compression.") {
@Override public Object parseValue(String value) {
return Double.parseDouble(value);
}
},
use_existing_db(false,
"If true, do not destroy the existing database. If you set this\n" +
"\tflag and also specify a benchmark that wants a fresh database,\n" +
@ -709,51 +902,43 @@ public class DbBenchmark {
return Boolean.parseBoolean(value);
}
},
num(1000000,
"Number of key/values to place in database.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
threads(1,
"Number of concurrent threads to run.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
reads(null,
"Number of read operations to do. If negative, do --nums reads.") {
@Override
public Object parseValue(String value) {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
key_size(16,
"The size of each key in bytes.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
value_size(100,
"The size of each value in bytes.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
write_buffer_size(4 << 20,
"Number of bytes to buffer in memtable before compacting\n" +
"\t(initialized to default value by 'main'.)") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
return Long.parseLong(value);
}
},
max_write_buffer_number(2,
"The number of in-memory memtables. Each memtable is of size\n" +
"\twrite_buffer_size.") {
@ -761,14 +946,12 @@ public class DbBenchmark {
return Integer.parseInt(value);
}
},
prefix_size(0, "Controls the prefix size for HashSkipList, HashLinkedList,\n" +
"\tand plain table.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
keys_per_prefix(0, "Controls the average number of keys generated\n" +
"\tper prefix, 0 means no special handling of the prefix,\n" +
"\ti.e. use the prefix comes with the generated random number.") {
@ -776,7 +959,6 @@ public class DbBenchmark {
return Integer.parseInt(value);
}
},
memtablerep("skip_list",
"The memtable format. Available options are\n" +
"\tskip_list,\n" +
@ -787,7 +969,6 @@ public class DbBenchmark {
return value;
}
},
hash_bucket_count(SizeUnit.MB,
"The number of hash buckets used in the hash-bucket-based\n" +
"\tmemtables. Memtables that currently support this argument are\n" +
@ -796,7 +977,6 @@ public class DbBenchmark {
return Long.parseLong(value);
}
},
writes_per_second(10000,
"The write-rate of the background writer used in the\n" +
"\t`readwhilewriting` benchmark. Non-positive number indicates\n" +
@ -805,14 +985,12 @@ public class DbBenchmark {
return Integer.parseInt(value);
}
},
use_plain_table(false,
"Use plain-table sst format.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
}
},
cache_size(-1L,
"Number of bytes to use as a cache of uncompressed data.\n" +
"\tNegative means use default settings.") {
@ -820,15 +998,445 @@ public class DbBenchmark {
return Long.parseLong(value);
}
},
seed(0L,
"Seed base for random number generators.") {
@Override public Object parseValue(String value) {
return Long.parseLong(value);
}
},
num_levels(7,
"The total number of levels.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
numdistinct(1000,
"Number of distinct keys to use. Used in RandomWithVerify to\n" +
"\tread/write on fewer keys so that gets are more likely to find the\n" +
"\tkey and puts are more likely to update the same key.") {
@Override public Object parseValue(String value) {
return Long.parseLong(value);
}
},
merge_keys(-1,
"Number of distinct keys to use for MergeRandom and\n" +
"\tReadRandomMergeRandom.\n" +
"\tIf negative, there will be FLAGS_num keys.") {
@Override public Object parseValue(String value) {
return Long.parseLong(value);
}
},
bloom_locality(0,"Control bloom filter probes locality.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
duration(0,"Time in seconds for the random-ops tests to run.\n" +
"\tWhen 0 then num & reads determine the test duration.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
num_multi_db(0,
"Number of DBs used in the benchmark. 0 means single DB.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
histogram(false,"Print histogram of operation timings.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
}
},
min_write_buffer_number_to_merge(
defaultOptions_.minWriteBufferNumberToMerge(),
"The minimum number of write buffers that will be merged together\n" +
"\tbefore writing to storage. This is cheap because it is an\n" +
"\tin-memory merge. If this feature is not enabled, then all these\n" +
"\twrite buffers are flushed to L0 as separate files and this\n" +
"\tincreases read amplification because a get request has to check\n" +
"\tin all of these files. Also, an in-memory merge may result in\n" +
"\twriting less data to storage if there are duplicate records\n" +
"\tin each of these individual write buffers.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
max_background_compactions(
defaultOptions_.maxBackgroundCompactions(),
"The maximum number of concurrent background compactions\n" +
"\tthat can occur in parallel.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
max_background_flushes(
defaultOptions_.maxBackgroundFlushes(),
"The maximum number of concurrent background flushes\n" +
"\tthat can occur in parallel.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
/* TODO(yhchiang): enable the following
compaction_style((int32_t) defaultOptions_.compactionStyle(),
"style of compaction: level-based vs universal.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},*/
universal_size_ratio(0,
"Percentage flexibility while comparing file size\n" +
"\t(for universal compaction only).") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
universal_min_merge_width(0,"The minimum number of files in a\n" +
"\tsingle compaction run (for universal compaction only).") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
universal_max_merge_width(0,"The max number of files to compact\n" +
"\tin universal style compaction.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
universal_max_size_amplification_percent(0,
"The max size amplification for universal style compaction.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
universal_compression_size_percent(-1,
"The percentage of the database to compress for universal\n" +
"\tcompaction. -1 means compress everything.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
block_size(defaultOptions_.blockSize(),
"Number of bytes in a block.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
compressed_cache_size(-1,
"Number of bytes to use as a cache of compressed data.") {
@Override public Object parseValue(String value) {
return Long.parseLong(value);
}
},
open_files(defaultOptions_.maxOpenFiles(),
"Maximum number of files to keep open at the same time\n" +
"\t(use default if == 0)") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
bloom_bits(-1,"Bloom filter bits per key. Negative means\n" +
"\tuse default settings.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
memtable_bloom_bits(0,"Bloom filter bits per key for memtable.\n" +
"\tNegative means no bloom filter.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
cache_numshardbits(-1,"Number of shards for the block cache\n" +
"\tis 2 ** cache_numshardbits. Negative means use default settings.\n" +
"\tThis is applied only if FLAGS_cache_size is non-negative.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
cache_remove_scan_count_limit(32,"") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
verify_checksum(false,"Verify checksum for every block read\n" +
"\tfrom storage.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
}
},
statistics(false,"Database statistics.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
}
},
writes(-1,"Number of write operations to do. If negative, do\n" +
"\t--num reads.") {
@Override public Object parseValue(String value) {
return Long.parseLong(value);
}
},
sync(false,"Sync all writes to disk.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
}
},
disable_data_sync(false,"If true, do not wait until data is\n" +
"\tsynced to disk.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
}
},
use_fsync(false,"If true, issue fsync instead of fdatasync.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
}
},
disable_wal(false,"If true, do not write WAL for write.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
}
},
wal_dir("", "If not empty, use the given dir for WAL.") {
@Override public Object parseValue(String value) {
return value;
}
},
target_file_size_base(2 * 1048576,"Target file size at level-1") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
target_file_size_multiplier(1,
"A multiplier to compute target level-N file size (N >= 2)") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
max_bytes_for_level_base(10 * 1048576,
"Max bytes for level-1") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
max_bytes_for_level_multiplier(10,
"A multiplier to compute max bytes for level-N (N >= 2)") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
level0_stop_writes_trigger(12,"Number of files in level-0\n" +
"\tthat will trigger put stop.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
level0_slowdown_writes_trigger(8,"Number of files in level-0\n" +
"\tthat will slow down writes.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
level0_file_num_compaction_trigger(4,"Number of files in level-0\n" +
"\twhen compactions start.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
readwritepercent(90,"Ratio of reads to reads/writes (expressed\n" +
"\tas percentage) for the ReadRandomWriteRandom workload. The\n" +
"\tdefault value 90 means 90% operations out of all reads and writes\n" +
"\toperations are reads. In other words, 9 gets for every 1 put.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
mergereadpercent(70,"Ratio of merges to merges&reads (expressed\n" +
"\tas percentage) for the ReadRandomMergeRandom workload. The\n" +
"\tdefault value 70 means 70% out of all read and merge operations\n" +
"\tare merges. In other words, 7 merges for every 3 gets.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
deletepercent(2,"Percentage of deletes out of reads/writes/\n" +
"\tdeletes (used in RandomWithVerify only). RandomWithVerify\n" +
"\tcalculates writepercent as (100 - FLAGS_readwritepercent -\n" +
"\tdeletepercent), so deletepercent must be smaller than (100 -\n" +
"\tFLAGS_readwritepercent)") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
disable_seek_compaction(false,"Option to disable compaction\n" +
"\ttriggered by read.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
}
},
delete_obsolete_files_period_micros(0,"Option to delete\n" +
"\tobsolete files periodically. 0 means that obsolete files are\n" +
"\tdeleted after every compaction run.") {
@Override public Object parseValue(String value) {
return Long.parseLong(value);
}
},
compression_level(-1,
"Compression level. For zlib this should be -1 for the\n" +
"\tdefault level, or between 0 and 9.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
min_level_to_compress(-1,"If non-negative, compression starts\n" +
"\tfrom this level. Levels with number < min_level_to_compress are\n" +
"\tnot compressed. Otherwise, apply compression_type to\n" +
"\tall levels.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
table_cache_numshardbits(4,"") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
stats_interval(0,"Stats are reported every N operations when\n" +
"\tthis is greater than zero. When 0 the interval grows over time.") {
@Override public Object parseValue(String value) {
return Long.parseLong(value);
}
},
stats_per_interval(0,"Reports additional stats per interval when\n" +
"\tthis is greater than 0.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
perf_level(0,"Level of perf collection.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
soft_rate_limit(0.0,"") {
@Override public Object parseValue(String value) {
return Double.parseDouble(value);
}
},
hard_rate_limit(0.0,"When not equal to 0 this make threads\n" +
"\tsleep at each stats reporting interval until the compaction\n" +
"\tscore for all levels is less than or equal to this value.") {
@Override public Object parseValue(String value) {
return Double.parseDouble(value);
}
},
rate_limit_delay_max_milliseconds(1000,
"When hard_rate_limit is set then this is the max time a put will\n" +
"\tbe stalled.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
max_grandparent_overlap_factor(10,"Control maximum bytes of\n" +
"\toverlaps in grandparent (i.e., level+2) before we stop building a\n" +
"\tsingle file in a level->level+1 compaction.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
readonly(false,"Run read only benchmarks.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
}
},
disable_auto_compactions(false,"Do not auto trigger compactions.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
}
},
source_compaction_factor(1,"Cap the size of data in level-K for\n" +
"\ta compaction run that compacts Level-K with Level-(K+1) (for\n" +
"\tK >= 1)") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
wal_ttl_seconds(0L,"Set the TTL for the WAL Files in seconds.") {
@Override public Object parseValue(String value) {
return Long.parseLong(value);
}
},
wal_size_limit_MB(0L,"Set the size limit for the WAL Files\n" +
"\tin MB.") {
@Override public Object parseValue(String value) {
return Long.parseLong(value);
}
},
/* TODO(yhchiang): enable the following
bufferedio(rocksdb::EnvOptions().use_os_buffer,
"Allow buffered io using OS buffers.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
}
},
*/
mmap_read(false,
"Allow reads to occur via mmap-ing files.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
}
},
mmap_write(false,
"Allow writes to occur via mmap-ing files.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
}
},
advise_random_on_open(defaultOptions_.adviseRandomOnOpen(),
"Advise random access on table file open.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
}
},
compaction_fadvice("NORMAL",
"Access pattern advice when a file is compacted.") {
@Override public Object parseValue(String value) {
return value;
}
},
use_tailing_iterator(false,
"Use tailing iterator to access a series of keys instead of get.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
}
},
use_adaptive_mutex(defaultOptions_.useAdaptiveMutex(),
"Use adaptive mutex.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
}
},
bytes_per_sync(defaultOptions_.bytesPerSync(),
"Allows OS to incrementally sync files to disk while they are\n" +
"\tbeing written, in the background. Issue one request for every\n" +
"\tbytes_per_sync written. 0 turns it off.") {
@Override public Object parseValue(String value) {
return Long.parseLong(value);
}
},
filter_deletes(false," On true, deletes use bloom-filter and drop\n" +
"\tthe delete if key not present.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
}
},
max_successive_merges(0,"Maximum number of successive merge\n" +
"\toperations on a key in the memtable.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
db("/tmp/rocksdbjni-bench",
"Use the db with the following name.") {
@Override public Object parseValue(String value) {
@ -859,25 +1467,23 @@ public class DbBenchmark {
private final byte[] data_;
private int dataLength_;
private int position_;
Random rand_;
private RandomGenerator(double compressionRatio) {
private RandomGenerator(long seed, double compressionRatio) {
// We use a limited amount of data over and over again and ensure
// that it is larger than the compression window (32KB), and also
// large enough to serve all typical value sizes we want to write.
Random rand = new Random(301);
rand_ = new Random(seed);
dataLength_ = 1048576 + 100;
data_ = new byte[dataLength_];
// TODO(yhchiang): mimic test::CompressibleString?
for (int i = 0; i < dataLength_; ++i) {
data_[i] = (byte) (' ' + rand.nextInt(95));
data_[i] = (byte) (' ' + rand_.nextInt(95));
}
}
private byte[] generate(int length) {
if (position_ + length > data_.length) {
position_ = 0;
assert (length < data_.length);
}
position_ = rand_.nextInt(data_.length - length);
return Arrays.copyOfRange(data_, position_, position_ + length);
}
}
@ -911,7 +1517,6 @@ public class DbBenchmark {
long startTime_;
// memtable related
final int writeBufferSize_;
final int maxWriteBufferNumber_;
final int prefixSize_;
final int keysPerPrefix_;
@ -923,4 +1528,8 @@ public class DbBenchmark {
Object finishLock_;
boolean isFinished_;
Map<Flag, Object> flags_;
// as the scope of a static member equals to the scope of the problem,
// we let its c++ pointer to be disposed in its finalizer.
static Options defaultOptions_ = new Options();
}

View File

@ -123,9 +123,9 @@ public class OptionsTest {
assert(opt.tableCacheRemoveScanCountLimit() == intValue);
}
{ // WALTtlSeconds test
{ // WalTtlSeconds test
long longValue = rand.nextLong();
opt.setWALTtlSeconds(longValue);
opt.setWalTtlSeconds(longValue);
assert(opt.walTtlSeconds() == longValue);
}

View File

@ -122,13 +122,13 @@ jlong Java_org_rocksdb_Options_statisticsPtr(
/*
* Class: org_rocksdb_Options
* Method: setFilter0
* Method: setFilterHandle
* Signature: (JJ)V
*/
void Java_org_rocksdb_Options_setFilter0(
JNIEnv* env, jobject jobj, jlong jopt_handle, jobject jfp) {
void Java_org_rocksdb_Options_setFilterHandle(
JNIEnv* env, jobject jobj, jlong jopt_handle, jlong jfilter_handle) {
reinterpret_cast<rocksdb::Options*>(jopt_handle)->filter_policy =
rocksdb::FilterJni::getHandle(env, jfp);
reinterpret_cast<rocksdb::FilterPolicy*>(jfilter_handle);
}
/*
@ -602,15 +602,36 @@ jlong Java_org_rocksdb_Options_walTtlSeconds(
/*
* Class: org_rocksdb_Options
* Method: setWALTtlSeconds
* Method: setWalTtlSeconds
* Signature: (JJ)V
*/
void Java_org_rocksdb_Options_setWALTtlSeconds(
void Java_org_rocksdb_Options_setWalTtlSeconds(
JNIEnv* env, jobject jobj, jlong jhandle, jlong WAL_ttl_seconds) {
reinterpret_cast<rocksdb::Options*>(jhandle)->WAL_ttl_seconds =
static_cast<int64_t>(WAL_ttl_seconds);
}
/*
* Class: org_rocksdb_Options
* Method: walTtlSeconds
* Signature: (J)J
*/
jlong Java_org_rocksdb_Options_walSizeLimitMB(
JNIEnv* env, jobject jobj, jlong jhandle) {
return reinterpret_cast<rocksdb::Options*>(jhandle)->WAL_size_limit_MB;
}
/*
* Class: org_rocksdb_Options
* Method: setWalSizeLimitMB
* Signature: (JJ)V
*/
void Java_org_rocksdb_Options_setWalSizeLimitMB(
JNIEnv* env, jobject jobj, jlong jhandle, jlong WAL_size_limit_MB) {
reinterpret_cast<rocksdb::Options*>(jhandle)->WAL_size_limit_MB =
static_cast<int64_t>(WAL_size_limit_MB);
}
/*
* Class: org_rocksdb_Options
* Method: manifestPreallocationSize