From 9a456fba2088f2bd66a5b319c630300ea6da2aba Mon Sep 17 00:00:00 2001 From: fyrz Date: Sun, 25 Jan 2015 22:05:29 +0100 Subject: [PATCH 1/5] [RocksJava] GetUpdatesSince support --- java/Makefile | 2 + java/org/rocksdb/RocksDB.java | 25 ++++ java/org/rocksdb/TransactionLogIterator.java | 116 ++++++++++++++++++ java/org/rocksdb/WriteBatch.java | 6 + .../test/TransactionLogIteratorTest.java | 79 ++++++++++++ java/rocksjni/rocksjni.cc | 24 ++++ java/rocksjni/transaction_log.cc | 79 ++++++++++++ 7 files changed, 331 insertions(+) create mode 100644 java/org/rocksdb/TransactionLogIterator.java create mode 100644 java/org/rocksdb/test/TransactionLogIteratorTest.java create mode 100644 java/rocksjni/transaction_log.cc diff --git a/java/Makefile b/java/Makefile index 42f465e10..97f0b0244 100644 --- a/java/Makefile +++ b/java/Makefile @@ -29,6 +29,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractComparator\ org.rocksdb.SkipListMemTableConfig\ org.rocksdb.Slice\ org.rocksdb.Statistics\ + org.rocksdb.TransactionLogIterator\ org.rocksdb.TtlDB\ org.rocksdb.VectorMemTableConfig\ org.rocksdb.StringAppendOperator\ @@ -81,6 +82,7 @@ JAVA_TESTS = org.rocksdb.test.BackupableDBOptionsTest\ org.rocksdb.test.SizeUnitTest\ org.rocksdb.test.SliceTest\ org.rocksdb.test.SnapshotTest\ + org.rocksdb.test.TransactionLogIteratorTest\ org.rocksdb.test.TtlDBTest\ org.rocksdb.test.StatisticsCollectorTest\ org.rocksdb.test.WriteBatchHandlerTest\ diff --git a/java/org/rocksdb/RocksDB.java b/java/org/rocksdb/RocksDB.java index ac02860e8..96032165e 100644 --- a/java/org/rocksdb/RocksDB.java +++ b/java/org/rocksdb/RocksDB.java @@ -1588,6 +1588,29 @@ public class RocksDB extends RocksObject { columnFamilyHandle.nativeHandle_); } + /** + *

Returns an iterator that is positioned at a write-batch containing + * seq_number. If the sequence number is non existent, it returns an iterator + * at the first available seq_no after the requested seq_no.

+ * + *

Must set WAL_ttl_seconds or WAL_size_limit_MB to large values to + * use this api, else the WAL files will get + * cleared aggressively and the iterator might keep getting invalid before + * an update is read.

+ * + * @param sequenceNumber sequence number offset + * + * @return {@link org.rocksdb.TransactionLogIterator} instance. + * + * @throws org.rocksdb.RocksDBException if iterator cannot be retrieved + * from native-side. + */ + public TransactionLogIterator getUpdatesSince(long sequenceNumber) + throws RocksDBException { + return new TransactionLogIterator( + getUpdatesSince(nativeHandle_, sequenceNumber)); + } + /** * Private constructor. */ @@ -1730,6 +1753,8 @@ public class RocksDB extends RocksObject { private native void compactRange(long handle, byte[] begin, int beginLen, byte[] end, int endLen, boolean reduce_level, int target_level, int target_path_id, long cfHandle) throws RocksDBException; + private native long getUpdatesSince(long handle, long sequenceNumber) + throws RocksDBException; protected DBOptionsInterface options_; } diff --git a/java/org/rocksdb/TransactionLogIterator.java b/java/org/rocksdb/TransactionLogIterator.java new file mode 100644 index 000000000..8de61aa00 --- /dev/null +++ b/java/org/rocksdb/TransactionLogIterator.java @@ -0,0 +1,116 @@ +package org.rocksdb; + +/** + *

A TransactionLogIterator is used to iterate over the transactions in a db. + * One run of the iterator is continuous, i.e. the iterator will stop at the + * beginning of any gap in sequences.

+ */ +public class TransactionLogIterator extends RocksObject { + + /** + *

An iterator is either positioned at a WriteBatch + * or not valid. This method returns true if the iterator + * is valid. Can read data from a valid iterator.

+ * + * @return true if iterator position is valid. + */ + public boolean isValid() { + return isValid(nativeHandle_); + } + + /** + *

Moves the iterator to the next WriteBatch. + * REQUIRES: Valid() to be true.

+ */ + public void next() { + assert(isValid()); + next(nativeHandle_); + } + + /** + *

Throws RocksDBException if something went wrong.

+ * + * @throws org.rocksdb.RocksDBException if something went + * wrong in the underlying C++ code. + */ + public void status() throws RocksDBException { + status(nativeHandle_); + } + + /** + *

If iterator position is valid, return the current + * write_batch and the sequence number of the earliest + * transaction contained in the batch.

+ * + *

ONLY use if Valid() is true and status() is OK.

+ * + * @return {@link org.rocksdb.TransactionLogIterator.BatchResult} + * instance. + */ + public BatchResult getBatch() { + assert(isValid()); + return getBatch(nativeHandle_); + } + + /** + *

TransactionLogIterator constructor.

+ * + * @param nativeHandle address to native address. + */ + TransactionLogIterator(long nativeHandle) { + super(); + nativeHandle_ = nativeHandle; + } + + @Override protected void disposeInternal() { + disposeInternal(nativeHandle_); + } + + /** + *

BatchResult represents a data structure returned + * by a TransactionLogIterator containing a sequence + * number and a {@link WriteBatch} instance.

+ */ + public class BatchResult { + /** + *

Constructor of BatchResult class.

+ * + * @param sequenceNumber related to this BatchResult instance. + * @param nativeHandle to {@link org.rocksdb.WriteBatch} + * native instance. + */ + public BatchResult(long sequenceNumber, long nativeHandle) { + sequenceNumber_ = sequenceNumber; + writeBatch_ = new WriteBatch(nativeHandle); + } + + /** + *

Return sequence number related to this BatchResult.

+ * + * @return Sequence number. + */ + public long sequenceNumber() { + return sequenceNumber_; + } + + /** + *

Return contained {@link org.rocksdb.WriteBatch} + * instance

+ * + * @return {@link org.rocksdb.WriteBatch} instance. + */ + public WriteBatch writeBatch() { + return writeBatch_; + } + + private final long sequenceNumber_; + private final WriteBatch writeBatch_; + } + + private native void disposeInternal(long handle); + private native boolean isValid(long handle); + private native void next(long handle); + private native void status(long handle) + throws RocksDBException; + private native BatchResult getBatch(long handle); +} diff --git a/java/org/rocksdb/WriteBatch.java b/java/org/rocksdb/WriteBatch.java index 24133ec39..fd8b894cb 100644 --- a/java/org/rocksdb/WriteBatch.java +++ b/java/org/rocksdb/WriteBatch.java @@ -53,6 +53,12 @@ public class WriteBatch extends AbstractWriteBatch { iterate(handler.nativeHandle_); } + WriteBatch(long nativeHandle) { + super(); + disOwnNativeHandle(); + nativeHandle_ = nativeHandle; + } + @Override final native void disposeInternal(long handle); @Override final native int count0(); @Override final native void put(byte[] key, int keyLen, byte[] value, int valueLen); diff --git a/java/org/rocksdb/test/TransactionLogIteratorTest.java b/java/org/rocksdb/test/TransactionLogIteratorTest.java new file mode 100644 index 000000000..2069e1200 --- /dev/null +++ b/java/org/rocksdb/test/TransactionLogIteratorTest.java @@ -0,0 +1,79 @@ +package org.rocksdb.test; + +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.*; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TransactionLogIteratorTest { + @ClassRule + public static final RocksMemoryResource rocksMemoryResource = + new RocksMemoryResource(); + + @Rule + public TemporaryFolder dbFolder = new TemporaryFolder(); + + @Test + public void transactionLogIterator() throws RocksDBException { + RocksDB db = null; + Options options = null; + TransactionLogIterator transactionLogIterator = null; + try { + options = new Options(). + setCreateIfMissing(true); + db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath()); + transactionLogIterator = db.getUpdatesSince(0); + } finally { + if (transactionLogIterator != null) { + transactionLogIterator.dispose(); + } + if (db != null) { + db.close(); + } + if (options != null) { + options.dispose(); + } + } + } + + @Test + public void getBatch() throws RocksDBException { + RocksDB db = null; + Options options = null; + TransactionLogIterator transactionLogIterator = null; + try { + options = new Options(). + setCreateIfMissing(true). + setWalTtlSeconds(1000). + setWalSizeLimitMB(10); + + db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath()); + + for (int i = 0; i < 250; i++){ + db.put(String.valueOf(i).getBytes(), + String.valueOf(i).getBytes()); + } + db.flush(new FlushOptions().setWaitForFlush(true)); + transactionLogIterator = db.getUpdatesSince(0); + assertThat(transactionLogIterator.isValid()).isTrue(); + transactionLogIterator.status(); + + TransactionLogIterator.BatchResult batchResult = + transactionLogIterator.getBatch(); + assertThat(batchResult.sequenceNumber()).isEqualTo(1); + } finally { + if (transactionLogIterator != null) { + transactionLogIterator.dispose(); + } + if (db != null) { + db.close(); + } + if (options != null) { + options.dispose(); + } + } + } +} diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc index 1055f87fe..9f5b9446e 100644 --- a/java/rocksjni/rocksjni.cc +++ b/java/rocksjni/rocksjni.cc @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -16,6 +17,7 @@ #include "rocksjni/portal.h" #include "rocksdb/db.h" #include "rocksdb/cache.h" +#include "rocksdb/types.h" ////////////////////////////////////////////////////////////////////////////// // rocksdb::DB::Open @@ -1598,3 +1600,25 @@ void Java_org_rocksdb_RocksDB_compactRange__J_3BI_3BIZIIJ( rocksdb_compactrange_helper(env, db, cf_handle, jbegin, jbegin_len, jend, jend_len, jreduce_level, jtarget_level, jtarget_path_id); } + +////////////////////////////////////////////////////////////////////////////// +// rocksdb::DB::GetUpdatesSince + +/* + * Class: org_rocksdb_RocksDB + * Method: getUpdatesSince + * Signature: (JJ)J + */ +jlong Java_org_rocksdb_RocksDB_getUpdatesSince(JNIEnv* env, + jobject jdb, jlong jdb_handle, jlong jsequence_number) { + auto db = reinterpret_cast(jdb_handle); + rocksdb::SequenceNumber sequence_number = + static_cast(jsequence_number); + std::unique_ptr iter; + rocksdb::Status s = db->GetUpdatesSince(sequence_number, &iter); + if (s.ok()) { + return reinterpret_cast(iter.release()); + } + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + return 0; +} diff --git a/java/rocksjni/transaction_log.cc b/java/rocksjni/transaction_log.cc new file mode 100644 index 000000000..28e387fe1 --- /dev/null +++ b/java/rocksjni/transaction_log.cc @@ -0,0 +1,79 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// This file implements the "bridge" between Java and C++ and enables +// calling c++ rocksdb::Iterator methods from Java side. + +#include +#include +#include + +#include "include/org_rocksdb_TransactionLogIterator.h" +#include "rocksdb/transaction_log.h" +#include "rocksjni/portal.h" + +/* + * Class: org_rocksdb_TransactionLogIterator + * Method: disposeInternal + * Signature: (J)V + */ +void Java_org_rocksdb_TransactionLogIterator_disposeInternal( + JNIEnv* env, jobject jobj, jlong handle) { + auto* it = reinterpret_cast(handle); + delete it; +} + +/* + * Class: org_rocksdb_TransactionLogIterator + * Method: isValid + * Signature: (J)Z + */ +jboolean Java_org_rocksdb_TransactionLogIterator_isValid( + JNIEnv* env, jobject jobj, jlong handle) { + return reinterpret_cast(handle)->Valid(); +} + +/* + * Class: org_rocksdb_TransactionLogIterator + * Method: next + * Signature: (J)V + */ +void Java_org_rocksdb_TransactionLogIterator_next( + JNIEnv* env, jobject jobj, jlong handle) { + reinterpret_cast(handle)->Next(); +} + +/* + * Class: org_rocksdb_TransactionLogIterator + * Method: status + * Signature: (J)V + */ +void Java_org_rocksdb_TransactionLogIterator_status( + JNIEnv* env, jobject jobj, jlong handle) { + rocksdb::Status s = reinterpret_cast< + rocksdb::TransactionLogIterator*>(handle)->status(); + if (!s.ok()) { + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + } +} + +/* + * Class: org_rocksdb_TransactionLogIterator + * Method: getBatch + * Signature: (J)Lorg/rocksdb/TransactionLogIterator$BatchResult + */ +jobject Java_org_rocksdb_TransactionLogIterator_getBatch( + JNIEnv* env, jobject jobj, jlong handle) { + rocksdb::BatchResult batch_result = + reinterpret_cast(handle)->GetBatch(); + jclass jclazz = env->FindClass( + "org/rocksdb/TransactionLogIterator$BatchResult"); + assert(jclazz != nullptr); + jmethodID mid = env->GetMethodID( + jclazz, "", "(Lorg/rocksdb/TransactionLogIterator;JJ)V"); + assert(mid != nullptr); + return env->NewObject(jclazz, mid, jobj, + batch_result.sequence, batch_result.writeBatchPtr.release()); +} From b39006e3db598cd6f95e34c05bb19e6fc04bb24c Mon Sep 17 00:00:00 2001 From: fyrz Date: Sun, 25 Jan 2015 22:46:11 +0100 Subject: [PATCH 2/5] [RocksJava] enable/disable File deletions --- java/org/rocksdb/RocksDB.java | 52 +++++++++++++++++++ java/org/rocksdb/test/RocksDBTest.java | 21 ++++++++ .../test/TransactionLogIteratorTest.java | 3 ++ java/rocksjni/rocksjni.cc | 45 ++++++++++++++++ 4 files changed, 121 insertions(+) diff --git a/java/org/rocksdb/RocksDB.java b/java/org/rocksdb/RocksDB.java index 96032165e..ea3824196 100644 --- a/java/org/rocksdb/RocksDB.java +++ b/java/org/rocksdb/RocksDB.java @@ -1588,6 +1588,53 @@ public class RocksDB extends RocksObject { columnFamilyHandle.nativeHandle_); } + /** + *

