Correct generic builder.

Add Unit Tests.
This commit is contained in:
mgabriel 2018-08-22 16:40:57 +02:00
parent bdbc806e85
commit 8ce13d8e0d
6 changed files with 122 additions and 66 deletions

View File

@ -30,13 +30,13 @@ import reactor.core.publisher.FluxSink;
*/ */
public abstract class AbstractChronicleStore<I, O> implements FluxStore<I, O> { public abstract class AbstractChronicleStore<I, O> implements FluxStore<I, O> {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractChronicleStore.class); private static final Logger LOGGER = LoggerFactory.getLogger(AbstractChronicleStore.class);
private final Function<I, byte[]> serializer;
protected final Function<byte[], I> deserializer; protected final Function<byte[], I> deserializer;
private final Function<I, byte[]> serializer;
private final SingleChronicleQueue queue; private final SingleChronicleQueue queue;
private final RollCycle rollCycle; private final RollCycle rollCycle;
protected AbstractChronicleStore(AbstractChronicleStoreBuilder<I> builder) { protected <S extends AbstractChronicleStore<I, O>, B extends AbstractChronicleStoreBuilder<B, S, I>> AbstractChronicleStore(
AbstractChronicleStoreBuilder<B, S, I> builder) {
serializer = builder.serializer; serializer = builder.serializer;
deserializer = builder.deserializer; deserializer = builder.deserializer;
rollCycle = builder.rollCycle; rollCycle = builder.rollCycle;
@ -69,17 +69,28 @@ public abstract class AbstractChronicleStore<I, O> implements FluxStore<I, O> {
return serializer.apply(v); return serializer.apply(v);
} }
protected abstract O deserializeValue(BytesIn rawData);
@Override @Override
public void store(I item) { public void store(I item) {
ExcerptAppender appender = queue.acquireAppender(); ExcerptAppender appender = queue.acquireAppender();
storeValue(appender, item); storeValue(appender, item);
} }
private enum ReaderType { @Override
ALL, public Flux<O> retrieveAll(boolean deleteAfterRead) {
ONLY_HISTORY return Flux.create(sink -> launchTailer(sink, ReaderType.ALL, deleteAfterRead));
}
private void launchTailer(FluxSink<O> sink, ReaderType readerType, boolean deleteAfterRead) {
launchTailer(sink, queue.createTailer(), readerType, deleteAfterRead);
}
private void launchTailer(FluxSink<O> sink, ExcerptTailer tailer, ReaderType readerType, boolean deleteAfterRead) {
String path = tailer.queue().file().getAbsolutePath();
Thread t = new Thread(
() -> readTailer(tailer, sink, readerType, deleteAfterRead),
"ChronicleStoreRetrieve_" + path);
t.setDaemon(true);
t.start();
} }
private void readTailer(ExcerptTailer tailer, FluxSink<O> sink, private void readTailer(ExcerptTailer tailer, FluxSink<O> sink,
@ -113,6 +124,8 @@ public abstract class AbstractChronicleStore<I, O> implements FluxStore<I, O> {
} }
} }
protected abstract O deserializeValue(BytesIn rawData);
private void waitMillis(long time) { private void waitMillis(long time) {
try { try {
MILLISECONDS.sleep(time); MILLISECONDS.sleep(time);
@ -153,24 +166,6 @@ public abstract class AbstractChronicleStore<I, O> implements FluxStore<I, O> {
} }
} }
@Override
public Flux<O> retrieveAll(boolean deleteAfterRead) {
return Flux.create(sink -> launchTailer(sink, ReaderType.ALL, deleteAfterRead));
}
private void launchTailer(FluxSink<O> sink, ReaderType readerType, boolean deleteAfterRead) {
launchTailer(sink, queue.createTailer(), readerType, deleteAfterRead);
}
private void launchTailer(FluxSink<O> sink, ExcerptTailer tailer, ReaderType readerType, boolean deleteAfterRead) {
String path = tailer.queue().file().getAbsolutePath();
Thread t = new Thread(
() -> readTailer(tailer, sink, readerType, deleteAfterRead),
"ChronicleStoreRetrieve_" + path);
t.setDaemon(true);
t.start();
}
@Override @Override
public Flux<O> retrieveHistory() { public Flux<O> retrieveHistory() {
return Flux.create(sink -> launchTailer(sink, ReaderType.ONLY_HISTORY, false)); return Flux.create(sink -> launchTailer(sink, ReaderType.ONLY_HISTORY, false));
@ -188,7 +183,12 @@ public abstract class AbstractChronicleStore<I, O> implements FluxStore<I, O> {
return new ReplayFlux<>(historySource, timestampExtractor); return new ReplayFlux<>(historySource, timestampExtractor);
} }
public abstract static class AbstractChronicleStoreBuilder<T> { private enum ReaderType {
ALL,
ONLY_HISTORY
}
public abstract static class AbstractChronicleStoreBuilder<B extends AbstractChronicleStoreBuilder<B, R, T>, R extends AbstractChronicleStore, T> {
private String path; private String path;
private Function<T, byte[]> serializer; private Function<T, byte[]> serializer;
private Function<byte[], T> deserializer; private Function<byte[], T> deserializer;
@ -202,36 +202,40 @@ public abstract class AbstractChronicleStore<I, O> implements FluxStore<I, O> {
* This path should not be a network file system (see <a href="https://github.com/OpenHFT/Chronicle-Queue">the Chronicle queue documentation for more detail</a> * This path should not be a network file system (see <a href="https://github.com/OpenHFT/Chronicle-Queue">the Chronicle queue documentation for more detail</a>
* @return this builder * @return this builder
*/ */
public AbstractChronicleStoreBuilder<T> path(String path) { public B path(String path) {
this.path = path; this.path = path;
return this; return getThis();
} }
protected abstract B getThis();
/** /**
* @param serializer data serializer * @param serializer data serializer
* @return this builder * @return this builder
*/ */
public AbstractChronicleStoreBuilder<T> serializer(Function<T, byte[]> serializer) { public B serializer(Function<T, byte[]> serializer) {
this.serializer = serializer; this.serializer = serializer;
return this; return getThis();
} }
/** /**
* @param deserializer data deserializer * @param deserializer data deserializer
* @return this builder * @return this builder
*/ */
public AbstractChronicleStoreBuilder<T> deserializer(Function<byte[], T> deserializer) { public B deserializer(Function<byte[], T> deserializer) {
this.deserializer = deserializer; this.deserializer = deserializer;
return this; return getThis();
} }
/** /**
* @param rollCycle roll cycle for the files * @param rollCycle roll cycle for the files
* @return this builder * @return this builder
*/ */
public AbstractChronicleStoreBuilder<T> rollCycle(RollCycle rollCycle) { public B rollCycle(RollCycle rollCycle) {
this.rollCycle = rollCycle; this.rollCycle = rollCycle;
return this; return getThis();
} }
public abstract R build();
} }
} }

View File

@ -9,7 +9,6 @@ import net.openhft.chronicle.bytes.BytesIn;
/** /**
* Implementation of a {@link FluxJournal} backed by a Chronicle Queue. * Implementation of a {@link FluxJournal} backed by a Chronicle Queue.
* This journal respects the backpressure on the data streams it produces. * This journal respects the backpressure on the data streams it produces.
*
* All values saved in the journal are timed with the current time. * All values saved in the journal are timed with the current time.
* *
* @author mgabriel. * @author mgabriel.
@ -30,10 +29,6 @@ public class ChronicleJournal<T> extends AbstractChronicleStore<T, Timed<T>> imp
.deserializer(deserializer)); .deserializer(deserializer));
} }
private ChronicleJournal(ChronicleJournalBuilder<T> builder) {
super(builder);
}
/** /**
* @param <BT> data type. * @param <BT> data type.
* @return a ChronicleStore builder. * @return a ChronicleStore builder.
@ -42,6 +37,10 @@ public class ChronicleJournal<T> extends AbstractChronicleStore<T, Timed<T>> imp
return new ChronicleJournalBuilder<>(); return new ChronicleJournalBuilder<>();
} }
private ChronicleJournal(ChronicleJournalBuilder<T> builder) {
super(builder);
}
@Override @Override
protected byte[] serializeValue(T v) { protected byte[] serializeValue(T v) {
byte[] val = super.serializeValue(v); byte[] val = super.serializeValue(v);
@ -52,11 +51,6 @@ public class ChronicleJournal<T> extends AbstractChronicleStore<T, Timed<T>> imp
return result; return result;
} }
//package private for testing
long getCurrentTime() {
return System.currentTimeMillis();
}
@Override @Override
protected Timed<T> deserializeValue(BytesIn rawData) { protected Timed<T> deserializeValue(BytesIn rawData) {
int size = rawData.readInt(); int size = rawData.readInt();
@ -72,6 +66,17 @@ public class ChronicleJournal<T> extends AbstractChronicleStore<T, Timed<T>> imp
} }
private static long fromByteArray(byte[] bytes) {
return (bytes[0] & 0xFFL) << 56
| (bytes[1] & 0xFFL) << 48
| (bytes[2] & 0xFFL) << 40
| (bytes[3] & 0xFFL) << 32
| (bytes[4] & 0xFFL) << 24
| (bytes[5] & 0xFFL) << 16
| (bytes[6] & 0xFFL) << 8
| (bytes[7] & 0xFFL);
}
private static byte[] toByteArray(long value) { private static byte[] toByteArray(long value) {
return new byte[] { return new byte[] {
(byte) (value >> 56), (byte) (value >> 56),
@ -85,21 +90,23 @@ public class ChronicleJournal<T> extends AbstractChronicleStore<T, Timed<T>> imp
}; };
} }
private static long fromByteArray(byte[] bytes){ //package private for testing
return (bytes[0] & 0xFFL) << 56 long getCurrentTime() {
| (bytes[1] & 0xFFL) << 48 return System.currentTimeMillis();
| (bytes[2] & 0xFFL) << 40
| (bytes[3] & 0xFFL) << 32
| (bytes[4] & 0xFFL) << 24
| (bytes[5] & 0xFFL) << 16
| (bytes[6] & 0xFFL) << 8
| (bytes[7] & 0xFFL);
} }
public static final class ChronicleJournalBuilder<T> extends AbstractChronicleStoreBuilder<T> { public static final class ChronicleJournalBuilder<T>
extends AbstractChronicleStoreBuilder<ChronicleJournalBuilder<T>, ChronicleJournal<T>, T> {
private ChronicleJournalBuilder() { private ChronicleJournalBuilder() {
super(); super();
} }
@Override
protected ChronicleJournalBuilder<T> getThis() {
return this;
}
@Override
public ChronicleJournal<T> build() { public ChronicleJournal<T> build() {
return new ChronicleJournal<>(this); return new ChronicleJournal<>(this);
} }

View File

@ -26,10 +26,6 @@ public class ChronicleStore<T> extends AbstractChronicleStore<T, T> {
.deserializer(deserializer)); .deserializer(deserializer));
} }
private ChronicleStore(ChronicleStoreBuilder<T> builder) {
super(builder);
}
/** /**
* @param <BT> data type. * @param <BT> data type.
* @return a ChronicleStore builder. * @return a ChronicleStore builder.
@ -38,6 +34,11 @@ public class ChronicleStore<T> extends AbstractChronicleStore<T, T> {
return new ChronicleStoreBuilder<>(); return new ChronicleStoreBuilder<>();
} }
//package private for testing
ChronicleStore(ChronicleStoreBuilder<T> builder) {
super(builder);
}
@Override @Override
protected T deserializeValue(BytesIn rawData) { protected T deserializeValue(BytesIn rawData) {
int size = rawData.readInt(); int size = rawData.readInt();
@ -46,14 +47,21 @@ public class ChronicleStore<T> extends AbstractChronicleStore<T, T> {
return deserializer.apply(bytes); return deserializer.apply(bytes);
} }
public static final class ChronicleStoreBuilder<T> extends AbstractChronicleStoreBuilder<T> { public static final class ChronicleStoreBuilder<T>
extends AbstractChronicleStoreBuilder<ChronicleStoreBuilder<T>, ChronicleStore<T>, T> {
private ChronicleStoreBuilder() { private ChronicleStoreBuilder() {
super(); super();
} }
@Override
protected ChronicleStoreBuilder<T> getThis() {
return this;
}
@Override
public ChronicleStore<T> build() { public ChronicleStore<T> build() {
return new ChronicleStore<T>(this); return new ChronicleStore<>(this);
} }
} }
} }

View File

@ -17,7 +17,6 @@ import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Mockito; import org.mockito.Mockito;
import ch.streamly.chronicle.flux.AbstractChronicleStore.AbstractChronicleStoreBuilder;
import net.openhft.chronicle.bytes.Bytes; import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.BytesIn; import net.openhft.chronicle.bytes.BytesIn;
import net.openhft.chronicle.bytes.ReadBytesMarshallable; import net.openhft.chronicle.bytes.ReadBytesMarshallable;
@ -59,11 +58,10 @@ class AbstractChronicleStoreTest {
when(wireStore.file()).thenReturn(file); when(wireStore.file()).thenReturn(file);
when(file.delete()).thenReturn(true); when(file.delete()).thenReturn(true);
AbstractChronicleStoreBuilder<String> builder = new AbstractChronicleStoreBuilder<String>() { ChronicleStore.ChronicleStoreBuilder<String> builder = ChronicleStore.newBuilder();
};
builder.rollCycle(rollCycle); builder.rollCycle(rollCycle);
store = new AbstractChronicleStore<String, String>(builder) { store = new ChronicleStore<String>(builder) {
@Override @Override
SingleChronicleQueue createQueue(String path) { SingleChronicleQueue createQueue(String path) {
@ -86,7 +84,7 @@ class AbstractChronicleStoreTest {
} }
@Test @Test
@DisplayName("tests that the an exception on file deletion is swallowed") @DisplayName("tests that an exception on file deletion is swallowed")
void shouldSwallowExceptionOnFileDelete() { void shouldSwallowExceptionOnFileDelete() {
when(file.delete()).thenThrow(new RuntimeException("Simulated for unit test")); when(file.delete()).thenThrow(new RuntimeException("Simulated for unit test"));
Disposable sub = store.retrieveAll(true).subscribe(); Disposable sub = store.retrieveAll(true).subscribe();
@ -133,4 +131,11 @@ class AbstractChronicleStoreTest {
when(wireStore.file()).thenReturn(null); when(wireStore.file()).thenReturn(null);
subscribeToValues(); subscribeToValues();
} }
@Test
@DisplayName("tests that an exception thrown when reading from Chronicle is swallowed")
void testTailerException() {
when(tailer.readBytes(any(ReadBytesMarshallable.class))).thenThrow(new RuntimeException("simulated")).thenReturn(true);
subscribeToValues();
}
} }

View File

@ -60,6 +60,21 @@ class ChronicleJournalTest {
@Test @Test
@DisplayName("tests that a timed data stream is stored in the Chronicle journal") @DisplayName("tests that a timed data stream is stored in the Chronicle journal")
void shouldStoreStreamWithTimeStamps() { void shouldStoreStreamWithTimeStamps() {
verifyBasicOperations();
}
@Test
@DisplayName("tests store creation with builder")
void testStoreCreationWithBuilder() {
journal = ChronicleJournal.<DummyObject>newBuilder()
.path(path)
.serializer(DummyObject::toBinary)
.deserializer(DummyObject::fromBinary)
.build();
}
private void verifyBasicOperations() {
journal.store(source); journal.store(source);
StepVerifier.create(journal.retrieveAll()) StepVerifier.create(journal.retrieveAll())
.expectSubscription() .expectSubscription()

View File

@ -11,6 +11,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import net.openhft.chronicle.queue.RollCycles;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
@ -44,6 +45,22 @@ class ChronicleStoreTest {
@Test @Test
@DisplayName("tests that a data stream is store in the Chronicle store") @DisplayName("tests that a data stream is store in the Chronicle store")
void shouldStoreStream() { void shouldStoreStream() {
testBasicOperations();
}
@Test
@DisplayName("tests building the chronicle store with its builder")
void testWithBuilder() {
store = ChronicleStore.<DummyObject>newBuilder()
.path(path)
.serializer(DummyObject::toBinary)
.deserializer(DummyObject::fromBinary)
.rollCycle(RollCycles.DAILY)
.build();
testBasicOperations();
}
private void testBasicOperations() {
store.store(source); store.store(source);
StepVerifier.create(store.retrieveAll()) StepVerifier.create(store.retrieveAll())
.expectSubscription() .expectSubscription()