Adding merge functions to RocksDBJava

Summary:
Added support for the merge operation to RocksJava.
You can specify a merge function to be used on the current database.
The merge function can either be one of the functions defined in
utilities/merge_operators.h, which can be specified through its
corresponding name, or a user-created function that needs to be
encapsulated in a JNI object in order to be used. Examples are
provided for both use cases.

Test Plan: There are unit test in MergeTest.java

Reviewers: ankgup87

Subscribers: vladb38

Differential Revision: https://reviews.facebook.net/D24525
This commit is contained in:
Vlad Balan 2014-09-16 13:58:49 -07:00 committed by Vlad Balan
parent b5dd7eed68
commit a40ce219b9
9 changed files with 321 additions and 2 deletions

View File

@ -1,4 +1,4 @@
NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB org.rocksdb.Options org.rocksdb.WriteBatch org.rocksdb.WriteBatchInternal org.rocksdb.WriteBatchTest org.rocksdb.WriteOptions org.rocksdb.BackupableDB org.rocksdb.BackupableDBOptions org.rocksdb.Statistics org.rocksdb.RocksIterator org.rocksdb.VectorMemTableConfig org.rocksdb.SkipListMemTableConfig org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashSkipListMemTableConfig org.rocksdb.PlainTableConfig org.rocksdb.BlockBasedTableConfig org.rocksdb.ReadOptions org.rocksdb.Filter org.rocksdb.BloomFilter org.rocksdb.RestoreOptions org.rocksdb.RestoreBackupableDB org.rocksdb.RocksEnv org.rocksdb.GenericRateLimiterConfig org.rocksdb.ColumnFamilyHandle NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB org.rocksdb.Options org.rocksdb.WriteBatch org.rocksdb.WriteBatchInternal org.rocksdb.WriteBatchTest org.rocksdb.WriteOptions org.rocksdb.BackupableDB org.rocksdb.BackupableDBOptions org.rocksdb.Statistics org.rocksdb.RocksIterator org.rocksdb.VectorMemTableConfig org.rocksdb.SkipListMemTableConfig org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashSkipListMemTableConfig org.rocksdb.PlainTableConfig org.rocksdb.BlockBasedTableConfig org.rocksdb.ReadOptions org.rocksdb.Filter org.rocksdb.BloomFilter org.rocksdb.RestoreOptions org.rocksdb.RestoreBackupableDB org.rocksdb.RocksEnv org.rocksdb.GenericRateLimiterConfig org.rocksdb.ColumnFamilyHandle org.rocksdb.MergeOperator org.rocksdb.StringAppendOperator
ROCKSDB_MAJOR = $(shell egrep "ROCKSDB_MAJOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3) ROCKSDB_MAJOR = $(shell egrep "ROCKSDB_MAJOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3)
ROCKSDB_MINOR = $(shell egrep "ROCKSDB_MINOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3) ROCKSDB_MINOR = $(shell egrep "ROCKSDB_MINOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3)
@ -44,6 +44,7 @@ test: java
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.KeyMayExistTest java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.KeyMayExistTest
#java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.OptionsTest #java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.OptionsTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.ReadOnlyTest java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.ReadOnlyTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.MergeTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.ReadOptionsTest java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.ReadOptionsTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.StatisticsCollectorTest java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.StatisticsCollectorTest
@rm -rf /tmp/rocksdbjni_* @rm -rf /tmp/rocksdbjni_*

View File

@ -0,0 +1,19 @@
// Copyright (c) 2014, Vlad Balan (vlad.gm@gmail.com). All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb;
import java.util.*;
/**
* MergeOperator holds an operator to be applied when compacting
* two values held under the same key in order to obtain a single
* value.
*/
public abstract class MergeOperator {
abstract protected long newMergeOperatorHandle();
}

View File

