From c91507381c9b3f2c7b12130f69e38577265a6c8e Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 20 Oct 2023 12:52:18 +0200 Subject: [PATCH] Add memory segments preview --- pom.xml | 11 +- .../it/cavallium/filequeue/Deserializer.java | 2 - .../filequeue/DiskQueueToConsumer.java | 5 + .../it/cavallium/filequeue/FileUtils.java | 24 ++ .../cavallium/filequeue/IQueueToConsumer.java | 2 + .../filequeue/LMDBQueueToConsumer.java | 5 + .../MemorySegmentQueueToConsumer.java | 58 ++++ .../cavallium/filequeue/QueueToConsumer.java | 47 +-- .../it/cavallium/filequeue/SimpleQueue.java | 2 +- .../cavallium/filequeue/SimpleQueueLMDB.java | 2 - .../filequeue/SimpleQueueMemorySegment.java | 163 ++++++++++ .../SimpleQueueMemorySegmentFixedSize.java | 89 ++++++ .../filequeue/TestQueueToConsumer.java | 70 ++++- .../filequeue/TestQueueToConsumerJava.java | 287 ++++++++++++++++++ .../TestQueueToConsumerMemorySegment.java | 54 ++++ 15 files changed, 781 insertions(+), 40 deletions(-) create mode 100644 src/main/java/it/cavallium/filequeue/FileUtils.java create mode 100644 src/main/java/it/cavallium/filequeue/MemorySegmentQueueToConsumer.java create mode 100644 src/main/java/it/cavallium/filequeue/SimpleQueueMemorySegment.java create mode 100644 src/main/java/it/cavallium/filequeue/SimpleQueueMemorySegmentFixedSize.java create mode 100644 src/test/java/it/cavallium/filequeue/TestQueueToConsumerJava.java create mode 100644 src/test/java/it/cavallium/filequeue/TestQueueToConsumerMemorySegment.java diff --git a/pom.xml b/pom.xml index 54928c1..601d3e7 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ it.cavallium filequeue file queue project - 3.1.5 + 3.1.6 jar Light weight, high performance, simple, reliable and persistent queue 4.0.0 @@ -32,8 +32,9 @@ maven-compiler-plugin 3.7.0 - 11 - 11 + 21 + 21 + --enable-preview @@ -47,12 +48,12 @@ org.lmdbjava lmdbjava - 0.8.2 + 0.8.3 org.junit.jupiter junit-jupiter-api - 5.9.0 + 5.9.2 test diff --git a/src/main/java/it/cavallium/filequeue/Deserializer.java b/src/main/java/it/cavallium/filequeue/Deserializer.java index 08f5ae2..a87b301 100644 --- a/src/main/java/it/cavallium/filequeue/Deserializer.java +++ b/src/main/java/it/cavallium/filequeue/Deserializer.java @@ -1,11 +1,9 @@ package it.cavallium.filequeue; import java.io.ByteArrayInputStream; -import java.io.Closeable; import java.io.DataInput; import java.io.DataInputStream; import java.io.IOException; -import java.util.Map; public interface Deserializer { diff --git a/src/main/java/it/cavallium/filequeue/DiskQueueToConsumer.java b/src/main/java/it/cavallium/filequeue/DiskQueueToConsumer.java index 917516e..222007a 100644 --- a/src/main/java/it/cavallium/filequeue/DiskQueueToConsumer.java +++ b/src/main/java/it/cavallium/filequeue/DiskQueueToConsumer.java @@ -52,4 +52,9 @@ public final class DiskQueueToConsumer implements IQueueToConsumer { public void startQueue() { queue.startQueue(); } + + @Override + public void runStep(boolean park) { + queue.runStep(park); + } } diff --git a/src/main/java/it/cavallium/filequeue/FileUtils.java b/src/main/java/it/cavallium/filequeue/FileUtils.java new file mode 100644 index 0000000..92234d4 --- /dev/null +++ b/src/main/java/it/cavallium/filequeue/FileUtils.java @@ -0,0 +1,24 @@ +package it.cavallium.filequeue; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; + +class FileUtils { + + public static void deleteDirectory(Path dir) throws IOException { + try (var fs = Files.walk(dir)) { + fs.sorted(Comparator.reverseOrder()).forEach(f -> { + try { + Files.deleteIfExists(f); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } catch (UncheckedIOException ex) { + throw ex.getCause(); + } + } +} diff --git a/src/main/java/it/cavallium/filequeue/IQueueToConsumer.java b/src/main/java/it/cavallium/filequeue/IQueueToConsumer.java index fb5d9a2..956af67 100644 --- a/src/main/java/it/cavallium/filequeue/IQueueToConsumer.java +++ b/src/main/java/it/cavallium/filequeue/IQueueToConsumer.java @@ -10,4 +10,6 @@ public interface IQueueToConsumer extends Closeable { void close(); void startQueue(); + + void runStep(boolean park); } diff --git a/src/main/java/it/cavallium/filequeue/LMDBQueueToConsumer.java b/src/main/java/it/cavallium/filequeue/LMDBQueueToConsumer.java index aedffa6..e9d4c06 100644 --- a/src/main/java/it/cavallium/filequeue/LMDBQueueToConsumer.java +++ b/src/main/java/it/cavallium/filequeue/LMDBQueueToConsumer.java @@ -35,4 +35,9 @@ public final class LMDBQueueToConsumer implements IQueueToConsumer { public void startQueue() { queue.startQueue(); } + + @Override + public void runStep(boolean park) { + queue.runStep(park); + } } diff --git a/src/main/java/it/cavallium/filequeue/MemorySegmentQueueToConsumer.java b/src/main/java/it/cavallium/filequeue/MemorySegmentQueueToConsumer.java new file mode 100644 index 0000000..9bdff63 --- /dev/null +++ b/src/main/java/it/cavallium/filequeue/MemorySegmentQueueToConsumer.java @@ -0,0 +1,58 @@ +package it.cavallium.filequeue; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Path; + +public final class MemorySegmentQueueToConsumer implements IQueueToConsumer { + + private final SimpleQueueMemorySegment queueMemorySegment; + private final QueueToConsumer queue; + private final Path dir; + + public MemorySegmentQueueToConsumer(Path file, + String dbName, + Serializer serializer, + Deserializer deserializer, + QueueConsumer consumer, + long segmentSize) { + try { + this.dir = file.resolve(dbName); + this.queueMemorySegment = new SimpleQueueMemorySegment<>(dir, + dbName, + serializer, + deserializer, + segmentSize + ); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + this.queue = new QueueToConsumer<>(queueMemorySegment, consumer); + } + + @Override + public void add(T value) { + queue.add(value); + } + + @Override + public void close() { + queue.close(); + queueMemorySegment.close(); + try { + FileUtils.deleteDirectory(dir); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void startQueue() { + queue.startQueue(); + } + + @Override + public void runStep(boolean park) { + queue.runStep(park); + } +} diff --git a/src/main/java/it/cavallium/filequeue/QueueToConsumer.java b/src/main/java/it/cavallium/filequeue/QueueToConsumer.java index ff9fbb0..6db13ba 100644 --- a/src/main/java/it/cavallium/filequeue/QueueToConsumer.java +++ b/src/main/java/it/cavallium/filequeue/QueueToConsumer.java @@ -70,27 +70,40 @@ class QueueToConsumer implements IQueueToConsumer { public void run() { try { while (!closed) { - T element; - boolean shouldRemove = afterAddQueued.getAndUpdate(n -> n > 0 ? n - 1 : 0) > 0; - if (shouldRemove) { - element = queue.remove(); - long nextDelay = BACKOFF_NS; - while (!closed && !consumer.tryConsume(element)) { - LockSupport.parkNanos(nextDelay); - if (nextDelay + BACKOFF_NS <= MAX_BACKOFF_NS) { - nextDelay += BACKOFF_NS; - } else if (nextDelay < MAX_BACKOFF_NS) { - nextDelay = MAX_BACKOFF_NS; - } - } - preAddQueued.updateAndGet(n -> n > 0 ? n - 1 : 0); - } else { - LockSupport.parkNanos(HALF_SECOND_NS); - } + runStepInternal(true); } } finally { closed = true; } } } + + public void runStep(boolean park) { + if (manager != null) throw new UnsupportedOperationException("This queue is managed"); + runStepInternal(false); + } + + private void runStepInternal(boolean park) { + T element; + boolean shouldRemove = afterAddQueued.getAndUpdate(n -> n > 0 ? n - 1 : 0) > 0; + if (shouldRemove) { + element = queue.remove(); + long nextDelay = BACKOFF_NS; + while (!closed && !consumer.tryConsume(element)) { + if (park) { + LockSupport.parkNanos(nextDelay); + } + if (nextDelay + BACKOFF_NS <= MAX_BACKOFF_NS) { + nextDelay += BACKOFF_NS; + } else if (nextDelay < MAX_BACKOFF_NS) { + nextDelay = MAX_BACKOFF_NS; + } + } + preAddQueued.updateAndGet(n -> n > 0 ? n - 1 : 0); + } else { + if (park) { + LockSupport.parkNanos(HALF_SECOND_NS); + } + } + } } diff --git a/src/main/java/it/cavallium/filequeue/SimpleQueue.java b/src/main/java/it/cavallium/filequeue/SimpleQueue.java index 4b83073..944a37e 100644 --- a/src/main/java/it/cavallium/filequeue/SimpleQueue.java +++ b/src/main/java/it/cavallium/filequeue/SimpleQueue.java @@ -3,7 +3,7 @@ package it.cavallium.filequeue; /** * The queue must be thread-safe */ -interface SimpleQueue { +public interface SimpleQueue { void add(T element); diff --git a/src/main/java/it/cavallium/filequeue/SimpleQueueLMDB.java b/src/main/java/it/cavallium/filequeue/SimpleQueueLMDB.java index 4f52a48..9bbdfcc 100644 --- a/src/main/java/it/cavallium/filequeue/SimpleQueueLMDB.java +++ b/src/main/java/it/cavallium/filequeue/SimpleQueueLMDB.java @@ -6,8 +6,6 @@ import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import org.lmdbjava.Dbi; import org.lmdbjava.DbiFlags; import org.lmdbjava.Env; diff --git a/src/main/java/it/cavallium/filequeue/SimpleQueueMemorySegment.java b/src/main/java/it/cavallium/filequeue/SimpleQueueMemorySegment.java new file mode 100644 index 0000000..1b4b61a --- /dev/null +++ b/src/main/java/it/cavallium/filequeue/SimpleQueueMemorySegment.java @@ -0,0 +1,163 @@ +package it.cavallium.filequeue; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayDeque; +import java.util.HashSet; +import java.util.NoSuchElementException; +import java.util.Set; + +/** + * Thread safe queue + */ +public class SimpleQueueMemorySegment implements SimpleQueue, Closeable { + + private static final Set REGISTRY = new HashSet<>(); + private final ArrayDeque queueSegments = new ArrayDeque<>(); + private final Path directory; + private final String name; + private final Serializer serializer; + private final Deserializer deserializer; + private final Arena arena; + private final long segmentSize; + + public SimpleQueueMemorySegment(Path directory, + String name, + Serializer serializer, + Deserializer deserializer, + long segmentSize) throws IOException { + this.arena = Arena.ofShared(); + this.name = name; + this.directory = directory; + this.serializer = serializer; + this.deserializer = deserializer; + this.segmentSize = segmentSize; + synchronized (REGISTRY) { + if (!REGISTRY.add(name)) { + throw new IllegalArgumentException("Database already registered: " + name); + } + } + if (Files.notExists(directory)) { + Files.createDirectories(directory); + } + } + + private Path generateQueuePath() { + return directory.resolve("queue_%s_%d.queue.bin".formatted(name, System.nanoTime())); + } + + @Override + public void add(T element) { + byte[] serialized; + try { + serialized = serializer.serialize(element); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + var memorySegment = arena.allocateArray(ValueLayout.JAVA_BYTE, serialized); + try { + if (!getQueueSegmentForWrite().offer(memorySegment)) { + expandQueueSegmentForWrite().add(memorySegment); + } + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + } + + private SimpleQueueMemorySegmentFixedSize expandQueueSegmentForWrite() throws IOException { + synchronized (queueSegments) { + var queueMemorySegment = new SimpleQueueMemorySegmentFixedSize(arena, segmentSize, generateQueuePath()); + queueSegments.add(queueMemorySegment); + return queueMemorySegment; + } + } + + private SimpleQueueMemorySegmentFixedSize getQueueSegmentForWrite() throws IOException { + synchronized (queueSegments) { + var first = queueSegments.peekLast(); + if (first == null) { + first = expandQueueSegmentForWrite(); + } + return first; + } + } + + private SimpleQueueMemorySegmentFixedSize getQueueSegmentForRead() throws IOException { + synchronized (queueSegments) { + var last = queueSegments.peekFirst(); + if (last == null) { + last = expandQueueSegmentForWrite(); + } + return last; + } + } + + private boolean popQueueSegmentForRead() throws IOException { + synchronized (queueSegments) { + if (queueSegments.size() > 1) { + var polled = queueSegments.poll(); + if (polled != null) { + polled.close(); + return true; + } + } + } + return false; + } + + @Override + public T remove() { + MemorySegment removed; + try { + removed = getQueueSegmentForRead().poll(); + if (removed == null) { + if (popQueueSegmentForRead()) { + removed = getQueueSegmentForRead().poll(); + } + } + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + if (removed == null) throw new NoSuchElementException(); + return readObject(removed.toArray(ValueLayout.JAVA_BYTE)); + } + + private T readObject(byte[] byteBuffer) { + try { + return deserializer.deserialize(byteBuffer); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public int size() { + return 0; + } + + @Override + public void close() { + try { + this.queueSegments.forEach(qs -> { + try { + qs.close(); + } catch (IOException e) { + e.printStackTrace(); + } + }); + this.arena.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + synchronized (REGISTRY) { + REGISTRY.remove(name); + } + } + } +} diff --git a/src/main/java/it/cavallium/filequeue/SimpleQueueMemorySegmentFixedSize.java b/src/main/java/it/cavallium/filequeue/SimpleQueueMemorySegmentFixedSize.java new file mode 100644 index 0000000..85e6e8e --- /dev/null +++ b/src/main/java/it/cavallium/filequeue/SimpleQueueMemorySegmentFixedSize.java @@ -0,0 +1,89 @@ +package it.cavallium.filequeue; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicInteger; + +public class SimpleQueueMemorySegmentFixedSize implements Closeable { + + private final Path filePath; + private final FileChannel ch; + private final MemorySegment writer; + private final Arena arena; + private final long fixedSize; + private long readPosition = 0; + private long writePosition = 0; + private final AtomicInteger size = new AtomicInteger(); + + public SimpleQueueMemorySegmentFixedSize(Arena arena, long fixedSize, Path filePath) throws IOException { + this.arena = arena; + this.filePath = filePath; + this.fixedSize = fixedSize; + Files.deleteIfExists(filePath); + this.ch = FileChannel.open(filePath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE, StandardOpenOption.READ); + ch.truncate(fixedSize); + this.writer = ch.map(MapMode.READ_WRITE, 0L, fixedSize, arena); + } + + @Override + public void close() throws IOException { + ch.close(); + Files.deleteIfExists(filePath); + } + + public void add(MemorySegment element) { + if (!offer(element)) { + throw new IllegalStateException("Can't add the element, not enough space"); + } + } + + /** + * @return true if there is enough space, false otherwise + */ + public boolean offer(MemorySegment element) { + int size = Math.toIntExact(element.byteSize()); + if (writePosition + size + Integer.BYTES > fixedSize) { + if (writePosition == 0) throw new UnsupportedOperationException("Entity too large! " + size + " bytes"); + return false; + } + writer.set(ValueLayout.JAVA_INT_UNALIGNED, writePosition, size); + MemorySegment.copy(element, 0, writer, writePosition + Integer.BYTES, size); + writePosition += size + Integer.BYTES; + this.size.incrementAndGet(); + return true; + } + + public MemorySegment poll() { + if (readPosition >= fixedSize) { + return null; + } + if (size.decrementAndGet() < 0) { + size.incrementAndGet(); + return null; + } + int size = writer.get(ValueLayout.JAVA_INT_UNALIGNED, readPosition); + var segment = arena.allocate(size); + MemorySegment.copy(writer, readPosition + Integer.BYTES, segment, 0, size); + readPosition += size + Integer.BYTES; + return segment; + } + + public MemorySegment remove() { + var polled = poll(); + if (polled == null) throw new NoSuchElementException(); + return polled; + } + + public int size() { + return Math.max(0, size.get()); + } +} diff --git a/src/test/java/it/cavallium/filequeue/TestQueueToConsumer.java b/src/test/java/it/cavallium/filequeue/TestQueueToConsumer.java index 5875315..8ef78b5 100644 --- a/src/test/java/it/cavallium/filequeue/TestQueueToConsumer.java +++ b/src/test/java/it/cavallium/filequeue/TestQueueToConsumer.java @@ -1,26 +1,28 @@ package it.cavallium.filequeue; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -public class TestQueueToConsumer { +public abstract class TestQueueToConsumer { + + public abstract SimpleQueue getSimpleQueue(); @Test public void test() { - var q = new SimpleQueueJava(new ConcurrentLinkedDeque<>()); + var q = getSimpleQueue(); AtomicBoolean ab = new AtomicBoolean(); - try (var qtc = new QueueToConsumer(q, new QueueConsumer() { - @Override - public boolean tryConsume(String value) { - System.out.println("value:" + value + " thread: " + Thread.currentThread()); - if (ab.get()) { - ab.set(false); - return false; - } else { - ab.set(true); - return true; - } + try (var qtc = new QueueToConsumer<>(q, value -> { + if (ab.get()) { + ab.set(false); + return false; + } else { + ab.set(true); + return true; } })) { qtc.startQueue(); @@ -34,4 +36,46 @@ public class TestQueueToConsumer { throw new RuntimeException(e); } } + + @Test + public void testConcurrency() { + var q = getSimpleQueue(); + AtomicBoolean ab = new AtomicBoolean(); + CountDownLatch cdl = new CountDownLatch(1); + var referenceQueue = IntStream + .range(0, 1000) + .mapToObj(Integer::toString) + .collect(Collectors.toCollection(ConcurrentLinkedDeque::new)); + try (var qtc = new QueueToConsumer<>(q, value -> { + if (ab.get()) { + ab.set(false); + return false; + } else { + ab.set(true); + Assertions.assertEquals(referenceQueue.pop(), value); + if (referenceQueue.isEmpty()) { + cdl.countDown(); + } + return true; + } + })) { + var t = new Thread(() -> { + while (!Thread.interrupted()) { + qtc.runStep(false); + } + }); + t.setDaemon(true); + t.setName("step"); + t.start(); + for (String s : referenceQueue.toArray(String[]::new)) { + qtc.add(s); + } + cdl.await(); + t.interrupt(); + t.join(); + Assertions.assertEquals(0, referenceQueue.size()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } } diff --git a/src/test/java/it/cavallium/filequeue/TestQueueToConsumerJava.java b/src/test/java/it/cavallium/filequeue/TestQueueToConsumerJava.java new file mode 100644 index 0000000..c460438 --- /dev/null +++ b/src/test/java/it/cavallium/filequeue/TestQueueToConsumerJava.java @@ -0,0 +1,287 @@ +package it.cavallium.filequeue; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestQueueToConsumerJava extends TestQueueToConsumer { + + @Override + public SimpleQueue getSimpleQueue() { + return new SimpleQueueJava<>(new ConcurrentLinkedDeque<>()); + } + + @Test + public void testNoQueue() { + var q = new SimpleQueueJava<>(new Queue() { + @Override + public boolean add(String s) { + Assertions.fail(); + return false; + } + + @Override + public boolean offer(String s) { + Assertions.fail(); + return false; + } + + @Override + public String remove() { + Assertions.fail(); + return null; + } + + @Override + public String poll() { + return null; + } + + @Override + public String element() { + Assertions.fail(); + return null; + } + + @Override + public String peek() { + return null; + } + + @Override + public int size() { + return 0; + } + + @Override + public boolean isEmpty() { + return true; + } + + @Override + public boolean contains(Object o) { + Assertions.fail(); + return false; + } + + @Override + public Iterator iterator() { + Assertions.fail(); + return null; + } + + @Override + public Object[] toArray() { + Assertions.fail(); + return new Object[0]; + } + + @Override + public T[] toArray(T[] a) { + Assertions.fail(); + return null; + } + + @Override + public boolean remove(Object o) { + Assertions.fail(); + return false; + } + + @Override + public boolean containsAll(Collection c) { + Assertions.fail(); + return false; + } + + @Override + public boolean addAll(Collection c) { + Assertions.fail(); + return false; + } + + @Override + public boolean removeAll(Collection c) { + Assertions.fail(); + return false; + } + + @Override + public boolean retainAll(Collection c) { + Assertions.fail(); + return false; + } + + @Override + public void clear() { + } + }); + AtomicReference lastConsumed = new AtomicReference<>(); + try (var consumer = new QueueToConsumer<>(q, value -> { + lastConsumed.set(value); + return true; + })) { + for (int i = 0; i < 10_000; i++) { + var s = Integer.toString(i); + consumer.add(s); + Assertions.assertEquals(s, lastConsumed.get()); + } + } + } + + @Test + public void testSmallQueueThenNone() { + var maxQueueSize = 10; + var shouldReject = new AtomicBoolean(true); + var q = new SimpleQueueJava<>(new Queue() { + final Queue smallQueue = new ArrayDeque<>(maxQueueSize); + @Override + public boolean add(String s) { + var added = smallQueue.add(s); + Assertions.assertTrue(smallQueue.size() <= maxQueueSize); + return added; + } + + @Override + public boolean offer(String s) { + var added = smallQueue.offer(s); + Assertions.assertTrue(smallQueue.size() <= maxQueueSize); + return added; + } + + @Override + public String remove() { + var removed = smallQueue.remove(); + Assertions.assertTrue(smallQueue.size() <= maxQueueSize); + return removed; + } + + @Override + public String poll() { + var removed = smallQueue.poll(); + Assertions.assertTrue(smallQueue.size() <= maxQueueSize); + return removed; + } + + @Override + public String element() { + var element = smallQueue.element(); + Assertions.assertTrue(smallQueue.size() <= maxQueueSize); + return element; + } + + @Override + public String peek() { + var element = smallQueue.peek(); + Assertions.assertTrue(smallQueue.size() <= maxQueueSize); + return element; + } + + @Override + public int size() { + return smallQueue.size(); + } + + @Override + public boolean isEmpty() { + return smallQueue.isEmpty(); + } + + @Override + public boolean contains(Object o) { + Assertions.fail(); + return false; + } + + @Override + public Iterator iterator() { + Assertions.fail(); + return null; + } + + @Override + public Object[] toArray() { + Assertions.fail(); + return new Object[0]; + } + + @Override + public T[] toArray(T[] a) { + Assertions.fail(); + return null; + } + + @Override + public boolean remove(Object o) { + Assertions.fail(); + return false; + } + + @Override + public boolean containsAll(Collection c) { + Assertions.fail(); + return false; + } + + @Override + public boolean addAll(Collection c) { + Assertions.fail(); + return false; + } + + @Override + public boolean removeAll(Collection c) { + Assertions.fail(); + return false; + } + + @Override + public boolean retainAll(Collection c) { + Assertions.fail(); + return false; + } + + @Override + public void clear() { + smallQueue.clear(); + } + }); + AtomicReference lastConsumed = new AtomicReference<>(); + try (var consumer = new QueueToConsumer<>(q, value -> { + if (shouldReject.get()) { + lastConsumed.set(null); + return false; + } else { + lastConsumed.set(value); + return true; + } + })) { + for (int i = 0; i < maxQueueSize; i++) { + var s = Integer.toString(i); + consumer.add(s); + Assertions.assertNull(lastConsumed.get()); + } + shouldReject.set(false); + for (int i = 0; i < maxQueueSize; i++) { + var s = Integer.toString(i); + consumer.runStep(false); + Assertions.assertEquals(s, lastConsumed.get()); + Assertions.assertEquals(maxQueueSize - i - 1, q.size()); + } + Assertions.assertEquals(0, q.size()); + for (int i = maxQueueSize; i < 10_000; i++) { + var s = Integer.toString(i); + consumer.add(s); + + Assertions.assertEquals(s, lastConsumed.get()); + Assertions.assertEquals(0, q.size()); + } + } + } +} diff --git a/src/test/java/it/cavallium/filequeue/TestQueueToConsumerMemorySegment.java b/src/test/java/it/cavallium/filequeue/TestQueueToConsumerMemorySegment.java new file mode 100644 index 0000000..b5229a2 --- /dev/null +++ b/src/test/java/it/cavallium/filequeue/TestQueueToConsumerMemorySegment.java @@ -0,0 +1,54 @@ +package it.cavallium.filequeue; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; + +public class TestQueueToConsumerMemorySegment extends TestQueueToConsumer { + + private static final long SEGMENT_SIZE = 24; + private Path dir; + + @BeforeEach + public void setup() throws IOException { + dir = Files.createTempDirectory("queue"); + } + + @AfterEach + public void tearDown() throws IOException { + Path pathToBeDeleted = dir; + try (var fs = Files.walk(pathToBeDeleted)) { + fs.sorted(Comparator.reverseOrder()) + .forEach(f -> { + try { + Files.deleteIfExists(f); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + } + + @Override + public SimpleQueue getSimpleQueue() { + try { + return new SimpleQueueMemorySegment<>(dir, "test", new Serializer<>() { + @Override + public byte[] serialize(String data) { + return data.getBytes(StandardCharsets.UTF_8); + } + }, new Deserializer<>() { + @Override + public String deserialize(byte[] data) { + return new String(data, StandardCharsets.UTF_8); + } + }, SEGMENT_SIZE); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +}