diff --git a/java/Makefile b/java/Makefile index 49afc6c45..1b5cc5aa8 100644 --- a/java/Makefile +++ b/java/Makefile @@ -12,6 +12,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractComparator\ org.rocksdb.DBOptions\ org.rocksdb.DirectComparator\ org.rocksdb.DirectSlice\ + org.rocksdb.Env\ org.rocksdb.FlushOptions\ org.rocksdb.Filter\ org.rocksdb.GenericRateLimiterConfig\ @@ -27,6 +28,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractComparator\ org.rocksdb.RocksDB\ org.rocksdb.RocksEnv\ org.rocksdb.RocksIterator\ + org.rocksdb.RocksMemEnv\ org.rocksdb.SkipListMemTableConfig\ org.rocksdb.Slice\ org.rocksdb.Statistics\ @@ -84,6 +86,7 @@ JAVA_TESTS = org.rocksdb.BackupableDBOptionsTest\ org.rocksdb.RocksDBTest\ org.rocksdb.RocksEnvTest\ org.rocksdb.RocksIteratorTest\ + org.rocksdb.RocksMemEnvTest\ org.rocksdb.util.SizeUnitTest\ org.rocksdb.SliceTest\ org.rocksdb.SnapshotTest\ diff --git a/java/benchmark/src/main/java/org/rocksdb/benchmark/DbBenchmark.java b/java/benchmark/src/main/java/org/rocksdb/benchmark/DbBenchmark.java index 4a8887377..698f62327 100644 --- a/java/benchmark/src/main/java/org/rocksdb/benchmark/DbBenchmark.java +++ b/java/benchmark/src/main/java/org/rocksdb/benchmark/DbBenchmark.java @@ -40,6 +40,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.rocksdb.*; +import org.rocksdb.RocksMemEnv; import org.rocksdb.util.SizeUnit; class Stats { @@ -445,6 +446,7 @@ public class DbBenchmark { keysPerPrefix_ = (Integer) flags.get(Flag.keys_per_prefix); hashBucketCount_ = (Long) flags.get(Flag.hash_bucket_count); usePlainTable_ = (Boolean) flags.get(Flag.use_plain_table); + useMemenv_ = (Boolean) flags.get(Flag.use_mem_env); flags_ = flags; finishLock_ = new Object(); // options.setPrefixSize((Integer)flags_.get(Flag.prefix_size)); @@ -486,6 +488,9 @@ public class DbBenchmark { } else { options.setCreateIfMissing(false); } + if (useMemenv_) { + options.setEnv(new RocksMemEnv()); + } switch (memtable_) { case "skip_list": options.setMemTableConfig(new SkipListMemTableConfig()); @@ -1482,6 +1487,12 @@ public class DbBenchmark { @Override public Object parseValue(String value) { return value; } + }, + use_mem_env(false, "Use RocksMemEnv instead of default filesystem based\n" + + "environment.") { + @Override public Object parseValue(String value) { + return parseBoolean(value); + } }; private Flag(Object defaultValue, String desc) { @@ -1594,6 +1605,9 @@ public class DbBenchmark { RandomGenerator gen_; long startTime_; + // env + boolean useMemenv_; + // memtable related final int maxWriteBufferNumber_; final int prefixSize_; diff --git a/java/rocksjni/env.cc b/java/rocksjni/env.cc index c6c58e144..b50d5ae30 100644 --- a/java/rocksjni/env.cc +++ b/java/rocksjni/env.cc @@ -6,44 +6,46 @@ // This file implements the "bridge" between Java and C++ and enables // calling c++ rocksdb::Env methods from Java side. +#include "include/org_rocksdb_Env.h" #include "include/org_rocksdb_RocksEnv.h" +#include "include/org_rocksdb_RocksMemEnv.h" #include "rocksdb/env.h" /* - * Class: org_rocksdb_RocksEnv + * Class: org_rocksdb_Env * Method: getDefaultEnvInternal * Signature: ()J */ -jlong Java_org_rocksdb_RocksEnv_getDefaultEnvInternal( +jlong Java_org_rocksdb_Env_getDefaultEnvInternal( JNIEnv* env, jclass jclazz) { return reinterpret_cast(rocksdb::Env::Default()); } /* - * Class: org_rocksdb_RocksEnv + * Class: org_rocksdb_Env * Method: setBackgroundThreads * Signature: (JII)V */ -void Java_org_rocksdb_RocksEnv_setBackgroundThreads( +void Java_org_rocksdb_Env_setBackgroundThreads( JNIEnv* env, jobject jobj, jlong jhandle, jint num, jint priority) { auto* rocks_env = reinterpret_cast(jhandle); switch (priority) { - case org_rocksdb_RocksEnv_FLUSH_POOL: + case org_rocksdb_Env_FLUSH_POOL: rocks_env->SetBackgroundThreads(num, rocksdb::Env::Priority::LOW); break; - case org_rocksdb_RocksEnv_COMPACTION_POOL: + case org_rocksdb_Env_COMPACTION_POOL: rocks_env->SetBackgroundThreads(num, rocksdb::Env::Priority::HIGH); break; } } /* - * Class: org_rocksdb_RocksEnv + * Class: org_rocksdb_sEnv * Method: getThreadPoolQueueLen * Signature: (JI)I */ -jint Java_org_rocksdb_RocksEnv_getThreadPoolQueueLen( +jint Java_org_rocksdb_Env_getThreadPoolQueueLen( JNIEnv* env, jobject jobj, jlong jhandle, jint pool_id) { auto* rocks_env = reinterpret_cast(jhandle); switch (pool_id) { @@ -56,11 +58,22 @@ jint Java_org_rocksdb_RocksEnv_getThreadPoolQueueLen( } /* - * Class: org_rocksdb_RocksEnv + * Class: org_rocksdb_RocksMemEnv + * Method: createMemEnv + * Signature: ()J + */ +jlong Java_org_rocksdb_RocksMemEnv_createMemEnv( + JNIEnv* env, jclass jclazz) { + return reinterpret_cast(rocksdb::NewMemEnv( + rocksdb::Env::Default())); +} + +/* + * Class: org_rocksdb_RocksMemEnv * Method: disposeInternal * Signature: (J)V */ -void Java_org_rocksdb_RocksEnv_disposeInternal( +void Java_org_rocksdb_RocksMemEnv_disposeInternal( JNIEnv* env, jobject jobj, jlong jhandle) { delete reinterpret_cast(jhandle); } diff --git a/java/src/main/java/org/rocksdb/Env.java b/java/src/main/java/org/rocksdb/Env.java new file mode 100644 index 000000000..929a394c3 --- /dev/null +++ b/java/src/main/java/org/rocksdb/Env.java @@ -0,0 +1,92 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +package org.rocksdb; + +/** + * Base class for all Env implementations in RocksDB. + */ +public abstract class Env extends RocksObject { + public static final int FLUSH_POOL = 0; + public static final int COMPACTION_POOL = 1; + + /** + *

Returns the default environment suitable for the current operating + * system.

+ * + *

The result of {@code getDefault()} is a singleton whose ownership + * belongs to rocksdb c++. As a result, the returned RocksEnv will not + * have the ownership of its c++ resource, and calling its dispose() + * will be no-op.

+ * + * @return the default {@link org.rocksdb.RocksEnv} instance. + */ + public static Env getDefault() { + return default_env_; + } + + /** + *

Sets the number of background worker threads of the flush pool + * for this environment.

+ *

Default number: 1

+ * + * @param num the number of threads + * + * @return current {@link RocksEnv} instance. + */ + public Env setBackgroundThreads(final int num) { + return setBackgroundThreads(num, FLUSH_POOL); + } + + /** + *

Sets the number of background worker threads of the specified thread + * pool for this environment.

+ * + * @param num the number of threads + * @param poolID the id to specified a thread pool. Should be either + * FLUSH_POOL or COMPACTION_POOL. + * + *

Default number: 1

+ * @return current {@link RocksEnv} instance. + */ + public Env setBackgroundThreads(final int num, final int poolID) { + setBackgroundThreads(nativeHandle_, num, poolID); + return this; + } + + /** + *

Returns the length of the queue associated with the specified + * thread pool.

+ * + * @param poolID the id to specified a thread pool. Should be either + * FLUSH_POOL or COMPACTION_POOL. + * + * @return the thread pool queue length. + */ + public int getThreadPoolQueueLen(final int poolID) { + return getThreadPoolQueueLen(nativeHandle_, poolID); + } + + + protected Env() { + super(); + } + + static { + default_env_ = new RocksEnv(getDefaultEnvInternal()); + } + + /** + *

The static default Env. The ownership of its native handle + * belongs to rocksdb c++ and is not able to be released on the Java + * side.

+ */ + static Env default_env_; + + private static native long getDefaultEnvInternal(); + private native void setBackgroundThreads( + long handle, int num, int priority); + private native int getThreadPoolQueueLen(long handle, int poolID); +} diff --git a/java/src/main/java/org/rocksdb/Options.java b/java/src/main/java/org/rocksdb/Options.java index 6dda0e82f..ed4fb163d 100644 --- a/java/src/main/java/org/rocksdb/Options.java +++ b/java/src/main/java/org/rocksdb/Options.java @@ -29,7 +29,7 @@ public class Options extends RocksObject public Options() { super(); newOptions(); - env_ = RocksEnv.getDefault(); + env_ = Env.getDefault(); } /** @@ -44,7 +44,7 @@ public class Options extends RocksObject final ColumnFamilyOptions columnFamilyOptions) { super(); newOptions(dbOptions.nativeHandle_, columnFamilyOptions.nativeHandle_); - env_ = RocksEnv.getDefault(); + env_ = Env.getDefault(); } @Override @@ -71,12 +71,12 @@ public class Options extends RocksObject /** * Use the specified object to interact with the environment, * e.g. to read/write files, schedule background work, etc. - * Default: {@link RocksEnv#getDefault()} + * Default: {@link Env#getDefault()} * - * @param env {@link RocksEnv} instance. + * @param env {@link Env} instance. * @return the instance of the current Options. */ - public Options setEnv(final RocksEnv env) { + public Options setEnv(final Env env) { assert(isInitialized()); setEnv(nativeHandle_, env.nativeHandle_); env_ = env; @@ -88,7 +88,7 @@ public class Options extends RocksObject * * @return {@link RocksEnv} instance set in the Options. */ - public RocksEnv getEnv() { + public Env getEnv() { return env_; } @@ -1333,7 +1333,7 @@ public class Options extends RocksObject boolean optimizeFiltersForHits); private native boolean optimizeFiltersForHits(long handle); // instance variables - RocksEnv env_; + Env env_; MemTableConfig memTableConfig_; TableFormatConfig tableFormatConfig_; RateLimiterConfig rateLimiterConfig_; diff --git a/java/src/main/java/org/rocksdb/RocksEnv.java b/java/src/main/java/org/rocksdb/RocksEnv.java index f57713cb1..4c399eafa 100644 --- a/java/src/main/java/org/rocksdb/RocksEnv.java +++ b/java/src/main/java/org/rocksdb/RocksEnv.java @@ -12,75 +12,7 @@ package org.rocksdb; *

All Env implementations are safe for concurrent access from * multiple threads without any external synchronization.

*/ -public class RocksEnv extends RocksObject { - public static final int FLUSH_POOL = 0; - public static final int COMPACTION_POOL = 1; - - static { - default_env_ = new RocksEnv(getDefaultEnvInternal()); - - } - private static native long getDefaultEnvInternal(); - - /** - *

Returns the default environment suitable for the current operating - * system.

- * - *

The result of {@code getDefault()} is a singleton whose ownership - * belongs to rocksdb c++. As a result, the returned RocksEnv will not - * have the ownership of its c++ resource, and calling its dispose() - * will be no-op.

- * - * @return the default {@link org.rocksdb.RocksEnv} instance. - */ - public static RocksEnv getDefault() { - return default_env_; - } - - /** - *

Sets the number of background worker threads of the flush pool - * for this environment.

- *

Default number: 1

- * - * @param num the number of threads - * - * @return current {@link org.rocksdb.RocksEnv} instance. - */ - public RocksEnv setBackgroundThreads(final int num) { - return setBackgroundThreads(num, FLUSH_POOL); - } - - /** - *

Sets the number of background worker threads of the specified thread - * pool for this environment.

- * - * @param num the number of threads - * @param poolID the id to specified a thread pool. Should be either - * FLUSH_POOL or COMPACTION_POOL. - * - *

Default number: 1

- * @return current {@link org.rocksdb.RocksEnv} instance. - */ - public RocksEnv setBackgroundThreads(final int num, final int poolID) { - setBackgroundThreads(nativeHandle_, num, poolID); - return this; - } - private native void setBackgroundThreads( - long handle, int num, int priority); - - /** - *

Returns the length of the queue associated with the specified - * thread pool.

- * - * @param poolID the id to specified a thread pool. Should be either - * FLUSH_POOL or COMPACTION_POOL. - * - * @return the thread pool queue length. - */ - public int getThreadPoolQueueLen(final int poolID) { - return getThreadPoolQueueLen(nativeHandle_, poolID); - } - private native int getThreadPoolQueueLen(long handle, int poolID); +public class RocksEnv extends Env { /** *

Package-private constructor that uses the specified native handle @@ -98,19 +30,14 @@ public class RocksEnv extends RocksObject { } /** - * The helper function of {@link #dispose()} which all subclasses of + *

The helper function of {@link #dispose()} which all subclasses of * {@link RocksObject} must implement to release their associated C++ - * resource. + * resource.

+ * + *

Note: this class is used to use the default + * RocksEnv with RocksJava. The default env allocation is managed + * by C++.

*/ @Override protected void disposeInternal() { - disposeInternal(nativeHandle_); } - private native void disposeInternal(long handle); - - /** - *

The static default RocksEnv. The ownership of its native handle - * belongs to rocksdb c++ and is not able to be released on the Java - * side.

- */ - static RocksEnv default_env_; } diff --git a/java/src/main/java/org/rocksdb/RocksMemEnv.java b/java/src/main/java/org/rocksdb/RocksMemEnv.java new file mode 100644 index 000000000..54c9f9981 --- /dev/null +++ b/java/src/main/java/org/rocksdb/RocksMemEnv.java @@ -0,0 +1,33 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +package org.rocksdb; + +/** + * RocksDB memory environment. + */ +public class RocksMemEnv extends Env { + + /** + *

Creates a new RocksDB environment that stores its data + * in memory and delegates all non-file-storage tasks to + * base_env. The caller must delete the result when it is + * no longer needed.

+ * + *

{@code *base_env} must remain live while the result is in use.

+ */ + public RocksMemEnv() { + super(); + nativeHandle_ = createMemEnv(); + } + + @Override + protected void disposeInternal() { + disposeInternal(nativeHandle_); + } + + private static native long createMemEnv(); + private native void disposeInternal(long handle); +} diff --git a/java/src/test/java/org/rocksdb/OptionsTest.java b/java/src/test/java/org/rocksdb/OptionsTest.java index ea3bc62af..0b510f417 100644 --- a/java/src/test/java/org/rocksdb/OptionsTest.java +++ b/java/src/test/java/org/rocksdb/OptionsTest.java @@ -996,13 +996,13 @@ public class OptionsTest { } @Test - public void rocksEnv() { + public void env() { Options options = null; try { options = new Options(); - RocksEnv rocksEnv = RocksEnv.getDefault(); - options.setEnv(rocksEnv); - assertThat(options.getEnv()).isSameAs(rocksEnv); + Env env = Env.getDefault(); + options.setEnv(env); + assertThat(options.getEnv()).isSameAs(env); } finally { if (options != null) { options.dispose(); diff --git a/java/src/test/java/org/rocksdb/RocksEnvTest.java b/java/src/test/java/org/rocksdb/RocksEnvTest.java index 6b0b9becc..5914e6e29 100644 --- a/java/src/test/java/org/rocksdb/RocksEnvTest.java +++ b/java/src/test/java/org/rocksdb/RocksEnvTest.java @@ -18,7 +18,7 @@ public class RocksEnvTest { @Test public void rocksEnv(){ - RocksEnv rocksEnv = RocksEnv.getDefault(); + Env rocksEnv = RocksEnv.getDefault(); rocksEnv.setBackgroundThreads(5); // default rocksenv will always return zero for flush pool // no matter what was set via setBackgroundThreads diff --git a/java/src/test/java/org/rocksdb/RocksMemEnvTest.java b/java/src/test/java/org/rocksdb/RocksMemEnvTest.java new file mode 100644 index 000000000..d2791c93e --- /dev/null +++ b/java/src/test/java/org/rocksdb/RocksMemEnvTest.java @@ -0,0 +1,196 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +package org.rocksdb; + +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.assertj.core.api.Assertions.assertThat; + +public class RocksMemEnvTest { + + @ClassRule + public static final RocksMemoryResource rocksMemoryResource = + new RocksMemoryResource(); + + @Test + public void memEnvFillAndReopen() throws RocksDBException { + + final byte[][] keys = { + "aaa".getBytes(), + "bbb".getBytes(), + "ccc".getBytes() + }; + + final byte[][] values = { + "foo".getBytes(), + "bar".getBytes(), + "baz".getBytes() + }; + + Env env = null; + Options options = null; + RocksDB db = null; + FlushOptions flushOptions = null; + try { + env = new RocksMemEnv(); + options = new Options(). + setCreateIfMissing(true). + setEnv(env); + flushOptions = new FlushOptions(). + setWaitForFlush(true); + db = RocksDB.open(options, "dir/db"); + + // write key/value pairs using MemEnv + for (int i=0; i < keys.length; i++) { + db.put(keys[i], values[i]); + } + + // read key/value pairs using MemEnv + for (int i=0; i < keys.length; i++) { + assertThat(db.get(keys[i])).isEqualTo(values[i]); + } + + // Check iterator access + RocksIterator iterator = db.newIterator(); + iterator.seekToFirst(); + for (int i=0; i < keys.length; i++) { + assertThat(iterator.isValid()).isTrue(); + assertThat(iterator.key()).isEqualTo(keys[i]); + assertThat(iterator.value()).isEqualTo(values[i]); + iterator.next(); + } + // reached end of database + assertThat(iterator.isValid()).isFalse(); + iterator.dispose(); + + // flush + db.flush(flushOptions); + + // read key/value pairs after flush using MemEnv + for (int i=0; i < keys.length; i++) { + assertThat(db.get(keys[i])).isEqualTo(values[i]); + } + + db.close(); + options.setCreateIfMissing(false); + + // After reopen the values shall still be in the mem env. + // as long as the env is not freed. + db = RocksDB.open(options, "dir/db"); + // read key/value pairs using MemEnv + for (int i=0; i < keys.length; i++) { + assertThat(db.get(keys[i])).isEqualTo(values[i]); + } + + } finally { + if (db != null) { + db.close(); + } + if (options != null) { + options.dispose(); + } + if (flushOptions != null) { + flushOptions.dispose(); + } + if (env != null) { + env.dispose(); + } + } + } + + @Test + public void multipleDatabaseInstances() throws RocksDBException { + // db - keys + final byte[][] keys = { + "aaa".getBytes(), + "bbb".getBytes(), + "ccc".getBytes() + }; + // otherDb - keys + final byte[][] otherKeys = { + "111".getBytes(), + "222".getBytes(), + "333".getBytes() + }; + // values + final byte[][] values = { + "foo".getBytes(), + "bar".getBytes(), + "baz".getBytes() + }; + + Env env = null; + Options options = null; + RocksDB db = null, otherDb = null; + + try { + env = new RocksMemEnv(); + options = new Options(). + setCreateIfMissing(true). + setEnv(env); + db = RocksDB.open(options, "dir/db"); + otherDb = RocksDB.open(options, "dir/otherDb"); + + // write key/value pairs using MemEnv + // to db and to otherDb. + for (int i=0; i < keys.length; i++) { + db.put(keys[i], values[i]); + otherDb.put(otherKeys[i], values[i]); + } + + // verify key/value pairs after flush using MemEnv + for (int i=0; i < keys.length; i++) { + // verify db + assertThat(db.get(otherKeys[i])).isNull(); + assertThat(db.get(keys[i])).isEqualTo(values[i]); + + // verify otherDb + assertThat(otherDb.get(keys[i])).isNull(); + assertThat(otherDb.get(otherKeys[i])).isEqualTo(values[i]); + } + } finally { + if (db != null) { + db.close(); + } + if (otherDb != null) { + otherDb.close(); + } + if (options != null) { + options.dispose(); + } + if (env != null) { + env.dispose(); + } + } + } + + @Test(expected = RocksDBException.class) + public void createIfMissingFalse() throws RocksDBException { + Env env = null; + Options options = null; + RocksDB db = null; + + try { + env = new RocksMemEnv(); + options = new Options(). + setCreateIfMissing(false). + setEnv(env); + // shall throw an exception because db dir does not + // exist. + db = RocksDB.open(options, "db/dir"); + } finally { + if (options != null) { + options.dispose(); + } + if (env != null) { + env.dispose(); + } + } + } +}