From e3b1e3dfa67cc303245247e5e512be5682ff92d2 Mon Sep 17 00:00:00 2001 From: Adam Retter Date: Tue, 14 Jun 2016 17:51:14 +0100 Subject: [PATCH] Expose save points in Java WriteBatch and WBWI (#1092) * Java API - Expose SetSavePoint and RollbackToRestorePoint for WriteBatch and WriteBatchWithIndex * Minor cleanup --- include/rocksdb/write_batch.h | 2 +- java/rocksjni/write_batch.cc | 31 +++++ java/rocksjni/write_batch_with_index.cc | 34 ++++- .../java/org/rocksdb/AbstractWriteBatch.java | 16 +++ .../src/main/java/org/rocksdb/WriteBatch.java | 2 + .../java/org/rocksdb/WriteBatchInterface.java | 15 +++ .../java/org/rocksdb/WriteBatchWithIndex.java | 2 + .../test/java/org/rocksdb/WriteBatchTest.java | 118 ++++++++++++++++++ .../org/rocksdb/WriteBatchWithIndexTest.java | 105 +++++++++++++++- 9 files changed, 321 insertions(+), 4 deletions(-) diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 2a6ed63bd..be6374cb3 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -125,7 +125,7 @@ class WriteBatch : public WriteBatchBase { // most recent call to SetSavePoint() and removes the most recent save point. // If there is no previous call to SetSavePoint(), Status::NotFound() // will be returned. - // Oterwise returns Status::OK(). + // Otherwise returns Status::OK(). Status RollbackToSavePoint() override; // Support for iterating over the contents of a batch. diff --git a/java/rocksjni/write_batch.cc b/java/rocksjni/write_batch.cc index 4a73c4a79..017787209 100644 --- a/java/rocksjni/write_batch.cc +++ b/java/rocksjni/write_batch.cc @@ -62,6 +62,37 @@ void Java_org_rocksdb_WriteBatch_clear0(JNIEnv* env, jobject jobj, wb->Clear(); } +/* + * Class: org_rocksdb_WriteBatch + * Method: setSavePoint0 + * Signature: (J)V + */ +void Java_org_rocksdb_WriteBatch_setSavePoint0( + JNIEnv* env, jobject jobj, jlong jwb_handle) { + auto* wb = reinterpret_cast(jwb_handle); + assert(wb != nullptr); + + wb->SetSavePoint(); +} + +/* + * Class: org_rocksdb_WriteBatch + * Method: rollbackToSavePoint0 + * Signature: (J)V + */ +void Java_org_rocksdb_WriteBatch_rollbackToSavePoint0( + JNIEnv* env, jobject jobj, jlong jwb_handle) { + auto* wb = reinterpret_cast(jwb_handle); + assert(wb != nullptr); + + auto s = wb->RollbackToSavePoint(); + + if (s.ok()) { + return; + } + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); +} + /* * Class: org_rocksdb_WriteBatch * Method: put diff --git a/java/rocksjni/write_batch_with_index.cc b/java/rocksjni/write_batch_with_index.cc index 1c7b6711c..2b8cf778b 100644 --- a/java/rocksjni/write_batch_with_index.cc +++ b/java/rocksjni/write_batch_with_index.cc @@ -198,7 +198,39 @@ void Java_org_rocksdb_WriteBatchWithIndex_clear0( auto* wbwi = reinterpret_cast(jwbwi_handle); assert(wbwi != nullptr); - wbwi->GetWriteBatch()->Clear(); + wbwi->Clear(); +} + +/* + * Class: org_rocksdb_WriteBatchWithIndex + * Method: setSavePoint0 + * Signature: (J)V + */ +void Java_org_rocksdb_WriteBatchWithIndex_setSavePoint0( + JNIEnv* env, jobject jobj, jlong jwbwi_handle) { + auto* wbwi = reinterpret_cast(jwbwi_handle); + assert(wbwi != nullptr); + + wbwi->SetSavePoint(); +} + +/* + * Class: org_rocksdb_WriteBatchWithIndex + * Method: rollbackToSavePoint0 + * Signature: (J)V + */ +void Java_org_rocksdb_WriteBatchWithIndex_rollbackToSavePoint0( + JNIEnv* env, jobject jobj, jlong jwbwi_handle) { + auto* wbwi = reinterpret_cast(jwbwi_handle); + assert(wbwi != nullptr); + + auto s = wbwi->RollbackToSavePoint(); + + if (s.ok()) { + return; + } + + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); } /* diff --git a/java/src/main/java/org/rocksdb/AbstractWriteBatch.java b/java/src/main/java/org/rocksdb/AbstractWriteBatch.java index c4eb01b1e..cad7ebbd3 100644 --- a/java/src/main/java/org/rocksdb/AbstractWriteBatch.java +++ b/java/src/main/java/org/rocksdb/AbstractWriteBatch.java @@ -70,6 +70,18 @@ public abstract class AbstractWriteBatch extends RocksObject clear0(nativeHandle_); } + @Override + public void setSavePoint() { + assert (isOwningHandle()); + setSavePoint0(nativeHandle_); + } + + @Override + public void rollbackToSavePoint() throws RocksDBException { + assert (isOwningHandle()); + rollbackToSavePoint0(nativeHandle_); + } + abstract int count0(final long handle); abstract void put(final long handle, final byte[] key, final int keyLen, @@ -94,4 +106,8 @@ public abstract class AbstractWriteBatch extends RocksObject final int blobLen); abstract void clear0(final long handle); + + abstract void setSavePoint0(final long handle); + + abstract void rollbackToSavePoint0(final long handle); } diff --git a/java/src/main/java/org/rocksdb/WriteBatch.java b/java/src/main/java/org/rocksdb/WriteBatch.java index d9e1098cb..325f9c057 100644 --- a/java/src/main/java/org/rocksdb/WriteBatch.java +++ b/java/src/main/java/org/rocksdb/WriteBatch.java @@ -82,6 +82,8 @@ public class WriteBatch extends AbstractWriteBatch { @Override final native void putLogData(final long handle, final byte[] blob, final int blobLen); @Override final native void clear0(final long handle); + @Override final native void setSavePoint0(final long handle); + @Override final native void rollbackToSavePoint0(final long handle); private native static long newWriteBatch(final int reserved_bytes); private native void iterate(final long handle, final long handlerHandle) diff --git a/java/src/main/java/org/rocksdb/WriteBatchInterface.java b/java/src/main/java/org/rocksdb/WriteBatchInterface.java index 885f1213d..a07791851 100644 --- a/java/src/main/java/org/rocksdb/WriteBatchInterface.java +++ b/java/src/main/java/org/rocksdb/WriteBatchInterface.java @@ -95,4 +95,19 @@ public interface WriteBatchInterface { * Clear all updates buffered in this batch */ void clear(); + + /** + * Records the state of the batch for future calls to RollbackToSavePoint(). + * May be called multiple times to set multiple save points. + */ + void setSavePoint(); + + /** + * Remove all entries in this batch (Put, Merge, Delete, PutLogData) since + * the most recent call to SetSavePoint() and removes the most recent save + * point. + * + * @throws RocksDBException if there is no previous call to SetSavePoint() + */ + void rollbackToSavePoint() throws RocksDBException; } diff --git a/java/src/main/java/org/rocksdb/WriteBatchWithIndex.java b/java/src/main/java/org/rocksdb/WriteBatchWithIndex.java index 254bf7e6a..dad908a24 100644 --- a/java/src/main/java/org/rocksdb/WriteBatchWithIndex.java +++ b/java/src/main/java/org/rocksdb/WriteBatchWithIndex.java @@ -155,6 +155,8 @@ public class WriteBatchWithIndex extends AbstractWriteBatch { @Override final native void putLogData(final long handle, final byte[] blob, final int blobLen); @Override final native void clear0(final long handle); + @Override final native void setSavePoint0(final long handle); + @Override final native void rollbackToSavePoint0(final long handle); private native static long newWriteBatchWithIndex(); private native static long newWriteBatchWithIndex(final boolean overwriteKey); diff --git a/java/src/test/java/org/rocksdb/WriteBatchTest.java b/java/src/test/java/org/rocksdb/WriteBatchTest.java index fdfb02444..ba5d00397 100644 --- a/java/src/test/java/org/rocksdb/WriteBatchTest.java +++ b/java/src/test/java/org/rocksdb/WriteBatchTest.java @@ -14,6 +14,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import java.io.UnsupportedEncodingException; +import java.util.Arrays; import static org.assertj.core.api.Assertions.assertThat; @@ -115,11 +116,128 @@ public class WriteBatchTest { } } + @Test + public void savePoints() + throws UnsupportedEncodingException, RocksDBException { + try (final WriteBatch batch = new WriteBatch()) { + batch.put("k1".getBytes("US-ASCII"), "v1".getBytes("US-ASCII")); + batch.put("k2".getBytes("US-ASCII"), "v2".getBytes("US-ASCII")); + batch.put("k3".getBytes("US-ASCII"), "v3".getBytes("US-ASCII")); + + assertThat(getFromWriteBatch(batch, "k1")).isEqualTo("v1"); + assertThat(getFromWriteBatch(batch, "k2")).isEqualTo("v2"); + assertThat(getFromWriteBatch(batch, "k3")).isEqualTo("v3"); + + + batch.setSavePoint(); + + batch.remove("k2".getBytes("US-ASCII")); + batch.put("k3".getBytes("US-ASCII"), "v3-2".getBytes("US-ASCII")); + + assertThat(getFromWriteBatch(batch, "k2")).isNull(); + assertThat(getFromWriteBatch(batch, "k3")).isEqualTo("v3-2"); + + + batch.setSavePoint(); + + batch.put("k3".getBytes("US-ASCII"), "v3-3".getBytes("US-ASCII")); + batch.put("k4".getBytes("US-ASCII"), "v4".getBytes("US-ASCII")); + + assertThat(getFromWriteBatch(batch, "k3")).isEqualTo("v3-3"); + assertThat(getFromWriteBatch(batch, "k4")).isEqualTo("v4"); + + + batch.rollbackToSavePoint(); + + assertThat(getFromWriteBatch(batch, "k2")).isNull(); + assertThat(getFromWriteBatch(batch, "k3")).isEqualTo("v3-2"); + assertThat(getFromWriteBatch(batch, "k4")).isNull(); + + + batch.rollbackToSavePoint(); + + assertThat(getFromWriteBatch(batch, "k1")).isEqualTo("v1"); + assertThat(getFromWriteBatch(batch, "k2")).isEqualTo("v2"); + assertThat(getFromWriteBatch(batch, "k3")).isEqualTo("v3"); + assertThat(getFromWriteBatch(batch, "k4")).isNull(); + } + } + + @Test(expected = RocksDBException.class) + public void restorePoints_withoutSavePoints() throws RocksDBException { + try (final WriteBatch batch = new WriteBatch()) { + batch.rollbackToSavePoint(); + } + } + + @Test(expected = RocksDBException.class) + public void restorePoints_withoutSavePoints_nested() throws RocksDBException { + try (final WriteBatch batch = new WriteBatch()) { + + batch.setSavePoint(); + batch.rollbackToSavePoint(); + + // without previous corresponding setSavePoint + batch.rollbackToSavePoint(); + } + } + static byte[] getContents(final WriteBatch wb) { return getContents(wb.nativeHandle_); } + static String getFromWriteBatch(final WriteBatch wb, final String key) + throws RocksDBException, UnsupportedEncodingException { + final WriteBatchGetter getter = + new WriteBatchGetter(key.getBytes("US-ASCII")); + wb.iterate(getter); + if(getter.getValue() != null) { + return new String(getter.getValue(), "US-ASCII"); + } else { + return null; + } + } + private static native byte[] getContents(final long writeBatchHandle); + + private static class WriteBatchGetter extends WriteBatch.Handler { + + private final byte[] key; + private byte[] value; + + public WriteBatchGetter(final byte[] key) { + this.key = key; + } + + public byte[] getValue() { + return value; + } + + @Override + public void put(final byte[] key, final byte[] value) { + if(Arrays.equals(this.key, key)) { + this.value = value; + } + } + + @Override + public void merge(final byte[] key, final byte[] value) { + if(Arrays.equals(this.key, key)) { + throw new UnsupportedOperationException(); + } + } + + @Override + public void delete(final byte[] key) { + if(Arrays.equals(this.key, key)) { + this.value = null; + } + } + + @Override + public void logData(final byte[] blob) { + } + } } /** diff --git a/java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java b/java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java index 74558fc2e..726c6f291 100644 --- a/java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java +++ b/java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java @@ -14,9 +14,9 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.Deque; +import java.util.Arrays; import static org.assertj.core.api.Assertions.assertThat; @@ -206,6 +206,107 @@ public class WriteBatchWithIndexTest { } } + @Test + public void savePoints() + throws UnsupportedEncodingException, RocksDBException { + try (final Options options = new Options().setCreateIfMissing(true); + final RocksDB db = RocksDB.open(options, + dbFolder.getRoot().getAbsolutePath())) { + try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true); + final ReadOptions readOptions = new ReadOptions()) { + wbwi.put("k1".getBytes(), "v1".getBytes()); + wbwi.put("k2".getBytes(), "v2".getBytes()); + wbwi.put("k3".getBytes(), "v3".getBytes()); + + assertThat(getFromWriteBatchWithIndex(db, readOptions, wbwi, "k1")) + .isEqualTo("v1"); + assertThat(getFromWriteBatchWithIndex(db, readOptions, wbwi, "k2")) + .isEqualTo("v2"); + assertThat(getFromWriteBatchWithIndex(db, readOptions, wbwi, "k3")) + .isEqualTo("v3"); + + + wbwi.setSavePoint(); + + wbwi.remove("k2".getBytes()); + wbwi.put("k3".getBytes(), "v3-2".getBytes()); + + assertThat(getFromWriteBatchWithIndex(db, readOptions, wbwi, "k2")) + .isNull(); + assertThat(getFromWriteBatchWithIndex(db, readOptions, wbwi, "k3")) + .isEqualTo("v3-2"); + + + wbwi.setSavePoint(); + + wbwi.put("k3".getBytes(), "v3-3".getBytes()); + wbwi.put("k4".getBytes(), "v4".getBytes()); + + assertThat(getFromWriteBatchWithIndex(db, readOptions, wbwi, "k3")) + .isEqualTo("v3-3"); + assertThat(getFromWriteBatchWithIndex(db, readOptions, wbwi, "k4")) + .isEqualTo("v4"); + + + wbwi.rollbackToSavePoint(); + + assertThat(getFromWriteBatchWithIndex(db, readOptions, wbwi, "k2")) + .isNull(); + assertThat(getFromWriteBatchWithIndex(db, readOptions, wbwi, "k3")) + .isEqualTo("v3-2"); + assertThat(getFromWriteBatchWithIndex(db, readOptions, wbwi, "k4")) + .isNull(); + + + wbwi.rollbackToSavePoint(); + + assertThat(getFromWriteBatchWithIndex(db, readOptions, wbwi, "k1")) + .isEqualTo("v1"); + assertThat(getFromWriteBatchWithIndex(db, readOptions, wbwi, "k2")) + .isEqualTo("v2"); + assertThat(getFromWriteBatchWithIndex(db, readOptions, wbwi, "k3")) + .isEqualTo("v3"); + assertThat(getFromWriteBatchWithIndex(db, readOptions, wbwi, "k4")) + .isNull(); + } + } + } + + @Test(expected = RocksDBException.class) + public void restorePoints_withoutSavePoints() throws RocksDBException { + try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex()) { + wbwi.rollbackToSavePoint(); + } + } + + @Test(expected = RocksDBException.class) + public void restorePoints_withoutSavePoints_nested() throws RocksDBException { + try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex()) { + + wbwi.setSavePoint(); + wbwi.rollbackToSavePoint(); + + // without previous corresponding setSavePoint + wbwi.rollbackToSavePoint(); + } + } + + private static String getFromWriteBatchWithIndex(final RocksDB db, + final ReadOptions readOptions, final WriteBatchWithIndex wbwi, + final String skey) { + final byte[] key = skey.getBytes(); + try(final RocksIterator baseIterator = db.newIterator(readOptions); + final RocksIterator iterator = wbwi.newIteratorWithBase(baseIterator)) { + iterator.seek(key); + + // Arrays.equals(key, iterator.key()) ensures an exact match in Rocks, + // instead of a nearest match + return iterator.isValid() && + Arrays.equals(key, iterator.key()) ? + new String(iterator.value()) : null; + } + } + private byte[] toArray(final ByteBuffer buf) { final byte[] ary = new byte[buf.remaining()]; buf.get(ary);