From 25a702015e91718845a3c9e150c1df526e097d8d Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 29 Jul 2022 00:32:08 +0200 Subject: [PATCH] Update dependencies --- pom.xml | 4 +- .../cavallium/dbengine/database/LLUtils.java | 29 +++++++ .../database/disk/CappedWriteBatch.java | 6 +- .../database/disk/LLLocalDictionary.java | 80 ++++++++++++------- .../disk/LLLocalKeyValueDatabase.java | 3 +- 5 files changed, 87 insertions(+), 35 deletions(-) diff --git a/pom.xml b/pom.xml index 53aee40..96ef697 100644 --- a/pom.xml +++ b/pom.xml @@ -112,7 +112,7 @@ io.netty netty5-buffer - 5.0.0.Alpha3 + 5.0.0.Alpha4 io.netty @@ -219,7 +219,7 @@ test - it.cavallium + org.rocksdb rocksdbjni 7.4.3 diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 419584e..dcc39c2 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -22,6 +22,10 @@ import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.lucene.LuceneCloseable; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.RandomSortField; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodHandles.Lookup; +import java.lang.invoke.MethodType; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.time.Duration; @@ -64,6 +68,8 @@ import org.apache.lucene.util.BytesRefBuilder; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.rocksdb.AbstractImmutableNativeReference; +import org.rocksdb.AbstractNativeReference; +import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import reactor.core.Disposable; @@ -100,11 +106,23 @@ public class LLUtils { public static final boolean DEBUG_ALL_DISCARDS = Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.discards.log", "false")); + private static final Lookup PUBLIC_LOOKUP = MethodHandles.publicLookup(); + + private static final MethodHandle IS_ACCESSIBLE_METHOD_HANDLE; + static { for (int i1 = 0; i1 < 256; i1++) { var b = LEXICONOGRAPHIC_ITERATION_SEEKS[i1]; b[0] = (byte) i1; } + var methodType = MethodType.methodType(boolean.class); + MethodHandle isAccessibleMethodHandle = null; + try { + isAccessibleMethodHandle = PUBLIC_LOOKUP.findVirtual(AbstractNativeReference.class, "isAccessible", methodType); + } catch (NoSuchMethodException | IllegalAccessException e) { + logger.debug("Failed to find isAccessible()", e); + } + IS_ACCESSIBLE_METHOD_HANDLE = isAccessibleMethodHandle; initHooks(); } @@ -744,6 +762,17 @@ public class LLUtils { }, delay.toMillis(), TimeUnit.MILLISECONDS)); } + public static boolean isAccessible(AbstractNativeReference abstractNativeReference) { + if (IS_ACCESSIBLE_METHOD_HANDLE != null) { + try { + return (boolean) IS_ACCESSIBLE_METHOD_HANDLE.invoke(abstractNativeReference); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + return true; + } + @Deprecated public record DirectBuffer(@NotNull Buffer buffer, @NotNull ByteBuffer byteBuffer) {} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/CappedWriteBatch.java b/src/main/java/it/cavallium/dbengine/database/disk/CappedWriteBatch.java index 374a89a..abad999 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/CappedWriteBatch.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/CappedWriteBatch.java @@ -10,6 +10,7 @@ import io.netty5.util.Send; import io.netty5.util.internal.PlatformDependent; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.RocksDBColumn; +import java.io.Closeable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -63,7 +64,7 @@ public class CappedWriteBatch extends WriteBatch { } } - private synchronized void releaseAllBuffers() { + public synchronized void releaseAllBuffers() { if (!buffersToRelease.isEmpty()) { for (Buffer byteBuffer : buffersToRelease) { byteBuffer.close(); @@ -265,9 +266,10 @@ public class CappedWriteBatch extends WriteBatch { } } - @Override + /* protected void disposeInternal(boolean owningHandle) { super.disposeInternal(owningHandle); releaseAllBuffers(); } + */ } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index e914320..c1a6239 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -568,13 +568,14 @@ public class LLLocalDictionary implements LLDictionary { try (var writeOptions = new WriteOptions()) { assert !Schedulers.isInNonBlockingThread() : "Called putMulti in a nonblocking thread"; if (USE_WRITE_BATCHES_IN_PUT_MULTI) { - try (var batch = new CappedWriteBatch(db, - alloc, - CAPPED_WRITE_BATCH_CAP, - RESERVED_WRITE_BATCH_SIZE, - MAX_WRITE_BATCH_SIZE, - writeOptions - )) { + var batch = new CappedWriteBatch(db, + alloc, + CAPPED_WRITE_BATCH_CAP, + RESERVED_WRITE_BATCH_SIZE, + MAX_WRITE_BATCH_SIZE, + writeOptions + ); + try { for (LLEntry entry : entriesWindow) { var k = entry.getKeyUnsafe(); var v = entry.getValueUnsafe(); @@ -585,6 +586,9 @@ public class LLLocalDictionary implements LLDictionary { } } batch.flush(); + } finally { + batch.releaseAllBuffers(); + batch.close(); } } else { for (LLEntry entry : entriesWindow) { @@ -677,13 +681,14 @@ public class LLLocalDictionary implements LLDictionary { } if (USE_WRITE_BATCHES_IN_PUT_MULTI) { - try (var batch = new CappedWriteBatch(db, - alloc, - CAPPED_WRITE_BATCH_CAP, - RESERVED_WRITE_BATCH_SIZE, - MAX_WRITE_BATCH_SIZE, - writeOptions - )) { + var batch = new CappedWriteBatch(db, + alloc, + CAPPED_WRITE_BATCH_CAP, + RESERVED_WRITE_BATCH_SIZE, + MAX_WRITE_BATCH_SIZE, + writeOptions + ); + try { int i = 0; for (Tuple2 entry : entriesWindow) { try (var valueToWrite = updatedValuesToWrite.get(i)) { @@ -696,6 +701,9 @@ public class LLLocalDictionary implements LLDictionary { i++; } batch.flush(); + } finally { + batch.releaseAllBuffers(); + batch.close(); } } else { int i = 0; @@ -913,19 +921,23 @@ public class LLLocalDictionary implements LLDictionary { } } } else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) { - try (var batch = new CappedWriteBatch(db, - alloc, - CAPPED_WRITE_BATCH_CAP, - RESERVED_WRITE_BATCH_SIZE, - MAX_WRITE_BATCH_SIZE, - writeOptions - )) { + var batch = new CappedWriteBatch(db, + alloc, + CAPPED_WRITE_BATCH_CAP, + RESERVED_WRITE_BATCH_SIZE, + MAX_WRITE_BATCH_SIZE, + writeOptions + ); + try { if (range.isSingle()) { batch.delete(cfh, range.getSingle()); } else { deleteSmallRangeWriteBatch(batch, range.copy()); } batch.flush(); + } finally { + batch.releaseAllBuffers(); + batch.close(); } } else { try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) { @@ -953,13 +965,14 @@ public class LLLocalDictionary implements LLDictionary { db.put(writeOptions, entry.getKeyUnsafe(), entry.getValueUnsafe()); } } else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) { - try (var batch = new CappedWriteBatch(db, - alloc, - CAPPED_WRITE_BATCH_CAP, - RESERVED_WRITE_BATCH_SIZE, - MAX_WRITE_BATCH_SIZE, - writeOptions - )) { + var batch = new CappedWriteBatch(db, + alloc, + CAPPED_WRITE_BATCH_CAP, + RESERVED_WRITE_BATCH_SIZE, + MAX_WRITE_BATCH_SIZE, + writeOptions); + + try { for (LLEntry entry : entriesList) { if (nettyDirect) { batch.put(cfh, entry.getKeyUnsafe().send(), entry.getValueUnsafe().send()); @@ -971,6 +984,9 @@ public class LLLocalDictionary implements LLDictionary { } } batch.flush(); + } finally { + batch.releaseAllBuffers(); + batch.close(); } } else { try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) { @@ -1076,13 +1092,14 @@ public class LLLocalDictionary implements LLDictionary { if (LLUtils.MANUAL_READAHEAD) { readOpts.setReadaheadSize(32 * 1024); // 32KiB } - try (CappedWriteBatch writeBatch = new CappedWriteBatch(db, + CappedWriteBatch writeBatch = new CappedWriteBatch(db, alloc, CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE, writeOptions - )) { + ); + try { byte[] firstDeletedKey = null; byte[] lastDeletedKey = null; @@ -1126,6 +1143,9 @@ public class LLLocalDictionary implements LLDictionary { db.flush(fo); } db.flushWal(true); + } finally { + writeBatch.releaseAllBuffers(); + writeBatch.close(); } return null; } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 032a983..0b20e9d 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -31,6 +31,7 @@ import it.cavallium.dbengine.rpc.current.data.DatabaseVolume; import it.cavallium.dbengine.rpc.current.data.NamedColumnOptions; import java.io.File; import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -1601,7 +1602,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { new ArrayList<>(handles.values()) ); handles.values().forEach(columnFamilyHandleRocksObj -> { - if (columnFamilyHandleRocksObj.isAccessible()) { + if (LLUtils.isAccessible(columnFamilyHandleRocksObj)) { columnFamilyHandleRocksObj.close(); } });