Merge pull request #474 from fyrz/RocksJava-GetUpdatesSince

[RocksJava] GetUpdatesSince
This commit is contained in:
Adam Retter 2015-01-31 14:41:06 +00:00
commit ea189b320c
8 changed files with 558 additions and 0 deletions

View File

@ -29,6 +29,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractComparator\
org.rocksdb.SkipListMemTableConfig\
org.rocksdb.Slice\
org.rocksdb.Statistics\
org.rocksdb.TransactionLogIterator\
org.rocksdb.TtlDB\
org.rocksdb.VectorMemTableConfig\
org.rocksdb.StringAppendOperator\
@ -81,6 +82,7 @@ JAVA_TESTS = org.rocksdb.test.BackupableDBOptionsTest\
org.rocksdb.test.SizeUnitTest\
org.rocksdb.test.SliceTest\
org.rocksdb.test.SnapshotTest\
org.rocksdb.test.TransactionLogIteratorTest\
org.rocksdb.test.TtlDBTest\
org.rocksdb.test.StatisticsCollectorTest\
org.rocksdb.test.WriteBatchHandlerTest\

View File

@ -1588,6 +1588,76 @@ public class RocksDB extends RocksObject {
columnFamilyHandle.nativeHandle_);
}
/**
* <p>The sequence number of the most recent transaction.</p>
*
* @return sequence number of the most
* recent transaction.
*/
public long getLatestSequenceNumber() {
return getLatestSequenceNumber(nativeHandle_);
}
/**
* <p>Prevent file deletions. Compactions will continue to occur,
* but no obsolete files will be deleted. Calling this multiple
* times have the same effect as calling it once.</p>
*
* @throws RocksDBException thrown if operation was not performed
* successfully.
*/
public void disableFileDeletions() throws RocksDBException {
disableFileDeletions(nativeHandle_);
}
/**
* <p>Allow compactions to delete obsolete files.
* If force == true, the call to EnableFileDeletions()
* will guarantee that file deletions are enabled after
* the call, even if DisableFileDeletions() was called
* multiple times before.</p>
*
* <p>If force == false, EnableFileDeletions will only
* enable file deletion after it's been called at least
* as many times as DisableFileDeletions(), enabling
* the two methods to be called by two threads
* concurrently without synchronization
* -- i.e., file deletions will be enabled only after both
* threads call EnableFileDeletions()</p>
*
* @param force boolean value described above.
*
* @throws RocksDBException thrown if operation was not performed
* successfully.
*/
public void enableFileDeletions(boolean force)
throws RocksDBException {
enableFileDeletions(nativeHandle_, force);
}
/**
* <p>Returns an iterator that is positioned at a write-batch containing
* seq_number. If the sequence number is non existent, it returns an iterator
* at the first available seq_no after the requested seq_no.</p>
*
* <p>Must set WAL_ttl_seconds or WAL_size_limit_MB to large values to
* use this api, else the WAL files will get
* cleared aggressively and the iterator might keep getting invalid before
* an update is read.</p>
*
* @param sequenceNumber sequence number offset
*
* @return {@link org.rocksdb.TransactionLogIterator} instance.
*
* @throws org.rocksdb.RocksDBException if iterator cannot be retrieved
* from native-side.
*/
public TransactionLogIterator getUpdatesSince(long sequenceNumber)
throws RocksDBException {
return new TransactionLogIterator(
getUpdatesSince(nativeHandle_, sequenceNumber));
}
/**
* Private constructor.
*/
@ -1730,6 +1800,13 @@ public class RocksDB extends RocksObject {
private native void compactRange(long handle, byte[] begin, int beginLen, byte[] end,
int endLen, boolean reduce_level, int target_level, int target_path_id,
long cfHandle) throws RocksDBException;
private native long getLatestSequenceNumber(long handle);
private native void disableFileDeletions(long handle)
throws RocksDBException;
private native void enableFileDeletions(long handle,
boolean force) throws RocksDBException;
private native long getUpdatesSince(long handle, long sequenceNumber)
throws RocksDBException;
protected DBOptionsInterface options_;
}

View File

