From 0f5cbcd7988a58b4cc81213612be80e46df1fdc3 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Wed, 9 Apr 2014 00:48:20 -0700 Subject: [PATCH] [JNI] Add an initial benchmark for java binding for rocksdb. Summary: * Add a benchmark for java binding for rocksdb. The java benchmark is a complete rewrite based on the c++ db/db_bench.cc and the DbBenchmark in dain's java leveldb. * Support multithreading. * 'readseq' is currently not supported as it requires RocksDB Iterator. * usage: --benchmarks Comma-separated list of operations to run in the specified order Actual benchmarks: fillseq -- write N values in sequential key order in async mode fillrandom -- write N values in random key order in async mode fillbatch -- write N/1000 batch where each batch has 1000 values in random key order in sync mode fillsync -- write N/100 values in random key order in sync mode fill100K -- write N/1000 100K values in random order in async mode readseq -- read N times sequentially readrandom -- read N times in random order readhot -- read N times in random order from 1% section of DB Meta Operations: delete -- delete DB DEFAULT: [fillseq, readrandom, fillrandom] --compression_ratio Arrange to generate values that shrink to this fraction of their original size after compression DEFAULT: 0.5 --use_existing_db If true, do not destroy the existing database. If you set this flag and also specify a benchmark that wants a fresh database, that benchmark will fail. DEFAULT: false --num Number of key/values to place in database. DEFAULT: 1000000 --threads Number of concurrent threads to run. DEFAULT: 1 --reads Number of read operations to do. If negative, do --nums reads. --key_size The size of each key in bytes. DEFAULT: 16 --value_size The size of each value in bytes. DEFAULT: 100 --write_buffer_size Number of bytes to buffer in memtable before compacting (initialized to default value by 'main'.) DEFAULT: 4194304 --cache_size Number of bytes to use as a cache of uncompressed data. Negative means use default settings. DEFAULT: -1 --seed Seed base for random number generators. DEFAULT: 0 --db Use the db with the following name. DEFAULT: /tmp/rocksdbjni-bench * Add RocksDB.write(). Test Plan: make jbench Reviewers: haobo, sdong, dhruba, ankgup87 Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D17433 --- Makefile | 3 + java/Makefile | 14 +- java/org/rocksdb/RocksDB.java | 10 + java/org/rocksdb/WriteBatch.java | 2 +- java/org/rocksdb/benchmark/DbBenchmark.java | 699 ++++++++++++++++++++ java/rocksjni/rocksjni.cc | 22 + 6 files changed, 747 insertions(+), 3 deletions(-) create mode 100644 java/org/rocksdb/benchmark/DbBenchmark.java diff --git a/Makefile b/Makefile index 47aeb5847..69f74ebe0 100644 --- a/Makefile +++ b/Makefile @@ -439,6 +439,9 @@ jclean: jtest: cd java;$(MAKE) sample;$(MAKE) test; +jdb_bench: + cd java;$(MAKE) db_bench; + # --------------------------------------------------------------------------- # Platform-specific compilation # --------------------------------------------------------------------------- diff --git a/java/Makefile b/java/Makefile index 10dd4f110..98c7d740c 100644 --- a/java/Makefile +++ b/java/Makefile @@ -12,7 +12,7 @@ java: jar -cf $(ROCKSDB_JAR) org/rocksdb/*.class javah -d $(NATIVE_INCLUDE) -jni $(NATIVE_JAVA_CLASSES) -sample: +sample: java javac -cp $(ROCKSDB_JAR) RocksDBSample.java @rm -rf /tmp/rocksdbjni @rm -rf /tmp/rocksdbjni_not_found @@ -20,5 +20,15 @@ sample: @rm -rf /tmp/rocksdbjni @rm -rf /tmp/rocksdbjni_not_found -test: +test: java java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.WriteBatchTest + +db_bench: java + javac org/rocksdb/benchmark/*.java + rm -rf /tmp/rocksdbjni-bench + java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=1 --benchmarks=fillseq,readrandom + java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=2 --benchmarks=fillseq,readrandom + java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=4 --benchmarks=fillseq,readrandom + java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=8 --benchmarks=fillseq,readrandom + java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=16 --benchmarks=fillseq,readrandom + java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=32 --benchmarks=fillseq,readrandom diff --git a/java/org/rocksdb/RocksDB.java b/java/org/rocksdb/RocksDB.java index bdab8be1b..fadb513e5 100644 --- a/java/org/rocksdb/RocksDB.java +++ b/java/org/rocksdb/RocksDB.java @@ -75,6 +75,14 @@ public class RocksDB { put(nativeHandle_, writeOpts.nativeHandle_, key, key.length, value, value.length); } + /** + * Apply the specified updates to the database. + */ + public void write(WriteOptions writeOpts, WriteBatch updates) + throws RocksDBException { + write(writeOpts.nativeHandle_, updates.nativeHandle_); + } + /** * Get the value associated with the specified key. * @@ -147,6 +155,8 @@ public class RocksDB { long handle, long writeOptHandle, byte[] key, int keyLen, byte[] value, int valueLen) throws RocksDBException; + private native void write( + long writeOptHandle, long batchHandle) throws RocksDBException; private native int get( long handle, byte[] key, int keyLen, byte[] value, int valueLen) throws RocksDBException; diff --git a/java/org/rocksdb/WriteBatch.java b/java/org/rocksdb/WriteBatch.java index acacee3f0..fdd9ef28c 100644 --- a/java/org/rocksdb/WriteBatch.java +++ b/java/org/rocksdb/WriteBatch.java @@ -106,7 +106,7 @@ public class WriteBatch { private native void putLogData(byte[] blob, int blobLen); private native void dispose0(); - private long nativeHandle_; + long nativeHandle_; } /** diff --git a/java/org/rocksdb/benchmark/DbBenchmark.java b/java/org/rocksdb/benchmark/DbBenchmark.java new file mode 100644 index 000000000..55f7b1189 --- /dev/null +++ b/java/org/rocksdb/benchmark/DbBenchmark.java @@ -0,0 +1,699 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +/** + * Copyright (C) 2011 the original author or authors. + * See the notice.md file distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.rocksdb.benchmark; + +import java.lang.Runnable; +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Date; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.rocksdb.*; + +class Stats { + int id_; + long start_; + long finish_; + double seconds_; + long done_; + long found_; + long lastReportDone_; + long nextReport_; + long bytes_; + StringBuilder message_; + boolean excludeFromMerge_; + + Stats(int id) { + id_ = id; + nextReport_ = 100; + done_ = 0; + lastReportDone_ = 0; + bytes_ = 0; + seconds_ = 0; + start_ = System.nanoTime(); + finish_ = start_; + found_ = 0; + message_ = new StringBuilder(""); + excludeFromMerge_ = false; + } + + void merge(final Stats other) { + if (other.excludeFromMerge_) { + return; + } + + done_ += other.done_; + found_ += other.found_; + bytes_ += other.bytes_; + seconds_ += other.seconds_; + if (other.start_ < start_) start_ = other.start_; + if (other.finish_ > finish_) finish_ = other.finish_; + + // Just keep the messages from one thread + if (message_.length() == 0) { + message_ = other.message_; + } + } + + void stop() { + finish_ = System.nanoTime(); + seconds_ = (double) (finish_ - start_) / 1000000; + } + + void addMessage(String msg) { + if (message_.length() > 0) { + message_.append(" "); + } + message_.append(msg); + } + + void setId(int id) { id_ = id; } + void setExcludeFromMerge() { excludeFromMerge_ = true; } + + void finishedSingleOp(int bytes) { + done_++; + lastReportDone_ = System.nanoTime(); + bytes_ += bytes; + if (done_ >= nextReport_) { + if (nextReport_ < 1000) { + nextReport_ += 100; + } else if (nextReport_ < 5000) { + nextReport_ += 500; + } else if (nextReport_ < 10000) { + nextReport_ += 1000; + } else if (nextReport_ < 50000) { + nextReport_ += 5000; + } else if (nextReport_ < 100000) { + nextReport_ += 10000; + } else if (nextReport_ < 500000) { + nextReport_ += 50000; + } else { + nextReport_ += 100000; + } + System.err.printf("... Task %s finished %d ops%30s\r", id_, done_, ""); + } + } + + void report(String name) { + // Pretend at least one op was done in case we are running a benchmark + // that does not call FinishedSingleOp(). + if (done_ < 1) done_ = 1; + + StringBuilder extra = new StringBuilder(""); + if (bytes_ > 0) { + // Rate is computed on actual elapsed time, not the sum of per-thread + // elapsed times. + double elapsed = (finish_ - start_) * 1e-6; + extra.append(String.format("%6.1f MB/s", (bytes_ / 1048576.0) / elapsed)); + } + extra.append(message_.toString()); + double elapsed = (finish_ - start_) * 1e-6; + double throughput = (double) done_ / elapsed; + + System.out.format("%-12s : %11.3f micros/op %d ops/sec;%s%s\n", + name, elapsed * 1e6 / done_, + (long) throughput, (extra.length() == 0 ? "" : " "), extra.toString()); + } +} + +public class DbBenchmark { + enum Order { + SEQUENTIAL, + RANDOM + } + + enum DBState { + FRESH, + EXISTING + } + + static { + System.loadLibrary("rocksdbjni"); + } + + abstract class BenchmarkTask implements Callable { + public BenchmarkTask( + int tid, long randSeed, long numEntries, long keyRange) { + tid_ = tid; + rand_ = new Random(randSeed + tid * 1000); + numEntries_ = numEntries; + keyRange_ = keyRange; + stats_ = new Stats(tid); + } + + @Override public Stats call() throws RocksDBException { + stats_.start_ = System.nanoTime(); + runTask(); + stats_.finish_ = System.nanoTime(); + return stats_; + } + + abstract protected void runTask() throws RocksDBException; + + protected int tid_; + protected Random rand_; + protected long numEntries_; + protected long keyRange_; + protected Stats stats_; + + protected void getFixedKey(byte[] key, long sn) { + DbBenchmark.formatNumber(key, sn); + } + + protected void getRandomKey(byte[] key, long range) { + DbBenchmark.formatNumber(key, Math.abs(rand_.nextLong() % range)); + } + } + + abstract class WriteTask extends BenchmarkTask { + public WriteTask( + int tid, long randSeed, long numEntries, long keyRange, + WriteOptions writeOpt, long entriesPerBatch) { + super(tid, randSeed, numEntries, keyRange); + writeOpt_ = writeOpt; + entriesPerBatch_ = entriesPerBatch; + } + + @Override public void runTask() throws RocksDBException { + if (numEntries_ != DbBenchmark.this.num_) { + stats_.message_.append(String.format(" (%d ops)", numEntries_)); + } + byte[] key = new byte[keySize_]; + byte[] value = new byte[valueSize_]; + + if (entriesPerBatch_ == 1) { + for (long i = 0; i < numEntries_; ++i) { + getKey(key, i, keyRange_); + db_.put(writeOpt_, key, DbBenchmark.this.gen_.generate(valueSize_)); + stats_.finishedSingleOp(keySize_ + valueSize_); + } + } else { + for (long i = 0; i < numEntries_; i += entriesPerBatch_) { + WriteBatch batch = new WriteBatch(); + for (long j = 0; j < entriesPerBatch_; j++) { + getKey(key, i + j, keyRange_); + batch.put(key, DbBenchmark.this.gen_.generate(valueSize_)); + stats_.finishedSingleOp(keySize_ + valueSize_); + } + db_.write(writeOpt_, batch); + batch.dispose(); + } + } + } + + abstract protected void getKey(byte[] key, long id, long range); + protected WriteOptions writeOpt_; + protected long entriesPerBatch_; + } + + class WriteSequentialTask extends WriteTask { + public WriteSequentialTask( + int tid, long randSeed, long numEntries, long keyRange, + WriteOptions writeOpt, long entriesPerBatch) { + super(tid, randSeed, numEntries, keyRange, + writeOpt, entriesPerBatch); + } + @Override protected void getKey(byte[] key, long id, long range) { + getFixedKey(key, id); + } + } + + class WriteRandomTask extends WriteTask { + public WriteRandomTask( + int tid, long randSeed, long numEntries, long keyRange, + WriteOptions writeOpt, long entriesPerBatch) { + super(tid, randSeed, numEntries, keyRange, + writeOpt, entriesPerBatch); + } + @Override protected void getKey(byte[] key, long id, long range) { + getRandomKey(key, range); + } + } + + class ReadRandomTask extends BenchmarkTask { + public ReadRandomTask( + int tid, long randSeed, long numEntries, long keyRange) { + super(tid, randSeed, numEntries, keyRange); + } + @Override public void runTask() throws RocksDBException { + stats_.found_ = 0; + byte[] key = new byte[keySize_]; + byte[] value = new byte[valueSize_]; + for (long i = 0; i < numEntries_; i++) { + getRandomKey(key, numEntries_); + int len = db_.get(key, value); + if (len != RocksDB.NOT_FOUND) { + stats_.found_++; + stats_.finishedSingleOp(keySize_ + valueSize_); + } else { + stats_.finishedSingleOp(keySize_); + } + } + } + } + + class ReadSequentialTask extends BenchmarkTask { + public ReadSequentialTask( + int tid, long randSeed, long numEntries, long keyRange, long initId) { + super(tid, randSeed, numEntries, keyRange); + initId_ = initId; + } + @Override public void runTask() throws RocksDBException { + // make sure we have enough things to read in sequential + if (numEntries_ > keyRange_ - initId_) { + numEntries_ = keyRange_ - initId_; + } + throw new UnsupportedOperationException(); + } + private long initId_; + } + + public DbBenchmark(Map flags) throws Exception { + benchmarks_ = (List) flags.get(Flag.benchmarks); + num_ = (int) flags.get(Flag.num); + threadNum_ = (int) flags.get(Flag.threads); + reads_ = (int) (flags.get(Flag.reads) == null ? + flags.get(Flag.num) : flags.get(Flag.reads)); + keySize_ = (int) flags.get(Flag.key_size); + valueSize_ = (int) flags.get(Flag.value_size); + writeBufferSize_ = (int) flags.get(Flag.write_buffer_size) > 0 ? + (int) flags.get(Flag.write_buffer_size) : 0; + compressionRatio_ = (double) flags.get(Flag.compression_ratio); + useExisting_ = (boolean) flags.get(Flag.use_existing_db); + randSeed_ = (long) flags.get(Flag.seed); + databaseDir_ = (String) flags.get(Flag.db); + gen_ = new RandomGenerator(compressionRatio_); + } + + private void run() throws RocksDBException { + if (!useExisting_) { + destroyDb(); + } + open(); + + printHeader(); + + for (String benchmark : benchmarks_) { + List> tasks = new ArrayList>(); + WriteOptions writeOpt = new WriteOptions(); + int currentTaskId = 0; + int concurrentThreads = threadNum_; + boolean known = true; + + if (benchmark.equals("fillseq")) { + tasks.add(new WriteSequentialTask( + currentTaskId, randSeed_, num_, num_, writeOpt, 1)); + concurrentThreads = 1; + } else if (benchmark.equals("fillbatch")) { + tasks.add(new WriteRandomTask( + currentTaskId, randSeed_, num_ / 1000, num_, writeOpt, 1000)); + concurrentThreads = 1; + } else if (benchmark.equals("fillrandom")) { + tasks.add(new WriteRandomTask( + currentTaskId, randSeed_, num_, num_, writeOpt, 1)); + concurrentThreads = 1; + } else if (benchmark.equals("fillsync")) { + writeOpt.setSync(true); + tasks.add(new WriteRandomTask( + currentTaskId, randSeed_, num_ / 1000, num_ / 1000, + writeOpt, 1)); + concurrentThreads = 1; + } else if (benchmark.equals("readseq")) { + for (int t = 0; t < threadNum_; ++t) { + tasks.add(new ReadSequentialTask( + currentTaskId++, randSeed_, reads_ / threadNum_, + num_, (num_ / threadNum_) * t)); + } + } else if (benchmark.equals("readrandom")) { + for (int t = 0; t < threadNum_; ++t) { + tasks.add(new ReadRandomTask( + currentTaskId++, randSeed_, reads_ / threadNum_, num_)); + } + } else if (benchmark.equals("readhot")) { + for (int t = 0; t < threadNum_; ++t) { + tasks.add(new ReadRandomTask( + currentTaskId++, randSeed_, reads_ / threadNum_, num_ / 100)); + } + } else if (benchmark.equals("delete")) { + destroyDb(); + open(); + } else { + known = false; + System.err.println("Unknown benchmark: " + benchmark); + } + if (known) { + ExecutorService executor = Executors.newCachedThreadPool(); + try { + start(); + List> results = executor.invokeAll(tasks); + executor.shutdown(); + boolean finished = executor.awaitTermination(3, TimeUnit.DAYS); + // do something + stop(benchmark, results, concurrentThreads); + if (!finished) { + // do something else + System.out.format("Benchmark %s was not finished before timeout."); + } + } catch (InterruptedException e) { + System.err.println(e); + } + } + writeOpt.dispose(); + } + db_.close(); + } + + private void printHeader() { + int kKeySize = 16; + System.out.printf("Keys: %d bytes each\n", kKeySize); + System.out.printf("Values: %d bytes each (%d bytes after compression)\n", + valueSize_, + (int) (valueSize_ * compressionRatio_ + 0.5)); + System.out.printf("Entries: %d\n", num_); + System.out.printf("RawSize: %.1f MB (estimated)\n", + ((kKeySize + valueSize_) * num_) / 1048576.0); + System.out.printf("FileSize: %.1f MB (estimated)\n", + (((kKeySize + valueSize_ * compressionRatio_) * num_) + / 1048576.0)); + printWarnings(); + System.out.printf("------------------------------------------------\n"); + } + + void printWarnings() { + boolean assertsEnabled = false; + assert assertsEnabled = true; // Intentional side effect!!! + if (assertsEnabled) { + System.out.printf( + "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n"); + } + } + + private void open() throws RocksDBException { + db_ = RocksDB.open(databaseDir_); + } + + private void start() { + startTime_ = System.nanoTime(); + } + + private void stop( + String benchmark, List> results, int concurrentThreads) { + long endTime = System.nanoTime(); + double elapsedSeconds = + 1.0d * (endTime - startTime_) / TimeUnit.SECONDS.toNanos(1); + + Stats stats = new Stats(-1); + int taskFinishedCount = 0; + for (Future result : results) { + if (result.isDone()) { + try { + Stats taskStats = result.get(3, TimeUnit.SECONDS); + if (!result.isCancelled()) { + taskFinishedCount++; + } + stats.merge(taskStats); + } catch (Exception e) { + // then it's not successful, the output will indicate this + } + } + } + + System.out.printf( + "%-12s : %11.5f micros/op; %6.1f MB/s; %d / %d task(s) finished.\n", + benchmark, elapsedSeconds * 1e6 / num_, + (stats.bytes_ / 1048576.0) / elapsedSeconds, + taskFinishedCount, concurrentThreads); + } + + public static void formatNumber(byte[] slice, long n) { + assert(n >= 0); + + for (int i = slice.length - 1; i >= 0; --i) { + slice[i] = (byte) ('0' + (n % 10)); + n /= 10; + } + } + + private void destroyDb() { + if (db_ != null) { + db_.close(); + } + // TODO(yhchiang): develop our own FileUtil + // FileUtil.deleteDir(databaseDir_); + } + + private void printStats() { + } + + static void printHelp() { + System.out.println("usage:"); + for (Flag flag : Flag.values()) { + System.out.format(" --%s%n %s%n", + flag.name(), + flag.desc()); + if (flag.getDefaultValue() != null) { + System.out.format(" DEFAULT: %s%n", + flag.getDefaultValue().toString()); + } + System.out.println(""); + } + } + + public static void main(String[] args) throws Exception { + Map flags = new EnumMap(Flag.class); + for (Flag flag : Flag.values()) { + if (flag.getDefaultValue() != null) { + flags.put(flag, flag.getDefaultValue()); + } + } + for (String arg : args) { + boolean valid = false; + if (arg.equals("--help") || arg.equals("-h")) { + printHelp(); + System.exit(0); + } + if (arg.startsWith("--")) { + try { + String[] parts = arg.substring(2).split("="); + if (parts.length >= 1) { + Flag key = Flag.valueOf(parts[0]); + if (key != null) { + Object value = null; + if (parts.length >= 2) { + value = key.parseValue(parts[1]); + } + flags.put(key, value); + valid = true; + } + } + } + catch (Exception e) { + } + } + if (!valid) { + System.err.println("Invalid argument " + arg); + System.exit(1); + } + } + new DbBenchmark(flags).run(); + } + + private enum Flag { + benchmarks( + Arrays.asList( + "fillseq", + "readrandom", + "fillrandom"), + "Comma-separated list of operations to run in the specified order\n" + + "\tActual benchmarks:\n" + + "\t\tfillseq -- write N values in sequential key order in async mode\n" + + "\t\tfillrandom -- write N values in random key order in async mode\n" + + "\t\tfillbatch -- write N/1000 batch where each batch has 1000 values\n" + + "\t\t in random key order in sync mode\n" + + "\t\tfillsync -- write N/100 values in random key order in sync mode\n" + + "\t\tfill100K -- write N/1000 100K values in random order in async mode\n" + + "\t\treadseq -- read N times sequentially\n" + + "\t\treadrandom -- read N times in random order\n" + + "\t\treadhot -- read N times in random order from 1% section of DB\n" + + "\tMeta Operations:\n" + + "\t\tdelete -- delete DB") { + @Override public Object parseValue(String value) { + return new ArrayList(Arrays.asList(value.split(","))); + } + }, + + compression_ratio(0.5d, + "Arrange to generate values that shrink to this fraction of\n" + + "\ttheir original size after compression") { + @Override public Object parseValue(String value) { + return Double.parseDouble(value); + } + }, + + use_existing_db(false, + "If true, do not destroy the existing database. If you set this\n" + + "\tflag and also specify a benchmark that wants a fresh database," + + "\tthat benchmark will fail.") { + @Override public Object parseValue(String value) { + return Boolean.parseBoolean(value); + } + }, + + num(1000000, + "Number of key/values to place in database.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + + threads(1, + "Number of concurrent threads to run.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + + reads(null, + "Number of read operations to do. If negative, do --nums reads.") { + @Override + public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + + key_size(16, + "The size of each key in bytes.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + + value_size(100, + "The size of each value in bytes.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + + write_buffer_size(4 << 20, + "Number of bytes to buffer in memtable before compacting\n" + + "\t(initialized to default value by 'main'.)") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + + cache_size(-1, + "Number of bytes to use as a cache of uncompressed data.\n" + + "\tNegative means use default settings.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + + seed(0L, + "Seed base for random number generators.") { + @Override public Object parseValue(String value) { + return Long.parseLong(value); + } + }, + + + db("/tmp/rocksdbjni-bench", + "Use the db with the following name.") { + @Override public Object parseValue(String value) { + return value; + } + }; + + private Flag(Object defaultValue, String desc) { + defaultValue_ = defaultValue; + desc_ = desc; + } + + protected abstract Object parseValue(String value); + + public Object getDefaultValue() { + return defaultValue_; + } + + public String desc() { + return desc_; + } + + private final Object defaultValue_; + private final String desc_; + } + + private static class RandomGenerator { + private final byte[] data_; + private int dataLength_; + private int position_; + + private RandomGenerator(double compressionRatio) { + // We use a limited amount of data over and over again and ensure + // that it is larger than the compression window (32KB), and also + // large enough to serve all typical value sizes we want to write. + Random rand = new Random(301); + dataLength_ = 1048576 + 100; + data_ = new byte[dataLength_]; + // TODO(yhchiang): mimic test::CompressibleString? + for (int i = 0; i < dataLength_; ++i) { + data_[i] = (byte) (' ' + rand.nextInt(95)); + } + } + + private byte[] generate(int length) { + if (position_ + length > data_.length) { + position_ = 0; + assert (length < data_.length); + } + return Arrays.copyOfRange(data_, position_, position_ + length); + } + } + + RocksDB db_; + final List benchmarks_; + final int num_; + final int reads_; + final int keySize_; + final int valueSize_; + final int writeBufferSize_; + final int threadNum_; + final long randSeed_; + final boolean useExisting_; + final String databaseDir_; + final double compressionRatio_; + RandomGenerator gen_; + long startTime_; +} diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc index ccd87105d..fc5acd5cd 100644 --- a/java/rocksjni/rocksjni.cc +++ b/java/rocksjni/rocksjni.cc @@ -121,6 +121,28 @@ void Java_org_rocksdb_RocksDB_put__JJ_3BI_3BI( jvalue, jvalue_len); } +////////////////////////////////////////////////////////////////////////////// +// rocksdb::DB::Write +/* + * Class: org_rocksdb_RocksDB + * Method: write + * Signature: (JJ)V + */ +void Java_org_rocksdb_RocksDB_write( + JNIEnv* env, jobject jdb, + jlong jwrite_options_handle, jlong jbatch_handle) { + rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, jdb); + auto write_options = reinterpret_cast( + jwrite_options_handle); + auto batch = reinterpret_cast(jbatch_handle); + + rocksdb::Status s = db->Write(*write_options, batch); + + if (!s.ok()) { + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + } +} + ////////////////////////////////////////////////////////////////////////////// // rocksdb::DB::Get