The sequence number of the most recent transaction.

+ * + * @return sequence number of the most + * recent transaction. + */ + public long getLatestSequenceNumber() { + return getLatestSequenceNumber(nativeHandle_); + } + + /** + *

Prevent file deletions. Compactions will continue to occur, + * but no obsolete files will be deleted. Calling this multiple + * times have the same effect as calling it once.

+ * + * @throws RocksDBException thrown if operation was not performed + * successfully. + */ + public void disableFileDeletions() throws RocksDBException { + disableFileDeletions(nativeHandle_); + } + + /** + *

Allow compactions to delete obsolete files. + * If force == true, the call to EnableFileDeletions() + * will guarantee that file deletions are enabled after + * the call, even if DisableFileDeletions() was called + * multiple times before.

+ * + *

If force == false, EnableFileDeletions will only + * enable file deletion after it's been called at least + * as many times as DisableFileDeletions(), enabling + * the two methods to be called by two threads + * concurrently without synchronization + * -- i.e., file deletions will be enabled only after both + * threads call EnableFileDeletions()

+ * + * @param force boolean value described above. + * + * @throws RocksDBException thrown if operation was not performed + * successfully. + */ + public void enableFileDeletions(boolean force) + throws RocksDBException { + enableFileDeletions(nativeHandle_, force); + } + /** *

Returns an iterator that is positioned at a write-batch containing * seq_number. If the sequence number is non existent, it returns an iterator @@ -1753,6 +1800,11 @@ public class RocksDB extends RocksObject { private native void compactRange(long handle, byte[] begin, int beginLen, byte[] end, int endLen, boolean reduce_level, int target_level, int target_path_id, long cfHandle) throws RocksDBException; + private native long getLatestSequenceNumber(long handle); + private native void disableFileDeletions(long handle) + throws RocksDBException; + private native void enableFileDeletions(long handle, + boolean force) throws RocksDBException; private native long getUpdatesSince(long handle, long sequenceNumber) throws RocksDBException; diff --git a/java/org/rocksdb/test/RocksDBTest.java b/java/org/rocksdb/test/RocksDBTest.java index a6934b310..15dde9856 100644 --- a/java/org/rocksdb/test/RocksDBTest.java +++ b/java/org/rocksdb/test/RocksDBTest.java @@ -738,4 +738,25 @@ public class RocksDBTest { } } } + + @Test + public void enableDisableFileDeletions() throws RocksDBException { + RocksDB db = null; + Options options = null; + try { + options = new Options().setCreateIfMissing(true); + db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath()); + db.disableFileDeletions(); + db.enableFileDeletions(false); + db.disableFileDeletions(); + db.enableFileDeletions(true); + } finally { + if (db != null) { + db.close(); + } + if (options != null) { + options.dispose(); + } + } + } } diff --git a/java/org/rocksdb/test/TransactionLogIteratorTest.java b/java/org/rocksdb/test/TransactionLogIteratorTest.java index 2069e1200..4e1ee4dfd 100644 --- a/java/org/rocksdb/test/TransactionLogIteratorTest.java +++ b/java/org/rocksdb/test/TransactionLogIteratorTest.java @@ -57,6 +57,9 @@ public class TransactionLogIteratorTest { String.valueOf(i).getBytes()); } db.flush(new FlushOptions().setWaitForFlush(true)); + + assertThat(db.getLatestSequenceNumber()).isEqualTo(250); + transactionLogIterator = db.getUpdatesSince(0); assertThat(transactionLogIterator.isValid()).isTrue(); transactionLogIterator.status(); diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc index 9f5b9446e..148c6c7dc 100644 --- a/java/rocksjni/rocksjni.cc +++ b/java/rocksjni/rocksjni.cc @@ -1601,6 +1601,51 @@ void Java_org_rocksdb_RocksDB_compactRange__J_3BI_3BIZIIJ( jend, jend_len, jreduce_level, jtarget_level, jtarget_path_id); } +////////////////////////////////////////////////////////////////////////////// +// rocksdb::DB::GetLatestSequenceNumber + +/* + * Class: org_rocksdb_RocksDB + * Method: getLatestSequenceNumber + * Signature: (J)V + */ +jlong Java_org_rocksdb_RocksDB_getLatestSequenceNumber(JNIEnv* env, + jobject jdb, jlong jdb_handle) { + auto db = reinterpret_cast(jdb_handle); + return db->GetLatestSequenceNumber(); +} + +////////////////////////////////////////////////////////////////////////////// +// rocksdb::DB enable/disable file deletions + +/* + * Class: org_rocksdb_RocksDB + * Method: enableFileDeletions + * Signature: (J)V + */ +void Java_org_rocksdb_RocksDB_disableFileDeletions(JNIEnv* env, + jobject jdb, jlong jdb_handle) { + auto db = reinterpret_cast(jdb_handle); + rocksdb::Status s = db->DisableFileDeletions(); + if (!s.ok()) { + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + } +} + +/* + * Class: org_rocksdb_RocksDB + * Method: enableFileDeletions + * Signature: (JZ)V + */ +void Java_org_rocksdb_RocksDB_enableFileDeletions(JNIEnv* env, + jobject jdb, jlong jdb_handle, jboolean jforce) { + auto db = reinterpret_cast(jdb_handle); + rocksdb::Status s = db->EnableFileDeletions(jforce); + if (!s.ok()) { + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + } +} + ////////////////////////////////////////////////////////////////////////////// // rocksdb::DB::GetUpdatesSince From caedd40ddd189eadc2b2974c77b129117c6a8560 Mon Sep 17 00:00:00 2001 From: fyrz Date: Sun, 25 Jan 2015 22:47:29 +0100 Subject: [PATCH 3/5] [RocksJava] Adjusted auto pointer --- java/rocksjni/rocksjni.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc index 148c6c7dc..eaa5603ea 100644 --- a/java/rocksjni/rocksjni.cc +++ b/java/rocksjni/rocksjni.cc @@ -1611,7 +1611,7 @@ void Java_org_rocksdb_RocksDB_compactRange__J_3BI_3BIZIIJ( */ jlong Java_org_rocksdb_RocksDB_getLatestSequenceNumber(JNIEnv* env, jobject jdb, jlong jdb_handle) { - auto db = reinterpret_cast(jdb_handle); + auto* db = reinterpret_cast(jdb_handle); return db->GetLatestSequenceNumber(); } @@ -1625,7 +1625,7 @@ jlong Java_org_rocksdb_RocksDB_getLatestSequenceNumber(JNIEnv* env, */ void Java_org_rocksdb_RocksDB_disableFileDeletions(JNIEnv* env, jobject jdb, jlong jdb_handle) { - auto db = reinterpret_cast(jdb_handle); + auto* db = reinterpret_cast(jdb_handle); rocksdb::Status s = db->DisableFileDeletions(); if (!s.ok()) { rocksdb::RocksDBExceptionJni::ThrowNew(env, s); @@ -1639,7 +1639,7 @@ void Java_org_rocksdb_RocksDB_disableFileDeletions(JNIEnv* env, */ void Java_org_rocksdb_RocksDB_enableFileDeletions(JNIEnv* env, jobject jdb, jlong jdb_handle, jboolean jforce) { - auto db = reinterpret_cast(jdb_handle); + auto* db = reinterpret_cast(jdb_handle); rocksdb::Status s = db->EnableFileDeletions(jforce); if (!s.ok()) { rocksdb::RocksDBExceptionJni::ThrowNew(env, s); @@ -1656,7 +1656,7 @@ void Java_org_rocksdb_RocksDB_enableFileDeletions(JNIEnv* env, */ jlong Java_org_rocksdb_RocksDB_getUpdatesSince(JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jsequence_number) { - auto db = reinterpret_cast(jdb_handle); + auto* db = reinterpret_cast(jdb_handle); rocksdb::SequenceNumber sequence_number = static_cast(jsequence_number); std::unique_ptr iter; From 68cd93b8739161a8cc7148256d49039fee9eaf4f Mon Sep 17 00:00:00 2001 From: fyrz Date: Sun, 25 Jan 2015 22:59:48 +0100 Subject: [PATCH 4/5] [RocksJava] GetUpdatesSince support Summary: This differential describes further changes to the Java-API New methods: * GetUpdatesSince * GetLatestSequenceNumber * EnableFileDeletions * DisableFileDeletions This pull requests depends on: https://github.com/facebook/rocksdb/pull/472 Test Plan: make rocksdbjava make jtest mvn -f rocksjni.pom package Reviewers: yhchiang, adamretter, ankgup87 Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D32151 --- java/rocksjni/transaction_log.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/rocksjni/transaction_log.cc b/java/rocksjni/transaction_log.cc index 28e387fe1..028062879 100644 --- a/java/rocksjni/transaction_log.cc +++ b/java/rocksjni/transaction_log.cc @@ -6,9 +6,9 @@ // This file implements the "bridge" between Java and C++ and enables // calling c++ rocksdb::Iterator methods from Java side. +#include #include #include -#include #include "include/org_rocksdb_TransactionLogIterator.h" #include "rocksdb/transaction_log.h" From 391f85fc8252339800cd51716ae7eacf42b8b971 Mon Sep 17 00:00:00 2001 From: fyrz Date: Fri, 30 Jan 2015 21:57:40 +0100 Subject: [PATCH 5/5] [RocksJava] Incorporated changes for D32151 --- java/org/rocksdb/TransactionLogIterator.java | 1 - java/org/rocksdb/WriteBatch.java | 7 + .../test/TransactionLogIteratorTest.java | 123 ++++++++++++++++-- java/rocksjni/transaction_log.cc | 3 +- 4 files changed, 120 insertions(+), 14 deletions(-) diff --git a/java/org/rocksdb/TransactionLogIterator.java b/java/org/rocksdb/TransactionLogIterator.java index 8de61aa00..d82cde3ea 100644 --- a/java/org/rocksdb/TransactionLogIterator.java +++ b/java/org/rocksdb/TransactionLogIterator.java @@ -23,7 +23,6 @@ public class TransactionLogIterator extends RocksObject { * REQUIRES: Valid() to be true.

