Merge pull request #546 from fyrz/RocksJava-MemEnv

[RocksJava] Expose MemEnv in RocksJava
This commit is contained in:
Yueh-Hsuan Chiang 2015-03-24 14:01:12 -07:00
commit afc51649e2
10 changed files with 380 additions and 102 deletions

View File

@ -12,6 +12,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractComparator\
org.rocksdb.DBOptions\
org.rocksdb.DirectComparator\
org.rocksdb.DirectSlice\
org.rocksdb.Env\
org.rocksdb.FlushOptions\
org.rocksdb.Filter\
org.rocksdb.GenericRateLimiterConfig\
@ -27,6 +28,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractComparator\
org.rocksdb.RocksDB\
org.rocksdb.RocksEnv\
org.rocksdb.RocksIterator\
org.rocksdb.RocksMemEnv\
org.rocksdb.SkipListMemTableConfig\
org.rocksdb.Slice\
org.rocksdb.Statistics\
@ -84,6 +86,7 @@ JAVA_TESTS = org.rocksdb.BackupableDBOptionsTest\
org.rocksdb.RocksDBTest\
org.rocksdb.RocksEnvTest\
org.rocksdb.RocksIteratorTest\
org.rocksdb.RocksMemEnvTest\
org.rocksdb.util.SizeUnitTest\
org.rocksdb.SliceTest\
org.rocksdb.SnapshotTest\

View File

