Added CompactionFilterFactory support to RocksJava

Summary:
This PR also includes some cleanup, bugfixes and refactoring of the Java API. However these are really pre-cursors on the road to CompactionFilterFactory support.
Closes https://github.com/facebook/rocksdb/pull/1241

Differential Revision: D6012778

Pulled By: sagar0

fbshipit-source-id: 0774465940ee99001a78906e4fed4ef57068ad5c
This commit is contained in:
Adam Retter 2017-10-12 11:06:51 -07:00 committed by Facebook Github Bot
parent 8dd0a7e11a
commit 560e984995
34 changed files with 915 additions and 315 deletions

View File

@ -1,5 +1,5 @@
NATIVE_JAVA_CLASSES = org.rocksdb.AbstractCompactionFilter\ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractCompactionFilter\
org.rocksdb.AbstractComparator\ org.rocksdb.AbstractCompactionFilterFactory\
org.rocksdb.AbstractSlice\ org.rocksdb.AbstractSlice\
org.rocksdb.BackupEngine\ org.rocksdb.BackupEngine\
org.rocksdb.BackupableDBOptions\ org.rocksdb.BackupableDBOptions\
@ -36,6 +36,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractCompactionFilter\
org.rocksdb.ReadOptions\ org.rocksdb.ReadOptions\
org.rocksdb.RemoveEmptyValueCompactionFilter\ org.rocksdb.RemoveEmptyValueCompactionFilter\
org.rocksdb.RestoreOptions\ org.rocksdb.RestoreOptions\
org.rocksdb.RocksCallbackObject\
org.rocksdb.RocksDB\ org.rocksdb.RocksDB\
org.rocksdb.RocksEnv\ org.rocksdb.RocksEnv\
org.rocksdb.RocksIterator\ org.rocksdb.RocksIterator\
@ -78,6 +79,7 @@ JAVA_TESTS = org.rocksdb.BackupableDBOptionsTest\
org.rocksdb.ClockCacheTest\ org.rocksdb.ClockCacheTest\
org.rocksdb.ColumnFamilyOptionsTest\ org.rocksdb.ColumnFamilyOptionsTest\
org.rocksdb.ColumnFamilyTest\ org.rocksdb.ColumnFamilyTest\
org.rocksdb.CompactionFilterFactoryTest\
org.rocksdb.CompactionOptionsFIFOTest\ org.rocksdb.CompactionOptionsFIFOTest\
org.rocksdb.CompactionOptionsUniversalTest\ org.rocksdb.CompactionOptionsUniversalTest\
org.rocksdb.CompactionPriorityTest\ org.rocksdb.CompactionPriorityTest\

View File