*/ public void next() { - assert(isValid()); next(nativeHandle_); } diff --git a/java/org/rocksdb/WriteBatch.java b/java/org/rocksdb/WriteBatch.java index fd8b894cb..fd6d9386c 100644 --- a/java/org/rocksdb/WriteBatch.java +++ b/java/org/rocksdb/WriteBatch.java @@ -53,6 +53,13 @@ public class WriteBatch extends AbstractWriteBatch { iterate(handler.nativeHandle_); } + /** + *

Private WriteBatch constructor which is used to construct + * WriteBatch instances from C++ side. As the reference to this + * object is also managed from C++ side the handle will be disowned.

+ * + * @param nativeHandle address of native instance. + */ WriteBatch(long nativeHandle) { super(); disOwnNativeHandle(); diff --git a/java/org/rocksdb/test/TransactionLogIteratorTest.java b/java/org/rocksdb/test/TransactionLogIteratorTest.java index 4e1ee4dfd..6d700dac9 100644 --- a/java/org/rocksdb/test/TransactionLogIteratorTest.java +++ b/java/org/rocksdb/test/TransactionLogIteratorTest.java @@ -41,6 +41,70 @@ public class TransactionLogIteratorTest { @Test public void getBatch() throws RocksDBException { + final int numberOfPuts = 5; + RocksDB db = null; + Options options = null; + ColumnFamilyHandle cfHandle = null; + TransactionLogIterator transactionLogIterator = null; + try { + options = new Options(). + setCreateIfMissing(true). + setWalTtlSeconds(1000). + setWalSizeLimitMB(10); + + db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath()); + + for (int i = 0; i < numberOfPuts; i++){ + db.put(String.valueOf(i).getBytes(), + String.valueOf(i).getBytes()); + } + db.flush(new FlushOptions().setWaitForFlush(true)); + + // the latest sequence number is 5 because 5 puts + // were written beforehand + assertThat(db.getLatestSequenceNumber()). + isEqualTo(numberOfPuts); + + // insert 5 writes into a cf + cfHandle = db.createColumnFamily( + new ColumnFamilyDescriptor("new_cf".getBytes())); + + for (int i = 0; i < numberOfPuts; i++){ + db.put(cfHandle, String.valueOf(i).getBytes(), + String.valueOf(i).getBytes()); + } + // the latest sequence number is 10 because + // (5 + 5) puts were written beforehand + assertThat(db.getLatestSequenceNumber()). + isEqualTo(numberOfPuts + numberOfPuts); + + // Get updates since the beginning + transactionLogIterator = db.getUpdatesSince(0); + assertThat(transactionLogIterator.isValid()).isTrue(); + transactionLogIterator.status(); + + // The first sequence number is 1 + TransactionLogIterator.BatchResult batchResult = + transactionLogIterator.getBatch(); + assertThat(batchResult.sequenceNumber()).isEqualTo(1); + } finally { + if (transactionLogIterator != null) { + transactionLogIterator.dispose(); + } + if (cfHandle != null) { + cfHandle.dispose(); + } + if (db != null) { + db.close(); + } + if (options != null) { + options.dispose(); + } + } + } + + @Test + public void transactionLogIteratorStallAtLastRecord() throws RocksDBException { RocksDB db = null; Options options = null; TransactionLogIterator transactionLogIterator = null; @@ -51,22 +115,59 @@ public class TransactionLogIteratorTest { setWalSizeLimitMB(10); db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath()); + db.put("key1".getBytes(), "value1".getBytes()); + // Get updates since the beginning + transactionLogIterator = db.getUpdatesSince(0); + transactionLogIterator.status(); + assertThat(transactionLogIterator.isValid()).isTrue(); + transactionLogIterator.next(); + assertThat(transactionLogIterator.isValid()).isFalse(); + transactionLogIterator.status(); + db.put("key2".getBytes(), "value2".getBytes()); + transactionLogIterator.next(); + transactionLogIterator.status(); + assertThat(transactionLogIterator.isValid()).isTrue(); - for (int i = 0; i < 250; i++){ - db.put(String.valueOf(i).getBytes(), - String.valueOf(i).getBytes()); + } finally { + if (transactionLogIterator != null) { + transactionLogIterator.dispose(); } - db.flush(new FlushOptions().setWaitForFlush(true)); + if (db != null) { + db.close(); + } + if (options != null) { + options.dispose(); + } + } + } - assertThat(db.getLatestSequenceNumber()).isEqualTo(250); + @Test + public void transactionLogIteratorCheckAfterRestart() throws RocksDBException { + final int numberOfKeys = 2; + RocksDB db = null; + Options options = null; + TransactionLogIterator transactionLogIterator = null; + try { + options = new Options(). + setCreateIfMissing(true). + setWalTtlSeconds(1000). + setWalSizeLimitMB(10); + + db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath()); + db.put("key1".getBytes(), "value1".getBytes()); + db.put("key2".getBytes(), "value2".getBytes()); + db.flush(new FlushOptions().setWaitForFlush(true)); + // reopen + db.close(); + db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath()); + assertThat(db.getLatestSequenceNumber()).isEqualTo(numberOfKeys); transactionLogIterator = db.getUpdatesSince(0); - assertThat(transactionLogIterator.isValid()).isTrue(); - transactionLogIterator.status(); - - TransactionLogIterator.BatchResult batchResult = - transactionLogIterator.getBatch(); - assertThat(batchResult.sequenceNumber()).isEqualTo(1); + for (int i = 0; i < numberOfKeys; i++) { + transactionLogIterator.status(); + assertThat(transactionLogIterator.isValid()).isTrue(); + transactionLogIterator.next(); + } } finally { if (transactionLogIterator != null) { transactionLogIterator.dispose(); diff --git a/java/rocksjni/transaction_log.cc b/java/rocksjni/transaction_log.cc index 028062879..1d3d7c100 100644 --- a/java/rocksjni/transaction_log.cc +++ b/java/rocksjni/transaction_log.cc @@ -21,8 +21,7 @@ */ void Java_org_rocksdb_TransactionLogIterator_disposeInternal( JNIEnv* env, jobject jobj, jlong handle) { - auto* it = reinterpret_cast(handle); - delete it; + delete reinterpret_cast(handle); } /*