Add UInt64AddOperator to rocksjava (#4448)

Summary:
Closes https://github.com/facebook/rocksdb/issues/4447
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4448

Differential Revision: D10351852

Pulled By: ajkr

fbshipit-source-id: 18287b5190ae0b8153ce425da9a0bdfe1af88c34
This commit is contained in:
John Calcote 2018-10-12 17:33:55 -07:00 committed by Facebook Github Bot
parent 6f8d4bdff1
commit 9c20797136
5 changed files with 249 additions and 1 deletions

View File

@ -132,6 +132,7 @@ set(NATIVE_JAVA_CLASSES
org.rocksdb.TransactionLogIterator
org.rocksdb.TransactionOptions
org.rocksdb.TtlDB
org.rocksdb.UInt64AddOperator
org.rocksdb.VectorMemTableConfig
org.rocksdb.WBWIRocksIterator
org.rocksdb.WriteBatch
@ -294,6 +295,7 @@ add_jar(
src/test/java/org/rocksdb/RocksDBExceptionTest.java
src/test/java/org/rocksdb/RocksMemoryResource.java
src/test/java/org/rocksdb/SnapshotTest.java
src/main/java/org/rocksdb/UInt64AddOperator.java
src/test/java/org/rocksdb/WriteBatchTest.java
src/test/java/org/rocksdb/util/CapturingWriteBatchHandler.java
src/test/java/org/rocksdb/util/WriteBatchGetter.java

View File

@ -62,6 +62,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractCompactionFilter\
org.rocksdb.VectorMemTableConfig\
org.rocksdb.Snapshot\
org.rocksdb.StringAppendOperator\
org.rocksdb.UInt64AddOperator\
org.rocksdb.WriteBatch\
org.rocksdb.WriteBatch.Handler\
org.rocksdb.WriteOptions\

View File

@ -13,6 +13,7 @@
#include <string>
#include "include/org_rocksdb_StringAppendOperator.h"
#include "include/org_rocksdb_UInt64AddOperator.h"
#include "rocksdb/db.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/merge_operator.h"
@ -47,3 +48,28 @@ void Java_org_rocksdb_StringAppendOperator_disposeInternal(JNIEnv* /*env*/,
reinterpret_cast<std::shared_ptr<rocksdb::MergeOperator>*>(jhandle);
delete sptr_string_append_op; // delete std::shared_ptr
}
/*
* Class: org_rocksdb_UInt64AddOperator
* Method: newSharedUInt64AddOperator
* Signature: ()J
*/
jlong Java_org_rocksdb_UInt64AddOperator_newSharedUInt64AddOperator(
JNIEnv* /*env*/, jclass /*jclazz*/) {
auto* sptr_uint64_add_op = new std::shared_ptr<rocksdb::MergeOperator>(
rocksdb::MergeOperators::CreateUInt64AddOperator());
return reinterpret_cast<jlong>(sptr_uint64_add_op);
}
/*
* Class: org_rocksdb_UInt64AddOperator
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_UInt64AddOperator_disposeInternal(JNIEnv* /*env*/,
jobject /*jobj*/,
jlong jhandle) {
auto* sptr_uint64_add_op =
reinterpret_cast<std::shared_ptr<rocksdb::MergeOperator>*>(jhandle);
delete sptr_uint64_add_op; // delete std::shared_ptr
}

View File

@ -0,0 +1,19 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
package org.rocksdb;
/**
* Uint64AddOperator is a merge operator that accumlates a long
* integer value.
*/
public class UInt64AddOperator extends MergeOperator {
public UInt64AddOperator() {
super(newSharedUInt64AddOperator());
}
private native static long newSharedUInt64AddOperator();
@Override protected final native void disposeInternal(final long handle);
}

View File

@ -5,6 +5,7 @@
package org.rocksdb;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
@ -44,6 +45,38 @@ public class MergeTest {
}
}
private byte[] longToByteArray(long l) {
ByteBuffer buf = ByteBuffer.allocate(Long.BYTES);
buf.putLong(l);
return buf.array();
}
private long longFromByteArray(byte[] a) {
ByteBuffer buf = ByteBuffer.allocate(Long.BYTES);
buf.put(a);
buf.flip();
return buf.getLong();
}
@Test
public void uint64AddOption()
throws InterruptedException, RocksDBException {
try (final Options opt = new Options()
.setCreateIfMissing(true)
.setMergeOperatorName("uint64add");
final RocksDB db = RocksDB.open(opt,
dbFolder.getRoot().getAbsolutePath())) {
// writing (long)100 under key
db.put("key".getBytes(), longToByteArray(100));
// merge (long)1 under key
db.merge("key".getBytes(), longToByteArray(1));
final byte[] value = db.get("key".getBytes());
final long longValue = longFromByteArray(value);
assertThat(longValue).isEqualTo(101);
}
}
@Test
public void cFStringOption()
throws InterruptedException, RocksDBException {
@ -86,6 +119,48 @@ public class MergeTest {
}
}
@Test
public void cFUInt64AddOption()
throws InterruptedException, RocksDBException {
try (final ColumnFamilyOptions cfOpt1 = new ColumnFamilyOptions()
.setMergeOperatorName("uint64add");
final ColumnFamilyOptions cfOpt2 = new ColumnFamilyOptions()
.setMergeOperatorName("uint64add")
) {
final List<ColumnFamilyDescriptor> cfDescriptors = Arrays.asList(
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOpt1),
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOpt2)
);
final List<ColumnFamilyHandle> columnFamilyHandleList = new ArrayList<>();
try (final DBOptions opt = new DBOptions()
.setCreateIfMissing(true)
.setCreateMissingColumnFamilies(true);
final RocksDB db = RocksDB.open(opt,
dbFolder.getRoot().getAbsolutePath(), cfDescriptors,
columnFamilyHandleList)) {
try {
// writing (long)100 under key
db.put(columnFamilyHandleList.get(1),
"cfkey".getBytes(), longToByteArray(100));
// merge (long)1 under key
db.merge(columnFamilyHandleList.get(1),
"cfkey".getBytes(), longToByteArray(1));
byte[] value = db.get(columnFamilyHandleList.get(1),
"cfkey".getBytes());
long longValue = longFromByteArray(value);
assertThat(longValue).isEqualTo(101);
} finally {
for (final ColumnFamilyHandle handle : columnFamilyHandleList) {
handle.close();
}
}
}
}
}
@Test
public void operatorOption()
throws InterruptedException, RocksDBException {
@ -108,6 +183,28 @@ public class MergeTest {
}
}
@Test
public void uint64AddOperatorOption()
throws InterruptedException, RocksDBException {
try (final UInt64AddOperator uint64AddOperator = new UInt64AddOperator();
final Options opt = new Options()
.setCreateIfMissing(true)
.setMergeOperator(uint64AddOperator);
final RocksDB db = RocksDB.open(opt,
dbFolder.getRoot().getAbsolutePath())) {
// Writing (long)100 under key
db.put("key".getBytes(), longToByteArray(100));
// Writing (long)1 under key
db.merge("key".getBytes(), longToByteArray(1));
final byte[] value = db.get("key".getBytes());
final long longValue = longFromByteArray(value);
assertThat(longValue).isEqualTo(101);
}
}
@Test
public void cFOperatorOption()
throws InterruptedException, RocksDBException {
@ -170,6 +267,68 @@ public class MergeTest {
}
}
@Test
public void cFUInt64AddOperatorOption()
throws InterruptedException, RocksDBException {
try (final UInt64AddOperator uint64AddOperator = new UInt64AddOperator();
final ColumnFamilyOptions cfOpt1 = new ColumnFamilyOptions()
.setMergeOperator(uint64AddOperator);
final ColumnFamilyOptions cfOpt2 = new ColumnFamilyOptions()
.setMergeOperator(uint64AddOperator)
) {
final List<ColumnFamilyDescriptor> cfDescriptors = Arrays.asList(
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOpt1),
new ColumnFamilyDescriptor("new_cf".getBytes(), cfOpt2)
);
final List<ColumnFamilyHandle> columnFamilyHandleList = new ArrayList<>();
try (final DBOptions opt = new DBOptions()
.setCreateIfMissing(true)
.setCreateMissingColumnFamilies(true);
final RocksDB db = RocksDB.open(opt,
dbFolder.getRoot().getAbsolutePath(), cfDescriptors,
columnFamilyHandleList)
) {
try {
// writing (long)100 under key
db.put(columnFamilyHandleList.get(1),
"cfkey".getBytes(), longToByteArray(100));
// merge (long)1 under key
db.merge(columnFamilyHandleList.get(1),
"cfkey".getBytes(), longToByteArray(1));
byte[] value = db.get(columnFamilyHandleList.get(1),
"cfkey".getBytes());
long longValue = longFromByteArray(value);
// Test also with createColumnFamily
try (final ColumnFamilyOptions cfHandleOpts =
new ColumnFamilyOptions()
.setMergeOperator(uint64AddOperator);
final ColumnFamilyHandle cfHandle =
db.createColumnFamily(
new ColumnFamilyDescriptor("new_cf2".getBytes(),
cfHandleOpts))
) {
// writing (long)200 under cfkey2
db.put(cfHandle, "cfkey2".getBytes(), longToByteArray(200));
// merge (long)50 under cfkey2
db.merge(cfHandle, new WriteOptions(), "cfkey2".getBytes(),
longToByteArray(50));
value = db.get(cfHandle, "cfkey2".getBytes());
long longValueTmpCf = longFromByteArray(value);
assertThat(longValue).isEqualTo(101);
assertThat(longValueTmpCf).isEqualTo(250);
}
} finally {
for (final ColumnFamilyHandle columnFamilyHandle :
columnFamilyHandleList) {
columnFamilyHandle.close();
}
}
}
}
}
@Test
public void operatorGcBehaviour()
throws RocksDBException {
@ -182,7 +341,6 @@ public class MergeTest {
//no-op
}
// test reuse
try (final Options opt = new Options()
.setMergeOperator(stringAppendOperator);
@ -213,6 +371,48 @@ public class MergeTest {
}
}
@Test
public void uint64AddOperatorGcBehaviour()
throws RocksDBException {
try (final UInt64AddOperator uint64AddOperator = new UInt64AddOperator()) {
try (final Options opt = new Options()
.setCreateIfMissing(true)
.setMergeOperator(uint64AddOperator);
final RocksDB db = RocksDB.open(opt,
dbFolder.getRoot().getAbsolutePath())) {
//no-op
}
// test reuse
try (final Options opt = new Options()
.setMergeOperator(uint64AddOperator);
final RocksDB db = RocksDB.open(opt,
dbFolder.getRoot().getAbsolutePath())) {
//no-op
}
// test param init
try (final UInt64AddOperator uint64AddOperator2 = new UInt64AddOperator();
final Options opt = new Options()
.setMergeOperator(uint64AddOperator2);
final RocksDB db = RocksDB.open(opt,
dbFolder.getRoot().getAbsolutePath())) {
//no-op
}
// test replace one with another merge operator instance
try (final Options opt = new Options()
.setMergeOperator(uint64AddOperator);
final UInt64AddOperator newUInt64AddOperator = new UInt64AddOperator()) {
opt.setMergeOperator(newUInt64AddOperator);
try (final RocksDB db = RocksDB.open(opt,
dbFolder.getRoot().getAbsolutePath())) {
//no-op
}
}
}
}
@Test
public void emptyStringInSetMergeOperatorByName() {
try (final Options opt = new Options()