[RocksJava] Flush functionality

RocksJava now supports also flush functionality of
RocksDB.
This commit is contained in:
fyrz 2014-11-09 20:09:39 +01:00
parent 8e5547f64f
commit fc6fcbab9e
7 changed files with 277 additions and 16 deletions

View File

@ -10,6 +10,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractComparator\
org.rocksdb.DBOptions\
org.rocksdb.DirectComparator\
org.rocksdb.DirectSlice\
org.rocksdb.FlushOptions\
org.rocksdb.Filter\
org.rocksdb.GenericRateLimiterConfig\
org.rocksdb.HashLinkedListMemTableConfig\

View File

@ -0,0 +1,51 @@
package org.rocksdb;
/**
* FlushOptions to be passed to flush operations of
* {@link org.rocksdb.RocksDB}.
*/
public class FlushOptions extends RocksObject {
/**
* Construct a new instance of FlushOptions.
*/
public FlushOptions(){
super();
newFlushOptions();
}
/**
* Set if the flush operation shall block until it terminates.
*
* @param waitForFlush boolean value indicating if the flush
* operations waits for termination of the flush process.
*
* @return instance of current FlushOptions.
*/
public FlushOptions setWaitForFlush(boolean waitForFlush) {
assert(isInitialized());
waitForFlush(nativeHandle_);
return this;
}
/**
* Wait for flush to finished.
*
* @return boolean value indicating if the flush operation
* waits for termination of the flush process.
*/
public boolean waitForFlush() {
assert(isInitialized());
return waitForFlush(nativeHandle_);
}
@Override protected void disposeInternal() {
disposeInternal(nativeHandle_);
}
private native void newFlushOptions();
private native void disposeInternal(long handle);
private native void setWaitForFlush(long handle,
boolean wait);
private native boolean waitForFlush(long handle);
}

View File

@ -1085,6 +1085,40 @@ public class RocksDB extends RocksObject {
dropColumnFamily(nativeHandle_, columnFamilyHandle.nativeHandle_);
}
/**
* <p>Flush all memory table data.</p>
*
* <p>Note: it must be ensured that the FlushOptions instance
* is not GC'ed before this method finishes. If the wait parameter is
* set to false, flush processing is asynchronous.</p>
*
* @param flushOptions {@link org.rocksdb.FlushOptions} instance.
* @throws RocksDBException thrown if an error occurs within the native
* part of the library.
*/
public void flush(FlushOptions flushOptions)
throws RocksDBException {
flush(nativeHandle_, flushOptions.nativeHandle_);
}
/**
* <p>Flush all memory table data.</p>
*
* <p>Note: it must be ensured that the FlushOptions instance
* is not GC'ed before this method finishes. If the wait parameter is
* set to false, flush processing is asynchronous.</p>
*
* @param flushOptions {@link org.rocksdb.FlushOptions} instance.
* @param columnFamilyHandle {@link org.rocksdb.ColumnFamilyHandle} instance.
* @throws RocksDBException thrown if an error occurs within the native
* part of the library.
*/
public void flush(FlushOptions flushOptions,
ColumnFamilyHandle columnFamilyHandle) throws RocksDBException {
flush(nativeHandle_, flushOptions.nativeHandle_,
columnFamilyHandle.nativeHandle_);
}
/**
* Private constructor.
*/
@ -1197,10 +1231,13 @@ public class RocksDB extends RocksObject {
protected native void releaseSnapshot(
long nativeHandle, long snapshotHandle);
private native void disposeInternal(long handle);
private native long createColumnFamily(long handle, long opt_handle,
String name) throws RocksDBException;
private native void dropColumnFamily(long handle, long cfHandle) throws RocksDBException;
private native void flush(long handle, long flushOptHandle)
throws RocksDBException;
private native void flush(long handle, long flushOptHandle,
long cfHandle) throws RocksDBException;
protected Options options_;
}

View File