@ -0,0 +1,37 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// This file implements the "bridge" between Java and C++ for
// rocksdb::CompactionFilterFactory.
#include <jni.h>
#include <memory>
#include "include/org_rocksdb_AbstractCompactionFilterFactory.h"
#include "rocksjni/compaction_filter_factory_jnicallback.h"
/*
* Class: org_rocksdb_AbstractCompactionFilterFactory
* Method: createNewCompactionFilterFactory0
* Signature: ()J
*/
jlong Java_org_rocksdb_AbstractCompactionFilterFactory_createNewCompactionFilterFactory0(
JNIEnv* env, jobject jobj) {
auto* cff = new rocksdb::CompactionFilterFactoryJniCallback(env, jobj);
auto* ptr_sptr_cff = new std::shared_ptr<rocksdb::CompactionFilterFactoryJniCallback>(cff);
return reinterpret_cast<jlong>(ptr_sptr_cff);
}
/*
* Class: org_rocksdb_AbstractCompactionFilterFactory
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_AbstractCompactionFilterFactory_disposeInternal(
JNIEnv* env, jobject jobj, jlong jhandle) {
auto* ptr_sptr_cff =
reinterpret_cast<std::shared_ptr<rocksdb::CompactionFilterFactoryJniCallback> *>(jhandle);
delete ptr_sptr_cff;
}

View File

@ -0,0 +1,76 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// This file implements the callback "bridge" between Java and C++ for
// rocksdb::CompactionFilterFactory.
#include "rocksjni/compaction_filter_factory_jnicallback.h"
#include "rocksjni/portal.h"
namespace rocksdb {
CompactionFilterFactoryJniCallback::CompactionFilterFactoryJniCallback(
JNIEnv* env, jobject jcompaction_filter_factory)
: JniCallback(env, jcompaction_filter_factory) {
// Note: The name of a CompactionFilterFactory will not change during
// it's lifetime, so we cache it in a global var
jmethodID jname_method_id =
AbstractCompactionFilterFactoryJni::getNameMethodId(env);
if(jname_method_id == nullptr) {
// exception thrown: NoSuchMethodException or OutOfMemoryError
return;
}
jstring jname =
(jstring)env->CallObjectMethod(m_jcallback_obj, jname_method_id);
if(env->ExceptionCheck()) {
// exception thrown
return;
}
jboolean has_exception = JNI_FALSE;
m_name = JniUtil::copyString(env, jname, &has_exception); // also releases jname
if (has_exception == JNI_TRUE) {
// exception thrown
return;
}
m_jcreate_compaction_filter_methodid =
AbstractCompactionFilterFactoryJni::getCreateCompactionFilterMethodId(env);
if(m_jcreate_compaction_filter_methodid == nullptr) {
// exception thrown: NoSuchMethodException or OutOfMemoryError
return;
}
}
const char* CompactionFilterFactoryJniCallback::Name() const {
return m_name.get();
}
std::unique_ptr<CompactionFilter> CompactionFilterFactoryJniCallback::CreateCompactionFilter(
const CompactionFilter::Context& context) {
jboolean attached_thread = JNI_FALSE;
JNIEnv* env = getJniEnv(&attached_thread);
assert(env != nullptr);
jlong addr_compaction_filter = env->CallLongMethod(m_jcallback_obj,
m_jcreate_compaction_filter_methodid,
static_cast<jboolean>(context.is_full_compaction),
static_cast<jboolean>(context.is_manual_compaction));
if(env->ExceptionCheck()) {
// exception thrown from CallLongMethod
env->ExceptionDescribe(); // print out exception to stderr
releaseJniEnv(attached_thread);
return nullptr;
}
auto* cff = reinterpret_cast<CompactionFilter*>(addr_compaction_filter);
releaseJniEnv(attached_thread);
return std::unique_ptr<CompactionFilter>(cff);
}
} // namespace rocksdb

View File

@ -0,0 +1,35 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// This file implements the callback "bridge" between Java and C++ for
// rocksdb::CompactionFilterFactory.
#ifndef JAVA_ROCKSJNI_COMPACTION_FILTER_FACTORY_JNICALLBACK_H_
#define JAVA_ROCKSJNI_COMPACTION_FILTER_FACTORY_JNICALLBACK_H_
#include <jni.h>
#include <memory>
#include "rocksdb/compaction_filter.h"
#include "rocksjni/jnicallback.h"
namespace rocksdb {
class CompactionFilterFactoryJniCallback : public JniCallback, public CompactionFilterFactory {
public:
CompactionFilterFactoryJniCallback(
JNIEnv* env, jobject jcompaction_filter_factory);
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context);
virtual const char* Name() const;
private:
std::unique_ptr<const char[]> m_name;
jmethodID m_jcreate_compaction_filter_methodid;
};
} //namespace rocksdb
#endif // JAVA_ROCKSJNI_COMPACTION_FILTER_FACTORY_JNICALLBACK_H_

View File

@ -12,27 +12,11 @@
#include <string> #include <string>
#include <functional> #include <functional>
#include "include/org_rocksdb_AbstractComparator.h"
#include "include/org_rocksdb_Comparator.h" #include "include/org_rocksdb_Comparator.h"
#include "include/org_rocksdb_DirectComparator.h" #include "include/org_rocksdb_DirectComparator.h"
#include "rocksjni/comparatorjnicallback.h" #include "rocksjni/comparatorjnicallback.h"
#include "rocksjni/portal.h" #include "rocksjni/portal.h"
// <editor-fold desc="org.rocksdb.AbstractComparator>
/*
* Class: org_rocksdb_AbstractComparator
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_AbstractComparator_disposeInternal(
JNIEnv* env, jobject jobj, jlong handle) {
auto* bcjc = reinterpret_cast<rocksdb::BaseComparatorJniCallback*>(handle);
assert(bcjc != nullptr);
delete bcjc;
}
// </editor-fold>
// <editor-fold desc="org.rocksdb.Comparator> // <editor-fold desc="org.rocksdb.Comparator>
/* /*
@ -42,10 +26,10 @@ void Java_org_rocksdb_AbstractComparator_disposeInternal(
*/ */
jlong Java_org_rocksdb_Comparator_createNewComparator0( jlong Java_org_rocksdb_Comparator_createNewComparator0(
JNIEnv* env, jobject jobj, jlong copt_handle) { JNIEnv* env, jobject jobj, jlong copt_handle) {
const rocksdb::ComparatorJniCallbackOptions* copt = auto* copt =
reinterpret_cast<rocksdb::ComparatorJniCallbackOptions*>(copt_handle); reinterpret_cast<rocksdb::ComparatorJniCallbackOptions*>(copt_handle);
const rocksdb::ComparatorJniCallback* c = auto* c =
new rocksdb::ComparatorJniCallback(env, jobj, copt); new rocksdb::ComparatorJniCallback(env, jobj, copt);
return reinterpret_cast<jlong>(c); return reinterpret_cast<jlong>(c);
} }
// </editor-fold> // </editor-fold>
@ -59,10 +43,10 @@ jlong Java_org_rocksdb_Comparator_createNewComparator0(
*/ */
jlong Java_org_rocksdb_DirectComparator_createNewDirectComparator0( jlong Java_org_rocksdb_DirectComparator_createNewDirectComparator0(
JNIEnv* env, jobject jobj, jlong copt_handle) { JNIEnv* env, jobject jobj, jlong copt_handle) {
const rocksdb::ComparatorJniCallbackOptions* copt = auto* copt =
reinterpret_cast<rocksdb::ComparatorJniCallbackOptions*>(copt_handle); reinterpret_cast<rocksdb::ComparatorJniCallbackOptions*>(copt_handle);
const rocksdb::DirectComparatorJniCallback* c = auto* c =
new rocksdb::DirectComparatorJniCallback(env, jobj, copt); new rocksdb::DirectComparatorJniCallback(env, jobj, copt);
return reinterpret_cast<jlong>(c); return reinterpret_cast<jlong>(c);
} }
// </editor-fold> // </editor-fold>

View File

@ -13,24 +13,9 @@ namespace rocksdb {
BaseComparatorJniCallback::BaseComparatorJniCallback( BaseComparatorJniCallback::BaseComparatorJniCallback(
JNIEnv* env, jobject jComparator, JNIEnv* env, jobject jComparator,
const ComparatorJniCallbackOptions* copt) const ComparatorJniCallbackOptions* copt)
: mtx_compare(new port::Mutex(copt->use_adaptive_mutex)), : JniCallback(env, jComparator),
mtx_compare(new port::Mutex(copt->use_adaptive_mutex)),
mtx_findShortestSeparator(new port::Mutex(copt->use_adaptive_mutex)) { mtx_findShortestSeparator(new port::Mutex(copt->use_adaptive_mutex)) {
// Note: Comparator methods may be accessed by multiple threads,
// so we ref the jvm not the env
const jint rs = env->GetJavaVM(&m_jvm);
if(rs != JNI_OK) {
// exception thrown
return;
}
// Note: we want to access the Java Comparator instance
// across multiple method calls, so we create a global ref
assert(jComparator != nullptr);
m_jComparator = env->NewGlobalRef(jComparator);
if(m_jComparator == nullptr) {
// exception thrown: OutOfMemoryError
return;
}
// Note: The name of a Comparator will not change during it's lifetime, // Note: The name of a Comparator will not change during it's lifetime,
// so we cache it in a global var // so we cache it in a global var
@ -39,7 +24,7 @@ BaseComparatorJniCallback::BaseComparatorJniCallback(
// exception thrown: NoSuchMethodException or OutOfMemoryError // exception thrown: NoSuchMethodException or OutOfMemoryError
return; return;
} }
jstring jsName = (jstring)env->CallObjectMethod(m_jComparator, jNameMethodId); jstring jsName = (jstring)env->CallObjectMethod(m_jcallback_obj, jNameMethodId);
if(env->ExceptionCheck()) { if(env->ExceptionCheck()) {
// exception thrown // exception thrown
return; return;
@ -74,18 +59,18 @@ BaseComparatorJniCallback::BaseComparatorJniCallback(
} }
const char* BaseComparatorJniCallback::Name() const { const char* BaseComparatorJniCallback::Name() const {
return m_name.c_str(); return m_name.get();
} }
int BaseComparatorJniCallback::Compare(const Slice& a, const Slice& b) const { int BaseComparatorJniCallback::Compare(const Slice& a, const Slice& b) const {
jboolean attached_thread = JNI_FALSE; jboolean attached_thread = JNI_FALSE;
JNIEnv* env = JniUtil::getJniEnv(m_jvm, &attached_thread); JNIEnv* env = getJniEnv(&attached_thread);
assert(env != nullptr); assert(env != nullptr);
// TODO(adamretter): slice objects can potentially be cached using thread // TODO(adamretter): slice objects can potentially be cached using thread
// local variables to avoid locking. Could make this configurable depending on // local variables to avoid locking. Could make this configurable depending on
// performance. // performance.
mtx_compare->Lock(); mtx_compare.get()->Lock();
bool pending_exception = bool pending_exception =
AbstractSliceJni::setHandle(env, m_jSliceA, &a, JNI_FALSE); AbstractSliceJni::setHandle(env, m_jSliceA, &a, JNI_FALSE);
@ -94,7 +79,7 @@ int BaseComparatorJniCallback::Compare(const Slice& a, const Slice& b) const {
// exception thrown from setHandle or descendant // exception thrown from setHandle or descendant
env->ExceptionDescribe(); // print out exception to stderr env->ExceptionDescribe(); // print out exception to stderr
} }
JniUtil::releaseJniEnv(m_jvm, attached_thread); releaseJniEnv(attached_thread);
return 0; return 0;
} }
@ -105,15 +90,15 @@ int BaseComparatorJniCallback::Compare(const Slice& a, const Slice& b) const {
// exception thrown from setHandle or descendant // exception thrown from setHandle or descendant
env->ExceptionDescribe(); // print out exception to stderr env->ExceptionDescribe(); // print out exception to stderr
} }
JniUtil::releaseJniEnv(m_jvm, attached_thread); releaseJniEnv(attached_thread);
return 0; return 0;
} }
jint result = jint result =
env->CallIntMethod(m_jComparator, m_jCompareMethodId, m_jSliceA, env->CallIntMethod(m_jcallback_obj, m_jCompareMethodId, m_jSliceA,
m_jSliceB); m_jSliceB);
mtx_compare->Unlock(); mtx_compare.get()->Unlock();
if(env->ExceptionCheck()) { if(env->ExceptionCheck()) {
// exception thrown from CallIntMethod // exception thrown from CallIntMethod
@ -121,19 +106,19 @@ int BaseComparatorJniCallback::Compare(const Slice& a, const Slice& b) const {
result = 0; // we could not get a result from java callback so use 0 result = 0; // we could not get a result from java callback so use 0
} }
JniUtil::releaseJniEnv(m_jvm, attached_thread); releaseJniEnv(attached_thread);
return result; return result;
} }
void BaseComparatorJniCallback::FindShortestSeparator( void BaseComparatorJniCallback::FindShortestSeparator(
std::string* start, const Slice& limit) const { std::string* start, const Slice& limit) const {
if (start == nullptr) { if (start == nullptr) {
return; return;
} }
jboolean attached_thread = JNI_FALSE; jboolean attached_thread = JNI_FALSE;
JNIEnv* env = JniUtil::getJniEnv(m_jvm, &attached_thread); JNIEnv* env = getJniEnv(&attached_thread);
assert(env != nullptr); assert(env != nullptr);
const char* startUtf = start->c_str(); const char* startUtf = start->c_str();
@ -143,21 +128,21 @@ void BaseComparatorJniCallback::FindShortestSeparator(
if(env->ExceptionCheck()) { if(env->ExceptionCheck()) {
env->ExceptionDescribe(); // print out exception to stderr env->ExceptionDescribe(); // print out exception to stderr
} }
JniUtil::releaseJniEnv(m_jvm, attached_thread); releaseJniEnv(attached_thread);
return; return;
} }
if(env->ExceptionCheck()) { if(env->ExceptionCheck()) {
// exception thrown: OutOfMemoryError // exception thrown: OutOfMemoryError
env->ExceptionDescribe(); // print out exception to stderr env->ExceptionDescribe(); // print out exception to stderr
env->DeleteLocalRef(jsStart); env->DeleteLocalRef(jsStart);
JniUtil::releaseJniEnv(m_jvm, attached_thread); releaseJniEnv(attached_thread);
return; return;
} }
// TODO(adamretter): slice object can potentially be cached using thread local // TODO(adamretter): slice object can potentially be cached using thread local
// variable to avoid locking. Could make this configurable depending on // variable to avoid locking. Could make this configurable depending on
// performance. // performance.
mtx_findShortestSeparator->Lock(); mtx_findShortestSeparator.get()->Lock();
bool pending_exception = bool pending_exception =
AbstractSliceJni::setHandle(env, m_jSliceLimit, &limit, JNI_FALSE); AbstractSliceJni::setHandle(env, m_jSliceLimit, &limit, JNI_FALSE);
@ -169,21 +154,21 @@ void BaseComparatorJniCallback::FindShortestSeparator(
if(jsStart != nullptr) { if(jsStart != nullptr) {
env->DeleteLocalRef(jsStart); env->DeleteLocalRef(jsStart);
} }
JniUtil::releaseJniEnv(m_jvm, attached_thread); releaseJniEnv(attached_thread);
return; return;
} }
jstring jsResultStart = jstring jsResultStart =
(jstring)env->CallObjectMethod(m_jComparator, (jstring)env->CallObjectMethod(m_jcallback_obj,
m_jFindShortestSeparatorMethodId, jsStart, m_jSliceLimit); m_jFindShortestSeparatorMethodId, jsStart, m_jSliceLimit);
mtx_findShortestSeparator->Unlock(); mtx_findShortestSeparator.get()->Unlock();
if(env->ExceptionCheck()) { if(env->ExceptionCheck()) {
// exception thrown from CallObjectMethod // exception thrown from CallObjectMethod
env->ExceptionDescribe(); // print out exception to stderr env->ExceptionDescribe(); // print out exception to stderr
env->DeleteLocalRef(jsStart); env->DeleteLocalRef(jsStart);
JniUtil::releaseJniEnv(m_jvm, attached_thread); releaseJniEnv(attached_thread);
return; return;
} }
@ -192,29 +177,29 @@ void BaseComparatorJniCallback::FindShortestSeparator(
if (jsResultStart != nullptr) { if (jsResultStart != nullptr) {
// update start with result // update start with result
jboolean has_exception = JNI_FALSE; jboolean has_exception = JNI_FALSE;
std::string result = JniUtil::copyString(env, jsResultStart, std::unique_ptr<const char[]> result_start = JniUtil::copyString(env, jsResultStart,
&has_exception); // also releases jsResultStart &has_exception); // also releases jsResultStart
if (has_exception == JNI_TRUE) { if (has_exception == JNI_TRUE) {
if (env->ExceptionCheck()) { if (env->ExceptionCheck()) {
env->ExceptionDescribe(); // print out exception to stderr env->ExceptionDescribe(); // print out exception to stderr
} }
JniUtil::releaseJniEnv(m_jvm, attached_thread); releaseJniEnv(attached_thread);
return; return;
} }
*start = result; start->assign(result_start.get());
} }
releaseJniEnv(attached_thread);
JniUtil::releaseJniEnv(m_jvm, attached_thread);
} }
void BaseComparatorJniCallback::FindShortSuccessor(std::string* key) const { void BaseComparatorJniCallback::FindShortSuccessor(
std::string* key) const {
if (key == nullptr) { if (key == nullptr) {
return; return;
} }
jboolean attached_thread = JNI_FALSE; jboolean attached_thread = JNI_FALSE;
JNIEnv* env = JniUtil::getJniEnv(m_jvm, &attached_thread); JNIEnv* env = getJniEnv(&attached_thread);
assert(env != nullptr); assert(env != nullptr);
const char* keyUtf = key->c_str(); const char* keyUtf = key->c_str();
@ -224,25 +209,25 @@ void BaseComparatorJniCallback::FindShortSuccessor(std::string* key) const {
if(env->ExceptionCheck()) { if(env->ExceptionCheck()) {
env->ExceptionDescribe(); // print out exception to stderr env->ExceptionDescribe(); // print out exception to stderr
} }
JniUtil::releaseJniEnv(m_jvm, attached_thread); releaseJniEnv(attached_thread);
return; return;
} else if(env->ExceptionCheck()) { } else if(env->ExceptionCheck()) {
// exception thrown: OutOfMemoryError // exception thrown: OutOfMemoryError
env->ExceptionDescribe(); // print out exception to stderr env->ExceptionDescribe(); // print out exception to stderr
env->DeleteLocalRef(jsKey); env->DeleteLocalRef(jsKey);
JniUtil::releaseJniEnv(m_jvm, attached_thread); releaseJniEnv(attached_thread);
return; return;
} }
jstring jsResultKey = jstring jsResultKey =
(jstring)env->CallObjectMethod(m_jComparator, (jstring)env->CallObjectMethod(m_jcallback_obj,
m_jFindShortSuccessorMethodId, jsKey); m_jFindShortSuccessorMethodId, jsKey);
if(env->ExceptionCheck()) { if(env->ExceptionCheck()) {
// exception thrown from CallObjectMethod // exception thrown from CallObjectMethod
env->ExceptionDescribe(); // print out exception to stderr env->ExceptionDescribe(); // print out exception to stderr
env->DeleteLocalRef(jsKey); env->DeleteLocalRef(jsKey);
JniUtil::releaseJniEnv(m_jvm, attached_thread); releaseJniEnv(attached_thread);
return; return;
} }
@ -251,31 +236,20 @@ void BaseComparatorJniCallback::FindShortSuccessor(std::string* key) const {
if (jsResultKey != nullptr) { if (jsResultKey != nullptr) {
// updates key with result, also releases jsResultKey. // updates key with result, also releases jsResultKey.
jboolean has_exception = JNI_FALSE; jboolean has_exception = JNI_FALSE;
std::string result = JniUtil::copyString(env, jsResultKey, &has_exception); std::unique_ptr<const char[]> result_key = JniUtil::copyString(env, jsResultKey,
&has_exception); // also releases jsResultKey
if (has_exception == JNI_TRUE) { if (has_exception == JNI_TRUE) {
if (env->ExceptionCheck()) { if (env->ExceptionCheck()) {
env->ExceptionDescribe(); // print out exception to stderr env->ExceptionDescribe(); // print out exception to stderr
} }
JniUtil::releaseJniEnv(m_jvm, attached_thread); releaseJniEnv(attached_thread);
return; return;
} }
*key = result; key->assign(result_key.get());
} }
JniUtil::releaseJniEnv(m_jvm, attached_thread); releaseJniEnv(attached_thread);
}
BaseComparatorJniCallback::~BaseComparatorJniCallback() {
jboolean attached_thread = JNI_FALSE;
JNIEnv* env = JniUtil::getJniEnv(m_jvm, &attached_thread);
assert(env != nullptr);
if(m_jComparator != nullptr) {
env->DeleteGlobalRef(m_jComparator);
}
JniUtil::releaseJniEnv(m_jvm, attached_thread);
} }
ComparatorJniCallback::ComparatorJniCallback( ComparatorJniCallback::ComparatorJniCallback(
@ -303,7 +277,7 @@ ComparatorJniCallback::ComparatorJniCallback(
ComparatorJniCallback::~ComparatorJniCallback() { ComparatorJniCallback::~ComparatorJniCallback() {
jboolean attached_thread = JNI_FALSE; jboolean attached_thread = JNI_FALSE;
JNIEnv* env = JniUtil::getJniEnv(m_jvm, &attached_thread); JNIEnv* env = getJniEnv(&attached_thread);
assert(env != nullptr); assert(env != nullptr);
if(m_jSliceA != nullptr) { if(m_jSliceA != nullptr) {
@ -318,7 +292,7 @@ ComparatorJniCallback::~ComparatorJniCallback() {
env->DeleteGlobalRef(m_jSliceLimit); env->DeleteGlobalRef(m_jSliceLimit);
} }
JniUtil::releaseJniEnv(m_jvm, attached_thread); releaseJniEnv(attached_thread);
} }
DirectComparatorJniCallback::DirectComparatorJniCallback( DirectComparatorJniCallback::DirectComparatorJniCallback(
@ -346,7 +320,7 @@ DirectComparatorJniCallback::DirectComparatorJniCallback(
DirectComparatorJniCallback::~DirectComparatorJniCallback() { DirectComparatorJniCallback::~DirectComparatorJniCallback() {
jboolean attached_thread = JNI_FALSE; jboolean attached_thread = JNI_FALSE;
JNIEnv* env = JniUtil::getJniEnv(m_jvm, &attached_thread); JNIEnv* env = getJniEnv(&attached_thread);
assert(env != nullptr); assert(env != nullptr);
if(m_jSliceA != nullptr) { if(m_jSliceA != nullptr) {
@ -361,6 +335,6 @@ DirectComparatorJniCallback::~DirectComparatorJniCallback() {
env->DeleteGlobalRef(m_jSliceLimit); env->DeleteGlobalRef(m_jSliceLimit);
} }
JniUtil::releaseJniEnv(m_jvm, attached_thread); releaseJniEnv(attached_thread);
} }
} // namespace rocksdb } // namespace rocksdb

View File

@ -10,7 +10,9 @@
#define JAVA_ROCKSJNI_COMPARATORJNICALLBACK_H_ #define JAVA_ROCKSJNI_COMPARATORJNICALLBACK_H_
#include <jni.h> #include <jni.h>
#include <memory>
#include <string> #include <string>
#include "rocksjni/jnicallback.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "port/port.h" #include "port/port.h"
@ -44,12 +46,11 @@ struct ComparatorJniCallbackOptions {
* introduce independent locking in regions of each of those methods * introduce independent locking in regions of each of those methods
* via the mutexs mtx_compare and mtx_findShortestSeparator respectively * via the mutexs mtx_compare and mtx_findShortestSeparator respectively
*/ */
class BaseComparatorJniCallback : public Comparator { class BaseComparatorJniCallback : public JniCallback, public Comparator {
public: public:
BaseComparatorJniCallback( BaseComparatorJniCallback(
JNIEnv* env, jobject jComparator, JNIEnv* env, jobject jComparator,
const ComparatorJniCallbackOptions* copt); const ComparatorJniCallbackOptions* copt);
virtual ~BaseComparatorJniCallback();
virtual const char* Name() const; virtual const char* Name() const;
virtual int Compare(const Slice& a, const Slice& b) const; virtual int Compare(const Slice& a, const Slice& b) const;
virtual void FindShortestSeparator( virtual void FindShortestSeparator(
@ -58,17 +59,15 @@ class BaseComparatorJniCallback : public Comparator {
private: private:
// used for synchronisation in compare method // used for synchronisation in compare method
port::Mutex* mtx_compare; std::unique_ptr<port::Mutex> mtx_compare;
// used for synchronisation in findShortestSeparator method // used for synchronisation in findShortestSeparator method
port::Mutex* mtx_findShortestSeparator; std::unique_ptr<port::Mutex> mtx_findShortestSeparator;
jobject m_jComparator; std::unique_ptr<const char[]> m_name;
std::string m_name;
jmethodID m_jCompareMethodId; jmethodID m_jCompareMethodId;
jmethodID m_jFindShortestSeparatorMethodId; jmethodID m_jFindShortestSeparatorMethodId;
jmethodID m_jFindShortSuccessorMethodId; jmethodID m_jFindShortSuccessorMethodId;
protected: protected:
JavaVM* m_jvm;
jobject m_jSliceA; jobject m_jSliceA;
jobject m_jSliceB; jobject m_jSliceB;
jobject m_jSliceLimit; jobject m_jSliceLimit;

View File

@ -0,0 +1,52 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// This file implements the callback "bridge" between Java and C++ for
// JNI Callbacks from C++ to sub-classes or org.rocksdb.RocksCallbackObject
#include <assert.h>
#include "rocksjni/jnicallback.h"
#include "rocksjni/portal.h"
namespace rocksdb {
JniCallback::JniCallback(JNIEnv* env, jobject jcallback_obj) {
// Note: jcallback_obj may be accessed by multiple threads,
// so we ref the jvm not the env
const jint rs = env->GetJavaVM(&m_jvm);
if(rs != JNI_OK) {
// exception thrown
return;
}
// Note: we may want to access the Java callback object instance
// across multiple method calls, so we create a global ref
assert(jcallback_obj != nullptr);
m_jcallback_obj = env->NewGlobalRef(jcallback_obj);
if(jcallback_obj == nullptr) {
// exception thrown: OutOfMemoryError
return;
}
}
JNIEnv* JniCallback::getJniEnv(jboolean* attached) const {
return JniUtil::getJniEnv(m_jvm, attached);
}
void JniCallback::releaseJniEnv(jboolean& attached) const {
JniUtil::releaseJniEnv(m_jvm, attached);
}
JniCallback::~JniCallback() {
jboolean attached_thread = JNI_FALSE;
JNIEnv* env = getJniEnv(&attached_thread);
assert(env != nullptr);
if(m_jcallback_obj != nullptr) {
env->DeleteGlobalRef(m_jcallback_obj);
}
releaseJniEnv(attached_thread);
}
} // namespace rocksdb

View File

@ -0,0 +1,28 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// This file implements the callback "bridge" between Java and C++ for
// JNI Callbacks from C++ to sub-classes or org.rocksdb.RocksCallbackObject
#ifndef JAVA_ROCKSJNI_JNICALLBACK_H_
#define JAVA_ROCKSJNI_JNICALLBACK_H_
#include <jni.h>
namespace rocksdb {
class JniCallback {
public:
JniCallback(JNIEnv* env, jobject jcallback_obj);
virtual ~JniCallback();
protected:
JavaVM* m_jvm;
jobject m_jcallback_obj;
JNIEnv* getJniEnv(jboolean* attached) const;
void releaseJniEnv(jboolean& attached) const;
};
}
#endif // JAVA_ROCKSJNI_JNICALLBACK_H_

View File

@ -16,23 +16,8 @@
namespace rocksdb { namespace rocksdb {
LoggerJniCallback::LoggerJniCallback( LoggerJniCallback::LoggerJniCallback(
JNIEnv* env, jobject jlogger) { JNIEnv* env, jobject jlogger) : JniCallback(env, jlogger) {
// Note: Logger methods may be accessed by multiple threads,
// so we ref the jvm not the env
const jint rs = env->GetJavaVM(&m_jvm);
if(rs != JNI_OK) {
// exception thrown
return;
}
// Note: we want to access the Java Logger instance
// across multiple method calls, so we create a global ref
assert(jlogger != nullptr);
m_jLogger = env->NewGlobalRef(jlogger);
if(m_jLogger == nullptr) {
// exception thrown: OutOfMemoryError
return;
}
m_jLogMethodId = LoggerJni::getLogMethodId(env); m_jLogMethodId = LoggerJni::getLogMethodId(env);
if(m_jLogMethodId == nullptr) { if(m_jLogMethodId == nullptr) {
// exception thrown: NoSuchMethodException or OutOfMemoryError // exception thrown: NoSuchMethodException or OutOfMemoryError
@ -153,7 +138,7 @@ void LoggerJniCallback::Logv(const InfoLogLevel log_level,
// pass msg to java callback handler // pass msg to java callback handler
jboolean attached_thread = JNI_FALSE; jboolean attached_thread = JNI_FALSE;
JNIEnv* env = JniUtil::getJniEnv(m_jvm, &attached_thread); JNIEnv* env = getJniEnv(&attached_thread);
assert(env != nullptr); assert(env != nullptr);
jstring jmsg = env->NewStringUTF(msg.get()); jstring jmsg = env->NewStringUTF(msg.get());
@ -162,28 +147,28 @@ void LoggerJniCallback::Logv(const InfoLogLevel log_level,
if(env->ExceptionCheck()) { if(env->ExceptionCheck()) {
env->ExceptionDescribe(); // print out exception to stderr env->ExceptionDescribe(); // print out exception to stderr
} }
JniUtil::releaseJniEnv(m_jvm, attached_thread); releaseJniEnv(attached_thread);
return; return;
} }
if(env->ExceptionCheck()) { if(env->ExceptionCheck()) {
// exception thrown: OutOfMemoryError // exception thrown: OutOfMemoryError
env->ExceptionDescribe(); // print out exception to stderr env->ExceptionDescribe(); // print out exception to stderr
env->DeleteLocalRef(jmsg); env->DeleteLocalRef(jmsg);
JniUtil::releaseJniEnv(m_jvm, attached_thread); releaseJniEnv(attached_thread);
return; return;
} }
env->CallVoidMethod(m_jLogger, m_jLogMethodId, jlog_level, jmsg); env->CallVoidMethod(m_jcallback_obj, m_jLogMethodId, jlog_level, jmsg);
if(env->ExceptionCheck()) { if(env->ExceptionCheck()) {
// exception thrown // exception thrown
env->ExceptionDescribe(); // print out exception to stderr env->ExceptionDescribe(); // print out exception to stderr
env->DeleteLocalRef(jmsg); env->DeleteLocalRef(jmsg);
JniUtil::releaseJniEnv(m_jvm, attached_thread); releaseJniEnv(attached_thread);
return; return;
} }
env->DeleteLocalRef(jmsg); env->DeleteLocalRef(jmsg);
JniUtil::releaseJniEnv(m_jvm, attached_thread); releaseJniEnv(attached_thread);
} }
} }
@ -202,16 +187,11 @@ std::unique_ptr<char[]> LoggerJniCallback::format_str(const char* format, va_lis
return buf; return buf;
} }
LoggerJniCallback::~LoggerJniCallback() { LoggerJniCallback::~LoggerJniCallback() {
jboolean attached_thread = JNI_FALSE; jboolean attached_thread = JNI_FALSE;
JNIEnv* env = JniUtil::getJniEnv(m_jvm, &attached_thread); JNIEnv* env = getJniEnv(&attached_thread);
assert(env != nullptr); assert(env != nullptr);
if(m_jLogger != nullptr) {
env->DeleteGlobalRef(m_jLogger);
}
if(m_jdebug_level != nullptr) { if(m_jdebug_level != nullptr) {
env->DeleteGlobalRef(m_jdebug_level); env->DeleteGlobalRef(m_jdebug_level);
} }
@ -236,7 +216,7 @@ LoggerJniCallback::~LoggerJniCallback() {
env->DeleteGlobalRef(m_jheader_level); env->DeleteGlobalRef(m_jheader_level);
} }
JniUtil::releaseJniEnv(m_jvm, attached_thread); releaseJniEnv(attached_thread);
} }
} // namespace rocksdb } // namespace rocksdb

View File

@ -12,15 +12,16 @@
#include <jni.h> #include <jni.h>
#include <memory> #include <memory>
#include <string> #include <string>
#include "rocksjni/jnicallback.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
namespace rocksdb { namespace rocksdb {
class LoggerJniCallback : public Logger { class LoggerJniCallback : public JniCallback, public Logger {
public: public:
LoggerJniCallback(JNIEnv* env, jobject jLogger); LoggerJniCallback(JNIEnv* env, jobject jLogger);
virtual ~LoggerJniCallback(); ~LoggerJniCallback();
using Logger::SetInfoLogLevel; using Logger::SetInfoLogLevel;
using Logger::GetInfoLogLevel; using Logger::GetInfoLogLevel;
@ -34,8 +35,6 @@ namespace rocksdb {
const char* format, va_list ap); const char* format, va_list ap);
private: private:
JavaVM* m_jvm;
jobject m_jLogger;
jmethodID m_jLogMethodId; jmethodID m_jLogMethodId;
jobject m_jdebug_level; jobject m_jdebug_level;
jobject m_jinfo_level; jobject m_jinfo_level;

View File

@ -146,12 +146,19 @@ void Java_org_rocksdb_Options_setComparatorHandle__JI(
/* /*
* Class: org_rocksdb_Options * Class: org_rocksdb_Options
* Method: setComparatorHandle * Method: setComparatorHandle
* Signature: (JJ)V * Signature: (JJZ)V
*/ */
void Java_org_rocksdb_Options_setComparatorHandle__JJ( void Java_org_rocksdb_Options_setComparatorHandle__JJZ(
JNIEnv* env, jobject jobj, jlong jopt_handle, jlong jcomparator_handle) { JNIEnv* env, jobject jobj, jlong jopt_handle, jlong jcomparator_handle,
reinterpret_cast<rocksdb::Options*>(jopt_handle)->comparator = jboolean is_direct) {
reinterpret_cast<rocksdb::Comparator*>(jcomparator_handle); auto* opt = reinterpret_cast<rocksdb::Options*>(jopt_handle);
if(is_direct) {
opt->comparator =
reinterpret_cast<rocksdb::DirectComparatorJniCallback*>(jcomparator_handle);
} else {
opt->comparator =
reinterpret_cast<rocksdb::ComparatorJniCallback*>(jcomparator_handle);
}
} }
/* /*
@ -431,7 +438,7 @@ void Java_org_rocksdb_Options_setDbPaths(
jtarget_sizes, ptr_jtarget_size, JNI_ABORT); jtarget_sizes, ptr_jtarget_size, JNI_ABORT);
return; return;
} }
std::string path = rocksdb::JniUtil::copyString( std::string path = rocksdb::JniUtil::copyStdString(
env, static_cast<jstring>(jpath), &has_exception); env, static_cast<jstring>(jpath), &has_exception);
env->DeleteLocalRef(jpath); env->DeleteLocalRef(jpath);
@ -2953,12 +2960,19 @@ void Java_org_rocksdb_ColumnFamilyOptions_setComparatorHandle__JI(
/* /*
* Class: org_rocksdb_ColumnFamilyOptions * Class: org_rocksdb_ColumnFamilyOptions
* Method: setComparatorHandle * Method: setComparatorHandle
* Signature: (JJ)V * Signature: (JJZ)V
*/ */
void Java_org_rocksdb_ColumnFamilyOptions_setComparatorHandle__JJ( void Java_org_rocksdb_ColumnFamilyOptions_setComparatorHandle__JJZ(
JNIEnv* env, jobject jobj, jlong jopt_handle, jlong jcomparator_handle) { JNIEnv* env, jobject jobj, jlong jopt_handle, jlong jcomparator_handle,
reinterpret_cast<rocksdb::ColumnFamilyOptions*>(jopt_handle)->comparator = jboolean is_direct) {
reinterpret_cast<rocksdb::Comparator*>(jcomparator_handle); auto* opt = reinterpret_cast<rocksdb::ColumnFamilyOptions*>(jopt_handle);
if(is_direct) {
opt->comparator =
reinterpret_cast<rocksdb::DirectComparatorJniCallback*>(jcomparator_handle);
} else {
opt->comparator =
reinterpret_cast<rocksdb::ComparatorJniCallback*>(jcomparator_handle);
}
} }
/* /*
@ -3005,6 +3019,21 @@ void Java_org_rocksdb_ColumnFamilyOptions_setCompactionFilterHandle(
(jcompactionfilter_handle); (jcompactionfilter_handle);
} }
/*
* Class: org_rocksdb_ColumnFamilyOptions
* Method: setCompactionFilterFactoryHandle
* Signature: (JJ)V
*/
void JNICALL Java_org_rocksdb_ColumnFamilyOptions_setCompactionFilterFactoryHandle(
JNIEnv* env , jobject jobj, jlong jopt_handle,
jlong jcompactionfilterfactory_handle) {
auto* cff_factory =
reinterpret_cast<std::shared_ptr<rocksdb::CompactionFilterFactory> *>(
jcompactionfilterfactory_handle);
reinterpret_cast<rocksdb::ColumnFamilyOptions*>(jopt_handle)->
compaction_filter_factory = *cff_factory;
}
/* /*
* Class: org_rocksdb_ColumnFamilyOptions * Class: org_rocksdb_ColumnFamilyOptions
* Method: setWriteBufferSize * Method: setWriteBufferSize
@ -4486,7 +4515,7 @@ void Java_org_rocksdb_DBOptions_setDbPaths(
jtarget_sizes, ptr_jtarget_size, JNI_ABORT); jtarget_sizes, ptr_jtarget_size, JNI_ABORT);
return; return;
} }
std::string path = rocksdb::JniUtil::copyString( std::string path = rocksdb::JniUtil::copyStdString(
env, static_cast<jstring>(jpath), &has_exception); env, static_cast<jstring>(jpath), &has_exception);
env->DeleteLocalRef(jpath); env->DeleteLocalRef(jpath);

View File

@ -10,10 +10,12 @@
#ifndef JAVA_ROCKSJNI_PORTAL_H_ #ifndef JAVA_ROCKSJNI_PORTAL_H_
#define JAVA_ROCKSJNI_PORTAL_H_ #define JAVA_ROCKSJNI_PORTAL_H_
#include <cstring>
#include <jni.h> #include <jni.h>
#include <functional> #include <functional>
#include <iostream> #include <iostream>
#include <limits> #include <limits>
#include <memory>
#include <string> #include <string>
#include <vector> #include <vector>
@ -22,6 +24,7 @@
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/utilities/backupable_db.h" #include "rocksdb/utilities/backupable_db.h"
#include "rocksdb/utilities/write_batch_with_index.h" #include "rocksdb/utilities/write_batch_with_index.h"
#include "rocksjni/compaction_filter_factory_jnicallback.h"
#include "rocksjni/comparatorjnicallback.h" #include "rocksjni/comparatorjnicallback.h"
#include "rocksjni/loggerjnicallback.h" #include "rocksjni/loggerjnicallback.h"
#include "rocksjni/writebatchhandlerjnicallback.h" #include "rocksjni/writebatchhandlerjnicallback.h"
@ -1014,6 +1017,69 @@ class ComparatorOptionsJni : public RocksDBNativeClass<
} }
}; };
// The portal class for org.rocksdb.AbstractCompactionFilterFactory
class AbstractCompactionFilterFactoryJni : public RocksDBNativeClass<
const rocksdb::CompactionFilterFactoryJniCallback*,
AbstractCompactionFilterFactoryJni> {
public:
/**
* Get the Java Class org.rocksdb.AbstractCompactionFilterFactory
*
* @param env A pointer to the Java environment
*
* @return The Java Class or nullptr if one of the
* ClassFormatError, ClassCircularityError, NoClassDefFoundError,
* OutOfMemoryError or ExceptionInInitializerError exceptions is thrown
*/
static jclass getJClass(JNIEnv* env) {
return RocksDBNativeClass::getJClass(env,
"org/rocksdb/AbstractCompactionFilterFactory");
}
/**
* Get the Java Method: AbstractCompactionFilterFactory#name
*
* @param env A pointer to the Java environment
*
* @return The Java Method ID or nullptr if the class or method id could not
* be retieved
*/
static jmethodID getNameMethodId(JNIEnv* env) {
jclass jclazz = getJClass(env);
if(jclazz == nullptr) {
// exception occurred accessing class
return nullptr;
}
static jmethodID mid = env->GetMethodID(
jclazz, "name", "()Ljava/lang/String;");
assert(mid != nullptr);
return mid;
}
/**
* Get the Java Method: AbstractCompactionFilterFactory#createCompactionFilter
*
* @param env A pointer to the Java environment
*
* @return The Java Method ID or nullptr if the class or method id could not
* be retieved
*/
static jmethodID getCreateCompactionFilterMethodId(JNIEnv* env) {
jclass jclazz = getJClass(env);
if(jclazz == nullptr) {
// exception occurred accessing class
return nullptr;
}
static jmethodID mid = env->GetMethodID(jclazz,
"createCompactionFilter",
"(ZZ)J");
assert(mid != nullptr);
return mid;
}
};
// The portal class for org.rocksdb.AbstractComparator // The portal class for org.rocksdb.AbstractComparator
class AbstractComparatorJni : public RocksDBNativeClass< class AbstractComparatorJni : public RocksDBNativeClass<
const rocksdb::BaseComparatorJniCallback*, const rocksdb::BaseComparatorJniCallback*,
@ -2988,6 +3054,46 @@ class JniUtil {
return strs; return strs;
} }
/**
* Copies a jstring to a C-style null-terminated byte string
* and releases the original jstring
*
* The jstring is copied as UTF-8
*
* If an exception occurs, then JNIEnv::ExceptionCheck()
* will have been called
*
* @param env (IN) A pointer to the java environment
* @param js (IN) The java string to copy
* @param has_exception (OUT) will be set to JNI_TRUE
* if an OutOfMemoryError exception occurs
*
* @return A pointer to the copied string, or a
* nullptr if has_exception == JNI_TRUE
*/
static std::unique_ptr<char[]> copyString(JNIEnv* env, jstring js,
jboolean* has_exception) {
const char *utf = env->GetStringUTFChars(js, nullptr);
if(utf == nullptr) {
// exception thrown: OutOfMemoryError
env->ExceptionCheck();
*has_exception = JNI_TRUE;
return nullptr;
} else if(env->ExceptionCheck()) {
// exception thrown
env->ReleaseStringUTFChars(js, utf);
*has_exception = JNI_TRUE;
return nullptr;
}
const jsize utf_len = env->GetStringUTFLength(js);
std::unique_ptr<char[]> str(new char[utf_len + 1]); // Note: + 1 is needed for the c_str null terminator
std::strcpy(str.get(), utf);
env->ReleaseStringUTFChars(js, utf);
*has_exception = JNI_FALSE;
return str;
}
/** /**
* Copies a jstring to a std::string * Copies a jstring to a std::string
* and releases the original jstring * and releases the original jstring
@ -3003,8 +3109,8 @@ class JniUtil {
* @return A std:string copy of the jstring, or an * @return A std:string copy of the jstring, or an
* empty std::string if has_exception == JNI_TRUE * empty std::string if has_exception == JNI_TRUE
*/ */
static std::string copyString(JNIEnv* env, jstring js, static std::string copyStdString(JNIEnv* env, jstring js,
jboolean* has_exception) { jboolean* has_exception) {
const char *utf = env->GetStringUTFChars(js, nullptr); const char *utf = env->GetStringUTFChars(js, nullptr);
if(utf == nullptr) { if(utf == nullptr) {
// exception thrown: OutOfMemoryError // exception thrown: OutOfMemoryError

View File

@ -0,0 +1,27 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// This file implements the "bridge" between Java and C++ for
// JNI Callbacks from C++ to sub-classes or org.rocksdb.RocksCallbackObject
#include <jni.h>
#include "include/org_rocksdb_RocksCallbackObject.h"
#include "jnicallback.h"
/*
* Class: org_rocksdb_RocksCallbackObject
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_RocksCallbackObject_disposeInternal(
JNIEnv* env, jobject jobj, jlong handle) {
// TODO(AR) is deleting from the super class JniCallback OK, or must we delete the subclass?
// Example hierarchies:
// 1) Comparator -> BaseComparatorJniCallback + JniCallback -> DirectComparatorJniCallback
// 2) Comparator -> BaseComparatorJniCallback + JniCallback -> ComparatorJniCallback
// I think this is okay, as Comparator and JniCallback both have virtual destructors...
delete reinterpret_cast<rocksdb::JniCallback*>(handle);
}

View File

@ -20,16 +20,24 @@
/* /*
* Class: org_rocksdb_SstFileWriter * Class: org_rocksdb_SstFileWriter
* Method: newSstFileWriter * Method: newSstFileWriter
* Signature: (JJJ)J * Signature: (JJJZ)J
*/ */
jlong Java_org_rocksdb_SstFileWriter_newSstFileWriter__JJJ(JNIEnv *env, jclass jcls, jlong Java_org_rocksdb_SstFileWriter_newSstFileWriter__JJJZ(JNIEnv *env,
jlong jenvoptions, jclass jcls, jlong jenvoptions, jlong joptions, jlong jcomparator,
jlong joptions, jboolean is_direct) {
jlong jcomparator) {
auto *env_options = auto *env_options =
reinterpret_cast<const rocksdb::EnvOptions *>(jenvoptions); reinterpret_cast<const rocksdb::EnvOptions *>(jenvoptions);
auto *options = reinterpret_cast<const rocksdb::Options *>(joptions); auto *options = reinterpret_cast<const rocksdb::Options *>(joptions);
auto *comparator = reinterpret_cast<const rocksdb::Comparator *>(jcomparator);
rocksdb::Comparator *comparator = nullptr;
if(is_direct) {
comparator =
reinterpret_cast<rocksdb::DirectComparatorJniCallback*>(jcomparator);
} else {
comparator =
reinterpret_cast<rocksdb::ComparatorJniCallback*>(jcomparator);
}
rocksdb::SstFileWriter *sst_file_writer = rocksdb::SstFileWriter *sst_file_writer =
new rocksdb::SstFileWriter(*env_options, *options, comparator); new rocksdb::SstFileWriter(*env_options, *options, comparator);
return reinterpret_cast<jlong>(sst_file_writer); return reinterpret_cast<jlong>(sst_file_writer);

View File

@ -298,16 +298,3 @@ jlong Java_org_rocksdb_WriteBatch_00024Handler_createNewHandler0(
auto* wbjnic = new rocksdb::WriteBatchHandlerJniCallback(env, jobj); auto* wbjnic = new rocksdb::WriteBatchHandlerJniCallback(env, jobj);
return reinterpret_cast<jlong>(wbjnic); return reinterpret_cast<jlong>(wbjnic);
} }
/*
* Class: org_rocksdb_WriteBatch_Handler
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_WriteBatch_00024Handler_disposeInternal(
JNIEnv* env, jobject jobj, jlong handle) {
auto* wbjnic =
reinterpret_cast<rocksdb::WriteBatchHandlerJniCallback*>(handle);
assert(wbjnic != nullptr);
delete wbjnic;
}

View File

@ -39,14 +39,22 @@ jlong Java_org_rocksdb_WriteBatchWithIndex_newWriteBatchWithIndex__Z(
/* /*
* Class: org_rocksdb_WriteBatchWithIndex * Class: org_rocksdb_WriteBatchWithIndex
* Method: newWriteBatchWithIndex * Method: newWriteBatchWithIndex
* Signature: (JIZ)J * Signature: (JZIZ)J
*/ */
jlong Java_org_rocksdb_WriteBatchWithIndex_newWriteBatchWithIndex__JIZ( jlong Java_org_rocksdb_WriteBatchWithIndex_newWriteBatchWithIndex__JZIZ(
JNIEnv* env, jclass jcls, jlong jfallback_index_comparator_handle, JNIEnv* env, jclass jcls, jlong jfallback_index_comparator_handle,
jint jreserved_bytes, jboolean joverwrite_key) { jboolean is_direct, jint jreserved_bytes, jboolean joverwrite_key) {
rocksdb::Comparator *fallback_comparator = nullptr;
if(is_direct) {
fallback_comparator =
reinterpret_cast<rocksdb::DirectComparatorJniCallback*>(jfallback_index_comparator_handle);
} else {
fallback_comparator =
reinterpret_cast<rocksdb::ComparatorJniCallback*>(jfallback_index_comparator_handle);
}
auto* wbwi = auto* wbwi =
new rocksdb::WriteBatchWithIndex( new rocksdb::WriteBatchWithIndex(
reinterpret_cast<rocksdb::Comparator*>(jfallback_index_comparator_handle), fallback_comparator,
static_cast<size_t>(jreserved_bytes), static_cast<bool>(joverwrite_key)); static_cast<size_t>(jreserved_bytes), static_cast<bool>(joverwrite_key));
return reinterpret_cast<jlong>(wbwi); return reinterpret_cast<jlong>(wbwi);
} }

View File

@ -12,16 +12,7 @@
namespace rocksdb { namespace rocksdb {
WriteBatchHandlerJniCallback::WriteBatchHandlerJniCallback( WriteBatchHandlerJniCallback::WriteBatchHandlerJniCallback(
JNIEnv* env, jobject jWriteBatchHandler) JNIEnv* env, jobject jWriteBatchHandler)
: m_env(env) { : JniCallback(env, jWriteBatchHandler), m_env(env) {
// Note: we want to access the Java WriteBatchHandler instance
// across multiple method calls, so we create a global ref
assert(jWriteBatchHandler != nullptr);
m_jWriteBatchHandler = env->NewGlobalRef(jWriteBatchHandler);
if(m_jWriteBatchHandler == nullptr) {
// exception thrown: OutOfMemoryError
return;
}
m_jPutMethodId = WriteBatchHandlerJni::getPutMethodId(env); m_jPutMethodId = WriteBatchHandlerJni::getPutMethodId(env);
if(m_jPutMethodId == nullptr) { if(m_jPutMethodId == nullptr) {
@ -83,7 +74,7 @@ void WriteBatchHandlerJniCallback::Put(const Slice& key, const Slice& value) {
} }
m_env->CallVoidMethod( m_env->CallVoidMethod(
m_jWriteBatchHandler, m_jcallback_obj,
m_jPutMethodId, m_jPutMethodId,
j_key, j_key,
j_value); j_value);
@ -130,7 +121,7 @@ void WriteBatchHandlerJniCallback::Merge(const Slice& key, const Slice& value) {
} }
m_env->CallVoidMethod( m_env->CallVoidMethod(
m_jWriteBatchHandler, m_jcallback_obj,
m_jMergeMethodId, m_jMergeMethodId,
j_key, j_key,
j_value); j_value);
@ -165,7 +156,7 @@ void WriteBatchHandlerJniCallback::Delete(const Slice& key) {
} }
m_env->CallVoidMethod( m_env->CallVoidMethod(
m_jWriteBatchHandler, m_jcallback_obj,
m_jDeleteMethodId, m_jDeleteMethodId,
j_key); j_key);
if(m_env->ExceptionCheck()) { if(m_env->ExceptionCheck()) {
@ -202,7 +193,7 @@ void WriteBatchHandlerJniCallback::DeleteRange(const Slice& beginKey,
return; return;
} }
m_env->CallVoidMethod(m_jWriteBatchHandler, m_jDeleteRangeMethodId, m_env->CallVoidMethod(m_jcallback_obj, m_jDeleteRangeMethodId,
j_beginKey, j_endKey); j_beginKey, j_endKey);
if (m_env->ExceptionCheck()) { if (m_env->ExceptionCheck()) {
// exception thrown // exception thrown
@ -236,7 +227,7 @@ void WriteBatchHandlerJniCallback::LogData(const Slice& blob) {
} }
m_env->CallVoidMethod( m_env->CallVoidMethod(
m_jWriteBatchHandler, m_jcallback_obj,
m_jLogDataMethodId, m_jLogDataMethodId,
j_blob); j_blob);
if(m_env->ExceptionCheck()) { if(m_env->ExceptionCheck()) {
@ -255,7 +246,7 @@ void WriteBatchHandlerJniCallback::LogData(const Slice& blob) {
bool WriteBatchHandlerJniCallback::Continue() { bool WriteBatchHandlerJniCallback::Continue() {
jboolean jContinue = m_env->CallBooleanMethod( jboolean jContinue = m_env->CallBooleanMethod(
m_jWriteBatchHandler, m_jcallback_obj,
m_jContinueMethodId); m_jContinueMethodId);
if(m_env->ExceptionCheck()) { if(m_env->ExceptionCheck()) {
// exception thrown // exception thrown
@ -278,6 +269,9 @@ bool WriteBatchHandlerJniCallback::Continue() {
* exception occurs * exception occurs
*/ */
jbyteArray WriteBatchHandlerJniCallback::sliceToJArray(const Slice& s) { jbyteArray WriteBatchHandlerJniCallback::sliceToJArray(const Slice& s) {
// TODO(AR) move to JniUtil
jbyteArray ja = m_env->NewByteArray(static_cast<jsize>(s.size())); jbyteArray ja = m_env->NewByteArray(static_cast<jsize>(s.size()));
if(ja == nullptr) { if(ja == nullptr) {
// exception thrown: OutOfMemoryError // exception thrown: OutOfMemoryError
@ -297,10 +291,4 @@ jbyteArray WriteBatchHandlerJniCallback::sliceToJArray(const Slice& s) {
return ja; return ja;
} }
WriteBatchHandlerJniCallback::~WriteBatchHandlerJniCallback() {
if(m_jWriteBatchHandler != nullptr) {
m_env->DeleteGlobalRef(m_jWriteBatchHandler);
}
}
} // namespace rocksdb } // namespace rocksdb

View File

@ -10,6 +10,7 @@
#define JAVA_ROCKSJNI_WRITEBATCHHANDLERJNICALLBACK_H_ #define JAVA_ROCKSJNI_WRITEBATCHHANDLERJNICALLBACK_H_
#include <jni.h> #include <jni.h>
#include "rocksjni/jnicallback.h"
#include "rocksdb/write_batch.h" #include "rocksdb/write_batch.h"
namespace rocksdb { namespace rocksdb {
@ -20,11 +21,10 @@ namespace rocksdb {
* which calls the appropriate Java method. * which calls the appropriate Java method.
* This enables Write Batch Handlers to be implemented in Java. * This enables Write Batch Handlers to be implemented in Java.
*/ */
class WriteBatchHandlerJniCallback : public WriteBatch::Handler { class WriteBatchHandlerJniCallback : public JniCallback, public WriteBatch::Handler {
public: public:
WriteBatchHandlerJniCallback( WriteBatchHandlerJniCallback(
JNIEnv* env, jobject jWriteBackHandler); JNIEnv* env, jobject jWriteBackHandler);
~WriteBatchHandlerJniCallback();
void Put(const Slice& key, const Slice& value); void Put(const Slice& key, const Slice& value);
void Merge(const Slice& key, const Slice& value); void Merge(const Slice& key, const Slice& value);
void Delete(const Slice& key); void Delete(const Slice& key);
@ -34,7 +34,6 @@ class WriteBatchHandlerJniCallback : public WriteBatch::Handler {
private: private:
JNIEnv* m_env; JNIEnv* m_env;
jobject m_jWriteBatchHandler;
jbyteArray sliceToJArray(const Slice& s); jbyteArray sliceToJArray(const Slice& s);
jmethodID m_jPutMethodId; jmethodID m_jPutMethodId;
jmethodID m_jMergeMethodId; jmethodID m_jMergeMethodId;

View File

@ -14,6 +14,35 @@ package org.rocksdb;
public abstract class AbstractCompactionFilter<T extends AbstractSlice<?>> public abstract class AbstractCompactionFilter<T extends AbstractSlice<?>>
extends RocksObject { extends RocksObject {
public static class Context {
private final boolean fullCompaction;
private final boolean manualCompaction;
public Context(final boolean fullCompaction, final boolean manualCompaction) {
this.fullCompaction = fullCompaction;
this.manualCompaction = manualCompaction;
}
/**
* Does this compaction run include all data files
*
* @return true if this is a full compaction run
*/
public boolean isFullCompaction() {
return fullCompaction;
}
/**
* Is this compaction requested by the client,
* or is it occurring as an automatic compaction process
*
* @return true if the compaction was initiated by the client
*/
public boolean isManualCompaction() {
return manualCompaction;
}
}
protected AbstractCompactionFilter(final long nativeHandle) { protected AbstractCompactionFilter(final long nativeHandle) {
super(nativeHandle); super(nativeHandle);
} }

View File

@ -0,0 +1,75 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
package org.rocksdb;
/**
* Each compaction will create a new {@link AbstractCompactionFilter}
* allowing the application to know about different compactions
*
* @param <T> The concrete type of the compaction filter
*/
public abstract class AbstractCompactionFilterFactory<T extends AbstractCompactionFilter<?>>
extends RocksCallbackObject {
public AbstractCompactionFilterFactory() {
super(null);
}
@Override
protected long initializeNative(final long... nativeParameterHandles) {
return createNewCompactionFilterFactory0();
}
/**
* Called from JNI, see compaction_filter_factory_jnicallback.cc
*
* @param fullCompaction {@link AbstractCompactionFilter.Context#fullCompaction}
* @param manualCompaction {@link AbstractCompactionFilter.Context#manualCompaction}
*
* @return native handle of the CompactionFilter
*/
private long createCompactionFilter(final boolean fullCompaction,
final boolean manualCompaction) {
final T filter = createCompactionFilter(
new AbstractCompactionFilter.Context(fullCompaction, manualCompaction));
// CompactionFilterFactory::CreateCompactionFilter returns a std::unique_ptr
// which therefore has ownership of the underlying native object
filter.disOwnNativeHandle();
return filter.nativeHandle_;
}
/**
* Create a new compaction filter
*
* @param context The context describing the need for a new compaction filter
*
* @return A new instance of {@link AbstractCompactionFilter}
*/
public abstract T createCompactionFilter(
final AbstractCompactionFilter.Context context);
/**
* A name which identifies this compaction filter
*
* The name will be printed to the LOG file on start up for diagnosis
*/
public abstract String name();
/**
* We override {@link RocksCallbackObject#disposeInternal()}
* as disposing of a rocksdb::AbstractCompactionFilterFactory requires
* a slightly different approach as it is a std::shared_ptr
*/
@Override
protected void disposeInternal() {
disposeInternal(nativeHandle_);
}
private native long createNewCompactionFilterFactory0();
private native void disposeInternal(final long handle);
}

View File

@ -15,10 +15,10 @@ package org.rocksdb;
* @see org.rocksdb.DirectComparator * @see org.rocksdb.DirectComparator
*/ */
public abstract class AbstractComparator<T extends AbstractSlice<?>> public abstract class AbstractComparator<T extends AbstractSlice<?>>
extends AbstractImmutableNativeReference { extends RocksCallbackObject {
protected AbstractComparator() { protected AbstractComparator(final ComparatorOptions copt) {
super(true); super(copt.nativeHandle_);
} }
/** /**
@ -87,20 +87,4 @@ public abstract class AbstractComparator<T extends AbstractSlice<?>>
public String findShortSuccessor(final String key) { public String findShortSuccessor(final String key) {
return null; return null;
} }
/**
* Deletes underlying C++ comparator pointer.
*
* Note that this function should be called only after all
* RocksDB instances referencing the comparator are closed.
* Otherwise an undefined behavior will occur.
*/
@Override
protected void disposeInternal() {
disposeInternal(getNativeHandle());
}
protected abstract long getNativeHandle();
private native void disposeInternal(final long handle);
} }

View File

@ -130,7 +130,8 @@ public class ColumnFamilyOptions extends RocksObject
public ColumnFamilyOptions setComparator( public ColumnFamilyOptions setComparator(
final AbstractComparator<? extends AbstractSlice<?>> comparator) { final AbstractComparator<? extends AbstractSlice<?>> comparator) {
assert (isOwningHandle()); assert (isOwningHandle());
setComparatorHandle(nativeHandle_, comparator.getNativeHandle()); setComparatorHandle(nativeHandle_, comparator.nativeHandle_,
comparator instanceof DirectComparator);
comparator_ = comparator; comparator_ = comparator;
return this; return this;
} }
@ -153,6 +154,22 @@ public class ColumnFamilyOptions extends RocksObject
return this; return this;
} }
/**
* A single CompactionFilter instance to call into during compaction.
* Allows an application to modify/delete a key-value during background
* compaction.
*
* If the client requires a new compaction filter to be used for different
* compaction runs, it can specify call
* {@link #setCompactionFilterFactory(AbstractCompactionFilterFactory)}
* instead.
*
* The client should specify only set one of the two.
* {@link #setCompactionFilter(AbstractCompactionFilter)} takes precedence
* over {@link #setCompactionFilterFactory(AbstractCompactionFilterFactory)}
* if the client specifies both.
*/
//TODO(AR) need to set a note on the concurrency of the compaction filter used from this method
public ColumnFamilyOptions setCompactionFilter( public ColumnFamilyOptions setCompactionFilter(
final AbstractCompactionFilter<? extends AbstractSlice<?>> final AbstractCompactionFilter<? extends AbstractSlice<?>>
compactionFilter) { compactionFilter) {
@ -161,6 +178,22 @@ public class ColumnFamilyOptions extends RocksObject
return this; return this;
} }
/**
* This is a factory that provides {@link AbstractCompactionFilter} objects
* which allow an application to modify/delete a key-value during background
* compaction.
*
* A new filter will be created on each compaction run. If multithreaded
* compaction is being used, each created CompactionFilter will only be used
* from a single thread and so does not need to be thread-safe.
*/
public ColumnFamilyOptions setCompactionFilterFactory(final AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory) {
assert (isOwningHandle());
setCompactionFilterFactoryHandle(nativeHandle_, compactionFilterFactory.nativeHandle_);
compactionFilterFactory_ = compactionFilterFactory;
return this;
}
@Override @Override
public ColumnFamilyOptions setWriteBufferSize(final long writeBufferSize) { public ColumnFamilyOptions setWriteBufferSize(final long writeBufferSize) {
assert(isOwningHandle()); assert(isOwningHandle());
@ -761,11 +794,13 @@ public class ColumnFamilyOptions extends RocksObject
long memtableMemoryBudget); long memtableMemoryBudget);
private native void setComparatorHandle(long handle, int builtinComparator); private native void setComparatorHandle(long handle, int builtinComparator);
private native void setComparatorHandle(long optHandle, private native void setComparatorHandle(long optHandle,
long comparatorHandle); long comparatorHandle, boolean isDirect);
private native void setMergeOperatorName(long handle, String name); private native void setMergeOperatorName(long handle, String name);
private native void setMergeOperator(long handle, long mergeOperatorHandle); private native void setMergeOperator(long handle, long mergeOperatorHandle);
private native void setCompactionFilterHandle(long handle, private native void setCompactionFilterHandle(long handle,
long compactionFilterHandle); long compactionFilterHandle);
private native void setCompactionFilterFactoryHandle(long handle,
long compactionFilterFactoryHandle);
private native void setWriteBufferSize(long handle, long writeBufferSize) private native void setWriteBufferSize(long handle, long writeBufferSize)
throws IllegalArgumentException; throws IllegalArgumentException;
private native long writeBufferSize(long handle); private native long writeBufferSize(long handle);
@ -903,6 +938,8 @@ public class ColumnFamilyOptions extends RocksObject
private TableFormatConfig tableFormatConfig_; private TableFormatConfig tableFormatConfig_;
private AbstractComparator<? extends AbstractSlice<?>> comparator_; private AbstractComparator<? extends AbstractSlice<?>> comparator_;
private AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter_; private AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter_;
AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>>
compactionFilterFactory_;
private CompactionOptionsUniversal compactionOptionsUniversal_; private CompactionOptionsUniversal compactionOptionsUniversal_;
private CompactionOptionsFIFO compactionOptionsFIFO_; private CompactionOptionsFIFO compactionOptionsFIFO_;
private CompressionOptions compressionOptions_; private CompressionOptions compressionOptions_;

View File

@ -16,16 +16,13 @@ package org.rocksdb;
*/ */
public abstract class Comparator extends AbstractComparator<Slice> { public abstract class Comparator extends AbstractComparator<Slice> {
private final long nativeHandle_;
public Comparator(final ComparatorOptions copt) { public Comparator(final ComparatorOptions copt) {
super(); super(copt);
this.nativeHandle_ = createNewComparator0(copt.nativeHandle_);
} }
@Override @Override
protected final long getNativeHandle() { protected long initializeNative(final long... nativeParameterHandles) {
return nativeHandle_; return createNewComparator0(nativeParameterHandles[0]);
} }
private native long createNewComparator0(final long comparatorOptionsHandle); private native long createNewComparator0(final long comparatorOptionsHandle);

View File

@ -16,16 +16,13 @@ package org.rocksdb;
*/ */
public abstract class DirectComparator extends AbstractComparator<DirectSlice> { public abstract class DirectComparator extends AbstractComparator<DirectSlice> {
private final long nativeHandle_;
public DirectComparator(final ComparatorOptions copt) { public DirectComparator(final ComparatorOptions copt) {
super(); super(copt);
this.nativeHandle_ = createNewDirectComparator0(copt.nativeHandle_);
} }
@Override @Override
protected final long getNativeHandle() { protected long initializeNative(final long... nativeParameterHandles) {
return nativeHandle_; return createNewDirectComparator0(nativeParameterHandles[0]);
} }
private native long createNewDirectComparator0( private native long createNewDirectComparator0(

View File

@ -35,9 +35,10 @@ package org.rocksdb;
* {@link org.rocksdb.InfoLogLevel#FATAL_LEVEL}. * {@link org.rocksdb.InfoLogLevel#FATAL_LEVEL}.
* </p> * </p>
*/ */
public abstract class Logger extends AbstractImmutableNativeReference { public abstract class Logger extends RocksCallbackObject {
final long nativeHandle_; private final static long WITH_OPTIONS = 0;
private final static long WITH_DBOPTIONS = 1;
/** /**
* <p>AbstractLogger constructor.</p> * <p>AbstractLogger constructor.</p>
@ -49,8 +50,8 @@ public abstract class Logger extends AbstractImmutableNativeReference {
* @param options {@link org.rocksdb.Options} instance. * @param options {@link org.rocksdb.Options} instance.
*/ */
public Logger(final Options options) { public Logger(final Options options) {
super(true); super(options.nativeHandle_, WITH_OPTIONS);
this.nativeHandle_ = createNewLoggerOptions(options.nativeHandle_);
} }
/** /**
@ -63,8 +64,18 @@ public abstract class Logger extends AbstractImmutableNativeReference {
* @param dboptions {@link org.rocksdb.DBOptions} instance. * @param dboptions {@link org.rocksdb.DBOptions} instance.
*/ */
public Logger(final DBOptions dboptions) { public Logger(final DBOptions dboptions) {
super(true); super(dboptions.nativeHandle_, WITH_DBOPTIONS);
this.nativeHandle_ = createNewLoggerDbOptions(dboptions.nativeHandle_); }
@Override
protected long initializeNative(long... nativeParameterHandles) {
if(nativeParameterHandles[1] == WITH_OPTIONS) {
return createNewLoggerOptions(nativeParameterHandles[0]);
} else if(nativeParameterHandles[1] == WITH_DBOPTIONS) {
return createNewLoggerDbOptions(nativeParameterHandles[0]);
} else {
throw new IllegalArgumentException();
}
} }
/** /**
@ -89,17 +100,6 @@ public abstract class Logger extends AbstractImmutableNativeReference {
protected abstract void log(InfoLogLevel infoLogLevel, protected abstract void log(InfoLogLevel infoLogLevel,
String logMsg); String logMsg);
/**
* Deletes underlying C++ slice pointer.
* Note that this function should be called only after all
* RocksDB instances referencing the slice are closed.
* Otherwise an undefined behavior will occur.
*/
@Override
protected void disposeInternal() {
disposeInternal(nativeHandle_);
}
protected native long createNewLoggerOptions( protected native long createNewLoggerOptions(
long options); long options);
protected native long createNewLoggerDbOptions( protected native long createNewLoggerDbOptions(
@ -107,5 +107,16 @@ public abstract class Logger extends AbstractImmutableNativeReference {
protected native void setInfoLogLevel(long handle, protected native void setInfoLogLevel(long handle,
byte infoLogLevel); byte infoLogLevel);
protected native byte infoLogLevel(long handle); protected native byte infoLogLevel(long handle);
/**
* We override {@link RocksCallbackObject#disposeInternal()}
* as disposing of a rocksdb::LoggerJniCallback requires
* a slightly different approach as it is a std::shared_ptr
*/
@Override
protected void disposeInternal() {
disposeInternal(nativeHandle_);
}
private native void disposeInternal(final long handle); private native void disposeInternal(final long handle);
} }

View File

@ -169,7 +169,8 @@ public class Options extends RocksObject
public Options setComparator( public Options setComparator(
final AbstractComparator<? extends AbstractSlice<?>> comparator) { final AbstractComparator<? extends AbstractSlice<?>> comparator) {
assert(isOwningHandle()); assert(isOwningHandle());
setComparatorHandle(nativeHandle_, comparator.getNativeHandle()); setComparatorHandle(nativeHandle_, comparator.nativeHandle_,
comparator instanceof DirectComparator);
comparator_ = comparator; comparator_ = comparator;
return this; return this;
} }
@ -1733,7 +1734,7 @@ public class Options extends RocksObject
long memtableMemoryBudget); long memtableMemoryBudget);
private native void setComparatorHandle(long handle, int builtinComparator); private native void setComparatorHandle(long handle, int builtinComparator);
private native void setComparatorHandle(long optHandle, private native void setComparatorHandle(long optHandle,
long comparatorHandle); long comparatorHandle, boolean isDirect);
private native void setMergeOperatorName( private native void setMergeOperatorName(
long handle, String name); long handle, String name);
private native void setMergeOperator( private native void setMergeOperator(

View File

@ -0,0 +1,50 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
package org.rocksdb;
/**
* RocksCallbackObject is similar to {@link RocksObject} but varies
* in its construction as it is designed for Java objects which have functions
* which are called from C++ via JNI.
*
* RocksCallbackObject is the base-class any RocksDB classes that acts as a
* callback from some underlying underlying native C++ {@code rocksdb} object.
*
* The use of {@code RocksObject} should always be preferred over
* {@link RocksCallbackObject} if callbacks are not required.
*/
public abstract class RocksCallbackObject extends
AbstractImmutableNativeReference {
protected final long nativeHandle_;
protected RocksCallbackObject(final long... nativeParameterHandles) {
super(true);
this.nativeHandle_ = initializeNative(nativeParameterHandles);
}
/**
* Construct the Native C++ object which will callback
* to our object methods
*
* @param nativeParameterHandles An array of native handles for any parameter
* objects that are needed during construction
*
* @return The native handle of the C++ object which will callback to us
*/
protected abstract long initializeNative(
final long... nativeParameterHandles);
/**
* Deletes underlying C++ native callback object pointer
*/
@Override
protected void disposeInternal() {
disposeInternal(nativeHandle_);
}
private native void disposeInternal(final long handle);
}

View File

@ -30,7 +30,8 @@ public class SstFileWriter extends RocksObject {
public SstFileWriter(final EnvOptions envOptions, final Options options, public SstFileWriter(final EnvOptions envOptions, final Options options,
final AbstractComparator<? extends AbstractSlice<?>> comparator) { final AbstractComparator<? extends AbstractSlice<?>> comparator) {
super(newSstFileWriter( super(newSstFileWriter(
envOptions.nativeHandle_, options.nativeHandle_, comparator.getNativeHandle())); envOptions.nativeHandle_, options.nativeHandle_, comparator.nativeHandle_,
comparator instanceof DirectComparator));
} }
/** /**
@ -224,7 +225,7 @@ public void put(final byte[] key, final byte[] value)
private native static long newSstFileWriter( private native static long newSstFileWriter(
final long envOptionsHandle, final long optionsHandle, final long envOptionsHandle, final long optionsHandle,
final long userComparatorHandle); final long userComparatorHandle, final boolean isDirect);
private native static long newSstFileWriter(final long envOptionsHandle, private native static long newSstFileWriter(final long envOptionsHandle,
final long optionsHandle); final long optionsHandle);

View File

@ -112,11 +112,14 @@ public class WriteBatch extends AbstractWriteBatch {
* Handler callback for iterating over the contents of a batch. * Handler callback for iterating over the contents of a batch.
*/ */
public static abstract class Handler public static abstract class Handler
extends AbstractImmutableNativeReference { extends RocksCallbackObject {
private final long nativeHandle_;
public Handler() { public Handler() {
super(true); super(null);
this.nativeHandle_ = createNewHandler0(); }
@Override
protected long initializeNative(final long... nativeParameterHandles) {
return createNewHandler0();
} }
public abstract void put(byte[] key, byte[] value); public abstract void put(byte[] key, byte[] value);
@ -139,15 +142,6 @@ public class WriteBatch extends AbstractWriteBatch {
return true; return true;
} }
/**
* Deletes underlying C++ handler pointer.
*/
@Override
protected void disposeInternal() {
disposeInternal(nativeHandle_);
}
private native long createNewHandler0(); private native long createNewHandler0();
private native void disposeInternal(final long handle);
} }
} }

View File

@ -60,8 +60,8 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
final AbstractComparator<? extends AbstractSlice<?>> final AbstractComparator<? extends AbstractSlice<?>>
fallbackIndexComparator, final int reservedBytes, fallbackIndexComparator, final int reservedBytes,
final boolean overwriteKey) { final boolean overwriteKey) {
super(newWriteBatchWithIndex(fallbackIndexComparator.getNativeHandle(), super(newWriteBatchWithIndex(fallbackIndexComparator.nativeHandle_,
reservedBytes, overwriteKey)); fallbackIndexComparator instanceof DirectComparator, reservedBytes, overwriteKey));
} }
/** /**
@ -263,7 +263,7 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
private native static long newWriteBatchWithIndex(); private native static long newWriteBatchWithIndex();
private native static long newWriteBatchWithIndex(final boolean overwriteKey); private native static long newWriteBatchWithIndex(final boolean overwriteKey);
private native static long newWriteBatchWithIndex( private native static long newWriteBatchWithIndex(
final long fallbackIndexComparatorHandle, final int reservedBytes, final long fallbackIndexComparatorHandle, final boolean isDirect, final int reservedBytes,
final boolean overwriteKey); final boolean overwriteKey);
private native long iterator0(final long handle); private native long iterator0(final long handle);
private native long iterator1(final long handle, final long cfHandle); private native long iterator1(final long handle, final long cfHandle);

View File

@ -0,0 +1,78 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
package org.rocksdb;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
public class CompactionFilterFactoryTest {
@Rule
public TemporaryFolder dbFolder = new TemporaryFolder();
@Test
public void columnFamilyOptions_setCompactionFilterFactory()
throws RocksDBException {
try(final DBOptions options = new DBOptions()
.setCreateIfMissing(true)
.setCreateMissingColumnFamilies(true);
final RemoveEmptyValueCompactionFilterFactory compactionFilterFactory
= new RemoveEmptyValueCompactionFilterFactory();
final ColumnFamilyOptions new_cf_opts
= new ColumnFamilyOptions()
.setCompactionFilterFactory(compactionFilterFactory)) {
final List<ColumnFamilyDescriptor> cfNames = Arrays.asList(
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY),
new ColumnFamilyDescriptor("new_cf".getBytes(), new_cf_opts));
final List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
try (final RocksDB rocksDb = RocksDB.open(options,
dbFolder.getRoot().getAbsolutePath(), cfNames, cfHandles);
) {
try {
final byte[] key1 = "key1".getBytes();
final byte[] key2 = "key2".getBytes();
final byte[] value1 = "value1".getBytes();
final byte[] value2 = new byte[0];
rocksDb.put(cfHandles.get(1), key1, value1);
rocksDb.put(cfHandles.get(1), key2, value2);
rocksDb.compactRange(cfHandles.get(1));
assertThat(rocksDb.get(cfHandles.get(1), key1)).isEqualTo(value1);
assertThat(rocksDb.keyMayExist(cfHandles.get(1), key2, new StringBuilder())).isFalse();
} finally {
for (final ColumnFamilyHandle cfHandle : cfHandles) {
cfHandle.close();
}
}
}
}
}
private static class RemoveEmptyValueCompactionFilterFactory extends AbstractCompactionFilterFactory<RemoveEmptyValueCompactionFilter> {
@Override
public RemoveEmptyValueCompactionFilter createCompactionFilter(final AbstractCompactionFilter.Context context) {
return new RemoveEmptyValueCompactionFilter();
}
@Override
public String name() {
return "RemoveEmptyValueCompactionFilterFactory";
}
}
}

View File

@ -41,13 +41,18 @@ public class BytewiseComparatorTest {
final Path dbDir = Files.createTempDirectory("comparator_db_test"); final Path dbDir = Files.createTempDirectory("comparator_db_test");
try(final RocksDB db = openDatabase(dbDir, try(final RocksDB db = openDatabase(dbDir,
BuiltinComparator.BYTEWISE_COMPARATOR)) { BuiltinComparator.BYTEWISE_COMPARATOR)) {
final Random rnd = new Random(rand_seed); final Random rnd = new Random(rand_seed);
doRandomIterationTest( try(final ComparatorOptions copt2 = new ComparatorOptions();
db, final Comparator comparator2 = new BytewiseComparator(copt2)) {
toJavaComparator(new BytewiseComparator(new ComparatorOptions())), final java.util.Comparator<String> jComparator = toJavaComparator(comparator2);
rnd, doRandomIterationTest(
8, 100, 3 db,
); jComparator,
rnd,
8, 100, 3
);
}
} finally { } finally {
removeData(dbDir); removeData(dbDir);
} }
@ -63,15 +68,21 @@ public class BytewiseComparatorTest {
throws IOException, RocksDBException { throws IOException, RocksDBException {
for(int rand_seed = 301; rand_seed < 306; rand_seed++) { for(int rand_seed = 301; rand_seed < 306; rand_seed++) {
final Path dbDir = Files.createTempDirectory("comparator_db_test"); final Path dbDir = Files.createTempDirectory("comparator_db_test");
try(final RocksDB db = openDatabase(dbDir, new BytewiseComparator( try(final ComparatorOptions copt = new ComparatorOptions();
new ComparatorOptions()))) { final Comparator comparator = new BytewiseComparator(copt);
final RocksDB db = openDatabase(dbDir, comparator)) {
final Random rnd = new Random(rand_seed); final Random rnd = new Random(rand_seed);
doRandomIterationTest( try(final ComparatorOptions copt2 = new ComparatorOptions();
db, final Comparator comparator2 = new BytewiseComparator(copt2)) {
toJavaComparator(new BytewiseComparator(new ComparatorOptions())), final java.util.Comparator<String> jComparator = toJavaComparator(comparator2);
rnd, doRandomIterationTest(
8, 100, 3 db,
); jComparator,
rnd,
8, 100, 3
);
}
} finally { } finally {
removeData(dbDir); removeData(dbDir);
} }
@ -89,15 +100,18 @@ public class BytewiseComparatorTest {
final Path dbDir = Files.createTempDirectory("comparator_db_test"); final Path dbDir = Files.createTempDirectory("comparator_db_test");
try(final RocksDB db = openDatabase(dbDir, try(final RocksDB db = openDatabase(dbDir,
BuiltinComparator.BYTEWISE_COMPARATOR)) { BuiltinComparator.BYTEWISE_COMPARATOR)) {
final Random rnd = new Random(rand_seed); final Random rnd = new Random(rand_seed);
doRandomIterationTest( try(final ComparatorOptions copt2 = new ComparatorOptions();
db, final DirectComparator comparator2 = new DirectBytewiseComparator(copt2)) {
toJavaComparator(new DirectBytewiseComparator( final java.util.Comparator<String> jComparator = toJavaComparator(comparator2);
new ComparatorOptions()) doRandomIterationTest(
), db,
rnd, jComparator,
8, 100, 3 rnd,
); 8, 100, 3
);
}
} finally { } finally {
removeData(dbDir); removeData(dbDir);
} }
@ -113,17 +127,21 @@ public class BytewiseComparatorTest {
throws IOException, RocksDBException { throws IOException, RocksDBException {
for(int rand_seed = 301; rand_seed < 306; rand_seed++) { for(int rand_seed = 301; rand_seed < 306; rand_seed++) {
final Path dbDir = Files.createTempDirectory("comparator_db_test"); final Path dbDir = Files.createTempDirectory("comparator_db_test");
try(final RocksDB db = openDatabase(dbDir, new DirectBytewiseComparator( try (final ComparatorOptions copt = new ComparatorOptions();
new ComparatorOptions()))) { final DirectComparator comparator = new DirectBytewiseComparator(copt);
final RocksDB db = openDatabase(dbDir, comparator)) {
final Random rnd = new Random(rand_seed); final Random rnd = new Random(rand_seed);
doRandomIterationTest( try(final ComparatorOptions copt2 = new ComparatorOptions();
db, final DirectComparator comparator2 = new DirectBytewiseComparator(copt2)) {
toJavaComparator(new DirectBytewiseComparator( final java.util.Comparator<String> jComparator = toJavaComparator(comparator2);
new ComparatorOptions()) doRandomIterationTest(
), db,
rnd, jComparator,
8, 100, 3 rnd,
); 8, 100, 3
);
}
} finally { } finally {
removeData(dbDir); removeData(dbDir);
} }
@ -141,15 +159,18 @@ public class BytewiseComparatorTest {
final Path dbDir = Files.createTempDirectory("comparator_db_test"); final Path dbDir = Files.createTempDirectory("comparator_db_test");
try(final RocksDB db = openDatabase(dbDir, try(final RocksDB db = openDatabase(dbDir,
BuiltinComparator.REVERSE_BYTEWISE_COMPARATOR)) { BuiltinComparator.REVERSE_BYTEWISE_COMPARATOR)) {
final Random rnd = new Random(rand_seed); final Random rnd = new Random(rand_seed);
doRandomIterationTest( try(final ComparatorOptions copt2 = new ComparatorOptions();
db, final Comparator comparator2 = new ReverseBytewiseComparator(copt2)) {
toJavaComparator( final java.util.Comparator<String> jComparator = toJavaComparator(comparator2);
new ReverseBytewiseComparator(new ComparatorOptions()) doRandomIterationTest(
), db,
rnd, jComparator,
8, 100, 3 rnd,
); 8, 100, 3
);
}
} finally { } finally {
removeData(dbDir); removeData(dbDir);
} }
@ -163,20 +184,23 @@ public class BytewiseComparatorTest {
@Test @Test
public void java_vs_java_reverseBytewiseComparator() public void java_vs_java_reverseBytewiseComparator()
throws IOException, RocksDBException { throws IOException, RocksDBException {
for(int rand_seed = 301; rand_seed < 306; rand_seed++) { for(int rand_seed = 301; rand_seed < 306; rand_seed++) {
final Path dbDir = Files.createTempDirectory("comparator_db_test"); final Path dbDir = Files.createTempDirectory("comparator_db_test");
try(final RocksDB db = openDatabase(dbDir, new ReverseBytewiseComparator( try (final ComparatorOptions copt = new ComparatorOptions();
new ComparatorOptions()))) { final Comparator comparator = new ReverseBytewiseComparator(copt);
final RocksDB db = openDatabase(dbDir, comparator)) {
final Random rnd = new Random(rand_seed); final Random rnd = new Random(rand_seed);
doRandomIterationTest( try(final ComparatorOptions copt2 = new ComparatorOptions();
db, final Comparator comparator2 = new ReverseBytewiseComparator(copt2)) {
toJavaComparator( final java.util.Comparator<String> jComparator = toJavaComparator(comparator2);
new ReverseBytewiseComparator(new ComparatorOptions()) doRandomIterationTest(
), db,
rnd, jComparator,
8, 100, 3 rnd,
); 8, 100, 3
);
}
} finally { } finally {
removeData(dbDir); removeData(dbDir);
} }

4
src.mk
View File

@ -376,6 +376,8 @@ JNI_NATIVE_SOURCES = \
java/rocksjni/clock_cache.cc \ java/rocksjni/clock_cache.cc \
java/rocksjni/columnfamilyhandle.cc \ java/rocksjni/columnfamilyhandle.cc \
java/rocksjni/compaction_filter.cc \ java/rocksjni/compaction_filter.cc \
java/rocksjni/compaction_filter_factory.cc \
java/rocksjni/compaction_filter_factory_jnicallback.cc \
java/rocksjni/compaction_options_fifo.cc \ java/rocksjni/compaction_options_fifo.cc \
java/rocksjni/compaction_options_universal.cc \ java/rocksjni/compaction_options_universal.cc \
java/rocksjni/comparator.cc \ java/rocksjni/comparator.cc \
@ -386,6 +388,7 @@ JNI_NATIVE_SOURCES = \
java/rocksjni/ingest_external_file_options.cc \ java/rocksjni/ingest_external_file_options.cc \
java/rocksjni/filter.cc \ java/rocksjni/filter.cc \
java/rocksjni/iterator.cc \ java/rocksjni/iterator.cc \
java/rocksjni/jnicallback.cc \
java/rocksjni/loggerjnicallback.cc \ java/rocksjni/loggerjnicallback.cc \
java/rocksjni/lru_cache.cc \ java/rocksjni/lru_cache.cc \
java/rocksjni/memtablejni.cc \ java/rocksjni/memtablejni.cc \
@ -397,6 +400,7 @@ JNI_NATIVE_SOURCES = \
java/rocksjni/cassandra_compactionfilterjni.cc \ java/rocksjni/cassandra_compactionfilterjni.cc \
java/rocksjni/cassandra_value_operator.cc \ java/rocksjni/cassandra_value_operator.cc \
java/rocksjni/restorejni.cc \ java/rocksjni/restorejni.cc \
java/rocksjni/rocks_callback_object.cc \
java/rocksjni/rocksjni.cc \ java/rocksjni/rocksjni.cc \
java/rocksjni/rocksdb_exception_test.cc \ java/rocksjni/rocksdb_exception_test.cc \
java/rocksjni/slice.cc \ java/rocksjni/slice.cc \