@ -2234,6 +2234,40 @@ public class Options extends RocksObject {
private native void setMinPartialMergeOperands( private native void setMinPartialMergeOperands(
long handle, int minPartialMergeOperands); long handle, int minPartialMergeOperands);
/**
* Set the merge operator to be used for merging two different key/value
* pairs that share the same key. The merge function is invoked during
* compaction and at lookup time, if multiple key/value pairs belonging
* to the same key are found in the database.
*
* @param name the name of the merge function, as defined by
* the MergeOperators factory (see utilities/MergeOperators.h)
* @return the reference to the current option.
*/
public Options setMergeOperatorName(String name) {
setMergeOperatorName(nativeHandle_, name);
return this;
}
private native void setMergeOperatorName(
long handle, String name);
/**
* Set the merge operator to be used for merging two different key/value
* pairs that share the same key. The merge function is invoked during
* compaction and at lookup time, if multiple key/value pairs belonging
* to the same key are found in the database.
*
* @param name the name of the merge function, as defined by
* the MergeOperators factory (see utilities/MergeOperators.h)
* @return the reference to the current option.
*/
public Options setMergeOperator(MergeOperator mergeOperator) {
setMergeOperator(nativeHandle_, mergeOperator.newMergeOperatorHandle());
return this;
}
private native void setMergeOperator(
long handle, long mergeOperatorHandle);
/** /**
* Release the memory allocated for the current instance * Release the memory allocated for the current instance
* in the c++ side. * in the c++ side.

View File

@ -473,8 +473,32 @@ public class RocksDB extends RocksObject {
} }
/** /**
* Get the value associated with the specified key within column family * Set the database entry for "key" to "value".
* *
* @param key the specified key to be merged.
* @param value the value to be nerged with the current value for
* the specified key.
*/
public void merge(byte[] key, byte[] value) throws RocksDBException {
merge(nativeHandle_, key, key.length, value, value.length);
}
/**
* Merge the database entry for "key" with "value".
*
* @param key the specified key to be merged.
* @param value the value to be merged with the current value for
* the specified key.
*/
public void merge(WriteOptions writeOpts, byte[] key, byte[] value)
throws RocksDBException {
merge(nativeHandle_, writeOpts.nativeHandle_,
key, key.length, value, value.length);
}
/**
* Get the value associated with the specified key within column family*
* @param key the key to retrieve the value. * @param key the key to retrieve the value.
* @param value the out-value to receive the retrieved value. * @param value the out-value to receive the retrieved value.
* @return The size of the actual value that matches the specified * @return The size of the actual value that matches the specified
@ -1002,6 +1026,13 @@ public class RocksDB extends RocksObject {
long cfHandle, StringBuffer stringBuffer); long cfHandle, StringBuffer stringBuffer);
protected native boolean keyMayExist(long optionsHandle, byte[] key, int keyLen, protected native boolean keyMayExist(long optionsHandle, byte[] key, int keyLen,
long cfHandle, StringBuffer stringBuffer); long cfHandle, StringBuffer stringBuffer);
protected native void merge(
long handle, byte[] key, int keyLen,
byte[] value, int valueLen) throws RocksDBException;
protected native void merge(
long handle, long writeOptHandle,
byte[] key, int keyLen,
byte[] value, int valueLen) throws RocksDBException;
protected native int get( protected native int get(
long handle, byte[] key, int keyLen, long handle, byte[] key, int keyLen,
byte[] value, int valueLen) throws RocksDBException; byte[] value, int valueLen) throws RocksDBException;

View File

@ -0,0 +1,21 @@
// Copyright (c) 2014, Vlad Balan (vlad.gm@gmail.com). All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb;
/**
* MergeOperator holds an operator to be applied when compacting
* two values held under the same key in order to obtain a single
* value.
*/
public class StringAppendOperator extends MergeOperator {
@Override protected long newMergeOperatorHandle() {
return newMergeOperatorHandleImpl();
}
private native long newMergeOperatorHandleImpl();
}

View File

@ -0,0 +1,88 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb.test;
import java.util.Collections;
import org.rocksdb.*;
public class MergeTest {
static final String db_path_string = "/tmp/mergestringjni_db";
static final String db_path_function = "/tmp/mergefunctionjni_db";
static {
RocksDB.loadLibrary();
}
public static void testStringOption()
throws InterruptedException, RocksDBException {
System.out.println("Testing merge function string option ===");
Options opt = new Options();
opt.setCreateIfMissing(true);
opt.setMergeOperatorName("stringappend");
RocksDB db = RocksDB.open(opt, db_path_string);
System.out.println("Writing aa under key...");
db.put("key".getBytes(), "aa".getBytes());
System.out.println("Writing bb under key...");
db.merge("key".getBytes(), "bb".getBytes());
byte[] value = db.get("key".getBytes());
String strValue = new String(value);
System.out.println("Retrieved value: " + strValue);
db.close();
opt.dispose();
assert(strValue.equals("aa,bb"));
System.out.println("Merge function string option passed!");
}
public static void testOperatorOption()
throws InterruptedException, RocksDBException {
System.out.println("Testing merge function operator option ===");
Options opt = new Options();
opt.setCreateIfMissing(true);
StringAppendOperator stringAppendOperator = new StringAppendOperator();
opt.setMergeOperator(stringAppendOperator);
RocksDB db = RocksDB.open(opt, db_path_string);
System.out.println("Writing aa under key...");
db.put("key".getBytes(), "aa".getBytes());
System.out.println("Writing bb under key...");
db.merge("key".getBytes(), "bb".getBytes());
byte[] value = db.get("key".getBytes());
String strValue = new String(value);
System.out.println("Retrieved value: " + strValue);
db.close();
opt.dispose();
assert(strValue.equals("aa,bb"));
System.out.println("Merge function operator option passed!");
}
public static void main(String[] args)
throws InterruptedException, RocksDBException {
testStringOption();
testOperatorOption();
}
}

View File

@ -0,0 +1,35 @@
// Copyright (c) 2014, Vlad Balan (vlad.gm@gmail.com). All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// This file implements the "bridge" between Java and C++ for rocksdb::MergeOperator.
#include <stdio.h>
#include <stdlib.h>
#include <jni.h>
#include <string>
#include <memory>
#include "include/org_rocksdb_StringAppendOperator.h"
#include "rocksjni/portal.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/statistics.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/table.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/merge_operator.h"
#include "utilities/merge_operators.h"
/*
* Class: org_rocksdb_StringAppendOperator
* Method: newMergeOperatorHandle
* Signature: ()J
*/
jlong Java_org_rocksdb_StringAppendOperator_newMergeOperatorHandleImpl(JNIEnv* env, jobject jobj) {
std::shared_ptr<rocksdb::MergeOperator> *op = new std::shared_ptr<rocksdb::MergeOperator>();
*op = rocksdb::MergeOperators::CreateFromStringId("stringappend");
return reinterpret_cast<jlong>(op);
}

View File

@ -23,6 +23,8 @@
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
#include "rocksdb/rate_limiter.h" #include "rocksdb/rate_limiter.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/merge_operator.h"
#include "utilities/merge_operators.h"
/* /*
* Class: org_rocksdb_Options * Class: org_rocksdb_Options
@ -1603,6 +1605,29 @@ void Java_org_rocksdb_Options_setMinPartialMergeOperands(
static_cast<int32_t>(jmin_partial_merge_operands); static_cast<int32_t>(jmin_partial_merge_operands);
} }
/*
* Class: org_rocksdb_Options
* Method: setMergeOperatorName
* Signature: (JJjava/lang/String)V
*/
void Java_org_rocksdb_Options_setMergeOperatorName(
JNIEnv* env, jobject jobj, jlong jhandle, jstring name) {
const char* op_name = env->GetStringUTFChars(name, 0);
reinterpret_cast<rocksdb::Options*>(jhandle)->merge_operator =
rocksdb::MergeOperators::CreateFromStringId(op_name);
}
/*
* Class: org_rocksdb_Options
* Method: setMergeOperator
* Signature: (JJjava/lang/String)V
*/
void Java_org_rocksdb_Options_setMergeOperator(
JNIEnv* env, jobject jobj, jlong jhandle, jlong mergeOperatorHandle) {
reinterpret_cast<rocksdb::Options*>(jhandle)->merge_operator =
*(reinterpret_cast<std::shared_ptr<rocksdb::MergeOperator>*> (mergeOperatorHandle));
}
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// WriteOptions // WriteOptions
@ -1759,3 +1784,4 @@ void Java_org_rocksdb_ReadOptions_setTailing(
reinterpret_cast<rocksdb::ReadOptions*>(jhandle)->tailing = reinterpret_cast<rocksdb::ReadOptions*>(jhandle)->tailing =
static_cast<bool>(jtailing); static_cast<bool>(jtailing);
} }