@ -0,0 +1,47 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb.test;
import org.rocksdb.*;
public class FlushTest {
static final String db_path = "/tmp/rocksdbjni_flush_test";
static {
RocksDB.loadLibrary();
}
public static void main(String[] args) {
RocksDB db = null;
Options options = new Options();
WriteOptions wOpt = new WriteOptions();
FlushOptions flushOptions = new FlushOptions();
try {
// Setup options
options.setCreateIfMissing(true);
options.setMaxWriteBufferNumber(10);
options.setMinWriteBufferNumberToMerge(10);
flushOptions.setWaitForFlush(true);
wOpt.setDisableWAL(true);
db = RocksDB.open(options, db_path);
db.put(wOpt, "key1".getBytes(), "value1".getBytes());
db.put(wOpt, "key2".getBytes(), "value2".getBytes());
db.put(wOpt, "key3".getBytes(), "value3".getBytes());
db.put(wOpt, "key4".getBytes(), "value4".getBytes());
assert(db.getProperty("rocksdb.num-entries-active-mem-table").equals("4"));
db.flush(flushOptions);
assert(db.getProperty("rocksdb.num-entries-active-mem-table").equals("0"));
} catch (RocksDBException e) {
assert(false);
}
db.close();
options.dispose();
wOpt.dispose();
flushOptions.dispose();
}
}

View File

