Merge pull request #290 from vladb38/master

Added merge operators to RocksJava
This commit is contained in:
Yueh-Hsuan Chiang 2014-10-27 16:41:33 -07:00
commit db52419cf0
9 changed files with 317 additions and 2 deletions

View File

@ -1,4 +1,4 @@
NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB org.rocksdb.Options org.rocksdb.WriteBatch org.rocksdb.WriteBatchInternal org.rocksdb.WriteBatchTest org.rocksdb.WriteOptions org.rocksdb.BackupableDB org.rocksdb.BackupableDBOptions org.rocksdb.Statistics org.rocksdb.RocksIterator org.rocksdb.VectorMemTableConfig org.rocksdb.SkipListMemTableConfig org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashSkipListMemTableConfig org.rocksdb.PlainTableConfig org.rocksdb.BlockBasedTableConfig org.rocksdb.ReadOptions org.rocksdb.Filter org.rocksdb.BloomFilter org.rocksdb.ComparatorOptions org.rocksdb.AbstractComparator org.rocksdb.Comparator org.rocksdb.DirectComparator org.rocksdb.AbstractSlice org.rocksdb.Slice org.rocksdb.DirectSlice org.rocksdb.RestoreOptions org.rocksdb.RestoreBackupableDB org.rocksdb.RocksEnv org.rocksdb.GenericRateLimiterConfig org.rocksdb.ColumnFamilyHandle
NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB org.rocksdb.Options org.rocksdb.WriteBatch org.rocksdb.WriteBatchInternal org.rocksdb.WriteBatchTest org.rocksdb.WriteOptions org.rocksdb.BackupableDB org.rocksdb.BackupableDBOptions org.rocksdb.Statistics org.rocksdb.RocksIterator org.rocksdb.VectorMemTableConfig org.rocksdb.SkipListMemTableConfig org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashSkipListMemTableConfig org.rocksdb.PlainTableConfig org.rocksdb.BlockBasedTableConfig org.rocksdb.ReadOptions org.rocksdb.Filter org.rocksdb.BloomFilter org.rocksdb.ComparatorOptions org.rocksdb.AbstractComparator org.rocksdb.Comparator org.rocksdb.DirectComparator org.rocksdb.AbstractSlice org.rocksdb.Slice org.rocksdb.DirectSlice org.rocksdb.RestoreOptions org.rocksdb.RestoreBackupableDB org.rocksdb.RocksEnv org.rocksdb.GenericRateLimiterConfig org.rocksdb.ColumnFamilyHandle org.rocksdb.MergeOperator org.rocksdb.StringAppendOperator
ROCKSDB_MAJOR = $(shell egrep "ROCKSDB_MAJOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3)
ROCKSDB_MINOR = $(shell egrep "ROCKSDB_MINOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3)
@ -45,6 +45,7 @@ test: java
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.MemTableTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.OptionsTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.ReadOnlyTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.MergeTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.ReadOptionsTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.RocksIteratorTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.SnapshotTest

View File

@ -0,0 +1,17 @@
// Copyright (c) 2014, Vlad Balan (vlad.gm@gmail.com). All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb;
import java.util.*;
/**
* MergeOperator holds an operator to be applied when compacting
* two merge operands held under the same key in order to obtain a single
* value.
*/
public interface MergeOperator {
public long newMergeOperatorHandle();
}

View File

@ -2258,6 +2258,42 @@ public class Options extends RocksObject {
private native void setMinPartialMergeOperands(
long handle, int minPartialMergeOperands);
/**
* Set the merge operator to be used for merging two merge operands
* of the same key. The merge function is invoked during
* compaction and at lookup time, if multiple key/value pairs belonging
* to the same key are found in the database.
*
* @param name the name of the merge function, as defined by
* the MergeOperators factory (see utilities/MergeOperators.h)
* The merge function is specified by name and must be one of the
* standard merge operators provided by RocksDB. The available
* operators are "put", "uint64add", "stringappend" and "stringappendtest".
* @return the reference to the current option.
*/
public Options setMergeOperatorName(String name) {
setMergeOperatorName(nativeHandle_, name);
return this;
}
private native void setMergeOperatorName(
long handle, String name);
/**
* Set the merge operator to be used for merging two different key/value
* pairs that share the same key. The merge function is invoked during
* compaction and at lookup time, if multiple key/value pairs belonging
* to the same key are found in the database.
*
* @param a {@link MergeOperator} object
* @return the reference to the current option.
*/
public Options setMergeOperator(MergeOperator mergeOperator) {
setMergeOperator(nativeHandle_, mergeOperator.newMergeOperatorHandle());
return this;
}
private native void setMergeOperator(
long handle, long mergeOperatorHandle);
/**
* Release the memory allocated for the current instance
* in the c++ side.

View File

@ -473,8 +473,33 @@ public class RocksDB extends RocksObject {
}
/**
* Get the value associated with the specified key within column family
* Add merge operand for key/value pair.
*
* @param key the specified key to be merged.
* @param value the value to be nerged with the current value for
* the specified key.
*/
public void merge(byte[] key, byte[] value) throws RocksDBException {
merge(nativeHandle_, key, key.length, value, value.length);
}
/**
* Add merge operand for key/value pair.
*
* @param writeOpts {@link WriteOptions} for this write.
* @param key the specified key to be merged.
* @param value the value to be merged with the current value for
* the specified key.
*/
public void merge(WriteOptions writeOpts, byte[] key, byte[] value)
throws RocksDBException {
merge(nativeHandle_, writeOpts.nativeHandle_,
key, key.length, value, value.length);
}
/**
* Get the value associated with the specified key within column family*
* @param key the key to retrieve the value.
* @param value the out-value to receive the retrieved value.
* @return The size of the actual value that matches the specified
@ -1035,6 +1060,13 @@ public class RocksDB extends RocksObject {
long cfHandle, StringBuffer stringBuffer);
protected native boolean keyMayExist(long optionsHandle, byte[] key, int keyLen,
long cfHandle, StringBuffer stringBuffer);
protected native void merge(
long handle, byte[] key, int keyLen,
byte[] value, int valueLen) throws RocksDBException;
protected native void merge(
long handle, long writeOptHandle,
byte[] key, int keyLen,
byte[] value, int valueLen) throws RocksDBException;
protected native int get(
long handle, byte[] key, int keyLen,
byte[] value, int valueLen) throws RocksDBException;

View File

@ -0,0 +1,17 @@
// Copyright (c) 2014, Vlad Balan (vlad.gm@gmail.com). All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb;
/**
* StringAppendOperator is a merge operator that concatenates
* two strings.
*/
public class StringAppendOperator implements MergeOperator {
@Override public long newMergeOperatorHandle() {
return newMergeOperatorHandleImpl();
}
private native long newMergeOperatorHandleImpl();
}

View File

@ -0,0 +1,85 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb.test;
import java.util.Collections;
import org.rocksdb.*;
public class MergeTest {
static final String db_path_string = "/tmp/mergestringjni_db";
static final String db_path_function = "/tmp/mergefunctionjni_db";
static {
RocksDB.loadLibrary();
}
public static void testStringOption()
throws InterruptedException, RocksDBException {
System.out.println("Testing merge function string option ===");
Options opt = new Options();
opt.setCreateIfMissing(true);
opt.setMergeOperatorName("stringappend");
RocksDB db = RocksDB.open(opt, db_path_string);
System.out.println("Writing aa under key...");
db.put("key".getBytes(), "aa".getBytes());
System.out.println("Writing bb under key...");
db.merge("key".getBytes(), "bb".getBytes());
byte[] value = db.get("key".getBytes());
String strValue = new String(value);
System.out.println("Retrieved value: " + strValue);
db.close();
opt.dispose();
assert(strValue.equals("aa,bb"));
System.out.println("Merge function string option passed!");
}
public static void testOperatorOption()
throws InterruptedException, RocksDBException {
System.out.println("Testing merge function operator option ===");
Options opt = new Options();
opt.setCreateIfMissing(true);
StringAppendOperator stringAppendOperator = new StringAppendOperator();
opt.setMergeOperator(stringAppendOperator);
RocksDB db = RocksDB.open(opt, db_path_string);
System.out.println("Writing aa under key...");
db.put("key".getBytes(), "aa".getBytes());
System.out.println("Writing bb under key...");
db.merge("key".getBytes(), "bb".getBytes());
byte[] value = db.get("key".getBytes());
String strValue = new String(value);
System.out.println("Retrieved value: " + strValue);
db.close();
opt.dispose();
assert(strValue.equals("aa,bb"));
System.out.println("Merge function operator option passed!");
}
public static void main(String[] args)
throws InterruptedException, RocksDBException {
testStringOption();
testOperatorOption();
}
}

View File

@ -0,0 +1,37 @@
// Copyright (c) 2014, Vlad Balan (vlad.gm@gmail.com). All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// This file implements the "bridge" between Java and C++
// for rocksdb::MergeOperator.
#include <stdio.h>
#include <stdlib.h>
#include <jni.h>
#include <string>
#include <memory>
#include "include/org_rocksdb_StringAppendOperator.h"
#include "rocksjni/portal.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/statistics.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/table.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/merge_operator.h"
#include "utilities/merge_operators.h"
/*
* Class: org_rocksdb_StringAppendOperator
* Method: newMergeOperatorHandle
* Signature: ()J
*/
jlong Java_org_rocksdb_StringAppendOperator_newMergeOperatorHandleImpl
(JNIEnv* env, jobject jobj) {
std::shared_ptr<rocksdb::MergeOperator> *op =
new std::shared_ptr<rocksdb::MergeOperator>();
*op = rocksdb::MergeOperators::CreateFromStringId("stringappend");
return reinterpret_cast<jlong>(op);
}

View File

@ -26,6 +26,8 @@
#include "rocksdb/slice_transform.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/comparator.h"
#include "rocksdb/merge_operator.h"
#include "utilities/merge_operators.h"
/*
* Class: org_rocksdb_Options
@ -1617,6 +1619,30 @@ void Java_org_rocksdb_Options_setMinPartialMergeOperands(
static_cast<int32_t>(jmin_partial_merge_operands);
}
/*
* Class: org_rocksdb_Options
* Method: setMergeOperatorName
* Signature: (JJjava/lang/String)V
*/
void Java_org_rocksdb_Options_setMergeOperatorName(
JNIEnv* env, jobject jobj, jlong jhandle, jstring name) {
const char* op_name = env->GetStringUTFChars(name, 0);
reinterpret_cast<rocksdb::Options*>(jhandle)->merge_operator =
rocksdb::MergeOperators::CreateFromStringId(op_name);
}
/*
* Class: org_rocksdb_Options
* Method: setMergeOperator
* Signature: (JJjava/lang/String)V
*/
void Java_org_rocksdb_Options_setMergeOperator(
JNIEnv* env, jobject jobj, jlong jhandle, jlong mergeOperatorHandle) {
reinterpret_cast<rocksdb::Options*>(jhandle)->merge_operator =
*(reinterpret_cast<std::shared_ptr<rocksdb::MergeOperator>*>
(mergeOperatorHandle));
}
//////////////////////////////////////////////////////////////////////////////
// WriteOptions

View File

@ -914,6 +914,70 @@ void Java_org_rocksdb_RocksDB_remove__JJ_3BIJ(
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
}
}
//////////////////////////////////////////////////////////////////////////////
// rocksdb::DB::Merge
void rocksdb_merge_helper(
JNIEnv* env, rocksdb::DB* db, const rocksdb::WriteOptions& write_options,
jbyteArray jkey, jint jkey_len,
jbyteArray jvalue, jint jvalue_len) {
jbyte* key = env->GetByteArrayElements(jkey, 0);
jbyte* value = env->GetByteArrayElements(jvalue, 0);
rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len);
rocksdb::Slice value_slice(reinterpret_cast<char*>(value), jvalue_len);
rocksdb::Status s = db->Merge(write_options, key_slice, value_slice);
// trigger java unref on key and value.
// by passing JNI_ABORT, it will simply release the reference without
// copying the result back to the java byte array.
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
env->ReleaseByteArrayElements(jvalue, value, JNI_ABORT);
if (s.ok()) {
return;
}
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
}
/*
* Class: org_rocksdb_RocksDB
* Method: merge
* Signature: (J[BI[BI)V
*/
void Java_org_rocksdb_RocksDB_merge__J_3BI_3BI(
JNIEnv* env, jobject jdb, jlong jdb_handle,
jbyteArray jkey, jint jkey_len,
jbyteArray jvalue, jint jvalue_len) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
static const rocksdb::WriteOptions default_write_options =
rocksdb::WriteOptions();
rocksdb_merge_helper(env, db, default_write_options,
jkey, jkey_len,
jvalue, jvalue_len);
}
/*
* Class: org_rocksdb_RocksDB
* Method: merge
* Signature: (JJ[BI[BI)V
*/
void Java_org_rocksdb_RocksDB_merge__JJ_3BI_3BI(
JNIEnv* env, jobject jdb,
jlong jdb_handle, jlong jwrite_options_handle,
jbyteArray jkey, jint jkey_len,
jbyteArray jvalue, jint jvalue_len) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto write_options = reinterpret_cast<rocksdb::WriteOptions*>(
jwrite_options_handle);
rocksdb_merge_helper(env, db, *write_options,
jkey, jkey_len,
jvalue, jvalue_len);
}
//////////////////////////////////////////////////////////////////////////////
// rocksdb::DB::~DB()