diff --git a/java/Makefile b/java/Makefile index 33a1008d4..c8f443f7b 100644 --- a/java/Makefile +++ b/java/Makefile @@ -36,6 +36,8 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractComparator\ org.rocksdb.test.WriteBatchInternal\ org.rocksdb.test.WriteBatchTest\ org.rocksdb.WriteOptions\ + org.rocksdb.WriteBatchWithIndex\ + org.rocksdb.WBWIRocksIterator ROCKSDB_MAJOR = $(shell egrep "ROCKSDB_MAJOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3) ROCKSDB_MINOR = $(shell egrep "ROCKSDB_MINOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3) diff --git a/java/org/rocksdb/WBWIRocksIterator.java b/java/org/rocksdb/WBWIRocksIterator.java new file mode 100644 index 000000000..aafe3aca6 --- /dev/null +++ b/java/org/rocksdb/WBWIRocksIterator.java @@ -0,0 +1,125 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +package org.rocksdb; + +public class WBWIRocksIterator extends RocksObject implements RocksIteratorInterface { + + //TODO(AR) abstract common code from WBWIRocksIterator and RocksIterator into AbstractRocksIterator + + final WriteBatchWithIndex wbwi_; + + protected WBWIRocksIterator(WriteBatchWithIndex wbwi, long nativeHandle) { + super(); + nativeHandle_ = nativeHandle; + // rocksDB must point to a valid RocksDB instance. + assert (wbwi != null); + // WBWIRocksIterator must hold a reference to the related WriteBatchWithIndex instance + // to guarantee that while a GC cycle starts WBWIRocksIterator instances + // are freed prior to WriteBatchWithIndex instances. + wbwi_ = wbwi; + } + + @Override + public boolean isValid() { + return false; + } + + @Override + public void seekToFirst() { + + } + + @Override + public void seekToLast() { + + } + + @Override + public void seek(byte[] target) { + + } + + @Override + public void next() { + + } + + @Override + public void prev() { + + } + + /** + * Get the current entry + */ + public WriteEntry entry() { + throw new UnsupportedOperationException("NOT YET IMPLEMENTED"); //TODO(AR) implement + } + + @Override + public void status() throws RocksDBException { + + } + + /** + *

Deletes underlying C++ iterator pointer.

+ *

+ *

Note: the underlying handle can only be safely deleted if the WriteBatchWithIndex + * instance related to a certain WBWIRocksIterator is still valid and initialized. + * Therefore {@code disposeInternal()} checks if the WriteBatchWithIndex is initialized + * before freeing the native handle.

+ */ + @Override + protected void disposeInternal() { + synchronized (wbwi_) { + assert (isInitialized()); + if (wbwi_.isInitialized()) { + disposeInternal(nativeHandle_); + } + } + } + + private native void disposeInternal(long handle); + + /** + * Enumeration of the Write operation + * that created the record in the Write Batch + */ + public enum WriteType { + PutRecord, + MergeRecord, + DeleteRecord, + LogDataRecord + } + + /** + * Represents the entry returned by a + * WBWIRocksIterator + */ + public static class WriteEntry { + final WriteType type; + final Slice key; + final Slice value; + + public WriteEntry(final WriteType type, final Slice key, final Slice value) { + this.type = type; + this.key = key; + this.value = value; + } + + public WriteType getType() { + return type; + } + + public Slice getKey() { + return key; + } + + public Slice getValue() { + return value; + } + } +} diff --git a/java/org/rocksdb/WriteBatchWithIndex.java b/java/org/rocksdb/WriteBatchWithIndex.java new file mode 100644 index 000000000..bb42dc3d7 --- /dev/null +++ b/java/org/rocksdb/WriteBatchWithIndex.java @@ -0,0 +1,153 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +package org.rocksdb; + +/** + * Similar to {@link org.rocksdb.WriteBatch} but with a binary searchable + * index built for all the keys inserted. + * + * Calling put, merge, remove or putLogData calls the same function + * as with {@link org.rocksdb.WriteBatch} whilst also building an index. + * + * A user can call {@link org.rocksdb.WriteBatchWithIndex#newIterator() }to create an iterator + * over the write batch or + * {@link org.rocksdb.WriteBatchWithIndex#newIteratorWithBase(org.rocksdb.RocksIterator)} to + * get an iterator for the database with Read-Your-Own-Writes like capability + */ +public class WriteBatchWithIndex extends AbstractWriteBatch { + + //TODO(AR) need to cover directly passing WriteBatchWithIndex to {@see org.rocksdb.RocksDB#write(WriteBatch) + //this simplifies the Java API beyond the C++ API as you don't need to call + //GetWriteBatch on the WriteBatchWithIndex + + /** + * Creates a WriteBatchWithIndex where no bytes + * are reserved up-front, bytewise comparison is + * used for fallback key comparisons, + * and duplicate keys operations are retained + */ + public WriteBatchWithIndex() { + super(); + newWriteBatchWithIndex(); + } + + + /** + * Creates a WriteBatchWithIndex where no bytes + * are reserved up-front, bytewise comparison is + * used for fallback key comparisons, and duplicate key + * assignment is determined by the constructor argument + * + * @param overwriteKey if true, overwrite the key in the index when + * inserting a duplicate key, in this way an iterator will never + * show two entries with the same key. + */ + public WriteBatchWithIndex(boolean overwriteKey) { + super(); + newWriteBatchWithIndex(overwriteKey); + } + + /** + * Creates a WriteBatchWithIndex + * + * @param fallbackIndexComparator We fallback to this comparator + * to compare keys within a column family if we cannot determine + * the column family and so look up it's comparator. + * @param reservedBytes reserved bytes in underlying WriteBatch + * @param overwriteKey if true, overwrite the key in the index when + * inserting a duplicate key, in this way an iterator will never + * show two entries with the same key. + */ + public WriteBatchWithIndex(AbstractComparator fallbackIndexComparator, int reservedBytes, + boolean overwriteKey) { + super(); + newWriteBatchWithIndex(fallbackIndexComparator.nativeHandle_, reservedBytes, overwriteKey); + } + + /** + * Create an iterator of a column family. User can call + * {@link org.rocksdb.RocksIteratorInterface#seek(byte[])} to + * search to the next entry of or after a key. Keys will be iterated in the + * order given by index_comparator. For multiple updates on the same key, + * each update will be returned as a separate entry, in the order of update + * time. + * + * @param columnFamilyHandle The column family to iterate over + * @return An iterator for the Write Batch contents, restricted to the column family + */ + public WBWIRocksIterator newIterator(ColumnFamilyHandle columnFamilyHandle) { + return new WBWIRocksIterator(this, iterator1(columnFamilyHandle.nativeHandle_)); + } + + /** + * Create an iterator of the default column family. User can call + * {@link org.rocksdb.RocksIteratorInterface#seek(byte[])} to + * search to the next entry of or after a key. Keys will be iterated in the + * order given by index_comparator. For multiple updates on the same key, + * each update will be returned as a separate entry, in the order of update + * time. + * + * @return An iterator for the Write Batch contents + */ + public WBWIRocksIterator newIterator() { + return new WBWIRocksIterator(this, iterator0()); + } + + /** + * Provides Read-Your-Own-Writes like functionality by + * creating a new Iterator that will use {@link org.rocksdb.WBWIRocksIterator} + * as a delta and baseIterator as a base + * + * @param columnFamilyHandle The column family to iterate over + * @param baseIterator The base iterator, e.g. {@link org.rocksdb.RocksDB#newIterator()} + * @return An iterator which shows a view comprised of both the database point-in-time + * from baseIterator and modifications made in this write batch. + */ + public RocksIterator newIteratorWithBase(ColumnFamilyHandle columnFamilyHandle, + RocksIterator baseIterator) { + RocksIterator iterator = new RocksIterator( + baseIterator.rocksDB_, + iteratorWithBase(columnFamilyHandle.nativeHandle_, baseIterator.nativeHandle_)); + + //when the iterator is deleted it will also delete the baseIterator + baseIterator.disOwnNativeHandle(); + return iterator; + } + + /** + * Provides Read-Your-Own-Writes like functionality by + * creating a new Iterator that will use {@link org.rocksdb.WBWIRocksIterator} + * as a delta and baseIterator as a base. Operates on the default column family. + * + * @param baseIterator The base iterator, e.g. {@link org.rocksdb.RocksDB#newIterator()} + * @return An iterator which shows a view comprised of both the database point-in-time + * from baseIterator and modifications made in this write batch. + */ + public RocksIterator newIteratorWithBase(RocksIterator baseIterator) { + return newIteratorWithBase(baseIterator.rocksDB_.getDefaultColumnFamily(), baseIterator); + } + + @Override final native void disposeInternal(long handle); + @Override final native int count0(); + @Override final native void put(byte[] key, int keyLen, byte[] value, int valueLen); + @Override final native void put(byte[] key, int keyLen, byte[] value, int valueLen, + long cfHandle); + @Override final native void merge(byte[] key, int keyLen, byte[] value, int valueLen); + @Override final native void merge(byte[] key, int keyLen, byte[] value, int valueLen, + long cfHandle); + @Override final native void remove(byte[] key, int keyLen); + @Override final native void remove(byte[] key, int keyLen, long cfHandle); + @Override final native void putLogData(byte[] blob, int blobLen); + @Override final native void clear0(); + + private native void newWriteBatchWithIndex(); + private native void newWriteBatchWithIndex(boolean overwriteKey); + private native void newWriteBatchWithIndex(long fallbackIndexComparatorHandle, int reservedBytes, + boolean overwriteKey); + private native long iterator0(); + private native long iterator1(long cfHandle); + private native long iteratorWithBase(long baseIteratorHandle, long cfHandle); +} diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index 539e824e5..746dde539 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -19,6 +19,7 @@ #include "rocksdb/filter_policy.h" #include "rocksdb/status.h" #include "rocksdb/utilities/backupable_db.h" +#include "rocksdb/utilities/write_batch_with_index.h" #include "rocksjni/comparatorjnicallback.h" #include "rocksjni/writebatchhandlerjnicallback.h" @@ -390,6 +391,37 @@ class WriteBatchHandlerJni { } }; +class WriteBatchWithIndexJni { + public: + static jclass getJClass(JNIEnv* env) { + jclass jclazz = env->FindClass("org/rocksdb/WriteBatchWithIndex"); + assert(jclazz != nullptr); + return jclazz; + } + + static jfieldID getHandleFieldID(JNIEnv* env) { + static jfieldID fid = env->GetFieldID( + getJClass(env), "nativeHandle_", "J"); + assert(fid != nullptr); + return fid; + } + + // Get the pointer to rocksdb::WriteBatchWithIndex of the specified + // org.rocksdb.WriteBatchWithIndex. + static rocksdb::WriteBatchWithIndex* getHandle(JNIEnv* env, jobject jwbwi) { + return reinterpret_cast( + env->GetLongField(jwbwi, getHandleFieldID(env))); + } + + // Pass the rocksdb::WriteBatchWithIndex pointer to the java side. + static void setHandle(JNIEnv* env, jobject jwbwi, + rocksdb::WriteBatchWithIndex* wbwi) { + env->SetLongField( + jwbwi, getHandleFieldID(env), + reinterpret_cast(wbwi)); + } +}; + class HistogramDataJni { public: static jmethodID getConstructorMethodId(JNIEnv* env, jclass jclazz) { diff --git a/java/rocksjni/write_batch_with_index.cc b/java/rocksjni/write_batch_with_index.cc new file mode 100644 index 000000000..3d04b4ddd --- /dev/null +++ b/java/rocksjni/write_batch_with_index.cc @@ -0,0 +1,299 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// This file implements the "bridge" between Java and C++ and enables +// calling c++ rocksdb::WriteBatchWithIndex methods from Java side. + +#include "include/org_rocksdb_WriteBatchWithIndex.h" +#include "rocksjni/portal.h" +#include "rocksdb/comparator.h" +#include "rocksdb/utilities/write_batch_with_index.h" + +/* + * Class: org_rocksdb_WriteBatchWithIndex + * Method: newWriteBatchWithIndex + * Signature: ()V + */ +void Java_org_rocksdb_WriteBatchWithIndex_newWriteBatchWithIndex__( + JNIEnv* env, jobject jobj) { + rocksdb::WriteBatchWithIndex* wbwi = new rocksdb::WriteBatchWithIndex(); + rocksdb::WriteBatchWithIndexJni::setHandle(env, jobj, wbwi); +} + +/* + * Class: org_rocksdb_WriteBatchWithIndex + * Method: newWriteBatchWithIndex + * Signature: (Z)V + */ +void Java_org_rocksdb_WriteBatchWithIndex_newWriteBatchWithIndex__Z( + JNIEnv* env, jobject jobj, jboolean joverwrite_key) { + rocksdb::WriteBatchWithIndex* wbwi = + new rocksdb::WriteBatchWithIndex(rocksdb::BytewiseComparator(), 0, + static_cast(joverwrite_key)); + rocksdb::WriteBatchWithIndexJni::setHandle(env, jobj, wbwi); +} + +/* + * Class: org_rocksdb_WriteBatchWithIndex + * Method: newWriteBatchWithIndex + * Signature: (JIZ)V + */ +void Java_org_rocksdb_WriteBatchWithIndex_newWriteBatchWithIndex__JIZ( + JNIEnv* env, jobject jobj, jlong jfallback_index_comparator_handle, + jint jreserved_bytes, jboolean joverwrite_key) { + rocksdb::WriteBatchWithIndex* wbwi = + new rocksdb::WriteBatchWithIndex( + reinterpret_cast(jfallback_index_comparator_handle), + static_cast(jreserved_bytes), static_cast(joverwrite_key)); + rocksdb::WriteBatchWithIndexJni::setHandle(env, jobj, wbwi); +} + +/* + * Class: org_rocksdb_WriteBatchWithIndex + * Method: count + * Signature: ()I + */ +jint Java_org_rocksdb_WriteBatchWithIndex_count0( + JNIEnv* env, jobject jobj) { + rocksdb::WriteBatchWithIndex* wbwi = + rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj); + assert(wbwi != nullptr); + + return static_cast(wbwi->GetWriteBatch()->Count()); +} + +//TODO(AR) make generic with WriteBatch equivalent +/* + * Helper for WriteBatchWithIndex put operations + */ +void write_batch_with_index_put_helper( + JNIEnv* env, jobject jobj, + jbyteArray jkey, jint jkey_len, + jbyteArray jentry_value, jint jentry_value_len, + rocksdb::ColumnFamilyHandle* cf_handle) { + rocksdb::WriteBatchWithIndex* wbwi = + rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj); + assert(wbwi != nullptr); + + jbyte* key = env->GetByteArrayElements(jkey, nullptr); + jbyte* value = env->GetByteArrayElements(jentry_value, nullptr); + rocksdb::Slice key_slice(reinterpret_cast(key), jkey_len); + rocksdb::Slice value_slice(reinterpret_cast(value), + jentry_value_len); + if (cf_handle != nullptr) { + wbwi->Put(cf_handle, key_slice, value_slice); + } else { + // backwards compatibility + wbwi->Put(key_slice, value_slice); + } + env->ReleaseByteArrayElements(jkey, key, JNI_ABORT); + env->ReleaseByteArrayElements(jentry_value, value, JNI_ABORT); +} + +/* + * Class: org_rocksdb_WriteBatchWithIndex + * Method: put + * Signature: ([BI[BI)V + */ +void Java_org_rocksdb_WriteBatchWithIndex_put___3BI_3BI( + JNIEnv* env, jobject jobj, jbyteArray jkey, jint jkey_len, + jbyteArray jentry_value, jint jentry_value_len) { + write_batch_with_index_put_helper(env, jobj, jkey, jkey_len, jentry_value, + jentry_value_len, nullptr); +} + +/* + * Class: org_rocksdb_WriteBatchWithIndex + * Method: put + * Signature: ([BI[BIJ)V + */ +void Java_org_rocksdb_WriteBatchWithIndex_put___3BI_3BIJ( + JNIEnv* env, jobject jobj, jbyteArray jkey, jint jkey_len, + jbyteArray jentry_value, jint jentry_value_len, jlong jcf_handle) { + auto* cf_handle = reinterpret_cast(jcf_handle); + write_batch_with_index_put_helper(env, jobj, jkey, jkey_len, jentry_value, + jentry_value_len, cf_handle); +} + +//TODO(AR) make generic with WriteBatch equivalent +/* + * Helper for WriteBatchWithIndex merge operations + */ +void write_batch_with_index_merge_helper( + JNIEnv* env, jobject jobj, + jbyteArray jkey, jint jkey_len, + jbyteArray jentry_value, jint jentry_value_len, + rocksdb::ColumnFamilyHandle* cf_handle) { + rocksdb::WriteBatchWithIndex* wbwi = + rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj); + assert(wbwi != nullptr); + + jbyte* key = env->GetByteArrayElements(jkey, nullptr); + jbyte* value = env->GetByteArrayElements(jentry_value, nullptr); + rocksdb::Slice key_slice(reinterpret_cast(key), jkey_len); + rocksdb::Slice value_slice(reinterpret_cast(value), + jentry_value_len); + if (cf_handle != nullptr) { + wbwi->Merge(cf_handle, key_slice, value_slice); + } else { + // backwards compatibility + wbwi->Merge(key_slice, value_slice); + } + env->ReleaseByteArrayElements(jkey, key, JNI_ABORT); + env->ReleaseByteArrayElements(jentry_value, value, JNI_ABORT); +} + +/* + * Class: org_rocksdb_WriteBatchWithIndex + * Method: merge + * Signature: ([BI[BI)V + */ +void Java_org_rocksdb_WriteBatchWithIndex_merge___3BI_3BI( + JNIEnv* env, jobject jobj, jbyteArray jkey, jint jkey_len, + jbyteArray jentry_value, jint jentry_value_len) { + write_batch_with_index_merge_helper(env, jobj, jkey, jkey_len, + jentry_value, jentry_value_len, nullptr); +} + +/* + * Class: org_rocksdb_WriteBatchWithIndex + * Method: merge + * Signature: ([BI[BIJ)V + */ +void Java_org_rocksdb_WriteBatchWithIndex_merge___3BI_3BIJ( + JNIEnv* env, jobject jobj, jbyteArray jkey, jint jkey_len, + jbyteArray jentry_value, jint jentry_value_len, jlong jcf_handle) { + auto* cf_handle = reinterpret_cast(jcf_handle); + write_batch_with_index_merge_helper(env, jobj, jkey, jkey_len, + jentry_value, jentry_value_len, cf_handle); +} + +//TODO(AR) make generic with WriteBatch equivalent +/* + * Helper for write batch remove operations + */ +void write_batch_with_index_remove_helper( + JNIEnv* env, jobject jobj, + jbyteArray jkey, jint jkey_len, + rocksdb::ColumnFamilyHandle* cf_handle) { + rocksdb::WriteBatchWithIndex* wbwi = + rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj); + assert(wbwi != nullptr); + + jbyte* key = env->GetByteArrayElements(jkey, nullptr); + rocksdb::Slice key_slice(reinterpret_cast(key), jkey_len); + if (cf_handle != nullptr) { + wbwi->Delete(cf_handle, key_slice); + } else { + wbwi->Delete(key_slice); + } + env->ReleaseByteArrayElements(jkey, key, JNI_ABORT); +} + +/* + * Class: org_rocksdb_WriteBatchWithIndex + * Method: remove + * Signature: ([BI)V + */ +void Java_org_rocksdb_WriteBatchWithIndex_remove___3BI( + JNIEnv* env, jobject jobj, jbyteArray jkey, jint jkey_len) { + write_batch_with_index_remove_helper(env, jobj, jkey, jkey_len, nullptr); +} + +/* + * Class: org_rocksdb_WriteBatchWithIndex + * Method: remove + * Signature: ([BIJ)V + */ +void Java_org_rocksdb_WriteBatchWithIndex_remove___3BIJ( + JNIEnv* env, jobject jobj, + jbyteArray jkey, jint jkey_len, jlong jcf_handle) { + auto* cf_handle = reinterpret_cast(jcf_handle); + write_batch_with_index_remove_helper(env, jobj, jkey, jkey_len, cf_handle); +} + +/* + * Class: org_rocksdb_WriteBatchWithIndex + * Method: putLogData + * Signature: ([BI)V + */ +void Java_org_rocksdb_WriteBatchWithIndex_putLogData( + JNIEnv* env, jobject jobj, jbyteArray jblob, jint jblob_len) { + rocksdb::WriteBatchWithIndex* wbwi = + rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj); + assert(wbwi != nullptr); + + jbyte* blob = env->GetByteArrayElements(jblob, nullptr); + rocksdb::Slice blob_slice(reinterpret_cast(blob), jblob_len); + wbwi->PutLogData(blob_slice); + env->ReleaseByteArrayElements(jblob, blob, JNI_ABORT); +} + +/* + * Class: org_rocksdb_WriteBatchWithIndex + * Method: clear + * Signature: ()V + */ +void Java_org_rocksdb_WriteBatchWithIndex_clear0( + JNIEnv* env, jobject jobj) { + rocksdb::WriteBatchWithIndex* wbwi = + rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj); + assert(wbwi != nullptr); + + wbwi->GetWriteBatch()->Clear(); +} + +/* + * Class: org_rocksdb_WriteBatchWithIndex + * Method: iterator0 + * Signature: ()J + */ +jlong Java_org_rocksdb_WriteBatchWithIndex_iterator0( + JNIEnv* env, jobject jobj) { + rocksdb::WriteBatchWithIndex* wbwi = + rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj); + rocksdb::WBWIIterator* wbwi_iterator = wbwi->NewIterator(); + return reinterpret_cast(wbwi_iterator); +} + +/* + * Class: org_rocksdb_WriteBatchWithIndex + * Method: iterator1 + * Signature: (J)J + */ +jlong Java_org_rocksdb_WriteBatchWithIndex_iterator1( + JNIEnv* env, jobject jobj, jlong jcf_handle) { + rocksdb::WriteBatchWithIndex* wbwi = + rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj); + auto* cf_handle = reinterpret_cast(jcf_handle); + rocksdb::WBWIIterator* wbwi_iterator = wbwi->NewIterator(cf_handle); + return reinterpret_cast(wbwi_iterator); +} + +/* + * Class: org_rocksdb_WriteBatchWithIndex + * Method: iteratorWithBase + * Signature: (JJ)J + */ +jlong Java_org_rocksdb_WriteBatchWithIndex_iteratorWithBase( + JNIEnv* env, jobject jobj, jlong jcf_handle, jlong jbi_handle) { + rocksdb::WriteBatchWithIndex* wbwi = + rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj); + auto* cf_handle = reinterpret_cast(jcf_handle); + auto* base_iterator = reinterpret_cast(jbi_handle); + auto* iterator = wbwi->NewIteratorWithBase(cf_handle, base_iterator); + return reinterpret_cast(iterator); +} + +/* + * Class: org_rocksdb_WriteBatchWithIndex + * Method: disposeInternal + * Signature: (J)V + */ +void Java_org_rocksdb_WriteBatchWithIndex_disposeInternal( + JNIEnv* env, jobject jobj, jlong handle) { + auto* wbwi = reinterpret_cast(handle); + delete wbwi; +}