package it.cavallium.filequeue; import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; import java.lang.foreign.ValueLayout; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayDeque; import java.util.HashSet; import java.util.NoSuchElementException; import java.util.Set; /** * Thread safe queue */ public class SimpleQueueMemorySegment implements SimpleQueue, Closeable { private static final Set REGISTRY = new HashSet<>(); private final ArrayDeque queueSegments = new ArrayDeque<>(); private final Path directory; private final String name; private final Serializer serializer; private final Deserializer deserializer; private final long segmentSize; public SimpleQueueMemorySegment(Path directory, String name, Serializer serializer, Deserializer deserializer, long segmentSize) throws IOException { this.name = name; this.directory = directory; this.serializer = serializer; this.deserializer = deserializer; this.segmentSize = segmentSize; synchronized (REGISTRY) { if (!REGISTRY.add(name)) { throw new IllegalArgumentException("Database already registered: " + name); } } if (Files.notExists(directory)) { Files.createDirectories(directory); } } private Path generateQueuePath() { return directory.resolve("queue_%s_%d.queue.bin".formatted(name, System.nanoTime())); } @Override public void add(T element) { byte[] serialized; try { serialized = serializer.serialize(element); } catch (IOException e) { throw new UncheckedIOException(e); } try (var arena = Arena.ofConfined()) { var memorySegment = arena.allocateFrom(ValueLayout.JAVA_BYTE, serialized); try { if (!getQueueSegmentForWrite().offer(memorySegment)) { expandQueueSegmentForWrite().add(memorySegment); } } catch (IOException ex) { throw new UncheckedIOException(ex); } } } private SimpleQueueMemorySegmentFixedSize expandQueueSegmentForWrite() throws IOException { synchronized (queueSegments) { var queueMemorySegment = new SimpleQueueMemorySegmentFixedSize(segmentSize, generateQueuePath()); // Hibernate the middle segments, if there will be 3 or more segments after the expansion if (queueSegments.size() >= 2) { queueSegments.getLast().hibernate(); } queueSegments.add(queueMemorySegment); return queueMemorySegment; } } private SimpleQueueMemorySegmentFixedSize getQueueSegmentForWrite() throws IOException { synchronized (queueSegments) { var first = queueSegments.peekLast(); if (first == null) { first = expandQueueSegmentForWrite(); } return first; } } private SimpleQueueMemorySegmentFixedSize getQueueSegmentForRead() throws IOException { synchronized (queueSegments) { var last = queueSegments.peekFirst(); if (last == null) { last = expandQueueSegmentForWrite(); } return last; } } private boolean popQueueSegmentForRead() throws IOException { synchronized (queueSegments) { if (queueSegments.size() > 1) { var polled = queueSegments.poll(); if (polled != null) { polled.close(); return true; } } } return false; } @Override public T remove() { MemorySegment removed; try (var arena = Arena.ofConfined()) { removed = getQueueSegmentForRead().poll(arena); if (removed == null) { if (popQueueSegmentForRead()) { removed = getQueueSegmentForRead().poll(arena); } } if (removed == null) throw new NoSuchElementException(); return readObject(removed.toArray(ValueLayout.JAVA_BYTE)); } catch (IOException ex) { throw new UncheckedIOException(ex); } } private T readObject(byte[] byteBuffer) { try { return deserializer.deserialize(byteBuffer); } catch (IOException e) { throw new UncheckedIOException(e); } } @Override public int size() { return this.queueSegments.stream().mapToInt(SimpleQueueMemorySegmentFixedSize::size).sum(); } @Override public void close() { try { this.queueSegments.forEach(qs -> { try { qs.close(); } catch (IOException e) { e.printStackTrace(); } }); } catch (Exception e) { throw new RuntimeException(e); } finally { synchronized (REGISTRY) { REGISTRY.remove(name); } } } }