diff --git a/pom.xml b/pom.xml
index 53aee40..96ef697 100644
--- a/pom.xml
+++ b/pom.xml
@@ -112,7 +112,7 @@
io.netty
netty5-buffer
- 5.0.0.Alpha3
+ 5.0.0.Alpha4
io.netty
@@ -219,7 +219,7 @@
test
- it.cavallium
+ org.rocksdb
rocksdbjni
7.4.3
diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java
index 419584e..dcc39c2 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java
@@ -22,6 +22,10 @@ import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.lucene.LuceneCloseable;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.RandomSortField;
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodHandles.Lookup;
+import java.lang.invoke.MethodType;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.time.Duration;
@@ -64,6 +68,8 @@ import org.apache.lucene.util.BytesRefBuilder;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.AbstractImmutableNativeReference;
+import org.rocksdb.AbstractNativeReference;
+import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import reactor.core.Disposable;
@@ -100,11 +106,23 @@ public class LLUtils {
public static final boolean DEBUG_ALL_DISCARDS
= Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.discards.log", "false"));
+ private static final Lookup PUBLIC_LOOKUP = MethodHandles.publicLookup();
+
+ private static final MethodHandle IS_ACCESSIBLE_METHOD_HANDLE;
+
static {
for (int i1 = 0; i1 < 256; i1++) {
var b = LEXICONOGRAPHIC_ITERATION_SEEKS[i1];
b[0] = (byte) i1;
}
+ var methodType = MethodType.methodType(boolean.class);
+ MethodHandle isAccessibleMethodHandle = null;
+ try {
+ isAccessibleMethodHandle = PUBLIC_LOOKUP.findVirtual(AbstractNativeReference.class, "isAccessible", methodType);
+ } catch (NoSuchMethodException | IllegalAccessException e) {
+ logger.debug("Failed to find isAccessible()", e);
+ }
+ IS_ACCESSIBLE_METHOD_HANDLE = isAccessibleMethodHandle;
initHooks();
}
@@ -744,6 +762,17 @@ public class LLUtils {
}, delay.toMillis(), TimeUnit.MILLISECONDS));
}
+ public static boolean isAccessible(AbstractNativeReference abstractNativeReference) {
+ if (IS_ACCESSIBLE_METHOD_HANDLE != null) {
+ try {
+ return (boolean) IS_ACCESSIBLE_METHOD_HANDLE.invoke(abstractNativeReference);
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return true;
+ }
+
@Deprecated
public record DirectBuffer(@NotNull Buffer buffer, @NotNull ByteBuffer byteBuffer) {}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/CappedWriteBatch.java b/src/main/java/it/cavallium/dbengine/database/disk/CappedWriteBatch.java
index 374a89a..abad999 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/CappedWriteBatch.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/CappedWriteBatch.java
@@ -10,6 +10,7 @@ import io.netty5.util.Send;
import io.netty5.util.internal.PlatformDependent;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.RocksDBColumn;
+import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -63,7 +64,7 @@ public class CappedWriteBatch extends WriteBatch {
}
}
- private synchronized void releaseAllBuffers() {
+ public synchronized void releaseAllBuffers() {
if (!buffersToRelease.isEmpty()) {
for (Buffer byteBuffer : buffersToRelease) {
byteBuffer.close();
@@ -265,9 +266,10 @@ public class CappedWriteBatch extends WriteBatch {
}
}
- @Override
+ /*
protected void disposeInternal(boolean owningHandle) {
super.disposeInternal(owningHandle);
releaseAllBuffers();
}
+ */
}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
index e914320..c1a6239 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
@@ -568,13 +568,14 @@ public class LLLocalDictionary implements LLDictionary {
try (var writeOptions = new WriteOptions()) {
assert !Schedulers.isInNonBlockingThread() : "Called putMulti in a nonblocking thread";
if (USE_WRITE_BATCHES_IN_PUT_MULTI) {
- try (var batch = new CappedWriteBatch(db,
- alloc,
- CAPPED_WRITE_BATCH_CAP,
- RESERVED_WRITE_BATCH_SIZE,
- MAX_WRITE_BATCH_SIZE,
- writeOptions
- )) {
+ var batch = new CappedWriteBatch(db,
+ alloc,
+ CAPPED_WRITE_BATCH_CAP,
+ RESERVED_WRITE_BATCH_SIZE,
+ MAX_WRITE_BATCH_SIZE,
+ writeOptions
+ );
+ try {
for (LLEntry entry : entriesWindow) {
var k = entry.getKeyUnsafe();
var v = entry.getValueUnsafe();
@@ -585,6 +586,9 @@ public class LLLocalDictionary implements LLDictionary {
}
}
batch.flush();
+ } finally {
+ batch.releaseAllBuffers();
+ batch.close();
}
} else {
for (LLEntry entry : entriesWindow) {
@@ -677,13 +681,14 @@ public class LLLocalDictionary implements LLDictionary {
}
if (USE_WRITE_BATCHES_IN_PUT_MULTI) {
- try (var batch = new CappedWriteBatch(db,
- alloc,
- CAPPED_WRITE_BATCH_CAP,
- RESERVED_WRITE_BATCH_SIZE,
- MAX_WRITE_BATCH_SIZE,
- writeOptions
- )) {
+ var batch = new CappedWriteBatch(db,
+ alloc,
+ CAPPED_WRITE_BATCH_CAP,
+ RESERVED_WRITE_BATCH_SIZE,
+ MAX_WRITE_BATCH_SIZE,
+ writeOptions
+ );
+ try {
int i = 0;
for (Tuple2 entry : entriesWindow) {
try (var valueToWrite = updatedValuesToWrite.get(i)) {
@@ -696,6 +701,9 @@ public class LLLocalDictionary implements LLDictionary {
i++;
}
batch.flush();
+ } finally {
+ batch.releaseAllBuffers();
+ batch.close();
}
} else {
int i = 0;
@@ -913,19 +921,23 @@ public class LLLocalDictionary implements LLDictionary {
}
}
} else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) {
- try (var batch = new CappedWriteBatch(db,
- alloc,
- CAPPED_WRITE_BATCH_CAP,
- RESERVED_WRITE_BATCH_SIZE,
- MAX_WRITE_BATCH_SIZE,
- writeOptions
- )) {
+ var batch = new CappedWriteBatch(db,
+ alloc,
+ CAPPED_WRITE_BATCH_CAP,
+ RESERVED_WRITE_BATCH_SIZE,
+ MAX_WRITE_BATCH_SIZE,
+ writeOptions
+ );
+ try {
if (range.isSingle()) {
batch.delete(cfh, range.getSingle());
} else {
deleteSmallRangeWriteBatch(batch, range.copy());
}
batch.flush();
+ } finally {
+ batch.releaseAllBuffers();
+ batch.close();
}
} else {
try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) {
@@ -953,13 +965,14 @@ public class LLLocalDictionary implements LLDictionary {
db.put(writeOptions, entry.getKeyUnsafe(), entry.getValueUnsafe());
}
} else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) {
- try (var batch = new CappedWriteBatch(db,
- alloc,
- CAPPED_WRITE_BATCH_CAP,
- RESERVED_WRITE_BATCH_SIZE,
- MAX_WRITE_BATCH_SIZE,
- writeOptions
- )) {
+ var batch = new CappedWriteBatch(db,
+ alloc,
+ CAPPED_WRITE_BATCH_CAP,
+ RESERVED_WRITE_BATCH_SIZE,
+ MAX_WRITE_BATCH_SIZE,
+ writeOptions);
+
+ try {
for (LLEntry entry : entriesList) {
if (nettyDirect) {
batch.put(cfh, entry.getKeyUnsafe().send(), entry.getValueUnsafe().send());
@@ -971,6 +984,9 @@ public class LLLocalDictionary implements LLDictionary {
}
}
batch.flush();
+ } finally {
+ batch.releaseAllBuffers();
+ batch.close();
}
} else {
try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) {
@@ -1076,13 +1092,14 @@ public class LLLocalDictionary implements LLDictionary {
if (LLUtils.MANUAL_READAHEAD) {
readOpts.setReadaheadSize(32 * 1024); // 32KiB
}
- try (CappedWriteBatch writeBatch = new CappedWriteBatch(db,
+ CappedWriteBatch writeBatch = new CappedWriteBatch(db,
alloc,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
writeOptions
- )) {
+ );
+ try {
byte[] firstDeletedKey = null;
byte[] lastDeletedKey = null;
@@ -1126,6 +1143,9 @@ public class LLLocalDictionary implements LLDictionary {
db.flush(fo);
}
db.flushWal(true);
+ } finally {
+ writeBatch.releaseAllBuffers();
+ writeBatch.close();
}
return null;
}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java
index 032a983..0b20e9d 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java
@@ -31,6 +31,7 @@ import it.cavallium.dbengine.rpc.current.data.DatabaseVolume;
import it.cavallium.dbengine.rpc.current.data.NamedColumnOptions;
import java.io.File;
import java.io.IOException;
+import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -1601,7 +1602,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
new ArrayList<>(handles.values())
);
handles.values().forEach(columnFamilyHandleRocksObj -> {
- if (columnFamilyHandleRocksObj.isAccessible()) {
+ if (LLUtils.isAccessible(columnFamilyHandleRocksObj)) {
columnFamilyHandleRocksObj.close();
}
});