@ -0,0 +1,115 @@
package org.rocksdb;
/**
* <p>A TransactionLogIterator is used to iterate over the transactions in a db.
* One run of the iterator is continuous, i.e. the iterator will stop at the
* beginning of any gap in sequences.</p>
*/
public class TransactionLogIterator extends RocksObject {
/**
* <p>An iterator is either positioned at a WriteBatch
* or not valid. This method returns true if the iterator
* is valid. Can read data from a valid iterator.</p>
*
* @return true if iterator position is valid.
*/
public boolean isValid() {
return isValid(nativeHandle_);
}
/**
* <p>Moves the iterator to the next WriteBatch.
* <strong>REQUIRES</strong>: Valid() to be true.</p>
*/
public void next() {
next(nativeHandle_);
}
/**
* <p>Throws RocksDBException if something went wrong.</p>
*
* @throws org.rocksdb.RocksDBException if something went
* wrong in the underlying C++ code.
*/
public void status() throws RocksDBException {
status(nativeHandle_);
}
/**
* <p>If iterator position is valid, return the current
* write_batch and the sequence number of the earliest
* transaction contained in the batch.</p>
*
* <p>ONLY use if Valid() is true and status() is OK.</p>
*
* @return {@link org.rocksdb.TransactionLogIterator.BatchResult}
* instance.
*/
public BatchResult getBatch() {
assert(isValid());
return getBatch(nativeHandle_);
}
/**
* <p>TransactionLogIterator constructor.</p>
*
* @param nativeHandle address to native address.
*/
TransactionLogIterator(long nativeHandle) {
super();
nativeHandle_ = nativeHandle;
}
@Override protected void disposeInternal() {
disposeInternal(nativeHandle_);
}
/**
* <p>BatchResult represents a data structure returned
* by a TransactionLogIterator containing a sequence
* number and a {@link WriteBatch} instance.</p>
*/
public class BatchResult {
/**
* <p>Constructor of BatchResult class.</p>
*
* @param sequenceNumber related to this BatchResult instance.
* @param nativeHandle to {@link org.rocksdb.WriteBatch}
* native instance.
*/
public BatchResult(long sequenceNumber, long nativeHandle) {
sequenceNumber_ = sequenceNumber;
writeBatch_ = new WriteBatch(nativeHandle);
}
/**
* <p>Return sequence number related to this BatchResult.</p>
*
* @return Sequence number.
*/
public long sequenceNumber() {
return sequenceNumber_;
}
/**
* <p>Return contained {@link org.rocksdb.WriteBatch}
* instance</p>
*
* @return {@link org.rocksdb.WriteBatch} instance.
*/
public WriteBatch writeBatch() {
return writeBatch_;
}
private final long sequenceNumber_;
private final WriteBatch writeBatch_;
}
private native void disposeInternal(long handle);
private native boolean isValid(long handle);
private native void next(long handle);
private native void status(long handle)
throws RocksDBException;
private native BatchResult getBatch(long handle);
}

View File

@ -53,6 +53,19 @@ public class WriteBatch extends AbstractWriteBatch {
iterate(handler.nativeHandle_);
}
/**
* <p>Private WriteBatch constructor which is used to construct
* WriteBatch instances from C++ side. As the reference to this
* object is also managed from C++ side the handle will be disowned.</p>
*
* @param nativeHandle address of native instance.
*/
WriteBatch(long nativeHandle) {
super();
disOwnNativeHandle();
nativeHandle_ = nativeHandle;
}
@Override final native void disposeInternal(long handle);
@Override final native int count0();
@Override final native void put(byte[] key, int keyLen, byte[] value, int valueLen);

View File

@ -738,4 +738,25 @@ public class RocksDBTest {
}
}
}
@Test
public void enableDisableFileDeletions() throws RocksDBException {
RocksDB db = null;
Options options = null;
try {
options = new Options().setCreateIfMissing(true);
db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath());
db.disableFileDeletions();
db.enableFileDeletions(false);
db.disableFileDeletions();
db.enableFileDeletions(true);
} finally {
if (db != null) {
db.close();
}
if (options != null) {
options.dispose();
}
}
}
}

View File

