diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index b34a4c7..bb20ea2 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -82,8 +82,8 @@ public class LLUtils { return bool ? RESPONSE_TRUE : RESPONSE_FALSE; } - public static Buffer booleanToResponseByteBuffer(BufferAllocator alloc, boolean bool) { - return alloc.allocate(1).writeByte(bool ? (byte) 1 : 0); + public static Send booleanToResponseByteBuffer(BufferAllocator alloc, boolean bool) { + return alloc.allocate(1).writeByte(bool ? (byte) 1 : 0).send(); } @Nullable @@ -349,18 +349,6 @@ public class LLUtils { return result; } - /* - public static Buffer toDirectCopy(Buffer buffer) { - try { - Buffer directCopyBuf = buffer.alloc().buffer(buffer.capacity(), buffer.maxCapacity()); - directCopyBuf.writeBytes(buffer, 0, buffer.writerIndex()); - return directCopyBuf; - } finally { - buffer.release(); - } - } - */ - public static Buffer fromByteArray(BufferAllocator alloc, byte[] array) { Buffer result = alloc.allocate(array.length); result.writeBytes(array); @@ -378,7 +366,6 @@ public class LLUtils { public static Send compositeBuffer(BufferAllocator alloc, Send buffer) { try (var composite = buffer.receive().compact()) { - assert composite.countReadableComponents() == 1 || composite.countReadableComponents() == 0; return composite.send(); } } @@ -387,7 +374,6 @@ public class LLUtils { try (buffer1) { try (buffer2) { try (var composite = CompositeBuffer.compose(alloc, buffer1, buffer2).compact()) { - assert composite.countReadableComponents() == 1 || composite.countReadableComponents() == 0; return composite.send(); } } @@ -399,7 +385,6 @@ public class LLUtils { try (buffer2) { try (buffer3) { try (var composite = CompositeBuffer.compose(alloc, buffer1, buffer2, buffer3).compact()) { - assert composite.countReadableComponents() == 1 || composite.countReadableComponents() == 0; return composite.send(); } } @@ -407,6 +392,7 @@ public class LLUtils { } } + @SafeVarargs public static Send compositeBuffer(BufferAllocator alloc, Send... buffers) { try { return switch (buffers.length) { @@ -416,7 +402,6 @@ public class LLUtils { case 3 -> compositeBuffer(alloc, buffers[0], buffers[1], buffers[2]); default -> { try (var composite = CompositeBuffer.compose(alloc, buffers).compact()) { - assert composite.countReadableComponents() == 1 || composite.countReadableComponents() == 0; yield composite.send(); } } @@ -738,7 +723,10 @@ public class LLUtils { } public static boolean isDirect(Buffer key) { - if (key.countReadableComponents() == 1) { + var readableComponents = key.countReadableComponents(); + if (readableComponents == 0) { + return true; + } else if (readableComponents == 1) { return key.forEachReadable(0, (index, component) -> component.readableBuffer().isDirect()) >= 0; } else { return false; diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index 1d3c716..1b4fc83 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -44,7 +44,9 @@ public class DatabaseMapDictionaryDeep> implem private static Send incrementPrefix(BufferAllocator alloc, Send originalKeySend, int prefixLength) { try (var originalKey = originalKeySend.receive()) { assert originalKey.readableBytes() >= prefixLength; - try (Buffer copiedBuf = alloc.allocate(originalKey.writerOffset())) { + var originalKeyStartOffset = originalKey.readerOffset(); + var originalKeyLength = originalKey.readableBytes(); + try (Buffer copiedBuf = alloc.allocate(originalKey.readableBytes())) { boolean overflowed = true; final int ff = 0xFF; int writtenBytes = 0; @@ -67,17 +69,19 @@ public class DatabaseMapDictionaryDeep> implem originalKey.copyInto(0, copiedBuf, 0, (prefixLength - writtenBytes)); } - copiedBuf.writerOffset(copiedBuf.capacity()); + copiedBuf.writerOffset(originalKeyLength); - if (originalKey.writerOffset() - prefixLength > 0) { - originalKey.copyInto(prefixLength, copiedBuf, prefixLength, originalKey.writerOffset() - prefixLength); + if (originalKeyLength - prefixLength > 0) { + originalKey.copyInto(prefixLength, copiedBuf, prefixLength, originalKeyLength - prefixLength); } if (overflowed) { - for (int i = 0; i < copiedBuf.writerOffset(); i++) { + copiedBuf.ensureWritable(originalKeyLength + 1); + copiedBuf.writerOffset(originalKeyLength + 1); + for (int i = 0; i < originalKeyLength; i++) { copiedBuf.setUnsignedByte(i, 0xFF); } - copiedBuf.writeByte((byte) 0x00); + copiedBuf.setUnsignedByte(originalKeyLength, (byte) 0x00); } return copiedBuf.send(); } @@ -118,8 +122,8 @@ public class DatabaseMapDictionaryDeep> implem for (int i = 0; i < suffixLength + extLength; i++) { zeroSuffixAndExt.writeByte((byte) 0x0); } - try (Send result = LLUtils.compositeBuffer(alloc, prefixKey.send(), zeroSuffixAndExt.send())) { - return result; + try (Buffer result = LLUtils.compositeBuffer(alloc, prefixKey.send(), zeroSuffixAndExt.send()).receive()) { + return result.send(); } } } @@ -214,36 +218,38 @@ public class DatabaseMapDictionaryDeep> implem } protected DatabaseMapDictionaryDeep(LLDictionary dictionary, - Send prefixKey, + Send prefixKeyToReceive, SerializerFixedBinaryLength> keySuffixSerializer, SubStageGetter subStageGetter, int keyExtLength) { - this.dictionary = dictionary; - this.alloc = dictionary.getAllocator(); - this.subStageGetter = subStageGetter; - this.keySuffixSerializer = keySuffixSerializer; - this.keyPrefix = prefixKey.receive(); - assert keyPrefix.isAccessible(); - this.keyPrefixLength = keyPrefix.readableBytes(); - this.keySuffixLength = keySuffixSerializer.getSerializedBinaryLength(); - this.keyExtLength = keyExtLength; - try (Buffer firstKey = firstRangeKey(alloc, - keyPrefix.copy().send(), - keyPrefixLength, - keySuffixLength, - keyExtLength - ).receive()) { - try (Buffer nextRangeKey = nextRangeKey(alloc, - keyPrefix.copy().send(), + try (var prefixKey = prefixKeyToReceive.receive()) { + this.dictionary = dictionary; + this.alloc = dictionary.getAllocator(); + this.subStageGetter = subStageGetter; + this.keySuffixSerializer = keySuffixSerializer; + this.keyPrefix = prefixKey.copy(); + assert keyPrefix.isAccessible(); + this.keyPrefixLength = keyPrefix.readableBytes(); + this.keySuffixLength = keySuffixSerializer.getSerializedBinaryLength(); + this.keyExtLength = keyExtLength; + try (Buffer firstKey = firstRangeKey(alloc, + prefixKey.copy().send(), keyPrefixLength, keySuffixLength, keyExtLength - ).receive()) { - assert keyPrefix.isAccessible(); - assert keyPrefixLength == 0 || !LLUtils.equals(firstKey, nextRangeKey); - this.range = keyPrefixLength == 0 ? LLRange.all() : LLRange.of(firstKey.send(), nextRangeKey.send()); - this.rangeMono = LLUtils.lazyRetainRange(this.range); - assert subStageKeysConsistency(keyPrefixLength + keySuffixLength + keyExtLength); + ).receive().compact()) { + try (Buffer nextRangeKey = nextRangeKey(alloc, + prefixKey.copy().send(), + keyPrefixLength, + keySuffixLength, + keyExtLength + ).receive().compact()) { + assert keyPrefix.isAccessible(); + assert keyPrefixLength == 0 || !LLUtils.equals(firstKey, nextRangeKey); + this.range = keyPrefixLength == 0 ? LLRange.all() : LLRange.of(firstKey.send(), nextRangeKey.send()); + this.rangeMono = LLUtils.lazyRetainRange(this.range); + assert subStageKeysConsistency(keyPrefixLength + keySuffixLength + keyExtLength); + } } } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 90d5acb..0b7c5f9 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -402,7 +402,8 @@ public class LLLocalDictionary implements LLDictionary { @Nullable WriteOptions writeOptions, Send keyToReceive, Send valueToReceive) throws RocksDBException { - try (WriteOptions validWriteOptions = Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS)) { + WriteOptions validWriteOptions = Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS); + try { try (var key = keyToReceive.receive()) { try (var value = valueToReceive.receive()) { if (databaseOptions.allowNettyDirect()) { @@ -424,6 +425,10 @@ public class LLLocalDictionary implements LLDictionary { } } } + } finally { + if (writeOptions != null && !(writeOptions instanceof UnreleasableWriteOptions)) { + writeOptions.close(); + } } } @@ -523,7 +528,8 @@ public class LLLocalDictionary implements LLDictionary { int size = RocksDB.NOT_FOUND; byte[] keyBytes = LLUtils.toArray(key); Holder data = new Holder<>(); - try (var unmodifiableReadOpts = resolveSnapshot(snapshot)) { + var unmodifiableReadOpts = resolveSnapshot(snapshot); + try { if (db.keyMayExist(cfh, unmodifiableReadOpts, keyBytes, data)) { if (data.getValue() != null) { size = data.getValue().length; @@ -531,6 +537,10 @@ public class LLLocalDictionary implements LLDictionary { size = db.get(cfh, unmodifiableReadOpts, keyBytes, NO_DATA); } } + } finally { + if (unmodifiableReadOpts != null && !(unmodifiableReadOpts instanceof UnreleasableReadOptions)) { + unmodifiableReadOpts.close(); + } } return size != RocksDB.NOT_FOUND; } finally { @@ -912,55 +922,51 @@ public class LLLocalDictionary implements LLDictionary { } private Mono> getPreviousData(Mono> keyMono, LLDictionaryResultType resultType) { - return Mono - .usingWhen(keyMono, - keySend -> { - try (var key = keySend.receive()) { - return switch (resultType) { - case PREVIOUS_VALUE_EXISTENCE -> this - .containsKey(null, keyMono) - .single() - .map((Boolean bool) -> LLUtils.booleanToResponseByteBuffer(alloc, bool).send()); - case PREVIOUS_VALUE -> Mono - .fromCallable(() -> { - StampedLock lock; - long stamp; - if (updateMode == UpdateMode.ALLOW) { - lock = itemsLock.getAt(getLockIndex(key)); + return switch (resultType) { + case PREVIOUS_VALUE_EXISTENCE -> this + .containsKey(null, keyMono) + .single() + .map((Boolean bool) -> LLUtils.booleanToResponseByteBuffer(alloc, bool)); + case PREVIOUS_VALUE -> Mono.usingWhen( + keyMono, + keySend -> this + .runOnDb(() -> { + try (var key = keySend.receive()) { + StampedLock lock; + long stamp; + if (updateMode == UpdateMode.ALLOW) { + lock = itemsLock.getAt(getLockIndex(key)); - stamp = lock.readLock(); - } else { - lock = null; - stamp = 0; + stamp = lock.readLock(); + } else { + lock = null; + stamp = 0; + } + try { + if (logger.isTraceEnabled()) { + logger.trace("Reading {}", LLUtils.toArray(key)); } - try { - if (logger.isTraceEnabled()) { - logger.trace("Reading {}", LLUtils.toArray(key)); - } - var data = new Holder(); - if (db.keyMayExist(cfh, LLUtils.toArray(key), data)) { - if (data.getValue() != null) { - return LLUtils.fromByteArray(alloc, data.getValue()).send(); - } else { - return dbGet(cfh, null, key.send(), true); - } + var data = new Holder(); + if (db.keyMayExist(cfh, LLUtils.toArray(key), data)) { + if (data.getValue() != null) { + return LLUtils.fromByteArray(alloc, data.getValue()).send(); } else { - return null; - } - } finally { - if (updateMode == UpdateMode.ALLOW) { - lock.unlockRead(stamp); + return dbGet(cfh, null, key.send(), true); } + } else { + return null; } - }) - .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)) - .subscribeOn(dbScheduler); - case VOID -> Mono.empty(); - }; - } - }, - keySend -> Mono.fromRunnable(keySend::close) - ); + } finally { + if (updateMode == UpdateMode.ALLOW) { + lock.unlockRead(stamp); + } + } + } + }) + .onErrorMap(cause -> new IOException("Failed to read ", cause)), + keySend -> Mono.fromRunnable(keySend::close)); + case VOID -> Mono.empty(); + }; } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java index 8a912ba..50eff2f 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java @@ -2,9 +2,11 @@ package it.cavallium.dbengine.database.memory; import io.netty.buffer.api.Buffer; import io.netty.buffer.api.BufferAllocator; +import io.netty.buffer.api.Send; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.ExtraKeyOperationResult; +import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLEntry; @@ -81,7 +83,7 @@ public class LLMemoryDictionary implements LLDictionary { } } - private Mono transformResult(Mono result, LLDictionaryResultType resultType) { + private Mono> transformResult(Mono result, LLDictionaryResultType resultType) { if (resultType == LLDictionaryResultType.PREVIOUS_VALUE) { // Don't retain the result because it has been removed from the skip list return result.map(this::kk); @@ -89,77 +91,80 @@ public class LLMemoryDictionary implements LLDictionary { return result .map(prev -> true) .defaultIfEmpty(false) - .map(LLUtils::booleanToResponseByteBuffer); + .map((Boolean bool) -> LLUtils.booleanToResponseByteBuffer(allocator, bool)); } else { return result.then(Mono.empty()); } } - private ByteList k(Buffer buf) { - return new BinaryLexicographicList(LLUtils.toArray(buf)); + private ByteList k(Send buf) { + return new BinaryLexicographicList(LLUtils.toArray(buf.receive())); } - private Buffer kk(ByteList bytesList) { - var buffer = getAllocator().buffer(bytesList.size()); - buffer.writeBytes(bytesList.toByteArray()); - return buffer; + private Send kk(ByteList bytesList) { + try (var buffer = getAllocator().allocate(bytesList.size())) { + buffer.writeBytes(bytesList.toByteArray()); + return buffer.send(); + } } - private Map mapSlice(LLSnapshot snapshot, LLRange range) { - if (range.isAll()) { - return snapshots.get(resolveSnapshot(snapshot)); - } else if (range.isSingle()) { - var key = k(range.getSingle()); - var value = snapshots - .get(resolveSnapshot(snapshot)) - .get(key); - if (value != null) { - return Map.of(key, value); + private Map mapSlice(LLSnapshot snapshot, Send rangeToReceive) { + try (var range = rangeToReceive.receive()) { + if (range.isAll()) { + return snapshots.get(resolveSnapshot(snapshot)); + } else if (range.isSingle()) { + var key = k(range.getSingle()); + var value = snapshots + .get(resolveSnapshot(snapshot)) + .get(key); + if (value != null) { + return Map.of(key, value); + } else { + return Map.of(); + } + } else if (range.hasMin() && range.hasMax()) { + var min = k(range.getMin()); + var max = k(range.getMax()); + if (min.compareTo(max) > 0) { + return Map.of(); + } + return snapshots + .get(resolveSnapshot(snapshot)) + .subMap(min, true, max, false); + } else if (range.hasMin()) { + return snapshots + .get(resolveSnapshot(snapshot)) + .tailMap(k(range.getMin()), true); } else { - return Map.of(); + return snapshots + .get(resolveSnapshot(snapshot)) + .headMap(k(range.getMax()), false); } - } else if (range.hasMin() && range.hasMax()) { - var min = k(range.getMin()); - var max = k(range.getMax()); - if (min.compareTo(max) > 0) { - return Map.of(); - } - return snapshots - .get(resolveSnapshot(snapshot)) - .subMap(min, true, max, false); - } else if (range.hasMin()) { - return snapshots - .get(resolveSnapshot(snapshot)) - .tailMap(k(range.getMin()), true); - } else { - return snapshots - .get(resolveSnapshot(snapshot)) - .headMap(k(range.getMax()), false); } } @Override - public Mono get(@Nullable LLSnapshot snapshot, Mono keyMono, boolean existsAlmostCertainly) { + public Mono> get(@Nullable LLSnapshot snapshot, Mono> keyMono, boolean existsAlmostCertainly) { return Mono.usingWhen(keyMono, key -> Mono .fromCallable(() -> snapshots.get(resolveSnapshot(snapshot)).get(k(key))) .map(this::kk) - .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)), - key -> Mono.fromRunnable(key::release) + .onErrorMap(cause -> new IOException("Failed to read", cause)), + key -> Mono.fromRunnable(key::close) ); } @Override - public Mono put(Mono keyMono, Mono valueMono, LLDictionaryResultType resultType) { + public Mono> put(Mono> keyMono, Mono> valueMono, LLDictionaryResultType resultType) { return Mono.usingWhen(keyMono, key -> Mono.usingWhen(valueMono, value -> Mono .fromCallable(() -> mainDb.put(k(key), k(value))) .transform(result -> this.transformResult(result, resultType)) - .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)), - value -> Mono.fromRunnable(value::release) + .onErrorMap(cause -> new IOException("Failed to read", cause)), + value -> Mono.fromRunnable(value::close) ), - key -> Mono.fromRunnable(key::release) + key -> Mono.fromRunnable(key::close) ); } @@ -169,17 +174,17 @@ public class LLMemoryDictionary implements LLDictionary { } @Override - public Mono> updateAndGetDelta(Mono keyMono, - SerializationFunction<@Nullable Buffer, @Nullable Buffer> updater, + public Mono updateAndGetDelta(Mono> keyMono, + SerializationFunction<@Nullable Send, @Nullable Send> updater, boolean existsAlmostCertainly) { return Mono.usingWhen(keyMono, key -> Mono.fromCallable(() -> { - AtomicReference oldRef = new AtomicReference<>(null); + AtomicReference> oldRef = new AtomicReference<>(null); var newValue = mainDb.compute(k(key), (_unused, old) -> { if (old != null) { oldRef.set(kk(old)); } - Buffer v = null; + Send v = null; try { v = updater.apply(old != null ? kk(old) : null); } catch (SerializationException e) { @@ -189,13 +194,13 @@ public class LLMemoryDictionary implements LLDictionary { return k(v); } finally { if (v != null) { - v.release(); + v.close(); } } }); - return new Delta<>(oldRef.get(), kk(newValue)); + return LLDelta.of(oldRef.get(), kk(newValue)); }), - key -> Mono.fromRunnable(key::release) + key -> Mono.fromRunnable(key::close) ); } @@ -205,183 +210,213 @@ public class LLMemoryDictionary implements LLDictionary { } @Override - public Mono remove(Mono keyMono, LLDictionaryResultType resultType) { + public Mono> remove(Mono> keyMono, LLDictionaryResultType resultType) { return Mono.usingWhen(keyMono, key -> Mono .fromCallable(() -> mainDb.remove(k(key))) // Don't retain the result because it has been removed from the skip list .mapNotNull(bytesList -> switch (resultType) { case VOID -> null; - case PREVIOUS_VALUE_EXISTENCE -> LLUtils.booleanToResponseByteBuffer(true); + case PREVIOUS_VALUE_EXISTENCE -> LLUtils.booleanToResponseByteBuffer(allocator, true); case PREVIOUS_VALUE -> kk(bytesList); }) .switchIfEmpty(Mono.defer(() -> { if (resultType == LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE) { - return Mono.fromCallable(() -> LLUtils.booleanToResponseByteBuffer(false)); + return Mono.fromCallable(() -> LLUtils.booleanToResponseByteBuffer(allocator, false)); } else { return Mono.empty(); } })) - .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)), - key -> Mono.fromRunnable(key::release) + .onErrorMap(cause -> new IOException("Failed to read", cause)), + key -> Mono.fromRunnable(key::close) ); } @Override - public Flux>> getMulti(@Nullable LLSnapshot snapshot, - Flux> keys, + public Flux, Optional>>> getMulti(@Nullable LLSnapshot snapshot, + Flux>> keys, boolean existsAlmostCertainly) { return keys - .flatMapSequential(key -> { - try { - ByteList v = snapshots.get(resolveSnapshot(snapshot)).get(k(key.getT2())); + .map(key -> { + try (var t2 = key.getT2().receive()) { + ByteList v = snapshots.get(resolveSnapshot(snapshot)).get(k(t2.copy().send())); if (v != null) { - return Flux.just(Tuples.of(key.getT1(), key.getT2().retain(), Optional.of(kk(v)))); + return Tuples.of(key.getT1(), t2.send(), Optional.of(kk(v))); } else { - return Flux.just(Tuples.of(key.getT1(), key.getT2().retain(), Optional.empty())); + return Tuples.of(key.getT1(), t2.send(), Optional.empty()); } - } finally { - key.getT2().release(); } }); } @Override - public Flux putMulti(Flux entries, boolean getOldValues) { - return entries - .handle((entry, sink) -> { - var key = entry.getKey(); - var val = entry.getValue(); - try { - var v = mainDb.put(k(key), k(val)); + public Flux> putMulti(Flux> entries, boolean getOldValues) { + return entries.handle((entryToReceive, sink) -> { + try (var entry = entryToReceive.receive()) { + try (var key = entry.getKey().receive()) { + try (var val = entry.getValue().receive()) { + var v = mainDb.put(k(key.copy().send()), k(val.send())); if (v == null || !getOldValues) { sink.complete(); } else { - sink.next(new LLEntry(key.retain(), kk(v))); + sink.next(LLEntry.of(key.send(), kk(v)).send()); } - } finally { - key.release(); - val.release(); } - }); + } + } + }); } @Override - public Flux> updateMulti(Flux> entries, - BiSerializationFunction updateFunction) { + public Flux, X>> updateMulti(Flux, X>> entries, + BiSerializationFunction, X, Send> updateFunction) { return Flux.error(new UnsupportedOperationException("Not implemented")); } @Override - public Flux getRange(@Nullable LLSnapshot snapshot, - Mono rangeMono, + public Flux> getRange(@Nullable LLSnapshot snapshot, + Mono> rangeMono, boolean existsAlmostCertainly) { - return Flux.usingWhen(rangeMono, - range -> { - if (range.isSingle()) { - return Mono.fromCallable(() -> { - var element = snapshots.get(resolveSnapshot(snapshot)) - .get(k(range.getSingle())); - return new LLEntry(range.getSingle().retain(), kk(element)); - }).flux(); - } else { - return Mono - .fromCallable(() -> mapSlice(snapshot, range)) - .flatMapMany(map -> Flux.fromIterable(map.entrySet())) - .map(entry -> new LLEntry(kk(entry.getKey()), kk(entry.getValue()))); - } - }, - range -> Mono.fromRunnable(range::release) - ); + return Flux.usingWhen(rangeMono, rangeToReceive -> { + try (var range = rangeToReceive.receive()) { + if (range.isSingle()) { + var singleToReceive = range.getSingle(); + return Mono.fromCallable(() -> { + try (var single = singleToReceive.receive()) { + var element = snapshots.get(resolveSnapshot(snapshot)).get(k(single.copy().send())); + return LLEntry.of(single.send(), kk(element)).send(); + } + }).flux(); + } else { + var rangeToReceive2 = range.send(); + return Mono + .fromCallable(() -> mapSlice(snapshot, rangeToReceive2)) + .flatMapMany(map -> Flux.fromIterable(map.entrySet())) + .map(entry -> LLEntry.of(kk(entry.getKey()), kk(entry.getValue())).send()); + } + } + }, range -> Mono.fromRunnable(range::close)); } @Override - public Flux> getRangeGrouped(@Nullable LLSnapshot snapshot, - Mono rangeMono, + public Flux>> getRangeGrouped(@Nullable LLSnapshot snapshot, + Mono> rangeMono, int prefixLength, boolean existsAlmostCertainly) { return Flux.error(new UnsupportedOperationException("Not implemented")); } @Override - public Flux getRangeKeys(@Nullable LLSnapshot snapshot, Mono rangeMono) { + public Flux> getRangeKeys(@Nullable LLSnapshot snapshot, Mono> rangeMono) { return Flux.usingWhen(rangeMono, - range -> { - if (range.isSingle()) { - return Mono.fromCallable(() -> { - var contains = snapshots.get(resolveSnapshot(snapshot)) - .containsKey(k(range.getSingle())); - return contains ? range.getSingle().retain() : null; - }).flux(); - } else { - return Mono - .fromCallable(() -> mapSlice(snapshot, range)) - .flatMapMany(map -> Flux.fromIterable(map.entrySet())) - .map(entry -> kk(entry.getKey())); + rangeToReceive -> { + try (var range = rangeToReceive.receive()) { + if (range.isSingle()) { + var singleToReceive = range.getSingle(); + return Mono.fromCallable(() -> { + try (var single = singleToReceive.receive()) { + var contains = snapshots.get(resolveSnapshot(snapshot)).containsKey(k(single.copy().send())); + return contains ? single.send() : null; + } + }).flux(); + } else { + var rangeToReceive2 = range.send(); + return Mono + .fromCallable(() -> mapSlice(snapshot, rangeToReceive2)) + .flatMapMany(map -> Flux.fromIterable(map.entrySet())) + .map(entry -> kk(entry.getKey())); + } } }, - range -> Mono.fromRunnable(range::release) + range -> Mono.fromRunnable(range::close) ); } + private static record BufferWithPrefix(Send buffer, Send prefix) {} + @Override - public Flux> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, - Mono rangeMono, + public Flux>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, + Mono> rangeMono, int prefixLength) { return getRangeKeys(snapshot, rangeMono) - .bufferUntilChanged(k -> k.slice(k.readerIndex(), prefixLength), LLUtils::equals); + .map(bufferToReceive -> { + try(var buffer = bufferToReceive.receive()) { + try (var bufferPrefix = buffer.copy(buffer.readerOffset(), prefixLength)) { + return new BufferWithPrefix(buffer.send(), bufferPrefix.send()); + } + } + }) + .windowUntilChanged(bufferTuple -> bufferTuple.prefix().receive(), LLUtils::equals) + .flatMapSequential(window -> window.map(tuple -> { + try (var ignored = tuple.prefix()) { + return tuple.buffer(); + } + }).collectList()); } @Override - public Flux getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono rangeMono, int prefixLength) { + public Flux> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, + Mono> rangeMono, + int prefixLength) { return getRangeKeys(snapshot, rangeMono) - .distinctUntilChanged(k -> k.slice(k.readerIndex(), prefixLength), (a, b) -> { + .map(bufferToReceive -> { + try(var buffer = bufferToReceive.receive()) { + try (var bufferPrefix = buffer.copy(buffer.readerOffset(), prefixLength)) { + return new BufferWithPrefix(buffer.send(), bufferPrefix.send()); + } + } + }) + .distinctUntilChanged(bufferTuple -> bufferTuple.prefix().receive(), (a, b) -> { if (LLUtils.equals(a, b)) { - b.release(); + b.close(); return true; } else { return false; } }) - .map(k -> k.slice(k.readerIndex(), prefixLength)) + .map(tuple -> { + try (var ignored = tuple.prefix()) { + return tuple.buffer(); + } + }) .transform(LLUtils::handleDiscard); } @Override - public Flux badBlocks(Mono rangeMono) { + public Flux badBlocks(Mono> rangeMono) { return Flux.empty(); } @Override - public Mono setRange(Mono rangeMono, Flux entries) { + public Mono setRange(Mono> rangeMono, Flux> entries) { return Mono.error(new UnsupportedOperationException("Not implemented")); } @Override - public Mono isRangeEmpty(@Nullable LLSnapshot snapshot, Mono rangeMono) { + public Mono isRangeEmpty(@Nullable LLSnapshot snapshot, Mono> rangeMono) { return Mono.error(new UnsupportedOperationException("Not implemented")); } @Override - public Mono sizeRange(@Nullable LLSnapshot snapshot, Mono rangeMono, boolean fast) { + public Mono sizeRange(@Nullable LLSnapshot snapshot, Mono> rangeMono, boolean fast) { return Mono.usingWhen(rangeMono, range -> Mono.fromCallable(() -> (long) mapSlice(snapshot, range).size()), - range -> Mono.fromRunnable(range::release) + range -> Mono.fromRunnable(range::close) ); } @Override - public Mono getOne(@Nullable LLSnapshot snapshot, Mono rangeMono) { + public Mono> getOne(@Nullable LLSnapshot snapshot, Mono> rangeMono) { return Mono.error(new UnsupportedOperationException("Not implemented")); } @Override - public Mono getOneKey(@Nullable LLSnapshot snapshot, Mono rangeMono) { + public Mono> getOneKey(@Nullable LLSnapshot snapshot, Mono> rangeMono) { return Mono.error(new UnsupportedOperationException("Not implemented")); } @Override - public Mono removeOne(Mono rangeMono) { + public Mono> removeOne(Mono> rangeMono) { return Mono.error(new UnsupportedOperationException("Not implemented")); } diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java index a9dd735..fdfcd93 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java @@ -14,7 +14,7 @@ public class LLMemorySingleton implements LLSingleton { private final LLMemoryDictionary dict; private final byte[] singletonName; - private final Mono singletonNameBufMono; + private final Mono> singletonNameBufMono; public LLMemorySingleton(LLMemoryDictionary dict, byte[] singletonName) { this.dict = dict; @@ -22,7 +22,8 @@ public class LLMemorySingleton implements LLSingleton { this.singletonNameBufMono = Mono.fromCallable(() -> dict .getAllocator() .allocate(singletonName.length) - .writeBytes(singletonName)); + .writeBytes(singletonName) + .send()); } @Override @@ -35,8 +36,8 @@ public class LLMemorySingleton implements LLSingleton { return dict .get(snapshot, singletonNameBufMono, false) .map(b -> { - try (b) { - return LLUtils.toArray(b); + try (var buf = b.receive()) { + return LLUtils.toArray(buf); } }); } @@ -44,7 +45,7 @@ public class LLMemorySingleton implements LLSingleton { @Override public Mono set(byte[] value) { var bbKey = singletonNameBufMono; - var bbVal = Mono.fromCallable(() -> dict.getAllocator().allocate(value.length).writeBytes(value)); + var bbVal = Mono.fromCallable(() -> dict.getAllocator().allocate(value.length).writeBytes(value).send()); return dict .put(bbKey, bbVal, LLDictionaryResultType.VOID) .then(); diff --git a/src/test/java/it/cavallium/dbengine/DbTestUtils.java b/src/test/java/it/cavallium/dbengine/DbTestUtils.java index bf29675..339e032 100644 --- a/src/test/java/it/cavallium/dbengine/DbTestUtils.java +++ b/src/test/java/it/cavallium/dbengine/DbTestUtils.java @@ -4,10 +4,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.ByteBufUtil; import io.netty.buffer.PoolArenaMetric; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.MemoryManager; +import io.netty.buffer.api.Send; +import io.netty.buffer.api.pool.BufferAllocatorMetric; +import io.netty.buffer.api.pool.PooledBufferAllocator; import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.LLDatabaseConnection; import it.cavallium.dbengine.database.LLDictionary; @@ -41,55 +45,26 @@ import reactor.core.scheduler.Schedulers; public class DbTestUtils { - public static record TestAllocator(ByteBufAllocator allocator) {} + public static record TestAllocator(PooledBufferAllocator allocator) {} public static TestAllocator newAllocator() { - return new TestAllocator(new PooledByteBufAllocator(false, 1, 0, 4096, 11, 0, 0, true)); + return new TestAllocator(new PooledBufferAllocator(MemoryManager.instance(), true, 1, 8192, 9, 0, 0, false)); } public static void destroyAllocator(TestAllocator testAllocator) { + testAllocator.allocator().close(); } public static final AtomicInteger dbId = new AtomicInteger(0); @SuppressWarnings("SameParameterValue") - private static int getActiveBuffers(ByteBufAllocator allocator, boolean printStats) { - int directActive = 0, directAlloc = 0, directDealloc = 0; - if (allocator instanceof PooledByteBufAllocator alloc) { - for (PoolArenaMetric arena : alloc.directArenas()) { - directActive += arena.numActiveAllocations(); - directAlloc += arena.numAllocations(); - directDealloc += arena.numDeallocations(); - } - } else if (allocator instanceof UnpooledByteBufAllocator alloc) { - directActive += alloc.metric().usedDirectMemory(); - } else { - throw new UnsupportedOperationException(); - } + private static long getUsedMemory(PooledBufferAllocator allocator, boolean printStats) { + allocator.trimCurrentThreadCache(); + var usedMemory = ((BufferAllocatorMetric) allocator.metric()).usedMemory(); if (printStats) { - System.out.println("directActive " + directActive + " directAlloc " + directAlloc + " directDealloc " + directDealloc); + System.out.println("usedMemory=" + usedMemory); } - return directActive; - } - - @SuppressWarnings("SameParameterValue") - private static int getActiveHeapBuffers(ByteBufAllocator allocator, boolean printStats) { - int heapActive = 0, heapAlloc = 0, heapDealloc = 0; - if (allocator instanceof PooledByteBufAllocator alloc) { - for (PoolArenaMetric arena : alloc.heapArenas()) { - heapActive += arena.numActiveAllocations(); - heapAlloc += arena.numAllocations(); - heapDealloc += arena.numDeallocations(); - } - } else if (allocator instanceof UnpooledByteBufAllocator alloc) { - heapActive += alloc.metric().usedHeapMemory(); - } else { - throw new UnsupportedOperationException(); - } - if (printStats) { - System.out.println("heapActive " + heapActive + " heapAlloc " + heapAlloc + " heapDealloc " + heapDealloc); - } - return heapActive; + return usedMemory; } public static Flux tempDb(TestAllocator alloc, Function> action) { @@ -134,10 +109,6 @@ public class DbTestUtils { public static Mono closeTempDb(TempDb tempDb) { return tempDb.db().close().then(tempDb.connection().disconnect()).then(Mono.fromCallable(() -> { ensureNoLeaks(tempDb.allocator().allocator(), false); - if (tempDb.allocator().allocator() instanceof PooledByteBufAllocator pooledByteBufAllocator) { - pooledByteBufAllocator.trimCurrentThreadCache(); - pooledByteBufAllocator.freeThreadLocalCache(); - } if (Files.exists(tempDb.path())) { Files.walk(tempDb.path()).sorted(Comparator.reverseOrder()).forEach(file -> { try { @@ -151,10 +122,9 @@ public class DbTestUtils { }).subscribeOn(Schedulers.boundedElastic())).then(); } - public static void ensureNoLeaks(ByteBufAllocator allocator, boolean printStats) { + public static void ensureNoLeaks(PooledBufferAllocator allocator, boolean printStats) { if (allocator != null) { - assertEquals(0, getActiveBuffers(allocator, printStats)); - assertEquals(0, getActiveHeapBuffers(allocator, printStats)); + assertEquals(0L, getUsedMemory(allocator, printStats)); } } @@ -195,26 +165,21 @@ public class DbTestUtils { } @Override - public @NotNull Short deserialize(@NotNull ByteBuf serialized) { - try { - var prevReaderIdx = serialized.readerIndex(); + public @NotNull Short deserialize(@NotNull Send serializedToReceive) { + try (var serialized = serializedToReceive.receive()) { + var prevReaderIdx = serialized.readerOffset(); var val = serialized.readShort(); - serialized.readerIndex(prevReaderIdx + Short.BYTES); + serialized.readerOffset(prevReaderIdx + Short.BYTES); return val; - } finally { - serialized.release(); } } @Override - public @NotNull ByteBuf serialize(@NotNull Short deserialized) { - var out = dictionary.getAllocator().directBuffer(Short.BYTES); - try { + public @NotNull Send serialize(@NotNull Short deserialized) { + try (var out = dictionary.getAllocator().allocate(Short.BYTES)) { out.writeShort(deserialized); - out.writerIndex(Short.BYTES); - return out.retain(); - } finally { - out.release(); + out.writerOffset(Short.BYTES); + return out.send(); } } } diff --git a/src/test/java/it/cavallium/dbengine/OldDatabaseTests.java b/src/test/java/it/cavallium/dbengine/OldDatabaseTests.java deleted file mode 100644 index f7c6ff4..0000000 --- a/src/test/java/it/cavallium/dbengine/OldDatabaseTests.java +++ /dev/null @@ -1,271 +0,0 @@ -package it.cavallium.dbengine; - -import static it.cavallium.dbengine.client.CompositeDatabasePartLocation.CompositeDatabasePartType.KV_DATABASE; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.Unpooled; -import it.cavallium.dbengine.client.CompositeDatabasePartLocation; -import it.cavallium.dbengine.client.CompositeSnapshot; -import it.cavallium.dbengine.database.Column; -import it.cavallium.dbengine.database.LLKeyValueDatabase; -import it.cavallium.dbengine.database.UpdateMode; -import it.cavallium.dbengine.database.collections.DatabaseMapDictionary; -import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; -import it.cavallium.dbengine.database.collections.SubStageGetterMap; -import it.cavallium.dbengine.client.DatabaseOptions; -import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; -import it.cavallium.dbengine.database.serialization.Serializer; -import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Comparator; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.CompletionException; -import java.util.stream.Collectors; -import org.jetbrains.annotations.NotNull; -import org.junit.jupiter.api.Test; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; -import reactor.test.StepVerifier; -import reactor.util.function.Tuples; - -public class OldDatabaseTests { - - @Test - public void testDatabaseAddKeysAndCheckSize() { - LinkedHashSet originalKeys = new LinkedHashSet<>(List.of("K1a", "K1b", "K1c")); - - StepVerifier - .create( - tempDb() - .flatMap(db -> db - .getDictionary("testmap", UpdateMode.DISALLOW) - .map(dictionary -> DatabaseMapDictionary.simple(dictionary, - new FixedStringSerializer(3), - Serializer.noop() - )) - .flatMap(collection -> Flux - .fromIterable(originalKeys) - .flatMap(k1 -> collection.putValue(k1, DUMMY_VALUE.retain())) - .then(collection.leavesCount(null, false)) - ) - ) - ) - .expectNext((long) originalKeys.size()) - .verifyComplete(); - } - - @Test - public void testDeepDatabaseAddKeysAndCheckSize() { - LinkedHashSet originalSuperKeys = new LinkedHashSet<>(List.of("K1a", "K1b", "K1c")); - LinkedHashSet originalSubKeys = new LinkedHashSet<>(List.of("K2aa", "K2bb", "K2cc")); - - StepVerifier - .create( - tempDb() - .flatMap(db -> db - .getDictionary("testmap", UpdateMode.DISALLOW) - .map(dictionary -> DatabaseMapDictionaryDeep.deepTail(dictionary, - new FixedStringSerializer(3), - 4, - new SubStageGetterMap<>(new FixedStringSerializer(4), Serializer.noop()) - )) - .flatMap(collection -> Flux - .fromIterable(originalSuperKeys) - .flatMap(k1 -> collection.at(null, k1)) - .flatMap(k1at -> Flux - .fromIterable(originalSubKeys) - .flatMap(k2 -> k1at.putValue(k2, DUMMY_VALUE.retain())) - ) - .then(collection.leavesCount(null, false)) - ) - ) - ) - .expectNext((long) originalSuperKeys.size() * originalSubKeys.size()) - .verifyComplete(); - } - - @Test - public void testDeepDatabaseAddKeysAndConvertToLongerOnes() { - LinkedHashSet originalSuperKeys = new LinkedHashSet<>(List.of("K1a", "K1b", "K1c")); - LinkedHashSet originalSubKeys = new LinkedHashSet<>(List.of("K2aa", "K2bb", "K2cc")); - String newPrefix = "xxx"; - - StepVerifier - .create(tempDb() - .flatMapMany(db -> addKeysAndConvertToLongerOnes(db, originalSuperKeys, originalSubKeys, newPrefix)) - ) - .expectNextSequence(originalSuperKeys - .stream() - .flatMap(superKey -> originalSubKeys - .stream() - .map(subKey -> Map.entry(newPrefix + superKey, newPrefix + subKey)) - ) - .collect(Collectors.toList()) - ) - .verifyComplete(); - } - - public static Mono tempDb() { - var wrkspcPath = Path.of("/tmp/.cache/tempdb-" + DbTestUtils.dbId.incrementAndGet() + "/"); - return Mono - .fromCallable(() -> { - if (Files.exists(wrkspcPath)) { - Files.walk(wrkspcPath) - .sorted(Comparator.reverseOrder()) - .forEach(file -> { - try { - Files.delete(file); - } catch (IOException ex) { - throw new CompletionException(ex); - } - }); - } - Files.createDirectories(wrkspcPath); - return null; - }) - .subscribeOn(Schedulers.boundedElastic()) - .then(new LLLocalDatabaseConnection(PooledByteBufAllocator.DEFAULT, wrkspcPath).connect()) - .flatMap(conn -> conn.getDatabase("testdb", - List.of(Column.dictionary("testmap")), - new DatabaseOptions(Map.of(), true, false, true, false, true, true, true, -1) - )); - } - - private static final ByteBuf DUMMY_VALUE; - static { - ByteBuf buf = Unpooled.directBuffer(2, 2); - buf.writeByte(0x01); - buf.writeByte(0x03); - DUMMY_VALUE = buf; - } - - private Flux> addKeysAndConvertToLongerOnes(LLKeyValueDatabase db, - LinkedHashSet originalSuperKeys, - LinkedHashSet originalSubKeys, - String newPrefix) { - return Flux - .defer(() -> Mono - .zip( - db - .getDictionary("testmap", UpdateMode.DISALLOW) - .map(dictionary -> DatabaseMapDictionaryDeep.deepTail(dictionary, - new FixedStringSerializer(3), - 4, - new SubStageGetterMap<>(new FixedStringSerializer(4), Serializer.noop()) - )), - db - .getDictionary("testmap", UpdateMode.DISALLOW) - .map(dictionary -> DatabaseMapDictionaryDeep.deepTail(dictionary, - new FixedStringSerializer(6), - 7, - new SubStageGetterMap<>(new FixedStringSerializer(7), Serializer.noop()) - )) - ) - .single() - .flatMap(tuple -> { - var db1 = tuple.getT1(); - return Flux - .fromIterable(originalSuperKeys) - .concatMap(superKey -> db1.at(null, superKey)) - .concatMap(at -> Flux - .fromIterable(originalSubKeys) - .concatMap(subKey -> at - .at(null, subKey) - .flatMap(at2 -> at2 - .set(DUMMY_VALUE.retainedSlice()) - .doAfterTerminate(at2::release) - ) - ) - .doAfterTerminate(at::release) - ) - .then(db - .takeSnapshot() - .map(snapshot -> new CompositeSnapshot(Map.of(CompositeDatabasePartLocation.of(KV_DATABASE, - db.getDatabaseName()), snapshot))) - ) - .map(snapshot -> Tuples.of(tuple.getT1(), tuple.getT2(), snapshot)) - .single(); - }) - .single() - .flatMap(tuple -> tuple.getT1().clear().thenReturn(tuple)) - .flatMap(tuple -> tuple - .getT1() - .leavesCount(null, false) - .flatMap(count -> count == 0 ? Mono.just(tuple) : Mono.error(new IllegalStateException( - "Failed to clear map. Remaining elements after clear: " + count))) - ) - .flatMapMany(tuple -> { - var oldDb = tuple.getT1(); - var newDb = tuple.getT2(); - var snapshot = tuple.getT3(); - - return oldDb - .getAllStages(snapshot) - .concatMap(parentEntry -> Mono - .fromCallable(() -> newPrefix + parentEntry.getKey()) - .flatMapMany(newId1 -> parentEntry.getValue() - .getAllValues(snapshot) - .concatMap(entry -> Mono - .fromCallable(() -> newPrefix + entry.getKey()) - .flatMap(newId2 -> newDb - .at(null, newId1) - .flatMap(newStage -> newStage - .putValue(newId2, entry.getValue()) - .doAfterTerminate(newStage::release) - ) - .thenReturn(Map.entry(newId1, newId2)) - ) - ) - ) - .doAfterTerminate(() -> parentEntry.getValue().release()) - ) - .concatWith(db - .releaseSnapshot(snapshot.getSnapshot(db)) - .then(oldDb.close()) - .then(newDb.close()) - .then(Mono.empty()) - ); - }) - ); - } - - private static class FixedStringSerializer implements SerializerFixedBinaryLength { - - private final int size; - - public FixedStringSerializer(int i) { - this.size = i; - } - - @Override - public int getSerializedBinaryLength() { - return size; - } - - @Override - public @NotNull String deserialize(ByteBuf serialized) { - try { - return serialized.toString(StandardCharsets.US_ASCII); - } finally { - serialized.release(); - } - } - - @Override - public ByteBuf serialize(@NotNull String deserialized) { - var serialized = deserialized.getBytes(StandardCharsets.US_ASCII); - var serializedBuf = Unpooled.directBuffer(serialized.length, serialized.length); - serializedBuf.writeBytes(serialized); - assert serializedBuf.isDirect(); - return serializedBuf; - } - } -} diff --git a/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java b/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java index 9cc1f60..7259190 100644 --- a/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java +++ b/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java @@ -7,6 +7,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import io.netty.buffer.ByteBuf; +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.Send; import it.cavallium.dbengine.DbTestUtils.TempDb; import it.cavallium.dbengine.DbTestUtils.TestAllocator; import it.cavallium.dbengine.database.LLDictionary; @@ -75,7 +77,7 @@ public class TestLLDictionaryLeaks { } private LLDictionary getDict(UpdateMode updateMode) { - var dict = DbTestUtils.tempDictionary(db, updateMode).block(); + var dict = DbTestUtils.tempDictionary(db, updateMode).blockOptional().orElseThrow(); var key1 = Mono.fromCallable(() -> fromString("test-key-1")); var key2 = Mono.fromCallable(() -> fromString("test-key-2")); var key3 = Mono.fromCallable(() -> fromString("test-key-3")); @@ -88,11 +90,12 @@ public class TestLLDictionaryLeaks { return dict; } - private ByteBuf fromString(String s) { + private Send fromString(String s) { var sb = s.getBytes(StandardCharsets.UTF_8); - var b = db.getAllocator().buffer(sb.length); - b.writeBytes(b); - return b; + try (var b = db.getAllocator().allocate(sb.length)) { + b.writeBytes(b); + return b.send(); + } } private void run(Flux publisher) { @@ -131,6 +134,14 @@ public class TestLLDictionaryLeaks { public void testNoOp() { } + @Test + public void testNoOpAllocation() { + for (int i = 0; i < 10; i++) { + var a = allocator.allocator().allocate(i * 512); + a.send().receive().close(); + } + } + @ParameterizedTest @MethodSource("provideArguments") public void testGetDict(UpdateMode updateMode) { diff --git a/src/test/java/it/cavallium/dbengine/database/collections/TestRanges.java b/src/test/java/it/cavallium/dbengine/database/collections/TestRanges.java index c9dcff5..956a6e4 100644 --- a/src/test/java/it/cavallium/dbengine/database/collections/TestRanges.java +++ b/src/test/java/it/cavallium/dbengine/database/collections/TestRanges.java @@ -3,14 +3,30 @@ package it.cavallium.dbengine.database.collections; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; +import io.netty.buffer.api.BufferAllocator; +import io.netty.buffer.api.pool.PooledBufferAllocator; import it.cavallium.dbengine.database.LLUtils; import java.util.Arrays; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import static io.netty.buffer.Unpooled.*; public class TestRanges { + private static BufferAllocator alloc; + + @BeforeAll + public static void beforeAll() { + alloc = BufferAllocator.offHeapPooled(); + } + + @AfterAll + public static void afterAll() { + alloc = BufferAllocator.offHeapPooled(); + } + @Test public void testDirectBuffer() { Assertions.assertTrue(wrappedBuffer(Unpooled.directBuffer(10, 10), Unpooled.buffer(10, 10)).isDirect()); @@ -25,25 +41,31 @@ public class TestRanges { testNextRangeKey(new byte[] {0x00, 0x00, (byte) 0xFF}); testNextRangeKey(new byte[] {0x00, 0x01, (byte) 0xFF}); testNextRangeKey(new byte[] {0x00, (byte) 0xFF, (byte) 0xFF}); + } + + @Test + public void testNextRangeKey2() { testNextRangeKey(new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF}); testNextRangeKey(new byte[] {(byte) 0xFF, (byte) 0, (byte) 0xFF}); testNextRangeKey(new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0}); } public void testNextRangeKey(byte[] prefixKey) { - - byte[] firstRangeKey = LLUtils.toArray(DatabaseMapDictionaryDeep.firstRangeKey(PooledByteBufAllocator.DEFAULT, - LLUtils.convertToDirectByteBuf(PooledByteBufAllocator.DEFAULT, wrappedBuffer(prefixKey)), + byte[] firstRangeKey; + try (var firstRangeKeyBuf = DatabaseMapDictionaryDeep.firstRangeKey(alloc, + alloc.allocate(prefixKey.length).writeBytes(prefixKey).send(), + prefixKey.length, 7, 3).receive()) { + firstRangeKey = LLUtils.toArray(firstRangeKeyBuf); + } + byte[] nextRangeKey; + try (var nextRangeKeyBuf = DatabaseMapDictionaryDeep.nextRangeKey(alloc, + alloc.allocate(prefixKey.length).writeBytes(prefixKey).send(), prefixKey.length, 7, 3 - )); - byte[] nextRangeKey = LLUtils.toArray(DatabaseMapDictionaryDeep.nextRangeKey(PooledByteBufAllocator.DEFAULT, - LLUtils.convertToDirectByteBuf(PooledByteBufAllocator.DEFAULT, wrappedBuffer(prefixKey)), - prefixKey.length, - 7, - 3 - )); + ).receive()) { + nextRangeKey = LLUtils.toArray(nextRangeKeyBuf); + } if (Arrays.equals(prefixKey, new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF})) { Assertions.assertArrayEquals(new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, 0}, nextRangeKey); @@ -78,6 +100,9 @@ public class TestRanges { testNextRangeKeyWithSuffix(new byte[] {0x00, 0x00, (byte) 0xFF}, new byte[] {0x00, 0x00, (byte) 0xFF}); testNextRangeKeyWithSuffix(new byte[] {0x00, 0x01, (byte) 0xFF}, new byte[] {0x00, 0x01, (byte) 0xFF}); testNextRangeKeyWithSuffix(new byte[] {0x00, (byte) 0xFF, (byte) 0xFF}, new byte[] {0x00, (byte) 0xFF, (byte) 0xFF}); + } + @Test + public void testNextRangeKeyWithSuffix2() { testNextRangeKeyWithSuffix(new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF}, new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF}); testNextRangeKeyWithSuffix(new byte[] {(byte) 0xFF, (byte) 0, (byte) 0xFF}, new byte[] {(byte) 0xFF, (byte) 0, (byte) 0xFF}); testNextRangeKeyWithSuffix(new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0}, new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0}); @@ -85,47 +110,52 @@ public class TestRanges { public void testNextRangeKeyWithSuffix(byte[] prefixKey, byte[] suffixKey) { - byte[] firstRangeKey = LLUtils.toArray(DatabaseMapDictionaryDeep.firstRangeKey(ByteBufAllocator.DEFAULT, - LLUtils.convertToDirectByteBuf(PooledByteBufAllocator.DEFAULT, wrappedBuffer(prefixKey)), - LLUtils.convertToDirectByteBuf(PooledByteBufAllocator.DEFAULT, wrappedBuffer(suffixKey)), + byte[] firstRangeKey; + try (var firstRangeKeyBuf = DatabaseMapDictionaryDeep.firstRangeKey(alloc, + alloc.allocate(prefixKey.length).writeBytes(prefixKey).send(), + alloc.allocate(suffixKey.length).writeBytes(suffixKey).send(), prefixKey.length, 3, 7 - )); - byte[] nextRangeKey = LLUtils.toArray(DatabaseMapDictionaryDeep.nextRangeKey(ByteBufAllocator.DEFAULT, - LLUtils.convertToDirectByteBuf(PooledByteBufAllocator.DEFAULT, wrappedBuffer(prefixKey)), - LLUtils.convertToDirectByteBuf(PooledByteBufAllocator.DEFAULT, wrappedBuffer(suffixKey)), + ).receive()) { + firstRangeKey = LLUtils.toArray(firstRangeKeyBuf); + } + try (var nextRangeKeyBuf = DatabaseMapDictionaryDeep.nextRangeKey(alloc, + alloc.allocate(prefixKey.length).writeBytes(prefixKey).send(), + alloc.allocate(suffixKey.length).writeBytes(suffixKey).send(), prefixKey.length, 3, 7 - )); + ).receive()) { + byte[] nextRangeKey = LLUtils.toArray(nextRangeKeyBuf); - if (Arrays.equals(prefixKey, new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF}) && Arrays.equals(suffixKey, new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF})) { - Assertions.assertArrayEquals(new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, 0}, nextRangeKey); - } else { - long biPrefix = 0; - var s = 0; - for (int i = (suffixKey.length) - 1; i >= 0; i--) { - biPrefix += ((long) (suffixKey[i] & 0xFF)) << s; - s += Byte.SIZE; - } - for (int i = (prefixKey.length) - 1; i >= 0; i--) { - biPrefix += ((long) (prefixKey[i] & 0xFF)) << s; - s += Byte.SIZE; - } - var nrPrefix = Arrays.copyOf(nextRangeKey, prefixKey.length + suffixKey.length); + if (Arrays.equals(prefixKey, new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF}) && Arrays.equals(suffixKey, new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF})) { + Assertions.assertArrayEquals(new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, 0}, nextRangeKey); + } else { + long biPrefix = 0; + var s = 0; + for (int i = (suffixKey.length) - 1; i >= 0; i--) { + biPrefix += ((long) (suffixKey[i] & 0xFF)) << s; + s += Byte.SIZE; + } + for (int i = (prefixKey.length) - 1; i >= 0; i--) { + biPrefix += ((long) (prefixKey[i] & 0xFF)) << s; + s += Byte.SIZE; + } + var nrPrefix = Arrays.copyOf(nextRangeKey, prefixKey.length + suffixKey.length); - long biNextPrefix = 0; - s = 0; - for (int i = (prefixKey.length + suffixKey.length) - 1; i >= 0; i--) { - biNextPrefix += ((long) (nrPrefix[i] & 0xFF)) << s; - s += Byte.SIZE; + long biNextPrefix = 0; + s = 0; + for (int i = (prefixKey.length + suffixKey.length) - 1; i >= 0; i--) { + biNextPrefix += ((long) (nrPrefix[i] & 0xFF)) << s; + s += Byte.SIZE; + } + Assertions.assertEquals(biPrefix + 1, biNextPrefix); + Assertions.assertArrayEquals( + new byte[7], + Arrays.copyOfRange(nextRangeKey, prefixKey.length + suffixKey.length, prefixKey.length + suffixKey.length + 7) + ); } - Assertions.assertEquals(biPrefix + 1, biNextPrefix); - Assertions.assertArrayEquals( - new byte[7], - Arrays.copyOfRange(nextRangeKey, prefixKey.length + suffixKey.length, prefixKey.length + suffixKey.length + 7) - ); } } }