Fix some possible leaks

This commit is contained in:
Andrea Cavalli 2022-05-26 13:13:14 +02:00
parent 96de3023a0
commit 6056eedd75
4 changed files with 90 additions and 80 deletions

View File

@ -20,13 +20,11 @@ public class DatabaseInt implements LLKeyValueDatabaseStructure {
} }
public Mono<Integer> get(@Nullable LLSnapshot snapshot) { public Mono<Integer> get(@Nullable LLSnapshot snapshot) {
return singleton.get(snapshot).handle((data, sink) -> { var resultMono = singleton.get(snapshot);
try (data) { return Mono.usingWhen(resultMono,
sink.next(serializer.deserialize(data)); result -> Mono.fromSupplier(() -> serializer.deserialize(result)),
} catch (SerializationException e) { result -> Mono.fromRunnable(result::close)
sink.error(e); );
}
});
} }
public Mono<Void> set(int value) { public Mono<Void> set(int value) {

View File

@ -24,17 +24,17 @@ public class DatabaseLong implements LLKeyValueDatabaseStructure {
} }
public Mono<Long> get(@Nullable LLSnapshot snapshot) { public Mono<Long> get(@Nullable LLSnapshot snapshot) {
return singleton.get(snapshot).handle((data, sink) -> { var resultMono = singleton.get(snapshot);
try (data) { return Mono.usingWhen(resultMono,
if (data.readableBytes() == 4) { result -> Mono.fromSupplier(() -> {
sink.next((long) (int) bugSerializer.deserialize(data)); if (result.readableBytes() == 4) {
} else { return (long) (int) bugSerializer.deserialize(result);
sink.next(serializer.deserialize(data)); } else {
} return serializer.deserialize(result);
} catch (SerializationException e) { }
sink.error(e); }),
} result -> Mono.fromRunnable(result::close)
}); );
} }
public Mono<Long> incrementAndGet() { public Mono<Long> incrementAndGet() {
@ -62,24 +62,26 @@ public class DatabaseLong implements LLKeyValueDatabaseStructure {
} }
private Mono<Long> addAnd(long count, UpdateReturnMode updateReturnMode) { private Mono<Long> addAnd(long count, UpdateReturnMode updateReturnMode) {
return singleton.update(prev -> { var resultMono = singleton.update(prev -> {
if (prev != null) { try (prev) {
var prevLong = prev.readLong(); if (prev != null) {
var alloc = singleton.getAllocator(); var prevLong = prev.readLong();
var buf = alloc.allocate(Long.BYTES); var alloc = singleton.getAllocator();
buf.writeLong(prevLong + count); var buf = alloc.allocate(Long.BYTES);
return buf; buf.writeLong(prevLong + count);
} else { return buf;
var alloc = singleton.getAllocator(); } else {
var buf = alloc.allocate(Long.BYTES); var alloc = singleton.getAllocator();
buf.writeLong(count); var buf = alloc.allocate(Long.BYTES);
return buf; buf.writeLong(count);
return buf;
}
} }
}, updateReturnMode).map(buf -> { }, updateReturnMode);
try (buf) { return Mono.usingWhen(resultMono,
return buf.readLong(); result -> Mono.fromSupplier(result::readLong),
} result -> Mono.fromRunnable(result::close)
}).single(); ).single();
} }
public Mono<Void> set(long value) { public Mono<Void> set(long value) {

View File

@ -130,17 +130,19 @@ public class DatabaseMapSingle<U> extends ResourceSupport<DatabaseStage<U>, Data
UpdateReturnMode updateReturnMode) { UpdateReturnMode updateReturnMode) {
var resultMono = dictionary var resultMono = dictionary
.update(keyMono, (oldValueSer) -> { .update(keyMono, (oldValueSer) -> {
U result; try (oldValueSer) {
if (oldValueSer == null) { U result;
result = updater.apply(null); if (oldValueSer == null) {
} else { result = updater.apply(null);
U deserializedValue = serializer.deserialize(oldValueSer); } else {
result = updater.apply(deserializedValue); U deserializedValue = serializer.deserialize(oldValueSer);
} result = updater.apply(deserializedValue);
if (result == null) { }
return null; if (result == null) {
} else { return null;
return serializeValue(result); } else {
return serializeValue(result);
}
} }
}, updateReturnMode); }, updateReturnMode);
return Mono.usingWhen(resultMono, return Mono.usingWhen(resultMono,

View File

@ -67,24 +67,22 @@ public class DatabaseSingleton<U> extends ResourceSupport<DatabaseStage<U>, Data
} }
} }
private void deserializeValue(Buffer value, SynchronousSink<U> sink) { private U deserializeValue(Buffer value) {
try { try {
U deserializedValue; U deserializedValue;
try (value) { try (value) {
deserializedValue = serializer.deserialize(value); deserializedValue = serializer.deserialize(value);
} }
sink.next(deserializedValue); return deserializedValue;
} catch (IndexOutOfBoundsException ex) { } catch (IndexOutOfBoundsException ex) {
var exMessage = ex.getMessage(); var exMessage = ex.getMessage();
if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) { if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) {
LOG.error("Unexpected zero-bytes value at " LOG.error("Unexpected zero-bytes value at " + singleton.getDatabaseName()
+ singleton.getDatabaseName() + ":" + singleton.getColumnName() + ":" + singleton.getName()); + ":" + singleton.getColumnName() + ":" + singleton.getName());
sink.complete(); return null;
} else { } else {
sink.error(ex); throw ex;
} }
} catch (SerializationException ex) {
sink.error(ex);
} }
} }
@ -103,8 +101,11 @@ public class DatabaseSingleton<U> extends ResourceSupport<DatabaseStage<U>, Data
@Override @Override
public Mono<U> get(@Nullable CompositeSnapshot snapshot) { public Mono<U> get(@Nullable CompositeSnapshot snapshot) {
return singleton.get(resolveSnapshot(snapshot)) var resultMono = singleton.get(resolveSnapshot(snapshot));
.handle(this::deserializeValue); return Mono.usingWhen(resultMono,
result -> Mono.fromSupplier(() -> this.deserializeValue(result)),
result -> Mono.fromRunnable(result::close)
);
} }
@Override @Override
@ -114,33 +115,39 @@ public class DatabaseSingleton<U> extends ResourceSupport<DatabaseStage<U>, Data
@Override @Override
public Mono<U> setAndGetPrevious(U value) { public Mono<U> setAndGetPrevious(U value) {
return Flux var resultMono = Flux
.concat(singleton.get(null), singleton.set(Mono.fromCallable(() -> serializeValue(value))).then(Mono.empty())) .concat(singleton.get(null), singleton.set(Mono.fromCallable(() -> serializeValue(value))).then(Mono.empty()))
.last() .last();
.handle(this::deserializeValue); return Mono.usingWhen(resultMono,
result -> Mono.fromSupplier(() -> this.deserializeValue(result)),
result -> Mono.fromRunnable(result::close)
);
} }
@Override @Override
public Mono<U> update(SerializationFunction<@Nullable U, @Nullable U> updater, public Mono<U> update(SerializationFunction<@Nullable U, @Nullable U> updater,
UpdateReturnMode updateReturnMode) { UpdateReturnMode updateReturnMode) {
return singleton var resultMono = singleton
.update((oldValueSer) -> { .update((oldValueSer) -> {
try (oldValueSer) { try (oldValueSer) {
U result; U result;
if (oldValueSer == null) { if (oldValueSer == null) {
result = updater.apply(null); result = updater.apply(null);
} else { } else {
U deserializedValue = serializer.deserialize(oldValueSer); U deserializedValue = serializer.deserialize(oldValueSer);
result = updater.apply(deserializedValue); result = updater.apply(deserializedValue);
}
if (result == null) {
return null;
} else {
return serializeValue(result);
}
} }
}, updateReturnMode) if (result == null) {
.handle(this::deserializeValue); return null;
} else {
return serializeValue(result);
}
}
}, updateReturnMode);
return Mono.usingWhen(resultMono,
result -> Mono.fromSupplier(() -> this.deserializeValue(result)),
result -> Mono.fromRunnable(result::close)
);
} }
@Override @Override
@ -171,10 +178,11 @@ public class DatabaseSingleton<U> extends ResourceSupport<DatabaseStage<U>, Data
@Override @Override
public Mono<U> clearAndGetPrevious() { public Mono<U> clearAndGetPrevious() {
return Flux var resultMono = Flux.concat(singleton.get(null), singleton.set(Mono.empty()).then(Mono.empty())).last();
.concat(singleton.get(null), singleton.set(Mono.empty()).then(Mono.empty())) return Mono.usingWhen(resultMono,
.last() result -> Mono.fromSupplier(() -> this.deserializeValue(result)),
.handle(this::deserializeValue); result -> Mono.fromRunnable(result::close)
);
} }
@Override @Override