@ -0,0 +1,183 @@
package org.rocksdb.test;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.rocksdb.*;
import static org.assertj.core.api.Assertions.assertThat;
public class TransactionLogIteratorTest {
@ClassRule
public static final RocksMemoryResource rocksMemoryResource =
new RocksMemoryResource();
@Rule
public TemporaryFolder dbFolder = new TemporaryFolder();
@Test
public void transactionLogIterator() throws RocksDBException {
RocksDB db = null;
Options options = null;
TransactionLogIterator transactionLogIterator = null;
try {
options = new Options().
setCreateIfMissing(true);
db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath());
transactionLogIterator = db.getUpdatesSince(0);
} finally {
if (transactionLogIterator != null) {
transactionLogIterator.dispose();
}
if (db != null) {
db.close();
}
if (options != null) {
options.dispose();
}
}
}
@Test
public void getBatch() throws RocksDBException {
final int numberOfPuts = 5;
RocksDB db = null;
Options options = null;
ColumnFamilyHandle cfHandle = null;
TransactionLogIterator transactionLogIterator = null;
try {
options = new Options().
setCreateIfMissing(true).
setWalTtlSeconds(1000).
setWalSizeLimitMB(10);
db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath());
for (int i = 0; i < numberOfPuts; i++){
db.put(String.valueOf(i).getBytes(),
String.valueOf(i).getBytes());
}
db.flush(new FlushOptions().setWaitForFlush(true));
// the latest sequence number is 5 because 5 puts
// were written beforehand
assertThat(db.getLatestSequenceNumber()).
isEqualTo(numberOfPuts);
// insert 5 writes into a cf
cfHandle = db.createColumnFamily(
new ColumnFamilyDescriptor("new_cf".getBytes()));
for (int i = 0; i < numberOfPuts; i++){
db.put(cfHandle, String.valueOf(i).getBytes(),
String.valueOf(i).getBytes());
}
// the latest sequence number is 10 because
// (5 + 5) puts were written beforehand
assertThat(db.getLatestSequenceNumber()).
isEqualTo(numberOfPuts + numberOfPuts);
// Get updates since the beginning
transactionLogIterator = db.getUpdatesSince(0);
assertThat(transactionLogIterator.isValid()).isTrue();
transactionLogIterator.status();
// The first sequence number is 1
TransactionLogIterator.BatchResult batchResult =
transactionLogIterator.getBatch();
assertThat(batchResult.sequenceNumber()).isEqualTo(1);
} finally {
if (transactionLogIterator != null) {
transactionLogIterator.dispose();
}
if (cfHandle != null) {
cfHandle.dispose();
}
if (db != null) {
db.close();
}
if (options != null) {
options.dispose();
}
}
}
@Test
public void transactionLogIteratorStallAtLastRecord() throws RocksDBException {
RocksDB db = null;
Options options = null;
TransactionLogIterator transactionLogIterator = null;
try {
options = new Options().
setCreateIfMissing(true).
setWalTtlSeconds(1000).
setWalSizeLimitMB(10);
db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath());
db.put("key1".getBytes(), "value1".getBytes());
// Get updates since the beginning
transactionLogIterator = db.getUpdatesSince(0);
transactionLogIterator.status();
assertThat(transactionLogIterator.isValid()).isTrue();
transactionLogIterator.next();
assertThat(transactionLogIterator.isValid()).isFalse();
transactionLogIterator.status();
db.put("key2".getBytes(), "value2".getBytes());
transactionLogIterator.next();
transactionLogIterator.status();
assertThat(transactionLogIterator.isValid()).isTrue();
} finally {
if (transactionLogIterator != null) {
transactionLogIterator.dispose();
}
if (db != null) {
db.close();
}
if (options != null) {
options.dispose();
}
}
}
@Test
public void transactionLogIteratorCheckAfterRestart() throws RocksDBException {
final int numberOfKeys = 2;
RocksDB db = null;
Options options = null;
TransactionLogIterator transactionLogIterator = null;
try {
options = new Options().
setCreateIfMissing(true).
setWalTtlSeconds(1000).
setWalSizeLimitMB(10);
db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath());
db.put("key1".getBytes(), "value1".getBytes());
db.put("key2".getBytes(), "value2".getBytes());
db.flush(new FlushOptions().setWaitForFlush(true));
// reopen
db.close();
db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath());
assertThat(db.getLatestSequenceNumber()).isEqualTo(numberOfKeys);
transactionLogIterator = db.getUpdatesSince(0);
for (int i = 0; i < numberOfKeys; i++) {
transactionLogIterator.status();
assertThat(transactionLogIterator.isValid()).isTrue();
transactionLogIterator.next();
}
} finally {
if (transactionLogIterator != null) {
transactionLogIterator.dispose();
}
if (db != null) {
db.close();
}
if (options != null) {
options.dispose();
}
}
}
}

View File