@ -40,6 +40,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.rocksdb.*;
import org.rocksdb.RocksMemEnv;
import org.rocksdb.util.SizeUnit;
class Stats {
@ -445,6 +446,7 @@ public class DbBenchmark {
keysPerPrefix_ = (Integer) flags.get(Flag.keys_per_prefix);
hashBucketCount_ = (Long) flags.get(Flag.hash_bucket_count);
usePlainTable_ = (Boolean) flags.get(Flag.use_plain_table);
useMemenv_ = (Boolean) flags.get(Flag.use_mem_env);
flags_ = flags;
finishLock_ = new Object();
// options.setPrefixSize((Integer)flags_.get(Flag.prefix_size));
@ -486,6 +488,9 @@ public class DbBenchmark {
} else {
options.setCreateIfMissing(false);
}
if (useMemenv_) {
options.setEnv(new RocksMemEnv());
}
switch (memtable_) {
case "skip_list":
options.setMemTableConfig(new SkipListMemTableConfig());
@ -1482,6 +1487,12 @@ public class DbBenchmark {
@Override public Object parseValue(String value) {
return value;
}
},
use_mem_env(false, "Use RocksMemEnv instead of default filesystem based\n" +
"environment.") {
@Override public Object parseValue(String value) {
return parseBoolean(value);
}
};
private Flag(Object defaultValue, String desc) {
@ -1594,6 +1605,9 @@ public class DbBenchmark {
RandomGenerator gen_;
long startTime_;
// env
boolean useMemenv_;
// memtable related
final int maxWriteBufferNumber_;
final int prefixSize_;

View File

@ -6,44 +6,46 @@
// This file implements the "bridge" between Java and C++ and enables
// calling c++ rocksdb::Env methods from Java side.
#include "include/org_rocksdb_Env.h"
#include "include/org_rocksdb_RocksEnv.h"
#include "include/org_rocksdb_RocksMemEnv.h"
#include "rocksdb/env.h"
/*
* Class: org_rocksdb_RocksEnv
* Class: org_rocksdb_Env
* Method: getDefaultEnvInternal
* Signature: ()J
*/
jlong Java_org_rocksdb_RocksEnv_getDefaultEnvInternal(
jlong Java_org_rocksdb_Env_getDefaultEnvInternal(
JNIEnv* env, jclass jclazz) {
return reinterpret_cast<jlong>(rocksdb::Env::Default());
}
/*
* Class: org_rocksdb_RocksEnv
* Class: org_rocksdb_Env
* Method: setBackgroundThreads
* Signature: (JII)V
*/
void Java_org_rocksdb_RocksEnv_setBackgroundThreads(
void Java_org_rocksdb_Env_setBackgroundThreads(
JNIEnv* env, jobject jobj, jlong jhandle,
jint num, jint priority) {
auto* rocks_env = reinterpret_cast<rocksdb::Env*>(jhandle);
switch (priority) {
case org_rocksdb_RocksEnv_FLUSH_POOL:
case org_rocksdb_Env_FLUSH_POOL:
rocks_env->SetBackgroundThreads(num, rocksdb::Env::Priority::LOW);
break;
case org_rocksdb_RocksEnv_COMPACTION_POOL:
case org_rocksdb_Env_COMPACTION_POOL:
rocks_env->SetBackgroundThreads(num, rocksdb::Env::Priority::HIGH);
break;
}
}
/*
* Class: org_rocksdb_RocksEnv
* Class: org_rocksdb_sEnv
* Method: getThreadPoolQueueLen
* Signature: (JI)I
*/
jint Java_org_rocksdb_RocksEnv_getThreadPoolQueueLen(
jint Java_org_rocksdb_Env_getThreadPoolQueueLen(
JNIEnv* env, jobject jobj, jlong jhandle, jint pool_id) {
auto* rocks_env = reinterpret_cast<rocksdb::Env*>(jhandle);
switch (pool_id) {
@ -56,11 +58,22 @@ jint Java_org_rocksdb_RocksEnv_getThreadPoolQueueLen(
}
/*
* Class: org_rocksdb_RocksEnv
* Class: org_rocksdb_RocksMemEnv
* Method: createMemEnv
* Signature: ()J
*/
jlong Java_org_rocksdb_RocksMemEnv_createMemEnv(
JNIEnv* env, jclass jclazz) {
return reinterpret_cast<jlong>(rocksdb::NewMemEnv(
rocksdb::Env::Default()));
}
/*
* Class: org_rocksdb_RocksMemEnv
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_RocksEnv_disposeInternal(
void Java_org_rocksdb_RocksMemEnv_disposeInternal(
JNIEnv* env, jobject jobj, jlong jhandle) {
delete reinterpret_cast<rocksdb::Env*>(jhandle);
}

View File

@ -0,0 +1,92 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb;
/**
* Base class for all Env implementations in RocksDB.
*/
public abstract class Env extends RocksObject {
public static final int FLUSH_POOL = 0;
public static final int COMPACTION_POOL = 1;
/**
* <p>Returns the default environment suitable for the current operating
* system.</p>
*
* <p>The result of {@code getDefault()} is a singleton whose ownership
* belongs to rocksdb c++. As a result, the returned RocksEnv will not
* have the ownership of its c++ resource, and calling its dispose()
* will be no-op.</p>
*
* @return the default {@link org.rocksdb.RocksEnv} instance.
*/
public static Env getDefault() {
return default_env_;
}
/**
* <p>Sets the number of background worker threads of the flush pool
* for this environment.</p>
* <p>Default number: 1</p>
*
* @param num the number of threads
*
* @return current {@link RocksEnv} instance.
*/
public Env setBackgroundThreads(final int num) {
return setBackgroundThreads(num, FLUSH_POOL);
}
/**
* <p>Sets the number of background worker threads of the specified thread
* pool for this environment.</p>
*
* @param num the number of threads
* @param poolID the id to specified a thread pool. Should be either
* FLUSH_POOL or COMPACTION_POOL.
*
* <p>Default number: 1</p>
* @return current {@link RocksEnv} instance.
*/
public Env setBackgroundThreads(final int num, final int poolID) {
setBackgroundThreads(nativeHandle_, num, poolID);
return this;
}
/**
* <p>Returns the length of the queue associated with the specified
* thread pool.</p>
*
* @param poolID the id to specified a thread pool. Should be either
* FLUSH_POOL or COMPACTION_POOL.
*
* @return the thread pool queue length.
*/
public int getThreadPoolQueueLen(final int poolID) {
return getThreadPoolQueueLen(nativeHandle_, poolID);
}
protected Env() {
super();
}
static {
default_env_ = new RocksEnv(getDefaultEnvInternal());
}
/**
* <p>The static default Env. The ownership of its native handle
* belongs to rocksdb c++ and is not able to be released on the Java
* side.</p>
*/
static Env default_env_;
private static native long getDefaultEnvInternal();
private native void setBackgroundThreads(
long handle, int num, int priority);
private native int getThreadPoolQueueLen(long handle, int poolID);
}

View File

@ -29,7 +29,7 @@ public class Options extends RocksObject
public Options() {
super();
newOptions();
env_ = RocksEnv.getDefault();
env_ = Env.getDefault();
}
/**
@ -44,7 +44,7 @@ public class Options extends RocksObject
final ColumnFamilyOptions columnFamilyOptions) {
super();
newOptions(dbOptions.nativeHandle_, columnFamilyOptions.nativeHandle_);
env_ = RocksEnv.getDefault();
env_ = Env.getDefault();
}
@Override
@ -71,12 +71,12 @@ public class Options extends RocksObject
/**
* Use the specified object to interact with the environment,
* e.g. to read/write files, schedule background work, etc.
* Default: {@link RocksEnv#getDefault()}
* Default: {@link Env#getDefault()}
*
* @param env {@link RocksEnv} instance.
* @param env {@link Env} instance.
* @return the instance of the current Options.
*/
public Options setEnv(final RocksEnv env) {
public Options setEnv(final Env env) {
assert(isInitialized());
setEnv(nativeHandle_, env.nativeHandle_);
env_ = env;
@ -88,7 +88,7 @@ public class Options extends RocksObject
*
* @return {@link RocksEnv} instance set in the Options.
*/
public RocksEnv getEnv() {
public Env getEnv() {
return env_;
}
@ -1333,7 +1333,7 @@ public class Options extends RocksObject
boolean optimizeFiltersForHits);
private native boolean optimizeFiltersForHits(long handle);
// instance variables
RocksEnv env_;
Env env_;
MemTableConfig memTableConfig_;
TableFormatConfig tableFormatConfig_;
RateLimiterConfig rateLimiterConfig_;

View File

@ -12,75 +12,7 @@ package org.rocksdb;
* <p>All Env implementations are safe for concurrent access from
* multiple threads without any external synchronization.</p>
*/
public class RocksEnv extends RocksObject {
public static final int FLUSH_POOL = 0;
public static final int COMPACTION_POOL = 1;
static {
default_env_ = new RocksEnv(getDefaultEnvInternal());
}
private static native long getDefaultEnvInternal();
/**
* <p>Returns the default environment suitable for the current operating
* system.</p>
*
* <p>The result of {@code getDefault()} is a singleton whose ownership
* belongs to rocksdb c++. As a result, the returned RocksEnv will not
* have the ownership of its c++ resource, and calling its dispose()
* will be no-op.</p>
*
* @return the default {@link org.rocksdb.RocksEnv} instance.
*/
public static RocksEnv getDefault() {
return default_env_;
}
/**
* <p>Sets the number of background worker threads of the flush pool
* for this environment.</p>
* <p>Default number: 1</p>
*
* @param num the number of threads
*
* @return current {@link org.rocksdb.RocksEnv} instance.
*/
public RocksEnv setBackgroundThreads(final int num) {
return setBackgroundThreads(num, FLUSH_POOL);
}
/**
* <p>Sets the number of background worker threads of the specified thread
* pool for this environment.</p>
*
* @param num the number of threads
* @param poolID the id to specified a thread pool. Should be either
* FLUSH_POOL or COMPACTION_POOL.
*
* <p>Default number: 1</p>
* @return current {@link org.rocksdb.RocksEnv} instance.
*/
public RocksEnv setBackgroundThreads(final int num, final int poolID) {
setBackgroundThreads(nativeHandle_, num, poolID);
return this;
}
private native void setBackgroundThreads(
long handle, int num, int priority);
/**
* <p>Returns the length of the queue associated with the specified
* thread pool.</p>
*
* @param poolID the id to specified a thread pool. Should be either
* FLUSH_POOL or COMPACTION_POOL.
*
* @return the thread pool queue length.
*/
public int getThreadPoolQueueLen(final int poolID) {
return getThreadPoolQueueLen(nativeHandle_, poolID);
}
private native int getThreadPoolQueueLen(long handle, int poolID);
public class RocksEnv extends Env {
/**
* <p>Package-private constructor that uses the specified native handle
@ -98,19 +30,14 @@ public class RocksEnv extends RocksObject {
}
/**
* The helper function of {@link #dispose()} which all subclasses of
* <p>The helper function of {@link #dispose()} which all subclasses of
* {@link RocksObject} must implement to release their associated C++
* resource.
* resource.</p>
*
* <p><strong>Note:</strong> this class is used to use the default
* RocksEnv with RocksJava. The default env allocation is managed
* by C++.</p>
*/
@Override protected void disposeInternal() {
disposeInternal(nativeHandle_);
}
private native void disposeInternal(long handle);
/**
* <p>The static default RocksEnv. The ownership of its native handle
* belongs to rocksdb c++ and is not able to be released on the Java
* side.</p>
*/
static RocksEnv default_env_;
}

View File

@ -0,0 +1,33 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb;
/**
* RocksDB memory environment.
*/
public class RocksMemEnv extends Env {
/**
* <p>Creates a new RocksDB environment that stores its data
* in memory and delegates all non-file-storage tasks to
* base_env. The caller must delete the result when it is
* no longer needed.</p>
*
* <p>{@code *base_env} must remain live while the result is in use.</p>
*/
public RocksMemEnv() {
super();
nativeHandle_ = createMemEnv();
}
@Override
protected void disposeInternal() {
disposeInternal(nativeHandle_);
}
private static native long createMemEnv();
private native void disposeInternal(long handle);
}

View File

@ -996,13 +996,13 @@ public class OptionsTest {
}
@Test
public void rocksEnv() {
public void env() {
Options options = null;
try {
options = new Options();
RocksEnv rocksEnv = RocksEnv.getDefault();
options.setEnv(rocksEnv);
assertThat(options.getEnv()).isSameAs(rocksEnv);
Env env = Env.getDefault();
options.setEnv(env);
assertThat(options.getEnv()).isSameAs(env);
} finally {
if (options != null) {
options.dispose();

View File

@ -18,7 +18,7 @@ public class RocksEnvTest {
@Test
public void rocksEnv(){
RocksEnv rocksEnv = RocksEnv.getDefault();
Env rocksEnv = RocksEnv.getDefault();
rocksEnv.setBackgroundThreads(5);
// default rocksenv will always return zero for flush pool
// no matter what was set via setBackgroundThreads

View File

@ -0,0 +1,196 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import static org.assertj.core.api.Assertions.assertThat;
public class RocksMemEnvTest {
@ClassRule
public static final RocksMemoryResource rocksMemoryResource =
new RocksMemoryResource();
@Test
public void memEnvFillAndReopen() throws RocksDBException {
final byte[][] keys = {
"aaa".getBytes(),
"bbb".getBytes(),
"ccc".getBytes()
};
final byte[][] values = {
"foo".getBytes(),
"bar".getBytes(),
"baz".getBytes()
};
Env env = null;
Options options = null;
RocksDB db = null;
FlushOptions flushOptions = null;
try {
env = new RocksMemEnv();
options = new Options().
setCreateIfMissing(true).
setEnv(env);
flushOptions = new FlushOptions().
setWaitForFlush(true);
db = RocksDB.open(options, "dir/db");
// write key/value pairs using MemEnv
for (int i=0; i < keys.length; i++) {
db.put(keys[i], values[i]);
}
// read key/value pairs using MemEnv
for (int i=0; i < keys.length; i++) {
assertThat(db.get(keys[i])).isEqualTo(values[i]);
}
// Check iterator access
RocksIterator iterator = db.newIterator();
iterator.seekToFirst();
for (int i=0; i < keys.length; i++) {
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo(keys[i]);
assertThat(iterator.value()).isEqualTo(values[i]);
iterator.next();
}
// reached end of database
assertThat(iterator.isValid()).isFalse();
iterator.dispose();
// flush
db.flush(flushOptions);
// read key/value pairs after flush using MemEnv
for (int i=0; i < keys.length; i++) {
assertThat(db.get(keys[i])).isEqualTo(values[i]);
}
db.close();
options.setCreateIfMissing(false);
// After reopen the values shall still be in the mem env.
// as long as the env is not freed.
db = RocksDB.open(options, "dir/db");
// read key/value pairs using MemEnv
for (int i=0; i < keys.length; i++) {
assertThat(db.get(keys[i])).isEqualTo(values[i]);
}
} finally {
if (db != null) {
db.close();
}
if (options != null) {
options.dispose();
}
if (flushOptions != null) {
flushOptions.dispose();
}
if (env != null) {
env.dispose();
}
}
}
@Test
public void multipleDatabaseInstances() throws RocksDBException {
// db - keys
final byte[][] keys = {
"aaa".getBytes(),
"bbb".getBytes(),
"ccc".getBytes()
};
// otherDb - keys
final byte[][] otherKeys = {
"111".getBytes(),
"222".getBytes(),
"333".getBytes()
};
// values
final byte[][] values = {
"foo".getBytes(),
"bar".getBytes(),
"baz".getBytes()
};
Env env = null;
Options options = null;
RocksDB db = null, otherDb = null;
try {
env = new RocksMemEnv();
options = new Options().
setCreateIfMissing(true).
setEnv(env);
db = RocksDB.open(options, "dir/db");
otherDb = RocksDB.open(options, "dir/otherDb");
// write key/value pairs using MemEnv
// to db and to otherDb.
for (int i=0; i < keys.length; i++) {
db.put(keys[i], values[i]);
otherDb.put(otherKeys[i], values[i]);
}
// verify key/value pairs after flush using MemEnv
for (int i=0; i < keys.length; i++) {
// verify db
assertThat(db.get(otherKeys[i])).isNull();
assertThat(db.get(keys[i])).isEqualTo(values[i]);
// verify otherDb
assertThat(otherDb.get(keys[i])).isNull();
assertThat(otherDb.get(otherKeys[i])).isEqualTo(values[i]);
}
} finally {
if (db != null) {
db.close();
}
if (otherDb != null) {
otherDb.close();
}
if (options != null) {
options.dispose();
}
if (env != null) {
env.dispose();
}
}
}
@Test(expected = RocksDBException.class)
public void createIfMissingFalse() throws RocksDBException {
Env env = null;
Options options = null;
RocksDB db = null;
try {
env = new RocksMemEnv();
options = new Options().
setCreateIfMissing(false).
setEnv(env);
// shall throw an exception because db dir does not
// exist.
db = RocksDB.open(options, "db/dir");
} finally {
if (options != null) {
options.dispose();
}
if (env != null) {
env.dispose();
}
}
}
}