Merge pull request #363 from adamretter/write_batch-iterate

Iterator support for Write Batches
This commit is contained in:
Yueh-Hsuan Chiang 2014-11-10 22:12:11 -08:00
commit 26f0a78b03
7 changed files with 483 additions and 3 deletions

View File

@ -1,4 +1,4 @@
NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB org.rocksdb.Options org.rocksdb.DBOptions org.rocksdb.WriteBatch org.rocksdb.WriteBatchInternal org.rocksdb.WriteBatchTest org.rocksdb.WriteOptions org.rocksdb.BackupableDB org.rocksdb.BackupableDBOptions org.rocksdb.Statistics org.rocksdb.RocksIterator org.rocksdb.VectorMemTableConfig org.rocksdb.SkipListMemTableConfig org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashSkipListMemTableConfig org.rocksdb.PlainTableConfig org.rocksdb.BlockBasedTableConfig org.rocksdb.ReadOptions org.rocksdb.Filter org.rocksdb.BloomFilter org.rocksdb.ComparatorOptions org.rocksdb.AbstractComparator org.rocksdb.Comparator org.rocksdb.DirectComparator org.rocksdb.AbstractSlice org.rocksdb.Slice org.rocksdb.DirectSlice org.rocksdb.RestoreOptions org.rocksdb.RestoreBackupableDB org.rocksdb.RocksEnv org.rocksdb.GenericRateLimiterConfig org.rocksdb.ColumnFamilyHandle org.rocksdb.MergeOperator org.rocksdb.StringAppendOperator org.rocksdb.ComparatorOptions org.rocksdb.AbstractComparator org.rocksdb.Comparator org.rocksdb.DirectComparator org.rocksdb.AbstractSlice org.rocksdb.Slice org.rocksdb.DirectSlice
NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB org.rocksdb.Options org.rocksdb.DBOptions org.rocksdb.WriteBatch org.rocksdb.WriteBatch.Handler org.rocksdb.WriteBatchInternal org.rocksdb.WriteBatchTest org.rocksdb.WriteOptions org.rocksdb.BackupableDB org.rocksdb.BackupableDBOptions org.rocksdb.Statistics org.rocksdb.RocksIterator org.rocksdb.VectorMemTableConfig org.rocksdb.SkipListMemTableConfig org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashSkipListMemTableConfig org.rocksdb.PlainTableConfig org.rocksdb.BlockBasedTableConfig org.rocksdb.ReadOptions org.rocksdb.Filter org.rocksdb.BloomFilter org.rocksdb.ComparatorOptions org.rocksdb.AbstractComparator org.rocksdb.Comparator org.rocksdb.DirectComparator org.rocksdb.AbstractSlice org.rocksdb.Slice org.rocksdb.DirectSlice org.rocksdb.RestoreOptions org.rocksdb.RestoreBackupableDB org.rocksdb.RocksEnv org.rocksdb.GenericRateLimiterConfig org.rocksdb.ColumnFamilyHandle org.rocksdb.MergeOperator org.rocksdb.StringAppendOperator
ROCKSDB_MAJOR = $(shell egrep "ROCKSDB_MAJOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3)
ROCKSDB_MINOR = $(shell egrep "ROCKSDB_MINOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3)
@ -56,6 +56,7 @@ test: java
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.ComparatorOptionsTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.ComparatorTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.DirectComparatorTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.WriteBatchHandlerTest
@rm -rf /tmp/rocksdbjni_*
db_bench: java

View File

@ -5,8 +5,6 @@
package org.rocksdb;
import java.util.*;
/**
* WriteBatch holds a collection of updates to apply atomically to a DB.
*
@ -105,6 +103,18 @@ public class WriteBatch extends RocksObject {
putLogData(blob, blob.length);
}
/**
* Support for iterating over the contents of a batch.
*
* @param handler A handler that is called back for each
* update present in the batch
*
* @throws RocksDBException If we cannot iterate over the batch
*/
public void iterate(Handler handler) throws RocksDBException {
iterate(handler.nativeHandle_);
}
/**
* Clear all updates buffered in this batch
*/
@ -133,7 +143,46 @@ public class WriteBatch extends RocksObject {
private native void remove(byte[] key, int keyLen,
long cfHandle);
private native void putLogData(byte[] blob, int blobLen);
private native void iterate(long handlerHandle) throws RocksDBException;
private native void disposeInternal(long handle);
/**
* Handler callback for iterating over the contents of a batch.
*/
public static abstract class Handler extends RocksObject {
public Handler() {
super();
createNewHandler0();
}
public abstract void put(byte[] key, byte[] value);
public abstract void merge(byte[] key, byte[] value);
public abstract void delete(byte[] key);
public abstract void logData(byte[] blob);
/**
* shouldContinue is called by the underlying iterator
* WriteBatch::Iterate. If it returns false,
* iteration is halted. Otherwise, it continues
* iterating. The default implementation always
* returns true.
*/
public boolean shouldContinue() {
return true;
}
/**
* Deletes underlying C++ handler pointer.
*/
@Override
protected void disposeInternal() {
assert(isInitialized());
disposeInternal(nativeHandle_);
}
private native void createNewHandler0();
private native void disposeInternal(long handle);
}
}
/**

View File

@ -0,0 +1,162 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb.test;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class WriteBatchHandlerTest {
static {
RocksDB.loadLibrary();
}
public static void main(final String[] args) throws IOException, RocksDBException {
// setup test data
final List<Tuple<Action, Tuple<byte[], byte[]>>> testEvents = new ArrayList<>();
testEvents.add(new Tuple<>(Action.DELETE,
new Tuple<byte[], byte[]>("k0".getBytes(), null)));
testEvents.add(new Tuple<>(Action.PUT,
new Tuple<>("k1".getBytes(), "v1".getBytes())));
testEvents.add(new Tuple<>(Action.PUT,
new Tuple<>("k2".getBytes(), "v2".getBytes())));
testEvents.add(new Tuple<>(Action.PUT,
new Tuple<>("k3".getBytes(), "v3".getBytes())));
testEvents.add(new Tuple<>(Action.LOG,
new Tuple<byte[], byte[]>(null, "log1".getBytes())));
testEvents.add(new Tuple<>(Action.MERGE,
new Tuple<>("k2".getBytes(), "v22".getBytes())));
testEvents.add(new Tuple<>(Action.DELETE,
new Tuple<byte[], byte[]>("k3".getBytes(), null)));
// load test data to the write batch
final WriteBatch batch = new WriteBatch();
for(final Tuple<Action, Tuple<byte[], byte[]>> testEvent : testEvents) {
final Tuple<byte[], byte[]> data = testEvent.value;
switch(testEvent.key) {
case PUT:
batch.put(data.key, data.value);
break;
case MERGE:
batch.merge(data.key, data.value);
break;
case DELETE:
batch.remove(data.key);
break;
case LOG:
batch.putLogData(data.value);
break;
}
}
// attempt to read test data back from the WriteBatch by iterating with a handler
final CapturingWriteBatchHandler handler = new CapturingWriteBatchHandler();
batch.iterate(handler);
// compare the results to the test data
final List<Tuple<Action, Tuple<byte[], byte[]>>> actualEvents = handler.getEvents();
assert(testEvents.size() == actualEvents.size());
for(int i = 0; i < testEvents.size(); i++) {
assert(equals(testEvents.get(i), actualEvents.get(i)));
}
System.out.println("Passed WriteBatchHandler Test");
}
private static boolean equals(final Tuple<Action, Tuple<byte[], byte[]>> expected,
final Tuple<Action, Tuple<byte[], byte[]>> actual) {
if(!expected.key.equals(actual.key)) {
return false;
}
final Tuple<byte[], byte[]> expectedData = expected.value;
final Tuple<byte[], byte[]> actualData = actual.value;
if(equals(expectedData.key, actualData.key)) {
return equals(expectedData.value, actualData.value);
} else {
return false;
}
}
private static boolean equals(byte[] expected, byte[] actual) {
if(expected != null) {
return Arrays.equals(expected, actual);
} else {
return actual == null;
}
}
private static class Tuple<K, V> {
public final K key;
public final V value;
public Tuple(final K key, final V value) {
this.key = key;
this.value = value;
}
}
/**
* Enumeration of Write Batch
* event actions
*/
private enum Action {
PUT,
MERGE,
DELETE,
LOG
}
/**
* A simple WriteBatch Handler which adds a record
* of each event that it receives to a list
*/
private static class CapturingWriteBatchHandler extends WriteBatch.Handler {
private final List<Tuple<Action, Tuple<byte[], byte[]>>> events = new ArrayList<>();
/**
* Returns a copy of the current events list
*
* @return a list of the events which have happened upto now
*/
public List<Tuple<Action, Tuple<byte[], byte[]>>> getEvents() {
return new ArrayList<>(events);
}
@Override
public void put(final byte[] key, final byte[] value) {
events.add(new Tuple<>(Action.PUT, new Tuple<>(key, value)));
}
@Override
public void merge(final byte[] key, final byte[] value) {
events.add(new Tuple<>(Action.MERGE, new Tuple<>(key, value)));
}
@Override
public void delete(final byte[] key) {
events.add(new Tuple<>(Action.DELETE, new Tuple<byte[], byte[]>(key, null)));
}
@Override
public void logData(final byte[] blob) {
events.add(new Tuple<>(Action.LOG, new Tuple<byte[], byte[]>(null, blob)));
}
}
}

View File

@ -20,6 +20,7 @@
#include "rocksdb/status.h"
#include "rocksdb/utilities/backupable_db.h"
#include "rocksjni/comparatorjnicallback.h"
#include "rocksjni/writebatchhandlerjnicallback.h"
namespace rocksdb {
@ -288,6 +289,79 @@ class WriteBatchJni {
}
};
class WriteBatchHandlerJni {
public:
static jclass getJClass(JNIEnv* env) {
jclass jclazz = env->FindClass("org/rocksdb/WriteBatch$Handler");
assert(jclazz != nullptr);
return jclazz;
}
static jfieldID getHandleFieldID(JNIEnv* env) {
static jfieldID fid = env->GetFieldID(
getJClass(env), "nativeHandle_", "J");
assert(fid != nullptr);
return fid;
}
// Get the java method `put` of org.rocksdb.WriteBatch.Handler.
static jmethodID getPutMethodId(JNIEnv* env) {
static jmethodID mid = env->GetMethodID(
getJClass(env), "put", "([B[B)V");
assert(mid != nullptr);
return mid;
}
// Get the java method `merge` of org.rocksdb.WriteBatch.Handler.
static jmethodID getMergeMethodId(JNIEnv* env) {
static jmethodID mid = env->GetMethodID(
getJClass(env), "merge", "([B[B)V");
assert(mid != nullptr);
return mid;
}
// Get the java method `delete` of org.rocksdb.WriteBatch.Handler.
static jmethodID getDeleteMethodId(JNIEnv* env) {
static jmethodID mid = env->GetMethodID(
getJClass(env), "delete", "([B)V");
assert(mid != nullptr);
return mid;
}
// Get the java method `logData` of org.rocksdb.WriteBatch.Handler.
static jmethodID getLogDataMethodId(JNIEnv* env) {
static jmethodID mid = env->GetMethodID(
getJClass(env), "logData", "([B)V");
assert(mid != nullptr);
return mid;
}
// Get the java method `shouldContinue` of org.rocksdb.WriteBatch.Handler.
static jmethodID getContinueMethodId(JNIEnv* env) {
static jmethodID mid = env->GetMethodID(
getJClass(env), "shouldContinue", "()Z");
assert(mid != nullptr);
return mid;
}
// Get the pointer to rocksdb::WriteBatchHandlerJniCallback of the specified
// org.rocksdb.WriteBatchHandler.
static rocksdb::WriteBatchHandlerJniCallback* getHandle(
JNIEnv* env, jobject jobj) {
return reinterpret_cast<rocksdb::WriteBatchHandlerJniCallback*>(
env->GetLongField(jobj, getHandleFieldID(env)));
}
// Pass the rocksdb::WriteBatchHandlerJniCallback pointer to the java side.
static void setHandle(
JNIEnv* env, jobject jobj,
const rocksdb::WriteBatchHandlerJniCallback* op) {
env->SetLongField(
jobj, getHandleFieldID(env),
reinterpret_cast<jlong>(op));
}
};
class HistogramDataJni {
public:
static jmethodID getConstructorMethodId(JNIEnv* env, jclass jclazz) {

View File

@ -8,13 +8,16 @@
#include <memory>
#include "include/org_rocksdb_WriteBatch.h"
#include "include/org_rocksdb_WriteBatch_Handler.h"
#include "include/org_rocksdb_WriteBatchInternal.h"
#include "include/org_rocksdb_WriteBatchTest.h"
#include "rocksjni/portal.h"
#include "rocksjni/writebatchhandlerjnicallback.h"
#include "rocksdb/db.h"
#include "rocksdb/immutable_options.h"
#include "db/memtable.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/status.h"
#include "db/write_batch_internal.h"
#include "rocksdb/env.h"
#include "rocksdb/memtablerep.h"
@ -224,6 +227,25 @@ void Java_org_rocksdb_WriteBatch_putLogData(
env->ReleaseByteArrayElements(jblob, blob, JNI_ABORT);
}
/*
* Class: org_rocksdb_WriteBatch
* Method: iterate
* Signature: (J)V
*/
void Java_org_rocksdb_WriteBatch_iterate(
JNIEnv* env , jobject jobj, jlong handlerHandle) {
rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj);
assert(wb != nullptr);
rocksdb::Status s = wb->Iterate(
reinterpret_cast<rocksdb::WriteBatchHandlerJniCallback*>(handlerHandle));
if (s.ok()) {
return;
}
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
}
/*
* Class: org_rocksdb_WriteBatch
* Method: disposeInternal
@ -276,6 +298,28 @@ void Java_org_rocksdb_WriteBatchInternal_append(
rocksdb::WriteBatchInternal::Append(wb1, wb2);
}
/*
* Class: org_rocksdb_WriteBatch_Handler
* Method: createNewHandler0
* Signature: ()V
*/
void Java_org_rocksdb_WriteBatch_00024Handler_createNewHandler0(
JNIEnv* env, jobject jobj) {
const rocksdb::WriteBatchHandlerJniCallback* h =
new rocksdb::WriteBatchHandlerJniCallback(env, jobj);
rocksdb::WriteBatchHandlerJni::setHandle(env, jobj, h);
}
/*
* Class: org_rocksdb_WriteBatch_Handler
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_WriteBatch_00024Handler_disposeInternal(
JNIEnv* env, jobject jobj, jlong handle) {
delete reinterpret_cast<rocksdb::WriteBatchHandlerJniCallback*>(handle);
}
/*
* Class: org_rocksdb_WriteBatchTest
* Method: getContents

View File

@ -0,0 +1,104 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// This file implements the callback "bridge" between Java and C++ for
// rocksdb::Comparator.
#include "rocksjni/writebatchhandlerjnicallback.h"
#include "rocksjni/portal.h"
namespace rocksdb {
WriteBatchHandlerJniCallback::WriteBatchHandlerJniCallback(
JNIEnv* env, jobject jWriteBatchHandler)
: m_env(env) {
// Note: we want to access the Java WriteBatchHandler instance
// across multiple method calls, so we create a global ref
m_jWriteBatchHandler = env->NewGlobalRef(jWriteBatchHandler);
m_jPutMethodId = WriteBatchHandlerJni::getPutMethodId(env);
m_jMergeMethodId = WriteBatchHandlerJni::getMergeMethodId(env);
m_jDeleteMethodId = WriteBatchHandlerJni::getDeleteMethodId(env);
m_jLogDataMethodId = WriteBatchHandlerJni::getLogDataMethodId(env);
m_jContinueMethodId = WriteBatchHandlerJni::getContinueMethodId(env);
}
void WriteBatchHandlerJniCallback::Put(const Slice& key, const Slice& value) {
const jbyteArray j_key = sliceToJArray(key);
const jbyteArray j_value = sliceToJArray(value);
m_env->CallVoidMethod(
m_jWriteBatchHandler,
m_jPutMethodId,
j_key,
j_value);
m_env->DeleteLocalRef(j_value);
m_env->DeleteLocalRef(j_key);
}
void WriteBatchHandlerJniCallback::Merge(const Slice& key, const Slice& value) {
const jbyteArray j_key = sliceToJArray(key);
const jbyteArray j_value = sliceToJArray(value);
m_env->CallVoidMethod(
m_jWriteBatchHandler,
m_jMergeMethodId,
j_key,
j_value);
m_env->DeleteLocalRef(j_value);
m_env->DeleteLocalRef(j_key);
}
void WriteBatchHandlerJniCallback::Delete(const Slice& key) {
const jbyteArray j_key = sliceToJArray(key);
m_env->CallVoidMethod(
m_jWriteBatchHandler,
m_jDeleteMethodId,
j_key);
m_env->DeleteLocalRef(j_key);
}
void WriteBatchHandlerJniCallback::LogData(const Slice& blob) {
const jbyteArray j_blob = sliceToJArray(blob);
m_env->CallVoidMethod(
m_jWriteBatchHandler,
m_jLogDataMethodId,
j_blob);
m_env->DeleteLocalRef(j_blob);
}
bool WriteBatchHandlerJniCallback::Continue() {
jboolean jContinue = m_env->CallBooleanMethod(
m_jWriteBatchHandler,
m_jContinueMethodId);
return static_cast<bool>(jContinue == JNI_TRUE);
}
/*
* Creates a Java Byte Array from the data in a Slice
*
* When calling this function
* you must remember to call env->DeleteLocalRef
* on the result after you have finished with it
*/
jbyteArray WriteBatchHandlerJniCallback::sliceToJArray(const Slice& s) {
jbyteArray ja = m_env->NewByteArray(s.size());
m_env->SetByteArrayRegion(
ja, 0, s.size(),
reinterpret_cast<const jbyte*>(s.data()));
return ja;
}
WriteBatchHandlerJniCallback::~WriteBatchHandlerJniCallback() {
m_env->DeleteGlobalRef(m_jWriteBatchHandler);
}
} // namespace rocksdb

View File

@ -0,0 +1,46 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// This file implements the callback "bridge" between Java and C++ for
// rocksdb::WriteBatch::Handler.
#ifndef JAVA_ROCKSJNI_WRITEBATCHHANDLERJNICALLBACK_H_
#define JAVA_ROCKSJNI_WRITEBATCHHANDLERJNICALLBACK_H_
#include <jni.h>
#include "rocksdb/write_batch.h"
namespace rocksdb {
/**
* This class acts as a bridge between C++
* and Java. The methods in this class will be
* called back from the RocksDB storage engine (C++)
* which calls the appropriate Java method.
* This enables Write Batch Handlers to be implemented in Java.
*/
class WriteBatchHandlerJniCallback : public WriteBatch::Handler {
public:
WriteBatchHandlerJniCallback(
JNIEnv* env, jobject jWriteBackHandler);
~WriteBatchHandlerJniCallback();
void Put(const Slice& key, const Slice& value);
void Merge(const Slice& key, const Slice& value);
void Delete(const Slice& key);
void LogData(const Slice& blob);
bool Continue();
private:
JNIEnv* m_env;
jobject m_jWriteBatchHandler;
jbyteArray sliceToJArray(const Slice& s);
jmethodID m_jPutMethodId;
jmethodID m_jMergeMethodId;
jmethodID m_jDeleteMethodId;
jmethodID m_jLogDataMethodId;
jmethodID m_jContinueMethodId;
};
} // namespace rocksdb
#endif // JAVA_ROCKSJNI_WRITEBATCHHANDLERJNICALLBACK_H_