diff --git a/Makefile b/Makefile index bf0700177..e1e982f15 100644 --- a/Makefile +++ b/Makefile @@ -402,7 +402,7 @@ ldb: tools/ldb.o $(LIBOBJECTS) # --------------------------------------------------------------------------- # Jni stuff # --------------------------------------------------------------------------- -JNI_NATIVE_SOURCES = ./java/rocksjni/rocksjni.cc ./java/rocksjni/options.cc +JNI_NATIVE_SOURCES = ./java/rocksjni/rocksjni.cc ./java/rocksjni/options.cc ./java/rocksjni/write_batch.cc JAVA_INCLUDE = -I/usr/lib/jvm/java-openjdk/include/ -I/usr/lib/jvm/java-openjdk/include/linux ROCKSDBJNILIB = ./java/librocksdbjni.so @@ -415,6 +415,7 @@ endif jni: clean OPT="-fPIC -DNDEBUG -O2" $(MAKE) $(LIBRARY) -j32 cd java;$(MAKE) java; + rm -f $(ROCKSDBJNILIB) $(CXX) $(CXXFLAGS) -I./java/. $(JAVA_INCLUDE) -shared -fPIC -o $(ROCKSDBJNILIB) $(JNI_NATIVE_SOURCES) $(LIBOBJECTS) $(LDFLAGS) $(COVERAGEFLAGS) jclean: @@ -422,7 +423,7 @@ jclean: rm -f $(ROCKSDBJNILIB) jtest: - cd java;$(MAKE) sample; + cd java;$(MAKE) sample;$(MAKE) test; # --------------------------------------------------------------------------- # Platform-specific compilation diff --git a/java/Makefile b/java/Makefile index 8168d3418..10dd4f110 100644 --- a/java/Makefile +++ b/java/Makefile @@ -1,4 +1,4 @@ -NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB org.rocksdb.Options +NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB org.rocksdb.Options org.rocksdb.WriteBatch org.rocksdb.WriteBatchInternal org.rocksdb.WriteBatchTest org.rocksdb.WriteOptions NATIVE_INCLUDE = ./include ROCKSDB_JAR = rocksdbjni.jar @@ -19,3 +19,6 @@ sample: java -ea -Djava.library.path=.:../ -cp ".:./*" RocksDBSample /tmp/rocksdbjni @rm -rf /tmp/rocksdbjni @rm -rf /tmp/rocksdbjni_not_found + +test: + java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.WriteBatchTest diff --git a/java/RocksDBSample.java b/java/RocksDBSample.java index 4e06bb29d..e6421778c 100644 --- a/java/RocksDBSample.java +++ b/java/RocksDBSample.java @@ -93,6 +93,21 @@ public class RocksDBSample { assert(len == RocksDB.NOT_FOUND); len = db.get(testKey, enoughArray); assert(len == testValue.length); + + db.remove(testKey); + len = db.get(testKey, enoughArray); + assert(len == RocksDB.NOT_FOUND); + + // repeat the test with WriteOptions + WriteOptions writeOpts = new WriteOptions(); + writeOpts.setSync(true); + writeOpts.setDisableWAL(true); + db.put(writeOpts, testKey, testValue); + len = db.get(testKey, enoughArray); + assert(len == testValue.length); + assert(new String(testValue).equals( + new String(enoughArray, 0, len))); + writeOpts.dispose(); } catch (RocksDBException e) { System.err.println(e); } diff --git a/java/org/rocksdb/RocksDB.java b/java/org/rocksdb/RocksDB.java index b08694081..bdab8be1b 100644 --- a/java/org/rocksdb/RocksDB.java +++ b/java/org/rocksdb/RocksDB.java @@ -1,4 +1,4 @@ -// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// Copyright (c) 2014, Facebook, Inc. All rights reserved. // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. @@ -61,7 +61,18 @@ public class RocksDB { * @param value the value associated with the specified key. */ public void put(byte[] key, byte[] value) throws RocksDBException { - put(key, key.length, value, value.length); + put(nativeHandle_, key, key.length, value, value.length); + } + + /** + * Set the database entry for "key" to "value". + * + * @param key the specified key to be inserted. + * @param value the value associated with the specified key. + */ + public void put(WriteOptions writeOpts, byte[] key, byte[] value) + throws RocksDBException { + put(nativeHandle_, writeOpts.nativeHandle_, key, key.length, value, value.length); } /** @@ -77,7 +88,7 @@ public class RocksDB { * found. */ public int get(byte[] key, byte[] value) throws RocksDBException { - return get(key, key.length, value, value.length); + return get(nativeHandle_, key, key.length, value, value.length); } /** @@ -92,7 +103,26 @@ public class RocksDB { * @see RocksDBException */ public byte[] get(byte[] key) throws RocksDBException { - return get(key, key.length); + return get(nativeHandle_, key, key.length); + } + + /** + * Remove the database entry (if any) for "key". Returns OK on + * success, and a non-OK status on error. It is not an error if "key" + * did not exist in the database. + */ + public void remove(byte[] key) throws RocksDBException { + remove(nativeHandle_, key, key.length); + } + + /** + * Remove the database entry (if any) for "key". Returns OK on + * success, and a non-OK status on error. It is not an error if "key" + * did not exist in the database. + */ + public void remove(WriteOptions writeOpt, byte[] key) + throws RocksDBException { + remove(nativeHandle_, writeOpt.nativeHandle_, key, key.length); } @Override protected void finalize() { @@ -108,14 +138,24 @@ public class RocksDB { // native methods private native void open0(String path) throws RocksDBException; - private native void open(long optionsHandle, String path) throws RocksDBException; + private native void open( + long optionsHandle, String path) throws RocksDBException; private native void put( + long handle, byte[] key, int keyLen, + byte[] value, int valueLen) throws RocksDBException; + private native void put( + long handle, long writeOptHandle, byte[] key, int keyLen, byte[] value, int valueLen) throws RocksDBException; private native int get( - byte[] key, int keyLen, + long handle, byte[] key, int keyLen, byte[] value, int valueLen) throws RocksDBException; private native byte[] get( + long handle, byte[] key, int keyLen) throws RocksDBException; + private native void remove( + long handle, byte[] key, int keyLen) throws RocksDBException; + private native void remove( + long handle, long writeOptHandle, byte[] key, int keyLen) throws RocksDBException; private native void close0(); diff --git a/java/org/rocksdb/WriteBatch.java b/java/org/rocksdb/WriteBatch.java new file mode 100644 index 000000000..acacee3f0 --- /dev/null +++ b/java/org/rocksdb/WriteBatch.java @@ -0,0 +1,121 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +package org.rocksdb; + +import java.lang.*; +import java.util.*; + +/** + * WriteBatch holds a collection of updates to apply atomically to a DB. + * + * The updates are applied in the order in which they are added + * to the WriteBatch. For example, the value of "key" will be "v3" + * after the following batch is written: + * + * batch.put("key", "v1"); + * batch.remove("key"); + * batch.put("key", "v2"); + * batch.put("key", "v3"); + * + * Multiple threads can invoke const methods on a WriteBatch without + * external synchronization, but if any of the threads may call a + * non-const method, all threads accessing the same WriteBatch must use + * external synchronization. + */ +public class WriteBatch { + public WriteBatch() { + nativeHandle_ = 0; + newWriteBatch(0); + } + + public WriteBatch(int reserved_bytes) { + nativeHandle_ = 0; + newWriteBatch(reserved_bytes); + } + + /** + * Returns the number of updates in the batch. + */ + public native int count(); + + /** + * Store the mapping "key->value" in the database. + */ + public void put(byte[] key, byte[] value) { + put(key, key.length, value, value.length); + } + + /** + * Merge "value" with the existing value of "key" in the database. + * "key->merge(existing, value)" + */ + public void merge(byte[] key, byte[] value) { + merge(key, key.length, value, value.length); + } + + /** + * If the database contains a mapping for "key", erase it. Else do nothing. + */ + public void remove(byte[] key) { + remove(key, key.length); + } + + /** + * Append a blob of arbitrary size to the records in this batch. The blob will + * be stored in the transaction log but not in any other file. In particular, + * it will not be persisted to the SST files. When iterating over this + * WriteBatch, WriteBatch::Handler::LogData will be called with the contents + * of the blob as it is encountered. Blobs, puts, deletes, and merges will be + * encountered in the same order in thich they were inserted. The blob will + * NOT consume sequence number(s) and will NOT increase the count of the batch + * + * Example application: add timestamps to the transaction log for use in + * replication. + */ + public void putLogData(byte[] blob) { + putLogData(blob, blob.length); + } + + /** + * Clear all updates buffered in this batch + */ + public native void clear(); + + /** + * Delete the c++ side pointer. + */ + public synchronized void dispose() { + if (nativeHandle_ != 0) { + dispose0(); + } + } + + @Override protected void finalize() { + dispose(); + } + + private native void newWriteBatch(int reserved_bytes); + private native void put(byte[] key, int keyLen, + byte[] value, int valueLen); + private native void merge(byte[] key, int keyLen, + byte[] value, int valueLen); + private native void remove(byte[] key, int keyLen); + private native void putLogData(byte[] blob, int blobLen); + private native void dispose0(); + + private long nativeHandle_; +} + +/** + * Package-private class which provides java api to access + * c++ WriteBatchInternal. + */ +class WriteBatchInternal { + static native void setSequence(WriteBatch batch, long sn); + static native long sequence(WriteBatch batch); + static native void append(WriteBatch b1, WriteBatch b2); +} + diff --git a/java/org/rocksdb/WriteBatchTest.java b/java/org/rocksdb/WriteBatchTest.java new file mode 100644 index 000000000..283caca65 --- /dev/null +++ b/java/org/rocksdb/WriteBatchTest.java @@ -0,0 +1,125 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +package org.rocksdb; + +import java.util.*; +import java.lang.*; +import java.io.UnsupportedEncodingException; + +/** + * This class mimics the db/write_batch_test.cc in the c++ rocksdb library. + */ +public class WriteBatchTest { + static { + System.loadLibrary("rocksdbjni"); + } + + public static void main(String args[]) { + System.out.println("Testing WriteBatchTest.Empty ==="); + Empty(); + + System.out.println("Testing WriteBatchTest.Multiple ==="); + Multiple(); + + System.out.println("Testing WriteBatchTest.Append ==="); + Append(); + + System.out.println("Testing WriteBatchTest.Blob ==="); + Blob(); + + // The following tests have not yet ported. + // Continue(); + // PutGatherSlices(); + + System.out.println("Passed all WriteBatchTest!"); + } + + static void Empty() { + WriteBatch batch = new WriteBatch(); + assert(batch.count() == 0); + } + + static void Multiple() { + try { + WriteBatch batch = new WriteBatch(); + batch.put("foo".getBytes("US-ASCII"), "bar".getBytes("US-ASCII")); + batch.remove("box".getBytes("US-ASCII")); + batch.put("baz".getBytes("US-ASCII"), "boo".getBytes("US-ASCII")); + WriteBatchInternal.setSequence(batch, 100); + assert(100 == WriteBatchInternal.sequence(batch)); + assert(3 == batch.count()); + assert(new String("Put(baz, boo)@102" + + "Delete(box)@101" + + "Put(foo, bar)@100") + .equals(new String(getContents(batch), "US-ASCII"))); + } catch (UnsupportedEncodingException e) { + System.err.println(e); + assert(false); + } + } + + static void Append() { + WriteBatch b1 = new WriteBatch(); + WriteBatch b2 = new WriteBatch(); + WriteBatchInternal.setSequence(b1, 200); + WriteBatchInternal.setSequence(b2, 300); + WriteBatchInternal.append(b1, b2); + assert(getContents(b1).length == 0); + assert(b1.count() == 0); + try { + b2.put("a".getBytes("US-ASCII"), "va".getBytes("US-ASCII")); + WriteBatchInternal.append(b1, b2); + assert("Put(a, va)@200".equals(new String(getContents(b1), "US-ASCII"))); + assert(1 == b1.count()); + b2.clear(); + b2.put("b".getBytes("US-ASCII"), "vb".getBytes("US-ASCII")); + WriteBatchInternal.append(b1, b2); + assert(new String("Put(a, va)@200" + + "Put(b, vb)@201") + .equals(new String(getContents(b1), "US-ASCII"))); + assert(2 == b1.count()); + b2.remove("foo".getBytes("US-ASCII")); + WriteBatchInternal.append(b1, b2); + assert(new String("Put(a, va)@200" + + "Put(b, vb)@202" + + "Put(b, vb)@201" + + "Delete(foo)@203") + .equals(new String(getContents(b1), "US-ASCII"))); + assert(4 == b1.count()); + } catch (UnsupportedEncodingException e) { + System.err.println(e); + assert(false); + } + } + + static void Blob() { + WriteBatch batch = new WriteBatch(); + try { + batch.put("k1".getBytes("US-ASCII"), "v1".getBytes("US-ASCII")); + batch.put("k2".getBytes("US-ASCII"), "v2".getBytes("US-ASCII")); + batch.put("k3".getBytes("US-ASCII"), "v3".getBytes("US-ASCII")); + batch.putLogData("blob1".getBytes("US-ASCII")); + batch.remove("k2".getBytes("US-ASCII")); + batch.putLogData("blob2".getBytes("US-ASCII")); + batch.merge("foo".getBytes("US-ASCII"), "bar".getBytes("US-ASCII")); + assert(5 == batch.count()); + assert(new String("Merge(foo, bar)@4" + + "Put(k1, v1)@0" + + "Delete(k2)@3" + + "Put(k2, v2)@1" + + "Put(k3, v3)@2") + .equals(new String(getContents(batch), "US-ASCII"))); + } catch (UnsupportedEncodingException e) { + System.err.println(e); + assert(false); + } + } + + static native byte[] getContents(WriteBatch batch); +} diff --git a/java/org/rocksdb/WriteOptions.java b/java/org/rocksdb/WriteOptions.java new file mode 100644 index 000000000..26f0e2b7c --- /dev/null +++ b/java/org/rocksdb/WriteOptions.java @@ -0,0 +1,96 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +package org.rocksdb; + +/** + * Options that control write operations. + * + * Note that developers should call WriteOptions.dispose() to release the + * c++ side memory before a WriteOptions instance runs out of scope. + */ +public class WriteOptions { + public WriteOptions() { + nativeHandle_ = 0; + newWriteOptions(); + } + + public synchronized void dispose() { + if (nativeHandle_ != 0) { + dispose0(nativeHandle_); + } + } + + /** + * If true, the write will be flushed from the operating system + * buffer cache (by calling WritableFile::Sync()) before the write + * is considered complete. If this flag is true, writes will be + * slower. + * + * If this flag is false, and the machine crashes, some recent + * writes may be lost. Note that if it is just the process that + * crashes (i.e., the machine does not reboot), no writes will be + * lost even if sync==false. + * + * In other words, a DB write with sync==false has similar + * crash semantics as the "write()" system call. A DB write + * with sync==true has similar crash semantics to a "write()" + * system call followed by "fdatasync()". + * + * Default: false + */ + public void setSync(boolean flag) { + setSync(nativeHandle_, flag); + } + + /** + * If true, the write will be flushed from the operating system + * buffer cache (by calling WritableFile::Sync()) before the write + * is considered complete. If this flag is true, writes will be + * slower. + * + * If this flag is false, and the machine crashes, some recent + * writes may be lost. Note that if it is just the process that + * crashes (i.e., the machine does not reboot), no writes will be + * lost even if sync==false. + * + * In other words, a DB write with sync==false has similar + * crash semantics as the "write()" system call. A DB write + * with sync==true has similar crash semantics to a "write()" + * system call followed by "fdatasync()". + */ + public boolean sync() { + return sync(nativeHandle_); + } + + /** + * If true, writes will not first go to the write ahead log, + * and the write may got lost after a crash. + */ + public void setDisableWAL(boolean flag) { + setDisableWAL(nativeHandle_, flag); + } + + /** + * If true, writes will not first go to the write ahead log, + * and the write may got lost after a crash. + */ + public boolean disableWAL() { + return disableWAL(nativeHandle_); + } + + @Override protected void finalize() { + dispose(); + } + + private native void newWriteOptions(); + private native void setSync(long handle, boolean flag); + private native boolean sync(long handle); + private native void setDisableWAL(long handle, boolean flag); + private native boolean disableWAL(long handle); + private native void dispose0(long handle); + + protected long nativeHandle_; +} diff --git a/java/rocksjni/options.cc b/java/rocksjni/options.cc index ef308bc4d..69224f6d0 100644 --- a/java/rocksjni/options.cc +++ b/java/rocksjni/options.cc @@ -11,8 +11,10 @@ #include #include "include/org_rocksdb_Options.h" +#include "include/org_rocksdb_WriteOptions.h" #include "rocksjni/portal.h" #include "rocksdb/db.h" +#include "rocksdb/options.h" /* * Class: org_rocksdb_Options @@ -55,3 +57,72 @@ jboolean Java_org_rocksdb_Options_createIfMissing( JNIEnv* env, jobject jobj, jlong jhandle) { return reinterpret_cast(jhandle)->create_if_missing; } + +////////////////////////////////////////////////////////////////////////////// +// WriteOptions + +/* + * Class: org_rocksdb_WriteOptions + * Method: newWriteOptions + * Signature: ()V + */ +void Java_org_rocksdb_WriteOptions_newWriteOptions( + JNIEnv* env, jobject jwrite_options) { + rocksdb::WriteOptions* op = new rocksdb::WriteOptions(); + rocksdb::WriteOptionsJni::setHandle(env, jwrite_options, op); +} + +/* + * Class: org_rocksdb_WriteOptions + * Method: dispose0 + * Signature: ()V + */ +void Java_org_rocksdb_WriteOptions_dispose0( + JNIEnv* env, jobject jwrite_options, jlong jhandle) { + auto write_options = reinterpret_cast(jhandle); + delete write_options; + + rocksdb::WriteOptionsJni::setHandle(env, jwrite_options, nullptr); +} + +/* + * Class: org_rocksdb_WriteOptions + * Method: setSync + * Signature: (JZ)V + */ +void Java_org_rocksdb_WriteOptions_setSync( + JNIEnv* env, jobject jwrite_options, jlong jhandle, jboolean jflag) { + reinterpret_cast(jhandle)->sync = jflag; +} + +/* + * Class: org_rocksdb_WriteOptions + * Method: sync + * Signature: (J)Z + */ +jboolean Java_org_rocksdb_WriteOptions_sync( + JNIEnv* env, jobject jwrite_options, jlong jhandle) { + return reinterpret_cast(jhandle)->sync; +} + +/* + * Class: org_rocksdb_WriteOptions + * Method: setDisableWAL + * Signature: (JZ)V + */ +void Java_org_rocksdb_WriteOptions_setDisableWAL( + JNIEnv* env, jobject jwrite_options, jlong jhandle, jboolean jflag) { + reinterpret_cast(jhandle)->disableWAL = jflag; +} + +/* + * Class: org_rocksdb_WriteOptions + * Method: disableWAL + * Signature: (J)Z + */ +jboolean Java_org_rocksdb_WriteOptions_disableWAL( + JNIEnv* env, jobject jwrite_options, jlong jhandle) { + return reinterpret_cast(jhandle)->disableWAL; +} + + diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index a90b82514..5b0524aec 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -109,5 +109,66 @@ class OptionsJni { } }; +class WriteOptionsJni { + public: + // Get the java class id of org.rocksdb.WriteOptions. + static jclass getJClass(JNIEnv* env) { + static jclass jclazz = env->FindClass("org/rocksdb/WriteOptions"); + assert(jclazz != nullptr); + return jclazz; + } + + // Get the field id of the member variable of org.rocksdb.WriteOptions + // that stores the pointer to rocksdb::WriteOptions + static jfieldID getHandleFieldID(JNIEnv* env) { + static jfieldID fid = env->GetFieldID( + getJClass(env), "nativeHandle_", "J"); + assert(fid != nullptr); + return fid; + } + + // Get the pointer to rocksdb::WriteOptions + static rocksdb::WriteOptions* getHandle(JNIEnv* env, jobject jobj) { + return reinterpret_cast( + env->GetLongField(jobj, getHandleFieldID(env))); + } + + // Pass the rocksdb::WriteOptions pointer to the java side. + static void setHandle(JNIEnv* env, jobject jobj, rocksdb::WriteOptions* op) { + env->SetLongField( + jobj, getHandleFieldID(env), + reinterpret_cast(op)); + } +}; + +class WriteBatchJni { + public: + static jclass getJClass(JNIEnv* env) { + static jclass jclazz = env->FindClass("org/rocksdb/WriteBatch"); + assert(jclazz != nullptr); + return jclazz; + } + + static jfieldID getHandleFieldID(JNIEnv* env) { + static jfieldID fid = env->GetFieldID( + getJClass(env), "nativeHandle_", "J"); + assert(fid != nullptr); + return fid; + } + + // Get the pointer to rocksdb::WriteBatch of the specified + // org.rocksdb.WriteBatch. + static rocksdb::WriteBatch* getHandle(JNIEnv* env, jobject jwb) { + return reinterpret_cast( + env->GetLongField(jwb, getHandleFieldID(env))); + } + + // Pass the rocksdb::WriteBatch pointer to the java side. + static void setHandle(JNIEnv* env, jobject jwb, rocksdb::WriteBatch* wb) { + env->SetLongField( + jwb, getHandleFieldID(env), + reinterpret_cast(wb)); + } +}; } // namespace rocksdb #endif // JAVA_ROCKSJNI_PORTAL_H_ diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc index b5d42c0c7..ccd87105d 100644 --- a/java/rocksjni/rocksjni.cc +++ b/java/rocksjni/rocksjni.cc @@ -15,6 +15,9 @@ #include "rocksjni/portal.h" #include "rocksdb/db.h" +////////////////////////////////////////////////////////////////////////////// +// rocksdb::DB::Open + void rocksdb_open_helper( JNIEnv* env, jobject java_db, jstring jdb_path, const rocksdb::Options& opt) { rocksdb::DB* db; @@ -54,27 +57,20 @@ void Java_org_rocksdb_RocksDB_open( rocksdb_open_helper(env, jdb, jdb_path, *options); } -/* - * Class: org_rocksdb_RocksDB - * Method: put - * Signature: ([BI[BI)V - */ -void Java_org_rocksdb_RocksDB_put( - JNIEnv* env, jobject jdb, +////////////////////////////////////////////////////////////////////////////// +// rocksdb::DB::Put + +void rocksdb_put_helper( + JNIEnv* env, rocksdb::DB* db, const rocksdb::WriteOptions& write_options, jbyteArray jkey, jint jkey_len, jbyteArray jvalue, jint jvalue_len) { - rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, jdb); - jboolean isCopy; - jbyte* key = env->GetByteArrayElements(jkey, &isCopy); - jbyte* value = env->GetByteArrayElements(jvalue, &isCopy); - rocksdb::Slice key_slice( - reinterpret_cast(key), jkey_len); - rocksdb::Slice value_slice( - reinterpret_cast(value), jvalue_len); + jbyte* key = env->GetByteArrayElements(jkey, 0); + jbyte* value = env->GetByteArrayElements(jvalue, 0); + rocksdb::Slice key_slice(reinterpret_cast(key), jkey_len); + rocksdb::Slice value_slice(reinterpret_cast(value), jvalue_len); - rocksdb::Status s = db->Put( - rocksdb::WriteOptions(), key_slice, value_slice); + rocksdb::Status s = db->Put(write_options, key_slice, value_slice); // trigger java unref on key and value. // by passing JNI_ABORT, it will simply release the reference without @@ -90,12 +86,53 @@ void Java_org_rocksdb_RocksDB_put( /* * Class: org_rocksdb_RocksDB - * Method: get - * Signature: ([BI)[B + * Method: put + * Signature: (J[BI[BI)V */ -jbyteArray Java_org_rocksdb_RocksDB_get___3BI( - JNIEnv* env, jobject jdb, jbyteArray jkey, jint jkey_len) { - rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, jdb); +void Java_org_rocksdb_RocksDB_put__J_3BI_3BI( + JNIEnv* env, jobject jdb, jlong jdb_handle, + jbyteArray jkey, jint jkey_len, + jbyteArray jvalue, jint jvalue_len) { + auto db = reinterpret_cast(jdb_handle); + static const rocksdb::WriteOptions default_write_options = + rocksdb::WriteOptions(); + + rocksdb_put_helper(env, db, default_write_options, + jkey, jkey_len, + jvalue, jvalue_len); +} + +/* + * Class: org_rocksdb_RocksDB + * Method: put + * Signature: (JJ[BI[BI)V + */ +void Java_org_rocksdb_RocksDB_put__JJ_3BI_3BI( + JNIEnv* env, jobject jdb, + jlong jdb_handle, jlong jwrite_options_handle, + jbyteArray jkey, jint jkey_len, + jbyteArray jvalue, jint jvalue_len) { + auto db = reinterpret_cast(jdb_handle); + auto write_options = reinterpret_cast( + jwrite_options_handle); + + rocksdb_put_helper(env, db, *write_options, + jkey, jkey_len, + jvalue, jvalue_len); +} + +////////////////////////////////////////////////////////////////////////////// +// rocksdb::DB::Get + +/* + * Class: org_rocksdb_RocksDB + * Method: get + * Signature: (J[BI)[B + */ +jbyteArray Java_org_rocksdb_RocksDB_get__J_3BI( + JNIEnv* env, jobject jdb, jlong jdb_handle, + jbyteArray jkey, jint jkey_len) { + auto db = reinterpret_cast(jdb_handle); jboolean isCopy; jbyte* key = env->GetByteArrayElements(jkey, &isCopy); @@ -131,20 +168,17 @@ jbyteArray Java_org_rocksdb_RocksDB_get___3BI( /* * Class: org_rocksdb_RocksDB * Method: get - * Signature: ([BI[BI)I + * Signature: (J[BI[BI)I */ -jint Java_org_rocksdb_RocksDB_get___3BI_3BI( - JNIEnv* env, jobject jdb, +jint Java_org_rocksdb_RocksDB_get__J_3BI_3BI( + JNIEnv* env, jobject jdb, jlong jdb_handle, jbyteArray jkey, jint jkey_len, jbyteArray jvalue, jint jvalue_len) { static const int kNotFound = -1; static const int kStatusError = -2; + auto db = reinterpret_cast(jdb_handle); - rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, jdb); - - jboolean isCopy; - jbyte* key = env->GetByteArrayElements(jkey, &isCopy); - jbyte* value = env->GetByteArrayElements(jvalue, &isCopy); + jbyte* key = env->GetByteArrayElements(jkey, 0); rocksdb::Slice key_slice( reinterpret_cast(key), jkey_len); @@ -160,10 +194,8 @@ jint Java_org_rocksdb_RocksDB_get___3BI_3BI( env->ReleaseByteArrayElements(jkey, key, JNI_ABORT); if (s.IsNotFound()) { - env->ReleaseByteArrayElements(jvalue, value, JNI_ABORT); return kNotFound; } else if (!s.ok()) { - env->ReleaseByteArrayElements(jvalue, value, JNI_ABORT); // Here since we are throwing a Java exception from c++ side. // As a result, c++ does not know calling this function will in fact // throwing an exception. As a result, the execution flow will @@ -179,11 +211,65 @@ jint Java_org_rocksdb_RocksDB_get___3BI_3BI( int cvalue_len = static_cast(cvalue.size()); int length = std::min(jvalue_len, cvalue_len); - memcpy(value, cvalue.c_str(), length); - env->ReleaseByteArrayElements(jvalue, value, JNI_COMMIT); - return static_cast(cvalue_len); + env->SetByteArrayRegion( + jvalue, 0, length, + reinterpret_cast(cvalue.c_str())); + return cvalue_len; } +////////////////////////////////////////////////////////////////////////////// +// rocksdb::DB::Delete() +void rocksdb_remove_helper( + JNIEnv* env, rocksdb::DB* db, const rocksdb::WriteOptions& write_options, + jbyteArray jkey, jint jkey_len) { + jbyte* key = env->GetByteArrayElements(jkey, 0); + rocksdb::Slice key_slice(reinterpret_cast(key), jkey_len); + + rocksdb::Status s = db->Delete(write_options, key_slice); + + // trigger java unref on key and value. + // by passing JNI_ABORT, it will simply release the reference without + // copying the result back to the java byte array. + env->ReleaseByteArrayElements(jkey, key, JNI_ABORT); + + if (!s.ok()) { + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + } + return; +} + +/* + * Class: org_rocksdb_RocksDB + * Method: remove + * Signature: (J[BI)V + */ +void Java_org_rocksdb_RocksDB_remove__J_3BI( + JNIEnv* env, jobject jdb, jlong jdb_handle, + jbyteArray jkey, jint jkey_len) { + auto db = reinterpret_cast(jdb_handle); + static const rocksdb::WriteOptions default_write_options = + rocksdb::WriteOptions(); + + rocksdb_remove_helper(env, db, default_write_options, jkey, jkey_len); +} + +/* + * Class: org_rocksdb_RocksDB + * Method: remove + * Signature: (JJ[BI)V + */ +void Java_org_rocksdb_RocksDB_remove__JJ_3BI( + JNIEnv* env, jobject jdb, jlong jdb_handle, + jlong jwrite_options, jbyteArray jkey, jint jkey_len) { + auto db = reinterpret_cast(jdb_handle); + auto write_options = reinterpret_cast(jwrite_options); + + rocksdb_remove_helper(env, db, *write_options, jkey, jkey_len); +} + +////////////////////////////////////////////////////////////////////////////// +// rocksdb::DB::~DB() + /* * Class: org_rocksdb_RocksDB * Method: close0 @@ -192,8 +278,8 @@ jint Java_org_rocksdb_RocksDB_get___3BI_3BI( void Java_org_rocksdb_RocksDB_close0( JNIEnv* env, jobject java_db) { rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, java_db); + assert(db != nullptr); delete db; - db = nullptr; - rocksdb::RocksDBJni::setHandle(env, java_db, db); + rocksdb::RocksDBJni::setHandle(env, java_db, nullptr); } diff --git a/java/rocksjni/write_batch.cc b/java/rocksjni/write_batch.cc new file mode 100644 index 000000000..f72c3ba6d --- /dev/null +++ b/java/rocksjni/write_batch.cc @@ -0,0 +1,263 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// This file implements the "bridge" between Java and C++ and enables +// calling c++ rocksdb::WriteBatch methods from Java side. +#include + +#include "include/org_rocksdb_WriteBatch.h" +#include "include/org_rocksdb_WriteBatchInternal.h" +#include "include/org_rocksdb_WriteBatchTest.h" +#include "rocksjni/portal.h" +#include "rocksdb/db.h" +#include "db/memtable.h" +#include "rocksdb/write_batch.h" +#include "db/write_batch_internal.h" +#include "rocksdb/env.h" +#include "rocksdb/memtablerep.h" +#include "util/logging.h" +#include "util/testharness.h" + +/* + * Class: org_rocksdb_WriteBatch + * Method: newWriteBatch + * Signature: (I)V + */ +void Java_org_rocksdb_WriteBatch_newWriteBatch( + JNIEnv* env, jobject jobj, jint jreserved_bytes) { + rocksdb::WriteBatch* wb = new rocksdb::WriteBatch( + static_cast(jreserved_bytes)); + + rocksdb::WriteBatchJni::setHandle(env, jobj, wb); +} + +/* + * Class: org_rocksdb_WriteBatch + * Method: count + * Signature: ()I + */ +jint Java_org_rocksdb_WriteBatch_count(JNIEnv* env, jobject jobj) { + rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj); + assert(wb != nullptr); + + return static_cast(wb->Count()); +} + +/* + * Class: org_rocksdb_WriteBatch + * Method: clear + * Signature: ()V + */ +void Java_org_rocksdb_WriteBatch_clear(JNIEnv* env, jobject jobj) { + rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj); + assert(wb != nullptr); + + wb->Clear(); +} + +/* + * Class: org_rocksdb_WriteBatch + * Method: put + * Signature: ([BI[BI)V + */ +void Java_org_rocksdb_WriteBatch_put( + JNIEnv* env, jobject jobj, + jbyteArray jkey, jint jkey_len, + jbyteArray jvalue, jint jvalue_len) { + rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj); + assert(wb != nullptr); + + jbyte* key = env->GetByteArrayElements(jkey, nullptr); + jbyte* value = env->GetByteArrayElements(jvalue, nullptr); + rocksdb::Slice key_slice(reinterpret_cast(key), jkey_len); + rocksdb::Slice value_slice(reinterpret_cast(value), jvalue_len); + wb->Put(key_slice, value_slice); + env->ReleaseByteArrayElements(jkey, key, JNI_ABORT); + env->ReleaseByteArrayElements(jvalue, value, JNI_ABORT); +} + +/* + * Class: org_rocksdb_WriteBatch + * Method: merge + * Signature: ([BI[BI)V + */ +JNIEXPORT void JNICALL Java_org_rocksdb_WriteBatch_merge( + JNIEnv* env, jobject jobj, + jbyteArray jkey, jint jkey_len, + jbyteArray jvalue, jint jvalue_len) { + rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj); + assert(wb != nullptr); + + jbyte* key = env->GetByteArrayElements(jkey, nullptr); + jbyte* value = env->GetByteArrayElements(jvalue, nullptr); + rocksdb::Slice key_slice(reinterpret_cast(key), jkey_len); + rocksdb::Slice value_slice(reinterpret_cast(value), jvalue_len); + wb->Merge(key_slice, value_slice); + env->ReleaseByteArrayElements(jkey, key, JNI_ABORT); + env->ReleaseByteArrayElements(jvalue, value, JNI_ABORT); +} + +/* + * Class: org_rocksdb_WriteBatch + * Method: remove + * Signature: ([BI)V + */ +JNIEXPORT void JNICALL Java_org_rocksdb_WriteBatch_remove( + JNIEnv* env, jobject jobj, + jbyteArray jkey, jint jkey_len) { + rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj); + assert(wb != nullptr); + + jbyte* key = env->GetByteArrayElements(jkey, nullptr); + rocksdb::Slice key_slice(reinterpret_cast(key), jkey_len); + wb->Delete(key_slice); + env->ReleaseByteArrayElements(jkey, key, JNI_ABORT); +} + +/* + * Class: org_rocksdb_WriteBatch + * Method: putLogData + * Signature: ([BI)V + */ +void Java_org_rocksdb_WriteBatch_putLogData( + JNIEnv* env, jobject jobj, jbyteArray jblob, jint jblob_len) { + rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj); + assert(wb != nullptr); + + jbyte* blob = env->GetByteArrayElements(jblob, nullptr); + rocksdb::Slice blob_slice(reinterpret_cast(blob), jblob_len); + wb->PutLogData(blob_slice); + env->ReleaseByteArrayElements(jblob, blob, JNI_ABORT); +} + +/* + * Class: org_rocksdb_WriteBatch + * Method: dispose0 + * Signature: ()V + */ +void Java_org_rocksdb_WriteBatch_dispose0(JNIEnv* env, jobject jobj) { + rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj); + assert(wb != nullptr); + delete wb; + + rocksdb::WriteBatchJni::setHandle(env, jobj, nullptr); +} + +/* + * Class: org_rocksdb_WriteBatchInternal + * Method: setSequence + * Signature: (Lorg/rocksdb/WriteBatch;J)V + */ +void Java_org_rocksdb_WriteBatchInternal_setSequence( + JNIEnv* env, jclass jclazz, jobject jobj, jlong jsn) { + rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj); + assert(wb != nullptr); + + rocksdb::WriteBatchInternal::SetSequence( + wb, static_cast(jsn)); +} + +/* + * Class: org_rocksdb_WriteBatchInternal + * Method: sequence + * Signature: (Lorg/rocksdb/WriteBatch;)J + */ +jlong Java_org_rocksdb_WriteBatchInternal_sequence( + JNIEnv* env, jclass jclazz, jobject jobj) { + rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj); + assert(wb != nullptr); + + return static_cast(rocksdb::WriteBatchInternal::Sequence(wb)); +} + +/* + * Class: org_rocksdb_WriteBatchInternal + * Method: append + * Signature: (Lorg/rocksdb/WriteBatch;Lorg/rocksdb/WriteBatch;)V + */ +void Java_org_rocksdb_WriteBatchInternal_append( + JNIEnv* env, jclass jclazz, jobject jwb1, jobject jwb2) { + rocksdb::WriteBatch* wb1 = rocksdb::WriteBatchJni::getHandle(env, jwb1); + assert(wb1 != nullptr); + rocksdb::WriteBatch* wb2 = rocksdb::WriteBatchJni::getHandle(env, jwb2); + assert(wb2 != nullptr); + + rocksdb::WriteBatchInternal::Append(wb1, wb2); +} + +/* + * Class: org_rocksdb_WriteBatchTest + * Method: getContents + * Signature: (Lorg/rocksdb/WriteBatch;)[B + */ +jbyteArray Java_org_rocksdb_WriteBatchTest_getContents( + JNIEnv* env, jclass jclazz, jobject jobj) { + rocksdb::WriteBatch* b = rocksdb::WriteBatchJni::getHandle(env, jobj); + assert(b != nullptr); + + // todo: Currently the following code is directly copied from + // db/write_bench_test.cc. It could be implemented in java once + // all the necessary components can be accessed via jni api. + + rocksdb::InternalKeyComparator cmp(rocksdb::BytewiseComparator()); + auto factory = std::make_shared(); + rocksdb::Options options; + options.memtable_factory = factory; + rocksdb::MemTable* mem = new rocksdb::MemTable(cmp, options); + mem->Ref(); + std::string state; + rocksdb::Status s = rocksdb::WriteBatchInternal::InsertInto(b, mem, &options); + int count = 0; + rocksdb::Iterator* iter = mem->NewIterator(); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + rocksdb::ParsedInternalKey ikey; + memset(reinterpret_cast(&ikey), 0, sizeof(ikey)); + ASSERT_TRUE(rocksdb::ParseInternalKey(iter->key(), &ikey)); + switch (ikey.type) { + case rocksdb::kTypeValue: + state.append("Put("); + state.append(ikey.user_key.ToString()); + state.append(", "); + state.append(iter->value().ToString()); + state.append(")"); + count++; + break; + case rocksdb::kTypeMerge: + state.append("Merge("); + state.append(ikey.user_key.ToString()); + state.append(", "); + state.append(iter->value().ToString()); + state.append(")"); + count++; + break; + case rocksdb::kTypeDeletion: + state.append("Delete("); + state.append(ikey.user_key.ToString()); + state.append(")"); + count++; + break; + default: + assert(false); + break; + } + state.append("@"); + state.append(rocksdb::NumberToString(ikey.sequence)); + } + delete iter; + if (!s.ok()) { + state.append(s.ToString()); + } else if (count != rocksdb::WriteBatchInternal::Count(b)) { + state.append("CountMismatch()"); + } + delete mem->Unref(); + + jbyteArray jstate = env->NewByteArray(state.size()); + env->SetByteArrayRegion( + jstate, 0, state.size(), + reinterpret_cast(state.c_str())); + + return jstate; +} +