View File

@ -914,6 +914,70 @@ void Java_org_rocksdb_RocksDB_remove__JJ_3BIJ(
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle.")); rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
} }
} }
//////////////////////////////////////////////////////////////////////////////
// rocksdb::DB::Merge
void rocksdb_merge_helper(
JNIEnv* env, rocksdb::DB* db, const rocksdb::WriteOptions& write_options,
jbyteArray jkey, jint jkey_len,
jbyteArray jvalue, jint jvalue_len) {
jbyte* key = env->GetByteArrayElements(jkey, 0);
jbyte* value = env->GetByteArrayElements(jvalue, 0);
rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len);
rocksdb::Slice value_slice(reinterpret_cast<char*>(value), jvalue_len);
rocksdb::Status s = db->Merge(write_options, key_slice, value_slice);
// trigger java unref on key and value.
// by passing JNI_ABORT, it will simply release the reference without
// copying the result back to the java byte array.
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
env->ReleaseByteArrayElements(jvalue, value, JNI_ABORT);
if (s.ok()) {
return;
}
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
}
/*
* Class: org_rocksdb_RocksDB
* Method: merge
* Signature: (J[BI[BI)V
*/
void Java_org_rocksdb_RocksDB_merge__J_3BI_3BI(
JNIEnv* env, jobject jdb, jlong jdb_handle,
jbyteArray jkey, jint jkey_len,
jbyteArray jvalue, jint jvalue_len) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
static const rocksdb::WriteOptions default_write_options =
rocksdb::WriteOptions();
rocksdb_merge_helper(env, db, default_write_options,
jkey, jkey_len,
jvalue, jvalue_len);
}
/*
* Class: org_rocksdb_RocksDB
* Method: merge
* Signature: (JJ[BI[BI)V
*/
void Java_org_rocksdb_RocksDB_merge__JJ_3BI_3BI(
JNIEnv* env, jobject jdb,
jlong jdb_handle, jlong jwrite_options_handle,
jbyteArray jkey, jint jkey_len,
jbyteArray jvalue, jint jvalue_len) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto write_options = reinterpret_cast<rocksdb::WriteOptions*>(
jwrite_options_handle);
rocksdb_merge_helper(env, db, *write_options,
jkey, jkey_len,
jvalue, jvalue_len);
}
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// rocksdb::DB::~DB() // rocksdb::DB::~DB()