From 6056eedd75e41fe6f47d06d5421b7adc658b8b5e Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 26 May 2022 13:13:14 +0200 Subject: [PATCH] Fix some possible leaks --- .../database/collections/DatabaseInt.java | 12 ++- .../database/collections/DatabaseLong.java | 58 +++++++------- .../collections/DatabaseMapSingle.java | 24 +++--- .../collections/DatabaseSingleton.java | 76 ++++++++++--------- 4 files changed, 90 insertions(+), 80 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseInt.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseInt.java index 150faca..e6ea801 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseInt.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseInt.java @@ -20,13 +20,11 @@ public class DatabaseInt implements LLKeyValueDatabaseStructure { } public Mono get(@Nullable LLSnapshot snapshot) { - return singleton.get(snapshot).handle((data, sink) -> { - try (data) { - sink.next(serializer.deserialize(data)); - } catch (SerializationException e) { - sink.error(e); - } - }); + var resultMono = singleton.get(snapshot); + return Mono.usingWhen(resultMono, + result -> Mono.fromSupplier(() -> serializer.deserialize(result)), + result -> Mono.fromRunnable(result::close) + ); } public Mono set(int value) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseLong.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseLong.java index 4bc8e2b..5694884 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseLong.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseLong.java @@ -24,17 +24,17 @@ public class DatabaseLong implements LLKeyValueDatabaseStructure { } public Mono get(@Nullable LLSnapshot snapshot) { - return singleton.get(snapshot).handle((data, sink) -> { - try (data) { - if (data.readableBytes() == 4) { - sink.next((long) (int) bugSerializer.deserialize(data)); - } else { - sink.next(serializer.deserialize(data)); - } - } catch (SerializationException e) { - sink.error(e); - } - }); + var resultMono = singleton.get(snapshot); + return Mono.usingWhen(resultMono, + result -> Mono.fromSupplier(() -> { + if (result.readableBytes() == 4) { + return (long) (int) bugSerializer.deserialize(result); + } else { + return serializer.deserialize(result); + } + }), + result -> Mono.fromRunnable(result::close) + ); } public Mono incrementAndGet() { @@ -62,24 +62,26 @@ public class DatabaseLong implements LLKeyValueDatabaseStructure { } private Mono addAnd(long count, UpdateReturnMode updateReturnMode) { - return singleton.update(prev -> { - if (prev != null) { - var prevLong = prev.readLong(); - var alloc = singleton.getAllocator(); - var buf = alloc.allocate(Long.BYTES); - buf.writeLong(prevLong + count); - return buf; - } else { - var alloc = singleton.getAllocator(); - var buf = alloc.allocate(Long.BYTES); - buf.writeLong(count); - return buf; + var resultMono = singleton.update(prev -> { + try (prev) { + if (prev != null) { + var prevLong = prev.readLong(); + var alloc = singleton.getAllocator(); + var buf = alloc.allocate(Long.BYTES); + buf.writeLong(prevLong + count); + return buf; + } else { + var alloc = singleton.getAllocator(); + var buf = alloc.allocate(Long.BYTES); + buf.writeLong(count); + return buf; + } } - }, updateReturnMode).map(buf -> { - try (buf) { - return buf.readLong(); - } - }).single(); + }, updateReturnMode); + return Mono.usingWhen(resultMono, + result -> Mono.fromSupplier(result::readLong), + result -> Mono.fromRunnable(result::close) + ).single(); } public Mono set(long value) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java index b0c61a3..1c611a2 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java @@ -130,17 +130,19 @@ public class DatabaseMapSingle extends ResourceSupport, Data UpdateReturnMode updateReturnMode) { var resultMono = dictionary .update(keyMono, (oldValueSer) -> { - U result; - if (oldValueSer == null) { - result = updater.apply(null); - } else { - U deserializedValue = serializer.deserialize(oldValueSer); - result = updater.apply(deserializedValue); - } - if (result == null) { - return null; - } else { - return serializeValue(result); + try (oldValueSer) { + U result; + if (oldValueSer == null) { + result = updater.apply(null); + } else { + U deserializedValue = serializer.deserialize(oldValueSer); + result = updater.apply(deserializedValue); + } + if (result == null) { + return null; + } else { + return serializeValue(result); + } } }, updateReturnMode); return Mono.usingWhen(resultMono, diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java index 2211db4..54fe93a 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java @@ -67,24 +67,22 @@ public class DatabaseSingleton extends ResourceSupport, Data } } - private void deserializeValue(Buffer value, SynchronousSink sink) { + private U deserializeValue(Buffer value) { try { U deserializedValue; try (value) { deserializedValue = serializer.deserialize(value); } - sink.next(deserializedValue); + return deserializedValue; } catch (IndexOutOfBoundsException ex) { var exMessage = ex.getMessage(); if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) { - LOG.error("Unexpected zero-bytes value at " - + singleton.getDatabaseName() + ":" + singleton.getColumnName() + ":" + singleton.getName()); - sink.complete(); + LOG.error("Unexpected zero-bytes value at " + singleton.getDatabaseName() + + ":" + singleton.getColumnName() + ":" + singleton.getName()); + return null; } else { - sink.error(ex); + throw ex; } - } catch (SerializationException ex) { - sink.error(ex); } } @@ -103,8 +101,11 @@ public class DatabaseSingleton extends ResourceSupport, Data @Override public Mono get(@Nullable CompositeSnapshot snapshot) { - return singleton.get(resolveSnapshot(snapshot)) - .handle(this::deserializeValue); + var resultMono = singleton.get(resolveSnapshot(snapshot)); + return Mono.usingWhen(resultMono, + result -> Mono.fromSupplier(() -> this.deserializeValue(result)), + result -> Mono.fromRunnable(result::close) + ); } @Override @@ -114,33 +115,39 @@ public class DatabaseSingleton extends ResourceSupport, Data @Override public Mono setAndGetPrevious(U value) { - return Flux + var resultMono = Flux .concat(singleton.get(null), singleton.set(Mono.fromCallable(() -> serializeValue(value))).then(Mono.empty())) - .last() - .handle(this::deserializeValue); + .last(); + return Mono.usingWhen(resultMono, + result -> Mono.fromSupplier(() -> this.deserializeValue(result)), + result -> Mono.fromRunnable(result::close) + ); } @Override public Mono update(SerializationFunction<@Nullable U, @Nullable U> updater, UpdateReturnMode updateReturnMode) { - return singleton - .update((oldValueSer) -> { - try (oldValueSer) { - U result; - if (oldValueSer == null) { - result = updater.apply(null); - } else { - U deserializedValue = serializer.deserialize(oldValueSer); - result = updater.apply(deserializedValue); - } - if (result == null) { - return null; - } else { - return serializeValue(result); - } + var resultMono = singleton + .update((oldValueSer) -> { + try (oldValueSer) { + U result; + if (oldValueSer == null) { + result = updater.apply(null); + } else { + U deserializedValue = serializer.deserialize(oldValueSer); + result = updater.apply(deserializedValue); } - }, updateReturnMode) - .handle(this::deserializeValue); + if (result == null) { + return null; + } else { + return serializeValue(result); + } + } + }, updateReturnMode); + return Mono.usingWhen(resultMono, + result -> Mono.fromSupplier(() -> this.deserializeValue(result)), + result -> Mono.fromRunnable(result::close) + ); } @Override @@ -171,10 +178,11 @@ public class DatabaseSingleton extends ResourceSupport, Data @Override public Mono clearAndGetPrevious() { - return Flux - .concat(singleton.get(null), singleton.set(Mono.empty()).then(Mono.empty())) - .last() - .handle(this::deserializeValue); + var resultMono = Flux.concat(singleton.get(null), singleton.set(Mono.empty()).then(Mono.empty())).last(); + return Mono.usingWhen(resultMono, + result -> Mono.fromSupplier(() -> this.deserializeValue(result)), + result -> Mono.fromRunnable(result::close) + ); } @Override