Java APIs for put, merge and delete in file ingestion
Summary: Adding SSTFileWriter's newly introduced put, merge and delete apis to the Java api. The C++ APIs were first introduced in #2361. Add is deprecated in favor of Put. Merge is especially needed to support streaming for Cassandra-on-RocksDB work in https://issues.apache.org/jira/browse/CASSANDRA-13476. Closes https://github.com/facebook/rocksdb/pull/2392 Differential Revision: D5165091 Pulled By: sagar0 fbshipit-source-id: 6f0ad396a7cbd2e27ca63e702584784dd72acaab
This commit is contained in:
parent
dd6e5c7c63
commit
6a7d920694
@ -72,24 +72,6 @@ void Java_org_rocksdb_SstFileWriter_open(JNIEnv *env, jobject jobj,
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_rocksdb_SstFileWriter
|
||||
* Method: add
|
||||
* Signature: (JJJ)V
|
||||
*/
|
||||
void Java_org_rocksdb_SstFileWriter_add(JNIEnv *env, jobject jobj,
|
||||
jlong jhandle, jlong jkey_handle,
|
||||
jlong jvalue_handle) {
|
||||
auto *key_slice = reinterpret_cast<rocksdb::Slice *>(jkey_handle);
|
||||
auto *value_slice = reinterpret_cast<rocksdb::Slice *>(jvalue_handle);
|
||||
rocksdb::Status s =
|
||||
reinterpret_cast<rocksdb::SstFileWriter *>(jhandle)->Put(*key_slice,
|
||||
*value_slice);
|
||||
if (!s.ok()) {
|
||||
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_rocksdb_SstFileWriter
|
||||
* Method: put
|
||||
|
@ -5,47 +5,199 @@
|
||||
|
||||
package org.rocksdb;
|
||||
|
||||
/**
|
||||
* SstFileWriter is used to create sst files that can be added to the
|
||||
* database later. All keys in files generated by SstFileWriter will have
|
||||
* sequence number = 0.
|
||||
*/
|
||||
public class SstFileWriter extends RocksObject {
|
||||
static {
|
||||
RocksDB.loadLibrary();
|
||||
}
|
||||
|
||||
/**
|
||||
* SstFileWriter Constructor.
|
||||
*
|
||||
* @param envOptions {@link org.rocksdb.EnvOptions} instance.
|
||||
* @param options {@link org.rocksdb.Options} instance.
|
||||
* @param comparator the comparator to specify the ordering of keys.
|
||||
*
|
||||
* @deprecated Use {@link #SstFileWriter(EnvOptions, Options)}.
|
||||
* Passing an explicit comparator is deprecated in lieu of passing the
|
||||
* comparator as part of options. Use the other constructor instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public SstFileWriter(final EnvOptions envOptions, final Options options,
|
||||
final AbstractComparator<? extends AbstractSlice<?>> comparator) {
|
||||
super(newSstFileWriter(
|
||||
envOptions.nativeHandle_, options.nativeHandle_, comparator.getNativeHandle()));
|
||||
}
|
||||
|
||||
/**
|
||||
* SstFileWriter Constructor.
|
||||
*
|
||||
* @param envOptions {@link org.rocksdb.EnvOptions} instance.
|
||||
* @param options {@link org.rocksdb.Options} instance.
|
||||
*/
|
||||
public SstFileWriter(final EnvOptions envOptions, final Options options) {
|
||||
super(newSstFileWriter(
|
||||
envOptions.nativeHandle_, options.nativeHandle_));
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare SstFileWriter to write to a file.
|
||||
*
|
||||
* @param filePath the location of file
|
||||
*
|
||||
* @throws RocksDBException thrown if error happens in underlying
|
||||
* native library.
|
||||
*/
|
||||
public void open(final String filePath) throws RocksDBException {
|
||||
open(nativeHandle_, filePath);
|
||||
}
|
||||
|
||||
public void add(final Slice key, final Slice value) throws RocksDBException {
|
||||
add(nativeHandle_, key.getNativeHandle(), value.getNativeHandle());
|
||||
/**
|
||||
* Add a Put key with value to currently opened file.
|
||||
*
|
||||
* @param key the specified key to be inserted.
|
||||
* @param value the value associated with the specified key.
|
||||
*
|
||||
* @throws RocksDBException thrown if error happens in underlying
|
||||
* native library.
|
||||
*
|
||||
* @deprecated Use {@link #put(Slice, Slice)}
|
||||
*/
|
||||
@Deprecated
|
||||
public void add(final Slice key, final Slice value)
|
||||
throws RocksDBException {
|
||||
put(nativeHandle_, key.getNativeHandle(), value.getNativeHandle());
|
||||
}
|
||||
|
||||
public void add(final DirectSlice key, final DirectSlice value) throws RocksDBException {
|
||||
add(nativeHandle_, key.getNativeHandle(), value.getNativeHandle());
|
||||
/**
|
||||
* Add a Put key with value to currently opened file.
|
||||
*
|
||||
* @param key the specified key to be inserted.
|
||||
* @param value the value associated with the specified key.
|
||||
*
|
||||
* @throws RocksDBException thrown if error happens in underlying
|
||||
* native library.
|
||||
*
|
||||
* @deprecated Use {@link #put(DirectSlice, DirectSlice)}
|
||||
*/
|
||||
@Deprecated
|
||||
public void add(final DirectSlice key, final DirectSlice value)
|
||||
throws RocksDBException {
|
||||
put(nativeHandle_, key.getNativeHandle(), value.getNativeHandle());
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a Put key with value to currently opened file.
|
||||
*
|
||||
* @param key the specified key to be inserted.
|
||||
* @param value the value associated with the specified key.
|
||||
*
|
||||
* @throws RocksDBException thrown if error happens in underlying
|
||||
* native library.
|
||||
*/
|
||||
public void put(final Slice key, final Slice value) throws RocksDBException {
|
||||
put(nativeHandle_, key.getNativeHandle(), value.getNativeHandle());
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a Put key with value to currently opened file.
|
||||
*
|
||||
* @param key the specified key to be inserted.
|
||||
* @param value the value associated with the specified key.
|
||||
*
|
||||
* @throws RocksDBException thrown if error happens in underlying
|
||||
* native library.
|
||||
*/
|
||||
public void put(final DirectSlice key, final DirectSlice value)
|
||||
throws RocksDBException {
|
||||
put(nativeHandle_, key.getNativeHandle(), value.getNativeHandle());
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a Merge key with value to currently opened file.
|
||||
*
|
||||
* @param key the specified key to be merged.
|
||||
* @param value the value to be merged with the current value for
|
||||
* the specified key.
|
||||
*
|
||||
* @throws RocksDBException thrown if error happens in underlying
|
||||
* native library.
|
||||
*/
|
||||
public void merge(final Slice key, final Slice value)
|
||||
throws RocksDBException {
|
||||
merge(nativeHandle_, key.getNativeHandle(), value.getNativeHandle());
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a Merge key with value to currently opened file.
|
||||
*
|
||||
* @param key the specified key to be merged.
|
||||
* @param value the value to be merged with the current value for
|
||||
* the specified key.
|
||||
*
|
||||
* @throws RocksDBException thrown if error happens in underlying
|
||||
* native library.
|
||||
*/
|
||||
public void merge(final DirectSlice key, final DirectSlice value)
|
||||
throws RocksDBException {
|
||||
merge(nativeHandle_, key.getNativeHandle(), value.getNativeHandle());
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a deletion key to currently opened file.
|
||||
*
|
||||
* @param key the specified key to be deleted.
|
||||
*
|
||||
* @throws RocksDBException thrown if error happens in underlying
|
||||
* native library.
|
||||
*/
|
||||
public void delete(final Slice key) throws RocksDBException {
|
||||
delete(nativeHandle_, key.getNativeHandle());
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a deletion key to currently opened file.
|
||||
*
|
||||
* @param key the specified key to be deleted.
|
||||
*
|
||||
* @throws RocksDBException thrown if error happens in underlying
|
||||
* native library.
|
||||
*/
|
||||
public void delete(final DirectSlice key) throws RocksDBException {
|
||||
delete(nativeHandle_, key.getNativeHandle());
|
||||
}
|
||||
|
||||
/**
|
||||
* Finish the process and close the sst file.
|
||||
*
|
||||
* @throws RocksDBException thrown if error happens in underlying
|
||||
* native library.
|
||||
*/
|
||||
public void finish() throws RocksDBException {
|
||||
finish(nativeHandle_);
|
||||
}
|
||||
|
||||
private native static long newSstFileWriter(
|
||||
final long envOptionsHandle, final long optionsHandle, final long userComparatorHandle);
|
||||
final long envOptionsHandle, final long optionsHandle,
|
||||
final long userComparatorHandle);
|
||||
|
||||
private native static long newSstFileWriter(final long envOptionsHandle,
|
||||
final long optionsHandle);
|
||||
|
||||
private native void open(final long handle, final String filePath) throws RocksDBException;
|
||||
private native void open(final long handle, final String filePath)
|
||||
throws RocksDBException;
|
||||
|
||||
private native void add(final long handle, final long keyHandle, final long valueHandle)
|
||||
private native void put(final long handle, final long keyHandle,
|
||||
final long valueHandle) throws RocksDBException;
|
||||
|
||||
private native void merge(final long handle, final long keyHandle,
|
||||
final long valueHandle) throws RocksDBException;
|
||||
|
||||
private native void delete(final long handle, final long keyHandle)
|
||||
throws RocksDBException;
|
||||
|
||||
private native void finish(final long handle) throws RocksDBException;
|
||||
|
@ -13,11 +13,12 @@ import org.rocksdb.util.BytewiseComparator;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.List;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class SstFileWriterTest {
|
||||
private static final String SST_FILE_NAME = "test.sst";
|
||||
@ -29,11 +30,37 @@ public class SstFileWriterTest {
|
||||
|
||||
@Rule public TemporaryFolder parentFolder = new TemporaryFolder();
|
||||
|
||||
private File newSstFile(final TreeMap<String, String> keyValues,
|
||||
boolean useJavaBytewiseComparator)
|
||||
throws IOException, RocksDBException {
|
||||
enum OpType { PUT, MERGE, DELETE }
|
||||
|
||||
class KeyValueWithOp {
|
||||
KeyValueWithOp(String key, String value, OpType opType) {
|
||||
this.key = key;
|
||||
this.value = value;
|
||||
this.opType = opType;
|
||||
}
|
||||
|
||||
String getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
String getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
OpType getOpType() {
|
||||
return opType;
|
||||
}
|
||||
|
||||
private String key;
|
||||
private String value;
|
||||
private OpType opType;
|
||||
};
|
||||
|
||||
private File newSstFile(final List<KeyValueWithOp> keyValues,
|
||||
boolean useJavaBytewiseComparator) throws IOException, RocksDBException {
|
||||
final EnvOptions envOptions = new EnvOptions();
|
||||
final Options options = new Options();
|
||||
final StringAppendOperator stringAppendOperator = new StringAppendOperator();
|
||||
final Options options = new Options().setMergeOperator(stringAppendOperator);
|
||||
SstFileWriter sstFileWriter = null;
|
||||
ComparatorOptions comparatorOptions = null;
|
||||
BytewiseComparator comparator = null;
|
||||
@ -49,10 +76,22 @@ public class SstFileWriterTest {
|
||||
final File sstFile = parentFolder.newFile(SST_FILE_NAME);
|
||||
try {
|
||||
sstFileWriter.open(sstFile.getAbsolutePath());
|
||||
for (Map.Entry<String, String> keyValue : keyValues.entrySet()) {
|
||||
for (KeyValueWithOp keyValue : keyValues) {
|
||||
Slice keySlice = new Slice(keyValue.getKey());
|
||||
Slice valueSlice = new Slice(keyValue.getValue());
|
||||
sstFileWriter.add(keySlice, valueSlice);
|
||||
switch (keyValue.getOpType()) {
|
||||
case PUT:
|
||||
sstFileWriter.put(keySlice, valueSlice);
|
||||
break;
|
||||
case MERGE:
|
||||
sstFileWriter.merge(keySlice, valueSlice);
|
||||
break;
|
||||
case DELETE:
|
||||
sstFileWriter.delete(keySlice);
|
||||
break;
|
||||
default:
|
||||
fail("Unsupported op type");
|
||||
}
|
||||
keySlice.close();
|
||||
valueSlice.close();
|
||||
}
|
||||
@ -75,55 +114,79 @@ public class SstFileWriterTest {
|
||||
@Test
|
||||
public void generateSstFileWithJavaComparator()
|
||||
throws RocksDBException, IOException {
|
||||
final TreeMap<String, String> keyValues = new TreeMap<>();
|
||||
keyValues.put("key1", "value1");
|
||||
keyValues.put("key2", "value2");
|
||||
final List<KeyValueWithOp> keyValues = new ArrayList<>();
|
||||
keyValues.add(new KeyValueWithOp("key1", "value1", OpType.PUT));
|
||||
keyValues.add(new KeyValueWithOp("key2", "value2", OpType.PUT));
|
||||
keyValues.add(new KeyValueWithOp("key3", "value3", OpType.MERGE));
|
||||
keyValues.add(new KeyValueWithOp("key4", "value4", OpType.MERGE));
|
||||
keyValues.add(new KeyValueWithOp("key5", "", OpType.DELETE));
|
||||
|
||||
newSstFile(keyValues, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void generateSstFileWithNativeComparator()
|
||||
throws RocksDBException, IOException {
|
||||
final TreeMap<String, String> keyValues = new TreeMap<>();
|
||||
keyValues.put("key1", "value1");
|
||||
keyValues.put("key2", "value2");
|
||||
final List<KeyValueWithOp> keyValues = new ArrayList<>();
|
||||
keyValues.add(new KeyValueWithOp("key1", "value1", OpType.PUT));
|
||||
keyValues.add(new KeyValueWithOp("key2", "value2", OpType.PUT));
|
||||
keyValues.add(new KeyValueWithOp("key3", "value3", OpType.MERGE));
|
||||
keyValues.add(new KeyValueWithOp("key4", "value4", OpType.MERGE));
|
||||
keyValues.add(new KeyValueWithOp("key5", "", OpType.DELETE));
|
||||
|
||||
newSstFile(keyValues, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void ingestSstFile() throws RocksDBException, IOException {
|
||||
final TreeMap<String, String> keyValues = new TreeMap<>();
|
||||
keyValues.put("key1", "value1");
|
||||
keyValues.put("key2", "value2");
|
||||
final List<KeyValueWithOp> keyValues = new ArrayList<>();
|
||||
keyValues.add(new KeyValueWithOp("key1", "value1", OpType.PUT));
|
||||
keyValues.add(new KeyValueWithOp("key2", "value2", OpType.PUT));
|
||||
keyValues.add(new KeyValueWithOp("key3", "value3", OpType.MERGE));
|
||||
keyValues.add(new KeyValueWithOp("key4", "", OpType.DELETE));
|
||||
|
||||
final File sstFile = newSstFile(keyValues, false);
|
||||
final File dbFolder = parentFolder.newFolder(DB_DIRECTORY_NAME);
|
||||
try(final Options options = new Options().setCreateIfMissing(true);
|
||||
final RocksDB db = RocksDB.open(options, dbFolder.getAbsolutePath());
|
||||
final IngestExternalFileOptions ingestExternalFileOptions
|
||||
= new IngestExternalFileOptions()) {
|
||||
try(final StringAppendOperator stringAppendOperator =
|
||||
new StringAppendOperator();
|
||||
final Options options = new Options()
|
||||
.setCreateIfMissing(true)
|
||||
.setMergeOperator(stringAppendOperator);
|
||||
final RocksDB db = RocksDB.open(options, dbFolder.getAbsolutePath());
|
||||
final IngestExternalFileOptions ingestExternalFileOptions =
|
||||
new IngestExternalFileOptions()) {
|
||||
db.ingestExternalFile(Arrays.asList(sstFile.getAbsolutePath()),
|
||||
ingestExternalFileOptions);
|
||||
|
||||
assertThat(db.get("key1".getBytes())).isEqualTo("value1".getBytes());
|
||||
assertThat(db.get("key2".getBytes())).isEqualTo("value2".getBytes());
|
||||
assertThat(db.get("key3".getBytes())).isEqualTo("value3".getBytes());
|
||||
assertThat(db.get("key4".getBytes())).isEqualTo(null);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void ingestSstFile_cf() throws RocksDBException, IOException {
|
||||
final TreeMap<String, String> keyValues = new TreeMap<>();
|
||||
keyValues.put("key1", "value1");
|
||||
keyValues.put("key2", "value2");
|
||||
final List<KeyValueWithOp> keyValues = new ArrayList<>();
|
||||
keyValues.add(new KeyValueWithOp("key1", "value1", OpType.PUT));
|
||||
keyValues.add(new KeyValueWithOp("key2", "value2", OpType.PUT));
|
||||
keyValues.add(new KeyValueWithOp("key3", "value3", OpType.MERGE));
|
||||
keyValues.add(new KeyValueWithOp("key4", "", OpType.DELETE));
|
||||
|
||||
final File sstFile = newSstFile(keyValues, false);
|
||||
final File dbFolder = parentFolder.newFolder(DB_DIRECTORY_NAME);
|
||||
try(final Options options = new Options()
|
||||
.setCreateIfMissing(true)
|
||||
.setCreateMissingColumnFamilies(true);
|
||||
try(final StringAppendOperator stringAppendOperator =
|
||||
new StringAppendOperator();
|
||||
final Options options = new Options()
|
||||
.setCreateIfMissing(true)
|
||||
.setCreateMissingColumnFamilies(true)
|
||||
.setMergeOperator(stringAppendOperator);
|
||||
final RocksDB db = RocksDB.open(options, dbFolder.getAbsolutePath());
|
||||
final IngestExternalFileOptions ingestExternalFileOptions =
|
||||
new IngestExternalFileOptions()) {
|
||||
|
||||
try(final ColumnFamilyOptions cf_opts = new ColumnFamilyOptions();
|
||||
try(final ColumnFamilyOptions cf_opts = new ColumnFamilyOptions()
|
||||
.setMergeOperator(stringAppendOperator);
|
||||
final ColumnFamilyHandle cf_handle = db.createColumnFamily(
|
||||
new ColumnFamilyDescriptor("new_cf".getBytes(), cf_opts))) {
|
||||
|
||||
@ -135,6 +198,10 @@ public class SstFileWriterTest {
|
||||
"key1".getBytes())).isEqualTo("value1".getBytes());
|
||||
assertThat(db.get(cf_handle,
|
||||
"key2".getBytes())).isEqualTo("value2".getBytes());
|
||||
assertThat(db.get(cf_handle,
|
||||
"key3".getBytes())).isEqualTo("value3".getBytes());
|
||||
assertThat(db.get(cf_handle,
|
||||
"key4".getBytes())).isEqualTo(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user