diff --git a/java/org/rocksdb/ColumnFamilyHandle.java b/java/org/rocksdb/ColumnFamilyHandle.java index 334abd96d..92a4d7cef 100644 --- a/java/org/rocksdb/ColumnFamilyHandle.java +++ b/java/org/rocksdb/ColumnFamilyHandle.java @@ -10,23 +10,33 @@ package org.rocksdb; * ColumnFamily Pointers. */ public class ColumnFamilyHandle extends RocksObject { - ColumnFamilyHandle(long nativeHandle) { + ColumnFamilyHandle(RocksDB rocksDB, long nativeHandle) { super(); nativeHandle_ = nativeHandle; + // rocksDB must point to a valid RocksDB instance; + assert(rocksDB != null); + // ColumnFamilyHandle must hold a reference to the related RocksDB instance + // to guarantee that while a GC cycle starts ColumnFamilyHandle instances + // are freed prior to RocksDB instances. + rocksDB_ = rocksDB; } /** - * Deletes underlying C++ filter pointer. + *

Deletes underlying C++ iterator pointer.

* - * Note that this function should be called only after all - * RocksDB instances referencing the filter are closed. - * Otherwise an undefined behavior will occur. + *

Note: the underlying handle can only be safely deleted if the RocksDB + * instance related to a certain ColumnFamilyHandle is still valid and initialized. + * Therefore {@code disposeInternal()} checks if the RocksDB is initialized + * before freeing the native handle.

*/ @Override protected void disposeInternal() { assert(isInitialized()); - disposeInternal(nativeHandle_); + if (rocksDB_.isInitialized()) { + disposeInternal(nativeHandle_); + } } private native void disposeInternal(long handle); + private RocksDB rocksDB_; } diff --git a/java/org/rocksdb/RocksDB.java b/java/org/rocksdb/RocksDB.java index 291c505c7..2a90c7370 100644 --- a/java/org/rocksdb/RocksDB.java +++ b/java/org/rocksdb/RocksDB.java @@ -214,7 +214,7 @@ public class RocksDB extends RocksObject { List cfReferences = db.open(options.nativeHandle_, path, columnFamilyNames, columnFamilyNames.size()); for (int i=0; i cfReferences = db.openROnly(options.nativeHandle_, path, columnFamilyNames, columnFamilyNames.size()); for (int i=0; i cfNames = new ArrayList(); + List columnFamilyHandleList = + new ArrayList(); + cfNames.add("default"); + cfNames.add("new_cf"); + RocksDB db = RocksDB.open(opt, db_cf_path_string, cfNames, columnFamilyHandleList); - System.out.println("Retrieved value: " + strValue); + // writing aa under key + db.put(columnFamilyHandleList.get(1), "cfkey".getBytes(), "aa".getBytes()); + // merge bb under key + db.merge(columnFamilyHandleList.get(1), "cfkey".getBytes(), "bb".getBytes()); - db.close(); - opt.dispose(); + byte[] value = db.get(columnFamilyHandleList.get(1), "cfkey".getBytes()); + String strValue = new String(value); - assert(strValue.equals("aa,bb")); + for (ColumnFamilyHandle handle : columnFamilyHandleList) { + handle.dispose(); + } + db.close(); + opt.dispose(); + assert(strValue.equals("aa,bb")); + } - System.out.println("Merge function string option passed!"); - } + public static void testOperatorOption() + throws InterruptedException, RocksDBException { + Options opt = new Options(); + opt.setCreateIfMissing(true); - public static void testOperatorOption() - throws InterruptedException, RocksDBException { + StringAppendOperator stringAppendOperator = new StringAppendOperator(); + opt.setMergeOperator(stringAppendOperator); - System.out.println("Testing merge function operator option ==="); + RocksDB db = RocksDB.open(opt, db_path_string); + // Writing aa under key + db.put("key".getBytes(), "aa".getBytes()); - Options opt = new Options(); - opt.setCreateIfMissing(true); + // Writing bb under key + db.merge("key".getBytes(), "bb".getBytes()); - StringAppendOperator stringAppendOperator = new StringAppendOperator(); - opt.setMergeOperator(stringAppendOperator); + byte[] value = db.get("key".getBytes()); + String strValue = new String(value); - RocksDB db = RocksDB.open(opt, db_path_string); + db.close(); + opt.dispose(); + assert(strValue.equals("aa,bb")); + } - System.out.println("Writing aa under key..."); - db.put("key".getBytes(), "aa".getBytes()); + public static void testCFOperatorOption() + throws InterruptedException, RocksDBException { + Options opt = new Options(); + opt.setCreateIfMissing(true); + opt.setCreateMissingColumnFamilies(true); + StringAppendOperator stringAppendOperator = new StringAppendOperator(); + opt.setMergeOperator(stringAppendOperator); - System.out.println("Writing bb under key..."); - db.merge("key".getBytes(), "bb".getBytes()); + List cfNames = new ArrayList(); + List columnFamilyHandleList = + new ArrayList(); + cfNames.add("default"); + cfNames.add("new_cf"); + RocksDB db = RocksDB.open(opt, db_path_operator, cfNames, columnFamilyHandleList); - byte[] value = db.get("key".getBytes()); - String strValue = new String(value); + // writing aa under key + db.put(columnFamilyHandleList.get(1), "cfkey".getBytes(), "aa".getBytes()); + // merge bb under key + db.merge(columnFamilyHandleList.get(1), "cfkey".getBytes(), "bb".getBytes()); + byte[] value = db.get(columnFamilyHandleList.get(1), "cfkey".getBytes()); + String strValue = new String(value); - System.out.println("Retrieved value: " + strValue); + // Test also with createColumnFamily + ColumnFamilyHandle columnFamilyHandle = db.createColumnFamily("new_cf2"); + // writing xx under cfkey2 + db.put(columnFamilyHandle, "cfkey2".getBytes(), "xx".getBytes()); + // merge yy under cfkey2 + db.merge(columnFamilyHandle, "cfkey2".getBytes(), "yy".getBytes()); + value = db.get(columnFamilyHandle, "cfkey2".getBytes()); + String strValueTmpCf = new String(value); - db.close(); - opt.dispose(); + db.close(); + opt.dispose(); + assert(strValue.equals("aa,bb")); + assert(strValueTmpCf.equals("xx,yy")); + } - assert(strValue.equals("aa,bb")); + public static void testOperatorGcBehaviour() + throws RocksDBException { + Options opt = new Options(); + opt.setCreateIfMissing(true); + StringAppendOperator stringAppendOperator = new StringAppendOperator(); + opt.setMergeOperator(stringAppendOperator); + RocksDB db = RocksDB.open(opt, db_path_string); + db.close(); + opt.dispose(); + System.gc(); + System.runFinalization(); + // test reuse + opt = new Options(); + opt.setMergeOperator(stringAppendOperator); + db = RocksDB.open(opt, db_path_string); + db.close(); + opt.dispose(); + System.gc(); + System.runFinalization(); + // test param init + opt = new Options(); + opt.setMergeOperator(new StringAppendOperator()); + db = RocksDB.open(opt, db_path_string); + db.close(); + opt.dispose(); + System.gc(); + System.runFinalization(); + // test replace one with another merge operator instance + opt = new Options(); + opt.setMergeOperator(stringAppendOperator); + StringAppendOperator newStringAppendOperator = new StringAppendOperator(); + opt.setMergeOperator(newStringAppendOperator); + db = RocksDB.open(opt, db_path_string); + db.close(); + opt.dispose(); + stringAppendOperator = null; + newStringAppendOperator = null; + System.gc(); + System.runFinalization(); + } - System.out.println("Merge function operator option passed!"); - } - - public static void main(String[] args) - throws InterruptedException, RocksDBException { - testStringOption(); - testOperatorOption(); - } + public static void main(String[] args) + throws InterruptedException, RocksDBException { + testStringOption(); + testCFStringOption(); + testOperatorOption(); + testCFOperatorOption(); + testOperatorGcBehaviour(); + System.out.println("Passed MergeTest."); + } } diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc index 1e886e2e2..50cd8a359 100644 --- a/java/rocksjni/rocksjni.cc +++ b/java/rocksjni/rocksjni.cc @@ -95,7 +95,7 @@ jobject cfnames_to_free.push_back(cfname); jcfnames_for_free.push_back(jstr); column_families.push_back(rocksdb::ColumnFamilyDescriptor(cfname, - rocksdb::ColumnFamilyOptions())); + *static_cast(opt))); } rocksdb::Status s = rocksdb::DB::OpenForReadOnly(*opt, @@ -167,7 +167,7 @@ jobject Java_org_rocksdb_RocksDB_open__JLjava_lang_String_2Ljava_util_List_2I( cfnames_to_free.push_back(cfname); jcfnames_for_free.push_back(jstr); column_families.push_back(rocksdb::ColumnFamilyDescriptor(cfname, - rocksdb::ColumnFamilyOptions())); + *static_cast(opt))); } rocksdb::Status s = rocksdb::DB::Open(*opt, db_path, column_families, @@ -919,7 +919,7 @@ void Java_org_rocksdb_RocksDB_remove__JJ_3BIJ( void rocksdb_merge_helper( JNIEnv* env, rocksdb::DB* db, const rocksdb::WriteOptions& write_options, - jbyteArray jkey, jint jkey_len, + rocksdb::ColumnFamilyHandle* cf_handle, jbyteArray jkey, jint jkey_len, jbyteArray jvalue, jint jvalue_len) { jbyte* key = env->GetByteArrayElements(jkey, 0); @@ -927,7 +927,12 @@ void rocksdb_merge_helper( rocksdb::Slice key_slice(reinterpret_cast(key), jkey_len); rocksdb::Slice value_slice(reinterpret_cast(value), jvalue_len); - rocksdb::Status s = db->Merge(write_options, key_slice, value_slice); + rocksdb::Status s; + if (cf_handle != nullptr) { + s = db->Merge(write_options, cf_handle, key_slice, value_slice); + } else { + s = db->Merge(write_options, key_slice, value_slice); + } // trigger java unref on key and value. // by passing JNI_ABORT, it will simply release the reference without @@ -955,8 +960,29 @@ void Java_org_rocksdb_RocksDB_merge__J_3BI_3BI( rocksdb::WriteOptions(); rocksdb_merge_helper(env, db, default_write_options, - jkey, jkey_len, - jvalue, jvalue_len); + nullptr, jkey, jkey_len, jvalue, jvalue_len); +} + +/* + * Class: org_rocksdb_RocksDB + * Method: merge + * Signature: (J[BI[BIJ)V + */ +void Java_org_rocksdb_RocksDB_merge__J_3BI_3BIJ( + JNIEnv* env, jobject jdb, jlong jdb_handle, + jbyteArray jkey, jint jkey_len, + jbyteArray jvalue, jint jvalue_len, jlong jcf_handle) { + auto db = reinterpret_cast(jdb_handle); + static const rocksdb::WriteOptions default_write_options = + rocksdb::WriteOptions(); + auto cf_handle = reinterpret_cast(jcf_handle); + if (cf_handle != nullptr) { + rocksdb_merge_helper(env, db, default_write_options, + cf_handle, jkey, jkey_len, jvalue, jvalue_len); + } else { + rocksdb::RocksDBExceptionJni::ThrowNew(env, + rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle.")); + } } /* @@ -974,8 +1000,30 @@ void Java_org_rocksdb_RocksDB_merge__JJ_3BI_3BI( jwrite_options_handle); rocksdb_merge_helper(env, db, *write_options, - jkey, jkey_len, - jvalue, jvalue_len); + nullptr, jkey, jkey_len, jvalue, jvalue_len); +} + +/* + * Class: org_rocksdb_RocksDB + * Method: merge + * Signature: (JJ[BI[BIJ)V + */ +void Java_org_rocksdb_RocksDB_merge__JJ_3BI_3BIJ( + JNIEnv* env, jobject jdb, + jlong jdb_handle, jlong jwrite_options_handle, + jbyteArray jkey, jint jkey_len, + jbyteArray jvalue, jint jvalue_len, jlong jcf_handle) { + auto db = reinterpret_cast(jdb_handle); + auto write_options = reinterpret_cast( + jwrite_options_handle); + auto cf_handle = reinterpret_cast(jcf_handle); + if (cf_handle != nullptr) { + rocksdb_merge_helper(env, db, *write_options, + cf_handle, jkey, jkey_len, jvalue, jvalue_len); + } else { + rocksdb::RocksDBExceptionJni::ThrowNew(env, + rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle.")); + } } ////////////////////////////////////////////////////////////////////////////// @@ -1062,15 +1110,17 @@ jlongArray Java_org_rocksdb_RocksDB_iterators( /* * Class: org_rocksdb_RocksDB * Method: createColumnFamily - * Signature: (JLjava/lang/String;)J; + * Signature: (JJLjava/lang/String;)J; */ jlong Java_org_rocksdb_RocksDB_createColumnFamily( - JNIEnv* env, jobject jdb, jlong jdb_handle, jstring jcfname) { + JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jopt_handle, + jstring jcfname) { rocksdb::ColumnFamilyHandle* handle; const char* cfname = env->GetStringUTFChars(jcfname, 0); auto db_handle = reinterpret_cast(jdb_handle); + auto opt = reinterpret_cast(jopt_handle); rocksdb::Status s = db_handle->CreateColumnFamily( - rocksdb::ColumnFamilyOptions(), cfname, &handle); + *static_cast(opt), cfname, &handle); env->ReleaseStringUTFChars(jcfname, cfname); if (s.ok()) {