@ -9,6 +9,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <jni.h>
#include <memory>
#include <string>
#include <vector>
@ -16,6 +17,7 @@
#include "rocksjni/portal.h"
#include "rocksdb/db.h"
#include "rocksdb/cache.h"
#include "rocksdb/types.h"
//////////////////////////////////////////////////////////////////////////////
// rocksdb::DB::Open
@ -1598,3 +1600,70 @@ void Java_org_rocksdb_RocksDB_compactRange__J_3BI_3BIZIIJ(
rocksdb_compactrange_helper(env, db, cf_handle, jbegin, jbegin_len,
jend, jend_len, jreduce_level, jtarget_level, jtarget_path_id);
}
//////////////////////////////////////////////////////////////////////////////
// rocksdb::DB::GetLatestSequenceNumber
/*
* Class: org_rocksdb_RocksDB
* Method: getLatestSequenceNumber
* Signature: (J)V
*/
jlong Java_org_rocksdb_RocksDB_getLatestSequenceNumber(JNIEnv* env,
jobject jdb, jlong jdb_handle) {
auto* db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
return db->GetLatestSequenceNumber();
}
//////////////////////////////////////////////////////////////////////////////
// rocksdb::DB enable/disable file deletions
/*
* Class: org_rocksdb_RocksDB
* Method: enableFileDeletions
* Signature: (J)V
*/
void Java_org_rocksdb_RocksDB_disableFileDeletions(JNIEnv* env,
jobject jdb, jlong jdb_handle) {
auto* db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
rocksdb::Status s = db->DisableFileDeletions();
if (!s.ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
}
}
/*
* Class: org_rocksdb_RocksDB
* Method: enableFileDeletions
* Signature: (JZ)V
*/
void Java_org_rocksdb_RocksDB_enableFileDeletions(JNIEnv* env,
jobject jdb, jlong jdb_handle, jboolean jforce) {
auto* db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
rocksdb::Status s = db->EnableFileDeletions(jforce);
if (!s.ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
}
}
//////////////////////////////////////////////////////////////////////////////
// rocksdb::DB::GetUpdatesSince
/*
* Class: org_rocksdb_RocksDB
* Method: getUpdatesSince
* Signature: (JJ)J
*/
jlong Java_org_rocksdb_RocksDB_getUpdatesSince(JNIEnv* env,
jobject jdb, jlong jdb_handle, jlong jsequence_number) {
auto* db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
rocksdb::SequenceNumber sequence_number =
static_cast<rocksdb::SequenceNumber>(jsequence_number);
std::unique_ptr<rocksdb::TransactionLogIterator> iter;
rocksdb::Status s = db->GetUpdatesSince(sequence_number, &iter);
if (s.ok()) {
return reinterpret_cast<jlong>(iter.release());
}
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
return 0;
}

View File

@ -0,0 +1,78 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// This file implements the "bridge" between Java and C++ and enables
// calling c++ rocksdb::Iterator methods from Java side.
#include <jni.h>
#include <stdio.h>
#include <stdlib.h>
#include "include/org_rocksdb_TransactionLogIterator.h"
#include "rocksdb/transaction_log.h"
#include "rocksjni/portal.h"
/*
* Class: org_rocksdb_TransactionLogIterator
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_TransactionLogIterator_disposeInternal(
JNIEnv* env, jobject jobj, jlong handle) {
delete reinterpret_cast<rocksdb::TransactionLogIterator*>(handle);
}
/*
* Class: org_rocksdb_TransactionLogIterator
* Method: isValid
* Signature: (J)Z
*/
jboolean Java_org_rocksdb_TransactionLogIterator_isValid(
JNIEnv* env, jobject jobj, jlong handle) {
return reinterpret_cast<rocksdb::TransactionLogIterator*>(handle)->Valid();
}
/*
* Class: org_rocksdb_TransactionLogIterator
* Method: next
* Signature: (J)V
*/
void Java_org_rocksdb_TransactionLogIterator_next(
JNIEnv* env, jobject jobj, jlong handle) {
reinterpret_cast<rocksdb::TransactionLogIterator*>(handle)->Next();
}
/*
* Class: org_rocksdb_TransactionLogIterator
* Method: status
* Signature: (J)V
*/
void Java_org_rocksdb_TransactionLogIterator_status(
JNIEnv* env, jobject jobj, jlong handle) {
rocksdb::Status s = reinterpret_cast<
rocksdb::TransactionLogIterator*>(handle)->status();
if (!s.ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
}
}
/*
* Class: org_rocksdb_TransactionLogIterator
* Method: getBatch
* Signature: (J)Lorg/rocksdb/TransactionLogIterator$BatchResult
*/
jobject Java_org_rocksdb_TransactionLogIterator_getBatch(
JNIEnv* env, jobject jobj, jlong handle) {
rocksdb::BatchResult batch_result =
reinterpret_cast<rocksdb::TransactionLogIterator*>(handle)->GetBatch();
jclass jclazz = env->FindClass(
"org/rocksdb/TransactionLogIterator$BatchResult");
assert(jclazz != nullptr);
jmethodID mid = env->GetMethodID(
jclazz, "<init>", "(Lorg/rocksdb/TransactionLogIterator;JJ)V");
assert(mid != nullptr);
return env->NewObject(jclazz, mid, jobj,
batch_result.sequence, batch_result.writeBatchPtr.release());
}