@ -18,9 +18,11 @@
#include "include/org_rocksdb_WriteOptions.h"
#include "include/org_rocksdb_ReadOptions.h"
#include "include/org_rocksdb_ComparatorOptions.h"
#include "include/org_rocksdb_FlushOptions.h"
#include "rocksjni/comparatorjnicallback.h"
#include "rocksjni/portal.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/statistics.h"
@ -3607,6 +3609,32 @@ jboolean Java_org_rocksdb_ReadOptions_tailing(
return reinterpret_cast<rocksdb::ReadOptions*>(jhandle)->tailing;
}
/*
* Class: org_rocksdb_ReadOptions
* Method: setSnapshot
* Signature: (JJ)V
*/
void Java_org_rocksdb_ReadOptions_setSnapshot(
JNIEnv* env, jobject jobj, jlong jhandle, jlong jsnapshot) {
reinterpret_cast<rocksdb::ReadOptions*>(jhandle)->snapshot =
reinterpret_cast<rocksdb::Snapshot*>(jsnapshot);
}
/*
* Class: org_rocksdb_ReadOptions
* Method: snapshot
* Signature: (J)J
*/
jlong Java_org_rocksdb_ReadOptions_snapshot(
JNIEnv* env, jobject jobj, jlong jhandle) {
auto& snapshot =
reinterpret_cast<rocksdb::ReadOptions*>(jhandle)->snapshot;
return reinterpret_cast<jlong>(snapshot);
}
/////////////////////////////////////////////////////////////////////
// rocksdb::ComparatorOptions
/*
* Class: org_rocksdb_ComparatorOptions
* Method: newComparatorOptions
@ -3651,25 +3679,49 @@ void Java_org_rocksdb_ComparatorOptions_disposeInternal(
rocksdb::ComparatorOptionsJni::setHandle(env, jobj, nullptr);
}
/////////////////////////////////////////////////////////////////////
// rocksdb::FlushOptions
/*
* Class: org_rocksdb_ReadOptions
* Method: setSnapshot
* Signature: (JJ)V
* Class: org_rocksdb_FlushOptions
* Method: newFlushOptions
* Signature: ()V
*/
void Java_org_rocksdb_ReadOptions_setSnapshot(
JNIEnv* env, jobject jobj, jlong jhandle, jlong jsnapshot) {
reinterpret_cast<rocksdb::ReadOptions*>(jhandle)->snapshot =
reinterpret_cast<rocksdb::Snapshot*>(jsnapshot);
void Java_org_rocksdb_FlushOptions_newFlushOptions(
JNIEnv* env, jobject jobj) {
auto flush_opt = new rocksdb::FlushOptions();
rocksdb::FlushOptionsJni::setHandle(env, jobj, flush_opt);
}
/*
* Class: org_rocksdb_ReadOptions
* Method: snapshot
* Signature: (J)J
* Class: org_rocksdb_FlushOptions
* Method: setWaitForFlush
* Signature: (JZ)V
*/
jlong Java_org_rocksdb_ReadOptions_snapshot(
JNIEnv* env, jobject jobj, jlong jhandle) {
auto& snapshot =
reinterpret_cast<rocksdb::ReadOptions*>(jhandle)->snapshot;
return reinterpret_cast<jlong>(snapshot);
void Java_org_rocksdb_FlushOptions_setWaitForFlush(
JNIEnv * env, jobject jobj, jlong jhandle, jboolean jwait) {
reinterpret_cast<rocksdb::FlushOptions*>(jhandle)
->wait = static_cast<bool>(jwait);
}
/*
* Class: org_rocksdb_FlushOptions
* Method: waitForFlush
* Signature: (J)Z
*/
jboolean Java_org_rocksdb_FlushOptions_waitForFlush(
JNIEnv * env, jobject jobj, jlong jhandle) {
return reinterpret_cast<rocksdb::FlushOptions*>(jhandle)
->wait;
}
/*
* Class: org_rocksdb_FlushOptions
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_FlushOptions_disposeInternal(
JNIEnv * env, jobject jobj, jlong jhandle) {
delete reinterpret_cast<rocksdb::FlushOptions*>(jhandle);
rocksdb::FlushOptionsJni::setHandle(env, jobj, nullptr);
}

View File

@ -505,6 +505,34 @@ class ColumnFamilyHandleJni {
}
};
class FlushOptionsJni {
public:
// Get the java class id of org.rocksdb.FlushOptions.
static jclass getJClass(JNIEnv* env) {
jclass jclazz = env->FindClass("org/rocksdb/FlushOptions");
assert(jclazz != nullptr);
return jclazz;
}
// Get the field id of the member variable of org.rocksdb.FlushOptions
// that stores the pointer to rocksdb::FlushOptions.
static jfieldID getHandleFieldID(JNIEnv* env) {
static jfieldID fid = env->GetFieldID(
getJClass(env), "nativeHandle_", "J");
assert(fid != nullptr);
return fid;
}
// Pass the FlushOptions pointer to the java side.
static void setHandle(
JNIEnv* env, jobject jobj,
const rocksdb::FlushOptions* op) {
env->SetLongField(
jobj, getHandleFieldID(env),
reinterpret_cast<jlong>(op));
}
};
class ComparatorOptionsJni {
public:
// Get the java class id of org.rocksdb.ComparatorOptions.

View File

@ -1255,3 +1255,48 @@ jstring Java_org_rocksdb_RocksDB_getProperty0__JJLjava_lang_String_2I(
return env->NewStringUTF(property_value.data());
}
//////////////////////////////////////////////////////////////////////////////
// rocksdb::DB::Flush
void rocksdb_flush_helper(
JNIEnv* env, rocksdb::DB* db, const rocksdb::FlushOptions& flush_options,
rocksdb::ColumnFamilyHandle* column_family_handle) {
rocksdb::Status s;
if (column_family_handle != nullptr) {
s = db->Flush(flush_options, column_family_handle);
} else {
s = db->Flush(flush_options);
}
if (!s.ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
}
}
/*
* Class: org_rocksdb_RocksDB
* Method: flush
* Signature: (JJ)V
*/
void Java_org_rocksdb_RocksDB_flush__JJ(
JNIEnv* env, jobject jdb, jlong jdb_handle,
jlong jflush_options) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto flush_options = reinterpret_cast<rocksdb::FlushOptions*>(jflush_options);
rocksdb_flush_helper(env, db, *flush_options, nullptr);
}
/*
* Class: org_rocksdb_RocksDB
* Method: flush
* Signature: (JJJ)V
*/
void Java_org_rocksdb_RocksDB_flush__JJJ(
JNIEnv* env, jobject jdb, jlong jdb_handle,
jlong jflush_options, jlong jcf_handle) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto flush_options = reinterpret_cast<rocksdb::FlushOptions*>(jflush_options);
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
rocksdb_flush_helper(env, db, *flush_options, cf_handle);
}