diff --git a/pom.xml b/pom.xml
index 54928c1..601d3e7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
it.cavallium
filequeue
file queue project
- 3.1.5
+ 3.1.6
jar
Light weight, high performance, simple, reliable and persistent queue
4.0.0
@@ -32,8 +32,9 @@
maven-compiler-plugin
3.7.0
-
- 11
+
+ 21
+ --enable-preview
@@ -47,12 +48,12 @@
org.lmdbjava
lmdbjava
- 0.8.2
+ 0.8.3
org.junit.jupiter
junit-jupiter-api
- 5.9.0
+ 5.9.2
test
diff --git a/src/main/java/it/cavallium/filequeue/Deserializer.java b/src/main/java/it/cavallium/filequeue/Deserializer.java
index 08f5ae2..a87b301 100644
--- a/src/main/java/it/cavallium/filequeue/Deserializer.java
+++ b/src/main/java/it/cavallium/filequeue/Deserializer.java
@@ -1,11 +1,9 @@
package it.cavallium.filequeue;
import java.io.ByteArrayInputStream;
-import java.io.Closeable;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
-import java.util.Map;
public interface Deserializer {
diff --git a/src/main/java/it/cavallium/filequeue/DiskQueueToConsumer.java b/src/main/java/it/cavallium/filequeue/DiskQueueToConsumer.java
index 917516e..222007a 100644
--- a/src/main/java/it/cavallium/filequeue/DiskQueueToConsumer.java
+++ b/src/main/java/it/cavallium/filequeue/DiskQueueToConsumer.java
@@ -52,4 +52,9 @@ public final class DiskQueueToConsumer implements IQueueToConsumer {
public void startQueue() {
queue.startQueue();
}
+
+ @Override
+ public void runStep(boolean park) {
+ queue.runStep(park);
+ }
}
diff --git a/src/main/java/it/cavallium/filequeue/FileUtils.java b/src/main/java/it/cavallium/filequeue/FileUtils.java
new file mode 100644
index 0000000..92234d4
--- /dev/null
+++ b/src/main/java/it/cavallium/filequeue/FileUtils.java
@@ -0,0 +1,24 @@
+package it.cavallium.filequeue;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+
+class FileUtils {
+
+ public static void deleteDirectory(Path dir) throws IOException {
+ try (var fs = Files.walk(dir)) {
+ fs.sorted(Comparator.reverseOrder()).forEach(f -> {
+ try {
+ Files.deleteIfExists(f);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ } catch (UncheckedIOException ex) {
+ throw ex.getCause();
+ }
+ }
+}
diff --git a/src/main/java/it/cavallium/filequeue/IQueueToConsumer.java b/src/main/java/it/cavallium/filequeue/IQueueToConsumer.java
index fb5d9a2..956af67 100644
--- a/src/main/java/it/cavallium/filequeue/IQueueToConsumer.java
+++ b/src/main/java/it/cavallium/filequeue/IQueueToConsumer.java
@@ -10,4 +10,6 @@ public interface IQueueToConsumer extends Closeable {
void close();
void startQueue();
+
+ void runStep(boolean park);
}
diff --git a/src/main/java/it/cavallium/filequeue/LMDBQueueToConsumer.java b/src/main/java/it/cavallium/filequeue/LMDBQueueToConsumer.java
index aedffa6..e9d4c06 100644
--- a/src/main/java/it/cavallium/filequeue/LMDBQueueToConsumer.java
+++ b/src/main/java/it/cavallium/filequeue/LMDBQueueToConsumer.java
@@ -35,4 +35,9 @@ public final class LMDBQueueToConsumer implements IQueueToConsumer {
public void startQueue() {
queue.startQueue();
}
+
+ @Override
+ public void runStep(boolean park) {
+ queue.runStep(park);
+ }
}
diff --git a/src/main/java/it/cavallium/filequeue/MemorySegmentQueueToConsumer.java b/src/main/java/it/cavallium/filequeue/MemorySegmentQueueToConsumer.java
new file mode 100644
index 0000000..9bdff63
--- /dev/null
+++ b/src/main/java/it/cavallium/filequeue/MemorySegmentQueueToConsumer.java
@@ -0,0 +1,58 @@
+package it.cavallium.filequeue;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+
+public final class MemorySegmentQueueToConsumer implements IQueueToConsumer {
+
+ private final SimpleQueueMemorySegment queueMemorySegment;
+ private final QueueToConsumer queue;
+ private final Path dir;
+
+ public MemorySegmentQueueToConsumer(Path file,
+ String dbName,
+ Serializer serializer,
+ Deserializer deserializer,
+ QueueConsumer consumer,
+ long segmentSize) {
+ try {
+ this.dir = file.resolve(dbName);
+ this.queueMemorySegment = new SimpleQueueMemorySegment<>(dir,
+ dbName,
+ serializer,
+ deserializer,
+ segmentSize
+ );
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ this.queue = new QueueToConsumer<>(queueMemorySegment, consumer);
+ }
+
+ @Override
+ public void add(T value) {
+ queue.add(value);
+ }
+
+ @Override
+ public void close() {
+ queue.close();
+ queueMemorySegment.close();
+ try {
+ FileUtils.deleteDirectory(dir);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void startQueue() {
+ queue.startQueue();
+ }
+
+ @Override
+ public void runStep(boolean park) {
+ queue.runStep(park);
+ }
+}
diff --git a/src/main/java/it/cavallium/filequeue/QueueToConsumer.java b/src/main/java/it/cavallium/filequeue/QueueToConsumer.java
index ff9fbb0..6db13ba 100644
--- a/src/main/java/it/cavallium/filequeue/QueueToConsumer.java
+++ b/src/main/java/it/cavallium/filequeue/QueueToConsumer.java
@@ -70,27 +70,40 @@ class QueueToConsumer implements IQueueToConsumer {
public void run() {
try {
while (!closed) {
- T element;
- boolean shouldRemove = afterAddQueued.getAndUpdate(n -> n > 0 ? n - 1 : 0) > 0;
- if (shouldRemove) {
- element = queue.remove();
- long nextDelay = BACKOFF_NS;
- while (!closed && !consumer.tryConsume(element)) {
- LockSupport.parkNanos(nextDelay);
- if (nextDelay + BACKOFF_NS <= MAX_BACKOFF_NS) {
- nextDelay += BACKOFF_NS;
- } else if (nextDelay < MAX_BACKOFF_NS) {
- nextDelay = MAX_BACKOFF_NS;
- }
- }
- preAddQueued.updateAndGet(n -> n > 0 ? n - 1 : 0);
- } else {
- LockSupport.parkNanos(HALF_SECOND_NS);
- }
+ runStepInternal(true);
}
} finally {
closed = true;
}
}
}
+
+ public void runStep(boolean park) {
+ if (manager != null) throw new UnsupportedOperationException("This queue is managed");
+ runStepInternal(false);
+ }
+
+ private void runStepInternal(boolean park) {
+ T element;
+ boolean shouldRemove = afterAddQueued.getAndUpdate(n -> n > 0 ? n - 1 : 0) > 0;
+ if (shouldRemove) {
+ element = queue.remove();
+ long nextDelay = BACKOFF_NS;
+ while (!closed && !consumer.tryConsume(element)) {
+ if (park) {
+ LockSupport.parkNanos(nextDelay);
+ }
+ if (nextDelay + BACKOFF_NS <= MAX_BACKOFF_NS) {
+ nextDelay += BACKOFF_NS;
+ } else if (nextDelay < MAX_BACKOFF_NS) {
+ nextDelay = MAX_BACKOFF_NS;
+ }
+ }
+ preAddQueued.updateAndGet(n -> n > 0 ? n - 1 : 0);
+ } else {
+ if (park) {
+ LockSupport.parkNanos(HALF_SECOND_NS);
+ }
+ }
+ }
}
diff --git a/src/main/java/it/cavallium/filequeue/SimpleQueue.java b/src/main/java/it/cavallium/filequeue/SimpleQueue.java
index 4b83073..944a37e 100644
--- a/src/main/java/it/cavallium/filequeue/SimpleQueue.java
+++ b/src/main/java/it/cavallium/filequeue/SimpleQueue.java
@@ -3,7 +3,7 @@ package it.cavallium.filequeue;
/**
* The queue must be thread-safe
*/
-interface SimpleQueue {
+public interface SimpleQueue {
void add(T element);
diff --git a/src/main/java/it/cavallium/filequeue/SimpleQueueLMDB.java b/src/main/java/it/cavallium/filequeue/SimpleQueueLMDB.java
index 4f52a48..9bbdfcc 100644
--- a/src/main/java/it/cavallium/filequeue/SimpleQueueLMDB.java
+++ b/src/main/java/it/cavallium/filequeue/SimpleQueueLMDB.java
@@ -6,8 +6,6 @@ import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import org.lmdbjava.Dbi;
import org.lmdbjava.DbiFlags;
import org.lmdbjava.Env;
diff --git a/src/main/java/it/cavallium/filequeue/SimpleQueueMemorySegment.java b/src/main/java/it/cavallium/filequeue/SimpleQueueMemorySegment.java
new file mode 100644
index 0000000..1b4b61a
--- /dev/null
+++ b/src/main/java/it/cavallium/filequeue/SimpleQueueMemorySegment.java
@@ -0,0 +1,163 @@
+package it.cavallium.filequeue;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.lang.foreign.Arena;
+import java.lang.foreign.MemorySegment;
+import java.lang.foreign.ValueLayout;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * Thread safe queue
+ */
+public class SimpleQueueMemorySegment implements SimpleQueue, Closeable {
+
+ private static final Set REGISTRY = new HashSet<>();
+ private final ArrayDeque queueSegments = new ArrayDeque<>();
+ private final Path directory;
+ private final String name;
+ private final Serializer serializer;
+ private final Deserializer deserializer;
+ private final Arena arena;
+ private final long segmentSize;
+
+ public SimpleQueueMemorySegment(Path directory,
+ String name,
+ Serializer serializer,
+ Deserializer deserializer,
+ long segmentSize) throws IOException {
+ this.arena = Arena.ofShared();
+ this.name = name;
+ this.directory = directory;
+ this.serializer = serializer;
+ this.deserializer = deserializer;
+ this.segmentSize = segmentSize;
+ synchronized (REGISTRY) {
+ if (!REGISTRY.add(name)) {
+ throw new IllegalArgumentException("Database already registered: " + name);
+ }
+ }
+ if (Files.notExists(directory)) {
+ Files.createDirectories(directory);
+ }
+ }
+
+ private Path generateQueuePath() {
+ return directory.resolve("queue_%s_%d.queue.bin".formatted(name, System.nanoTime()));
+ }
+
+ @Override
+ public void add(T element) {
+ byte[] serialized;
+ try {
+ serialized = serializer.serialize(element);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ var memorySegment = arena.allocateArray(ValueLayout.JAVA_BYTE, serialized);
+ try {
+ if (!getQueueSegmentForWrite().offer(memorySegment)) {
+ expandQueueSegmentForWrite().add(memorySegment);
+ }
+ } catch (IOException ex) {
+ throw new UncheckedIOException(ex);
+ }
+ }
+
+ private SimpleQueueMemorySegmentFixedSize expandQueueSegmentForWrite() throws IOException {
+ synchronized (queueSegments) {
+ var queueMemorySegment = new SimpleQueueMemorySegmentFixedSize(arena, segmentSize, generateQueuePath());
+ queueSegments.add(queueMemorySegment);
+ return queueMemorySegment;
+ }
+ }
+
+ private SimpleQueueMemorySegmentFixedSize getQueueSegmentForWrite() throws IOException {
+ synchronized (queueSegments) {
+ var first = queueSegments.peekLast();
+ if (first == null) {
+ first = expandQueueSegmentForWrite();
+ }
+ return first;
+ }
+ }
+
+ private SimpleQueueMemorySegmentFixedSize getQueueSegmentForRead() throws IOException {
+ synchronized (queueSegments) {
+ var last = queueSegments.peekFirst();
+ if (last == null) {
+ last = expandQueueSegmentForWrite();
+ }
+ return last;
+ }
+ }
+
+ private boolean popQueueSegmentForRead() throws IOException {
+ synchronized (queueSegments) {
+ if (queueSegments.size() > 1) {
+ var polled = queueSegments.poll();
+ if (polled != null) {
+ polled.close();
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public T remove() {
+ MemorySegment removed;
+ try {
+ removed = getQueueSegmentForRead().poll();
+ if (removed == null) {
+ if (popQueueSegmentForRead()) {
+ removed = getQueueSegmentForRead().poll();
+ }
+ }
+ } catch (IOException ex) {
+ throw new UncheckedIOException(ex);
+ }
+ if (removed == null) throw new NoSuchElementException();
+ return readObject(removed.toArray(ValueLayout.JAVA_BYTE));
+ }
+
+ private T readObject(byte[] byteBuffer) {
+ try {
+ return deserializer.deserialize(byteBuffer);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+
+ @Override
+ public void close() {
+ try {
+ this.queueSegments.forEach(qs -> {
+ try {
+ qs.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ });
+ this.arena.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ synchronized (REGISTRY) {
+ REGISTRY.remove(name);
+ }
+ }
+ }
+}
diff --git a/src/main/java/it/cavallium/filequeue/SimpleQueueMemorySegmentFixedSize.java b/src/main/java/it/cavallium/filequeue/SimpleQueueMemorySegmentFixedSize.java
new file mode 100644
index 0000000..85e6e8e
--- /dev/null
+++ b/src/main/java/it/cavallium/filequeue/SimpleQueueMemorySegmentFixedSize.java
@@ -0,0 +1,89 @@
+package it.cavallium.filequeue;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.foreign.Arena;
+import java.lang.foreign.MemorySegment;
+import java.lang.foreign.ValueLayout;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SimpleQueueMemorySegmentFixedSize implements Closeable {
+
+ private final Path filePath;
+ private final FileChannel ch;
+ private final MemorySegment writer;
+ private final Arena arena;
+ private final long fixedSize;
+ private long readPosition = 0;
+ private long writePosition = 0;
+ private final AtomicInteger size = new AtomicInteger();
+
+ public SimpleQueueMemorySegmentFixedSize(Arena arena, long fixedSize, Path filePath) throws IOException {
+ this.arena = arena;
+ this.filePath = filePath;
+ this.fixedSize = fixedSize;
+ Files.deleteIfExists(filePath);
+ this.ch = FileChannel.open(filePath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE, StandardOpenOption.READ);
+ ch.truncate(fixedSize);
+ this.writer = ch.map(MapMode.READ_WRITE, 0L, fixedSize, arena);
+ }
+
+ @Override
+ public void close() throws IOException {
+ ch.close();
+ Files.deleteIfExists(filePath);
+ }
+
+ public void add(MemorySegment element) {
+ if (!offer(element)) {
+ throw new IllegalStateException("Can't add the element, not enough space");
+ }
+ }
+
+ /**
+ * @return true if there is enough space, false otherwise
+ */
+ public boolean offer(MemorySegment element) {
+ int size = Math.toIntExact(element.byteSize());
+ if (writePosition + size + Integer.BYTES > fixedSize) {
+ if (writePosition == 0) throw new UnsupportedOperationException("Entity too large! " + size + " bytes");
+ return false;
+ }
+ writer.set(ValueLayout.JAVA_INT_UNALIGNED, writePosition, size);
+ MemorySegment.copy(element, 0, writer, writePosition + Integer.BYTES, size);
+ writePosition += size + Integer.BYTES;
+ this.size.incrementAndGet();
+ return true;
+ }
+
+ public MemorySegment poll() {
+ if (readPosition >= fixedSize) {
+ return null;
+ }
+ if (size.decrementAndGet() < 0) {
+ size.incrementAndGet();
+ return null;
+ }
+ int size = writer.get(ValueLayout.JAVA_INT_UNALIGNED, readPosition);
+ var segment = arena.allocate(size);
+ MemorySegment.copy(writer, readPosition + Integer.BYTES, segment, 0, size);
+ readPosition += size + Integer.BYTES;
+ return segment;
+ }
+
+ public MemorySegment remove() {
+ var polled = poll();
+ if (polled == null) throw new NoSuchElementException();
+ return polled;
+ }
+
+ public int size() {
+ return Math.max(0, size.get());
+ }
+}
diff --git a/src/test/java/it/cavallium/filequeue/TestQueueToConsumer.java b/src/test/java/it/cavallium/filequeue/TestQueueToConsumer.java
index 5875315..8ef78b5 100644
--- a/src/test/java/it/cavallium/filequeue/TestQueueToConsumer.java
+++ b/src/test/java/it/cavallium/filequeue/TestQueueToConsumer.java
@@ -1,26 +1,28 @@
package it.cavallium.filequeue;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-public class TestQueueToConsumer {
+public abstract class TestQueueToConsumer {
+
+ public abstract SimpleQueue getSimpleQueue();
@Test
public void test() {
- var q = new SimpleQueueJava(new ConcurrentLinkedDeque<>());
+ var q = getSimpleQueue();
AtomicBoolean ab = new AtomicBoolean();
- try (var qtc = new QueueToConsumer(q, new QueueConsumer() {
- @Override
- public boolean tryConsume(String value) {
- System.out.println("value:" + value + " thread: " + Thread.currentThread());
- if (ab.get()) {
- ab.set(false);
- return false;
- } else {
- ab.set(true);
- return true;
- }
+ try (var qtc = new QueueToConsumer<>(q, value -> {
+ if (ab.get()) {
+ ab.set(false);
+ return false;
+ } else {
+ ab.set(true);
+ return true;
}
})) {
qtc.startQueue();
@@ -34,4 +36,46 @@ public class TestQueueToConsumer {
throw new RuntimeException(e);
}
}
+
+ @Test
+ public void testConcurrency() {
+ var q = getSimpleQueue();
+ AtomicBoolean ab = new AtomicBoolean();
+ CountDownLatch cdl = new CountDownLatch(1);
+ var referenceQueue = IntStream
+ .range(0, 1000)
+ .mapToObj(Integer::toString)
+ .collect(Collectors.toCollection(ConcurrentLinkedDeque::new));
+ try (var qtc = new QueueToConsumer<>(q, value -> {
+ if (ab.get()) {
+ ab.set(false);
+ return false;
+ } else {
+ ab.set(true);
+ Assertions.assertEquals(referenceQueue.pop(), value);
+ if (referenceQueue.isEmpty()) {
+ cdl.countDown();
+ }
+ return true;
+ }
+ })) {
+ var t = new Thread(() -> {
+ while (!Thread.interrupted()) {
+ qtc.runStep(false);
+ }
+ });
+ t.setDaemon(true);
+ t.setName("step");
+ t.start();
+ for (String s : referenceQueue.toArray(String[]::new)) {
+ qtc.add(s);
+ }
+ cdl.await();
+ t.interrupt();
+ t.join();
+ Assertions.assertEquals(0, referenceQueue.size());
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/src/test/java/it/cavallium/filequeue/TestQueueToConsumerJava.java b/src/test/java/it/cavallium/filequeue/TestQueueToConsumerJava.java
new file mode 100644
index 0000000..c460438
--- /dev/null
+++ b/src/test/java/it/cavallium/filequeue/TestQueueToConsumerJava.java
@@ -0,0 +1,287 @@
+package it.cavallium.filequeue;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestQueueToConsumerJava extends TestQueueToConsumer {
+
+ @Override
+ public SimpleQueue getSimpleQueue() {
+ return new SimpleQueueJava<>(new ConcurrentLinkedDeque<>());
+ }
+
+ @Test
+ public void testNoQueue() {
+ var q = new SimpleQueueJava<>(new Queue() {
+ @Override
+ public boolean add(String s) {
+ Assertions.fail();
+ return false;
+ }
+
+ @Override
+ public boolean offer(String s) {
+ Assertions.fail();
+ return false;
+ }
+
+ @Override
+ public String remove() {
+ Assertions.fail();
+ return null;
+ }
+
+ @Override
+ public String poll() {
+ return null;
+ }
+
+ @Override
+ public String element() {
+ Assertions.fail();
+ return null;
+ }
+
+ @Override
+ public String peek() {
+ return null;
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return true;
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ Assertions.fail();
+ return false;
+ }
+
+ @Override
+ public Iterator iterator() {
+ Assertions.fail();
+ return null;
+ }
+
+ @Override
+ public Object[] toArray() {
+ Assertions.fail();
+ return new Object[0];
+ }
+
+ @Override
+ public T[] toArray(T[] a) {
+ Assertions.fail();
+ return null;
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ Assertions.fail();
+ return false;
+ }
+
+ @Override
+ public boolean containsAll(Collection> c) {
+ Assertions.fail();
+ return false;
+ }
+
+ @Override
+ public boolean addAll(Collection extends String> c) {
+ Assertions.fail();
+ return false;
+ }
+
+ @Override
+ public boolean removeAll(Collection> c) {
+ Assertions.fail();
+ return false;
+ }
+
+ @Override
+ public boolean retainAll(Collection> c) {
+ Assertions.fail();
+ return false;
+ }
+
+ @Override
+ public void clear() {
+ }
+ });
+ AtomicReference lastConsumed = new AtomicReference<>();
+ try (var consumer = new QueueToConsumer<>(q, value -> {
+ lastConsumed.set(value);
+ return true;
+ })) {
+ for (int i = 0; i < 10_000; i++) {
+ var s = Integer.toString(i);
+ consumer.add(s);
+ Assertions.assertEquals(s, lastConsumed.get());
+ }
+ }
+ }
+
+ @Test
+ public void testSmallQueueThenNone() {
+ var maxQueueSize = 10;
+ var shouldReject = new AtomicBoolean(true);
+ var q = new SimpleQueueJava<>(new Queue() {
+ final Queue smallQueue = new ArrayDeque<>(maxQueueSize);
+ @Override
+ public boolean add(String s) {
+ var added = smallQueue.add(s);
+ Assertions.assertTrue(smallQueue.size() <= maxQueueSize);
+ return added;
+ }
+
+ @Override
+ public boolean offer(String s) {
+ var added = smallQueue.offer(s);
+ Assertions.assertTrue(smallQueue.size() <= maxQueueSize);
+ return added;
+ }
+
+ @Override
+ public String remove() {
+ var removed = smallQueue.remove();
+ Assertions.assertTrue(smallQueue.size() <= maxQueueSize);
+ return removed;
+ }
+
+ @Override
+ public String poll() {
+ var removed = smallQueue.poll();
+ Assertions.assertTrue(smallQueue.size() <= maxQueueSize);
+ return removed;
+ }
+
+ @Override
+ public String element() {
+ var element = smallQueue.element();
+ Assertions.assertTrue(smallQueue.size() <= maxQueueSize);
+ return element;
+ }
+
+ @Override
+ public String peek() {
+ var element = smallQueue.peek();
+ Assertions.assertTrue(smallQueue.size() <= maxQueueSize);
+ return element;
+ }
+
+ @Override
+ public int size() {
+ return smallQueue.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return smallQueue.isEmpty();
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ Assertions.fail();
+ return false;
+ }
+
+ @Override
+ public Iterator iterator() {
+ Assertions.fail();
+ return null;
+ }
+
+ @Override
+ public Object[] toArray() {
+ Assertions.fail();
+ return new Object[0];
+ }
+
+ @Override
+ public T[] toArray(T[] a) {
+ Assertions.fail();
+ return null;
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ Assertions.fail();
+ return false;
+ }
+
+ @Override
+ public boolean containsAll(Collection> c) {
+ Assertions.fail();
+ return false;
+ }
+
+ @Override
+ public boolean addAll(Collection extends String> c) {
+ Assertions.fail();
+ return false;
+ }
+
+ @Override
+ public boolean removeAll(Collection> c) {
+ Assertions.fail();
+ return false;
+ }
+
+ @Override
+ public boolean retainAll(Collection> c) {
+ Assertions.fail();
+ return false;
+ }
+
+ @Override
+ public void clear() {
+ smallQueue.clear();
+ }
+ });
+ AtomicReference lastConsumed = new AtomicReference<>();
+ try (var consumer = new QueueToConsumer<>(q, value -> {
+ if (shouldReject.get()) {
+ lastConsumed.set(null);
+ return false;
+ } else {
+ lastConsumed.set(value);
+ return true;
+ }
+ })) {
+ for (int i = 0; i < maxQueueSize; i++) {
+ var s = Integer.toString(i);
+ consumer.add(s);
+ Assertions.assertNull(lastConsumed.get());
+ }
+ shouldReject.set(false);
+ for (int i = 0; i < maxQueueSize; i++) {
+ var s = Integer.toString(i);
+ consumer.runStep(false);
+ Assertions.assertEquals(s, lastConsumed.get());
+ Assertions.assertEquals(maxQueueSize - i - 1, q.size());
+ }
+ Assertions.assertEquals(0, q.size());
+ for (int i = maxQueueSize; i < 10_000; i++) {
+ var s = Integer.toString(i);
+ consumer.add(s);
+
+ Assertions.assertEquals(s, lastConsumed.get());
+ Assertions.assertEquals(0, q.size());
+ }
+ }
+ }
+}
diff --git a/src/test/java/it/cavallium/filequeue/TestQueueToConsumerMemorySegment.java b/src/test/java/it/cavallium/filequeue/TestQueueToConsumerMemorySegment.java
new file mode 100644
index 0000000..b5229a2
--- /dev/null
+++ b/src/test/java/it/cavallium/filequeue/TestQueueToConsumerMemorySegment.java
@@ -0,0 +1,54 @@
+package it.cavallium.filequeue;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+public class TestQueueToConsumerMemorySegment extends TestQueueToConsumer {
+
+ private static final long SEGMENT_SIZE = 24;
+ private Path dir;
+
+ @BeforeEach
+ public void setup() throws IOException {
+ dir = Files.createTempDirectory("queue");
+ }
+
+ @AfterEach
+ public void tearDown() throws IOException {
+ Path pathToBeDeleted = dir;
+ try (var fs = Files.walk(pathToBeDeleted)) {
+ fs.sorted(Comparator.reverseOrder())
+ .forEach(f -> {
+ try {
+ Files.deleteIfExists(f);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+
+ @Override
+ public SimpleQueue getSimpleQueue() {
+ try {
+ return new SimpleQueueMemorySegment<>(dir, "test", new Serializer<>() {
+ @Override
+ public byte[] serialize(String data) {
+ return data.getBytes(StandardCharsets.UTF_8);
+ }
+ }, new Deserializer<>() {
+ @Override
+ public String deserialize(byte[] data) {
+ return new String(data, StandardCharsets.UTF_8);
+ }
+ }, SEGMENT_SIZE);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}