Send multiple events together

This commit is contained in:
Andrea Cavalli 2022-01-11 01:45:39 +01:00
parent e723cc6d98
commit 735fccf043
4 changed files with 100 additions and 48 deletions

View File

@ -8,6 +8,7 @@ import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import it.tdlight.reactiveapi.Event.Request;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux;
@ -26,9 +27,9 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut
this.eventService = api.getAtomix().getEventService();
clientBoundEvents = Flux
.<ClientBoundEvent>push(sink -> {
.<List<ClientBoundEvent>>push(sink -> {
var subscriptionFuture = eventService.subscribe("session-client-bound-events",
LiveAtomixReactiveApiClient::deserializeEvent,
LiveAtomixReactiveApiClient::deserializeEvents,
s -> {
sink.next(s);
return CompletableFuture.completedFuture(null);
@ -38,6 +39,7 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut
sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close));
}, OverflowStrategy.ERROR)
.onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR)
.flatMapIterable(list -> list)
.takeUntil(s -> closed)
.share();
}

View File

@ -8,6 +8,7 @@ import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import it.tdlight.reactiveapi.Event.Request;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import reactor.core.Disposable;
@ -38,9 +39,9 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl
this.userId = userId;
clientBoundEvents = Flux
.<ClientBoundEvent>push(sink -> {
.<List<ClientBoundEvent>>push(sink -> {
var subscriptionFuture = eventService.subscribe("session-client-bound-events",
LiveAtomixReactiveApiClient::deserializeEvent,
LiveAtomixReactiveApiClient::deserializeEvents,
s -> {
sink.next(s);
return CompletableFuture.completedFuture(null);
@ -49,9 +50,10 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl
);
sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close));
}, OverflowStrategy.ERROR)
.onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR)
.flatMapIterable(list -> list)
.filter(e -> e.userId() == userId)
.doOnNext(e -> liveId.set(e.liveId()))
.onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR)
.share();
liveIdChange = this.clientBoundEvents()

View File

@ -19,6 +19,8 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.SerializationException;
import reactor.core.publisher.BufferOverflowStrategy;
@ -39,15 +41,20 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient {
this.liveId = liveId;
this.userId = userId;
this.clientBoundEvents = Flux
.<ClientBoundEvent>push(sink -> {
var subscriptionFuture = eventService.subscribe("session-client-bound-events", LiveAtomixReactiveApiClient::deserializeEvent, s -> {
.<List<ClientBoundEvent>>push(sink -> {
var subscriptionFuture = eventService.subscribe("session-client-bound-events",
LiveAtomixReactiveApiClient::deserializeEvents,
s -> {
sink.next(s);
return CompletableFuture.completedFuture(null);
}, (a) -> null);
},
(a) -> null
);
sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close));
}, OverflowStrategy.ERROR)
.filter(e -> e.userId() == userId && e.liveId() == liveId)
.onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR)
.flatMapIterable(list -> list)
.filter(e -> e.userId() == userId && e.liveId() == liveId)
.share();
}
@ -103,6 +110,29 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient {
static ClientBoundEvent deserializeEvent(byte[] bytes) {
try (var byteArrayInputStream = new ByteArrayInputStream(bytes)) {
try (var is = new DataInputStream(byteArrayInputStream)) {
return deserializeEvent(is);
}
} catch (IOException ex) {
throw new SerializationException(ex);
}
}
static List<ClientBoundEvent> deserializeEvents(byte[] bytes) {
try (var byteArrayInputStream = new ByteArrayInputStream(bytes)) {
try (var is = new DataInputStream(byteArrayInputStream)) {
var len = is.readInt();
var result = new ArrayList<ClientBoundEvent>(len);
for (int i = 0; i < len; i++) {
result.add(deserializeEvent(is));
}
return result;
}
} catch (IOException ex) {
throw new SerializationException(ex);
}
}
static ClientBoundEvent deserializeEvent(DataInputStream is) throws IOException {
var liveId = is.readLong();
var userId = is.readLong();
return switch (is.readByte()) {
@ -115,8 +145,4 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient {
default -> throw new IllegalStateException("Unexpected value: " + is.readByte());
};
}
} catch (IOException ex) {
throw new SerializationException(ex);
}
}
}

View File

@ -177,10 +177,13 @@ public abstract class ReactiveApiPublisher {
.cast(ClientBoundResultingEvent.class)
.map(ClientBoundResultingEvent::event)
.limitRate(1)
.bufferTimeout(64, Duration.ofMillis(10))
// Send events to the client
.subscribeOn(Schedulers.parallel())
.subscribe(clientBoundEvent -> eventService.broadcast("session-client-bound-events",
clientBoundEvent, ReactiveApiPublisher::serializeEvent));
clientBoundEvent, ReactiveApiPublisher::serializeEvents));
publishedResultingEvents
// Obtain only cluster-bound events
@ -352,9 +355,33 @@ public abstract class ReactiveApiPublisher {
return List.of();
}
private static byte[] serializeEvents(List<ClientBoundEvent> clientBoundEvents) {
try (var byteArrayOutputStream = new ByteArrayOutputStream()) {
try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
dataOutputStream.writeInt(clientBoundEvents.size());
for (ClientBoundEvent clientBoundEvent : clientBoundEvents) {
writeClientBoundEvent(clientBoundEvent, dataOutputStream);
}
return byteArrayOutputStream.toByteArray();
}
} catch (IOException ex) {
throw new SerializationException(ex);
}
}
private static byte[] serializeEvent(ClientBoundEvent clientBoundEvent) {
try (var byteArrayOutputStream = new ByteArrayOutputStream()) {
try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
writeClientBoundEvent(clientBoundEvent, dataOutputStream);
return byteArrayOutputStream.toByteArray();
}
} catch (IOException ex) {
throw new SerializationException(ex);
}
}
private static void writeClientBoundEvent(ClientBoundEvent clientBoundEvent, DataOutputStream dataOutputStream)
throws IOException {
dataOutputStream.writeLong(clientBoundEvent.liveId());
dataOutputStream.writeLong(clientBoundEvent.userId());
if (clientBoundEvent instanceof OnUpdateData onUpdateData) {
@ -380,11 +407,6 @@ public abstract class ReactiveApiPublisher {
} else {
throw new UnsupportedOperationException("Unexpected value: " + clientBoundEvent);
}
return byteArrayOutputStream.toByteArray();
}
} catch (IOException ex) {
throw new SerializationException(ex);
}
}
private CompletableFuture<Subscription> registerTopics() {