Add memory segments preview

This commit is contained in:
Andrea Cavalli 2023-10-20 12:52:18 +02:00
parent 90b6c62da8
commit c91507381c
15 changed files with 781 additions and 40 deletions

11
pom.xml
View File

@ -4,7 +4,7 @@
<groupId>it.cavallium</groupId>
<artifactId>filequeue</artifactId>
<name>file queue project</name>
<version>3.1.5</version>
<version>3.1.6</version>
<packaging>jar</packaging>
<description>Light weight, high performance, simple, reliable and persistent queue</description>
<modelVersion>4.0.0</modelVersion>
@ -32,8 +32,9 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>11</source>
<target>11</target>
<source>21</source>
<target>21</target>
<compilerArgs>--enable-preview</compilerArgs>
</configuration>
</plugin>
</plugins>
@ -47,12 +48,12 @@
<dependency>
<groupId>org.lmdbjava</groupId>
<artifactId>lmdbjava</artifactId>
<version>0.8.2</version>
<version>0.8.3</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.9.0</version>
<version>5.9.2</version>
<scope>test</scope>
</dependency>
</dependencies>

View File

@ -1,11 +1,9 @@
package it.cavallium.filequeue;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.Map;
public interface Deserializer<T> {

View File

@ -52,4 +52,9 @@ public final class DiskQueueToConsumer<T> implements IQueueToConsumer<T> {
public void startQueue() {
queue.startQueue();
}
@Override
public void runStep(boolean park) {
queue.runStep(park);
}
}

View File

@ -0,0 +1,24 @@
package it.cavallium.filequeue;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
class FileUtils {
public static void deleteDirectory(Path dir) throws IOException {
try (var fs = Files.walk(dir)) {
fs.sorted(Comparator.reverseOrder()).forEach(f -> {
try {
Files.deleteIfExists(f);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
} catch (UncheckedIOException ex) {
throw ex.getCause();
}
}
}

View File

@ -10,4 +10,6 @@ public interface IQueueToConsumer<T> extends Closeable {
void close();
void startQueue();
void runStep(boolean park);
}

View File

@ -35,4 +35,9 @@ public final class LMDBQueueToConsumer<T> implements IQueueToConsumer<T> {
public void startQueue() {
queue.startQueue();
}
@Override
public void runStep(boolean park) {
queue.runStep(park);
}
}

View File

@ -0,0 +1,58 @@
package it.cavallium.filequeue;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
public final class MemorySegmentQueueToConsumer<T> implements IQueueToConsumer<T> {
private final SimpleQueueMemorySegment<T> queueMemorySegment;
private final QueueToConsumer<T> queue;
private final Path dir;
public MemorySegmentQueueToConsumer(Path file,
String dbName,
Serializer<T> serializer,
Deserializer<T> deserializer,
QueueConsumer<T> consumer,
long segmentSize) {
try {
this.dir = file.resolve(dbName);
this.queueMemorySegment = new SimpleQueueMemorySegment<>(dir,
dbName,
serializer,
deserializer,
segmentSize
);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
this.queue = new QueueToConsumer<>(queueMemorySegment, consumer);
}
@Override
public void add(T value) {
queue.add(value);
}
@Override
public void close() {
queue.close();
queueMemorySegment.close();
try {
FileUtils.deleteDirectory(dir);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void startQueue() {
queue.startQueue();
}
@Override
public void runStep(boolean park) {
queue.runStep(park);
}
}

View File

@ -70,27 +70,40 @@ class QueueToConsumer<T> implements IQueueToConsumer<T> {
public void run() {
try {
while (!closed) {
T element;
boolean shouldRemove = afterAddQueued.getAndUpdate(n -> n > 0 ? n - 1 : 0) > 0;
if (shouldRemove) {
element = queue.remove();
long nextDelay = BACKOFF_NS;
while (!closed && !consumer.tryConsume(element)) {
LockSupport.parkNanos(nextDelay);
if (nextDelay + BACKOFF_NS <= MAX_BACKOFF_NS) {
nextDelay += BACKOFF_NS;
} else if (nextDelay < MAX_BACKOFF_NS) {
nextDelay = MAX_BACKOFF_NS;
}
}
preAddQueued.updateAndGet(n -> n > 0 ? n - 1 : 0);
} else {
LockSupport.parkNanos(HALF_SECOND_NS);
}
runStepInternal(true);
}
} finally {
closed = true;
}
}
}
public void runStep(boolean park) {
if (manager != null) throw new UnsupportedOperationException("This queue is managed");
runStepInternal(false);
}
private void runStepInternal(boolean park) {
T element;
boolean shouldRemove = afterAddQueued.getAndUpdate(n -> n > 0 ? n - 1 : 0) > 0;
if (shouldRemove) {
element = queue.remove();
long nextDelay = BACKOFF_NS;
while (!closed && !consumer.tryConsume(element)) {
if (park) {
LockSupport.parkNanos(nextDelay);
}
if (nextDelay + BACKOFF_NS <= MAX_BACKOFF_NS) {
nextDelay += BACKOFF_NS;
} else if (nextDelay < MAX_BACKOFF_NS) {
nextDelay = MAX_BACKOFF_NS;
}
}
preAddQueued.updateAndGet(n -> n > 0 ? n - 1 : 0);
} else {
if (park) {
LockSupport.parkNanos(HALF_SECOND_NS);
}
}
}
}

View File

@ -3,7 +3,7 @@ package it.cavallium.filequeue;
/**
* The queue must be thread-safe
*/
interface SimpleQueue<T> {
public interface SimpleQueue<T> {
void add(T element);

View File

@ -6,8 +6,6 @@ import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.lmdbjava.Dbi;
import org.lmdbjava.DbiFlags;
import org.lmdbjava.Env;

View File

@ -0,0 +1,163 @@
package it.cavallium.filequeue;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.NoSuchElementException;
import java.util.Set;
/**
* Thread safe queue
*/
public class SimpleQueueMemorySegment<T> implements SimpleQueue<T>, Closeable {
private static final Set<String> REGISTRY = new HashSet<>();
private final ArrayDeque<SimpleQueueMemorySegmentFixedSize> queueSegments = new ArrayDeque<>();
private final Path directory;
private final String name;
private final Serializer<T> serializer;
private final Deserializer<T> deserializer;
private final Arena arena;
private final long segmentSize;
public SimpleQueueMemorySegment(Path directory,
String name,
Serializer<T> serializer,
Deserializer<T> deserializer,
long segmentSize) throws IOException {
this.arena = Arena.ofShared();
this.name = name;
this.directory = directory;
this.serializer = serializer;
this.deserializer = deserializer;
this.segmentSize = segmentSize;
synchronized (REGISTRY) {
if (!REGISTRY.add(name)) {
throw new IllegalArgumentException("Database already registered: " + name);
}
}
if (Files.notExists(directory)) {
Files.createDirectories(directory);
}
}
private Path generateQueuePath() {
return directory.resolve("queue_%s_%d.queue.bin".formatted(name, System.nanoTime()));
}
@Override
public void add(T element) {
byte[] serialized;
try {
serialized = serializer.serialize(element);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
var memorySegment = arena.allocateArray(ValueLayout.JAVA_BYTE, serialized);
try {
if (!getQueueSegmentForWrite().offer(memorySegment)) {
expandQueueSegmentForWrite().add(memorySegment);
}
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
private SimpleQueueMemorySegmentFixedSize expandQueueSegmentForWrite() throws IOException {
synchronized (queueSegments) {
var queueMemorySegment = new SimpleQueueMemorySegmentFixedSize(arena, segmentSize, generateQueuePath());
queueSegments.add(queueMemorySegment);
return queueMemorySegment;
}
}
private SimpleQueueMemorySegmentFixedSize getQueueSegmentForWrite() throws IOException {
synchronized (queueSegments) {
var first = queueSegments.peekLast();
if (first == null) {
first = expandQueueSegmentForWrite();
}
return first;
}
}
private SimpleQueueMemorySegmentFixedSize getQueueSegmentForRead() throws IOException {
synchronized (queueSegments) {
var last = queueSegments.peekFirst();
if (last == null) {
last = expandQueueSegmentForWrite();
}
return last;
}
}
private boolean popQueueSegmentForRead() throws IOException {
synchronized (queueSegments) {
if (queueSegments.size() > 1) {
var polled = queueSegments.poll();
if (polled != null) {
polled.close();
return true;
}
}
}
return false;
}
@Override
public T remove() {
MemorySegment removed;
try {
removed = getQueueSegmentForRead().poll();
if (removed == null) {
if (popQueueSegmentForRead()) {
removed = getQueueSegmentForRead().poll();
}
}
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
if (removed == null) throw new NoSuchElementException();
return readObject(removed.toArray(ValueLayout.JAVA_BYTE));
}
private T readObject(byte[] byteBuffer) {
try {
return deserializer.deserialize(byteBuffer);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public int size() {
return 0;
}
@Override
public void close() {
try {
this.queueSegments.forEach(qs -> {
try {
qs.close();
} catch (IOException e) {
e.printStackTrace();
}
});
this.arena.close();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
synchronized (REGISTRY) {
REGISTRY.remove(name);
}
}
}
}

View File

@ -0,0 +1,89 @@
package it.cavallium.filequeue;
import java.io.Closeable;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
public class SimpleQueueMemorySegmentFixedSize implements Closeable {
private final Path filePath;
private final FileChannel ch;
private final MemorySegment writer;
private final Arena arena;
private final long fixedSize;
private long readPosition = 0;
private long writePosition = 0;
private final AtomicInteger size = new AtomicInteger();
public SimpleQueueMemorySegmentFixedSize(Arena arena, long fixedSize, Path filePath) throws IOException {
this.arena = arena;
this.filePath = filePath;
this.fixedSize = fixedSize;
Files.deleteIfExists(filePath);
this.ch = FileChannel.open(filePath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE, StandardOpenOption.READ);
ch.truncate(fixedSize);
this.writer = ch.map(MapMode.READ_WRITE, 0L, fixedSize, arena);
}
@Override
public void close() throws IOException {
ch.close();
Files.deleteIfExists(filePath);
}
public void add(MemorySegment element) {
if (!offer(element)) {
throw new IllegalStateException("Can't add the element, not enough space");
}
}
/**
* @return true if there is enough space, false otherwise
*/
public boolean offer(MemorySegment element) {
int size = Math.toIntExact(element.byteSize());
if (writePosition + size + Integer.BYTES > fixedSize) {
if (writePosition == 0) throw new UnsupportedOperationException("Entity too large! " + size + " bytes");
return false;
}
writer.set(ValueLayout.JAVA_INT_UNALIGNED, writePosition, size);
MemorySegment.copy(element, 0, writer, writePosition + Integer.BYTES, size);
writePosition += size + Integer.BYTES;
this.size.incrementAndGet();
return true;
}
public MemorySegment poll() {
if (readPosition >= fixedSize) {
return null;
}
if (size.decrementAndGet() < 0) {
size.incrementAndGet();
return null;
}
int size = writer.get(ValueLayout.JAVA_INT_UNALIGNED, readPosition);
var segment = arena.allocate(size);
MemorySegment.copy(writer, readPosition + Integer.BYTES, segment, 0, size);
readPosition += size + Integer.BYTES;
return segment;
}
public MemorySegment remove() {
var polled = poll();
if (polled == null) throw new NoSuchElementException();
return polled;
}
public int size() {
return Math.max(0, size.get());
}
}

View File

@ -1,26 +1,28 @@
package it.cavallium.filequeue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class TestQueueToConsumer {
public abstract class TestQueueToConsumer {
public abstract SimpleQueue<String> getSimpleQueue();
@Test
public void test() {
var q = new SimpleQueueJava<String>(new ConcurrentLinkedDeque<>());
var q = getSimpleQueue();
AtomicBoolean ab = new AtomicBoolean();
try (var qtc = new QueueToConsumer<String>(q, new QueueConsumer<String>() {
@Override
public boolean tryConsume(String value) {
System.out.println("value:" + value + " thread: " + Thread.currentThread());
if (ab.get()) {
ab.set(false);
return false;
} else {
ab.set(true);
return true;
}
try (var qtc = new QueueToConsumer<>(q, value -> {
if (ab.get()) {
ab.set(false);
return false;
} else {
ab.set(true);
return true;
}
})) {
qtc.startQueue();
@ -34,4 +36,46 @@ public class TestQueueToConsumer {
throw new RuntimeException(e);
}
}
@Test
public void testConcurrency() {
var q = getSimpleQueue();
AtomicBoolean ab = new AtomicBoolean();
CountDownLatch cdl = new CountDownLatch(1);
var referenceQueue = IntStream
.range(0, 1000)
.mapToObj(Integer::toString)
.collect(Collectors.toCollection(ConcurrentLinkedDeque::new));
try (var qtc = new QueueToConsumer<>(q, value -> {
if (ab.get()) {
ab.set(false);
return false;
} else {
ab.set(true);
Assertions.assertEquals(referenceQueue.pop(), value);
if (referenceQueue.isEmpty()) {
cdl.countDown();
}
return true;
}
})) {
var t = new Thread(() -> {
while (!Thread.interrupted()) {
qtc.runStep(false);
}
});
t.setDaemon(true);
t.setName("step");
t.start();
for (String s : referenceQueue.toArray(String[]::new)) {
qtc.add(s);
}
cdl.await();
t.interrupt();
t.join();
Assertions.assertEquals(0, referenceQueue.size());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,287 @@
package it.cavallium.filequeue;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class TestQueueToConsumerJava extends TestQueueToConsumer {
@Override
public SimpleQueue<String> getSimpleQueue() {
return new SimpleQueueJava<>(new ConcurrentLinkedDeque<>());
}
@Test
public void testNoQueue() {
var q = new SimpleQueueJava<>(new Queue<String>() {
@Override
public boolean add(String s) {
Assertions.fail();
return false;
}
@Override
public boolean offer(String s) {
Assertions.fail();
return false;
}
@Override
public String remove() {
Assertions.fail();
return null;
}
@Override
public String poll() {
return null;
}
@Override
public String element() {
Assertions.fail();
return null;
}
@Override
public String peek() {
return null;
}
@Override
public int size() {
return 0;
}
@Override
public boolean isEmpty() {
return true;
}
@Override
public boolean contains(Object o) {
Assertions.fail();
return false;
}
@Override
public Iterator<String> iterator() {
Assertions.fail();
return null;
}
@Override
public Object[] toArray() {
Assertions.fail();
return new Object[0];
}
@Override
public <T> T[] toArray(T[] a) {
Assertions.fail();
return null;
}
@Override
public boolean remove(Object o) {
Assertions.fail();
return false;
}
@Override
public boolean containsAll(Collection<?> c) {
Assertions.fail();
return false;
}
@Override
public boolean addAll(Collection<? extends String> c) {
Assertions.fail();
return false;
}
@Override
public boolean removeAll(Collection<?> c) {
Assertions.fail();
return false;
}
@Override
public boolean retainAll(Collection<?> c) {
Assertions.fail();
return false;
}
@Override
public void clear() {
}
});
AtomicReference<String> lastConsumed = new AtomicReference<>();
try (var consumer = new QueueToConsumer<>(q, value -> {
lastConsumed.set(value);
return true;
})) {
for (int i = 0; i < 10_000; i++) {
var s = Integer.toString(i);
consumer.add(s);
Assertions.assertEquals(s, lastConsumed.get());
}
}
}
@Test
public void testSmallQueueThenNone() {
var maxQueueSize = 10;
var shouldReject = new AtomicBoolean(true);
var q = new SimpleQueueJava<>(new Queue<String>() {
final Queue<String> smallQueue = new ArrayDeque<>(maxQueueSize);
@Override
public boolean add(String s) {
var added = smallQueue.add(s);
Assertions.assertTrue(smallQueue.size() <= maxQueueSize);
return added;
}
@Override
public boolean offer(String s) {
var added = smallQueue.offer(s);
Assertions.assertTrue(smallQueue.size() <= maxQueueSize);
return added;
}
@Override
public String remove() {
var removed = smallQueue.remove();
Assertions.assertTrue(smallQueue.size() <= maxQueueSize);
return removed;
}
@Override
public String poll() {
var removed = smallQueue.poll();
Assertions.assertTrue(smallQueue.size() <= maxQueueSize);
return removed;
}
@Override
public String element() {
var element = smallQueue.element();
Assertions.assertTrue(smallQueue.size() <= maxQueueSize);
return element;
}
@Override
public String peek() {
var element = smallQueue.peek();
Assertions.assertTrue(smallQueue.size() <= maxQueueSize);
return element;
}
@Override
public int size() {
return smallQueue.size();
}
@Override
public boolean isEmpty() {
return smallQueue.isEmpty();
}
@Override
public boolean contains(Object o) {
Assertions.fail();
return false;
}
@Override
public Iterator<String> iterator() {
Assertions.fail();
return null;
}
@Override
public Object[] toArray() {
Assertions.fail();
return new Object[0];
}
@Override
public <T> T[] toArray(T[] a) {
Assertions.fail();
return null;
}
@Override
public boolean remove(Object o) {
Assertions.fail();
return false;
}
@Override
public boolean containsAll(Collection<?> c) {
Assertions.fail();
return false;
}
@Override
public boolean addAll(Collection<? extends String> c) {
Assertions.fail();
return false;
}
@Override
public boolean removeAll(Collection<?> c) {
Assertions.fail();
return false;
}
@Override
public boolean retainAll(Collection<?> c) {
Assertions.fail();
return false;
}
@Override
public void clear() {
smallQueue.clear();
}
});
AtomicReference<String> lastConsumed = new AtomicReference<>();
try (var consumer = new QueueToConsumer<>(q, value -> {
if (shouldReject.get()) {
lastConsumed.set(null);
return false;
} else {
lastConsumed.set(value);
return true;
}
})) {
for (int i = 0; i < maxQueueSize; i++) {
var s = Integer.toString(i);
consumer.add(s);
Assertions.assertNull(lastConsumed.get());
}
shouldReject.set(false);
for (int i = 0; i < maxQueueSize; i++) {
var s = Integer.toString(i);
consumer.runStep(false);
Assertions.assertEquals(s, lastConsumed.get());
Assertions.assertEquals(maxQueueSize - i - 1, q.size());
}
Assertions.assertEquals(0, q.size());
for (int i = maxQueueSize; i < 10_000; i++) {
var s = Integer.toString(i);
consumer.add(s);
Assertions.assertEquals(s, lastConsumed.get());
Assertions.assertEquals(0, q.size());
}
}
}
}

View File

@ -0,0 +1,54 @@
package it.cavallium.filequeue;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
public class TestQueueToConsumerMemorySegment extends TestQueueToConsumer {
private static final long SEGMENT_SIZE = 24;
private Path dir;
@BeforeEach
public void setup() throws IOException {
dir = Files.createTempDirectory("queue");
}
@AfterEach
public void tearDown() throws IOException {
Path pathToBeDeleted = dir;
try (var fs = Files.walk(pathToBeDeleted)) {
fs.sorted(Comparator.reverseOrder())
.forEach(f -> {
try {
Files.deleteIfExists(f);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
}
@Override
public SimpleQueue<String> getSimpleQueue() {
try {
return new SimpleQueueMemorySegment<>(dir, "test", new Serializer<>() {
@Override
public byte[] serialize(String data) {
return data.getBytes(StandardCharsets.UTF_8);
}
}, new Deserializer<>() {
@Override
public String deserialize(byte[] data) {
return new String(data, StandardCharsets.UTF_8);
}
}, SEGMENT_SIZE);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}