Remove dependencies, modularize, add bytebuffer implementation

This commit is contained in:
Andrea Cavalli 2023-10-24 15:42:25 +02:00
parent 4a41699551
commit fbaf2586a2
13 changed files with 393 additions and 340 deletions

10
pom.xml
View File

@ -40,16 +40,6 @@
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.squareup.tape2</groupId>
<artifactId>tape</artifactId>
<version>2.0.0-beta1</version>
</dependency>
<dependency>
<groupId>org.lmdbjava</groupId>
<artifactId>lmdbjava</artifactId>
<version>0.8.3</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>

View File

@ -1,60 +0,0 @@
package it.cavallium.filequeue;
import com.squareup.tape2.QueueFile;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
public final class DiskQueueToConsumer<T> implements IQueueToConsumer<T> {
private final QueueToConsumer<T> queue;
private final QueueFile queueFile;
public DiskQueueToConsumer(Path file,
boolean clear,
Serializer<T> serializer,
Deserializer<T> deserializer,
QueueConsumer<T> consumer) throws IOException {
QueueFile queueFile;
try {
queueFile = new QueueFile.Builder(file.toFile()).zero(false).build();
if (clear) {
queueFile.clear();
}
} catch (Throwable ex) {
try {
Files.deleteIfExists(file);
} catch (Throwable ex2) {
}
queueFile = new QueueFile.Builder(file.toFile()).zero(false).build();
}
this.queueFile = queueFile;
this.queue = new QueueToConsumer<>(new SimpleQueueFile<>(queueFile, serializer, deserializer), consumer);
}
@Override
public void add(T value) {
queue.add(value);
}
@Override
public void close() {
queue.close();
try {
queueFile.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void startQueue() {
queue.startQueue();
}
@Override
public void runStep(boolean park) {
queue.runStep(park);
}
}

View File

@ -1,58 +0,0 @@
package it.cavallium.filequeue;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import org.lmdbjava.ByteArrayProxy;
import org.lmdbjava.Env;
import org.lmdbjava.EnvFlags;
public class LMDBEnvManager {
private static final Map<Path, Env<byte[]>> ENVS = new HashMap<>();
public synchronized static Env<byte[]> ofPath(Path path) {
return ENVS.compute(path, (p, env) -> {
if (env != null) {
if (env.isClosed()) {
env = null;
}
}
if (env != null) {
return env;
}
if (Files.notExists(path)) {
try {
Files.createDirectories(path);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
try {
return open(path.toFile());
} catch (Throwable ex) {
try {
Files.walk(path)
.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(File::delete);
} catch (IOException e) {
throw new RuntimeException(e);
}
return open(path.toFile());
}
});
}
private static Env<byte[]> open(File path) {
return Env.create(ByteArrayProxy.PROXY_BA)
// 128GiB
.setMapSize(Integer.parseInt(System.getProperty("fq.writemap.size.gb", "32")) * 1024L * 1024 * 1024)
.setMaxReaders(1024)
.setMaxDbs(1024)
.open(path, EnvFlags.MDB_NOSYNC, EnvFlags.MDB_WRITEMAP, EnvFlags.MDB_NOMETASYNC);
}
}

View File

@ -1,48 +0,0 @@
package it.cavallium.filequeue;
import java.nio.file.Path;
import org.lmdbjava.Env;
public final class LMDBQueueToConsumer<T> implements IQueueToConsumer<T> {
private final Env<byte[]> env;
private final SimpleQueueLMDB<T> queueLMDB;
private final QueueToConsumer<T> queue;
public LMDBQueueToConsumer(Path file,
String dbName,
boolean clear,
Serializer<T> serializer,
Deserializer<T> deserializer,
QueueConsumer<T> consumer) {
this.env = LMDBEnvManager.ofPath(file);
this.queueLMDB = new SimpleQueueLMDB<>(env, dbName, serializer, deserializer);
this.queue = new QueueToConsumer<>(queueLMDB, consumer) {
@Override
protected String getQueueManagerName() {
return super.getQueueManagerName() + "-" + dbName;
}
};
}
@Override
public void add(T value) {
queue.add(value);
}
@Override
public void close() {
queue.close();
queueLMDB.close();
}
@Override
public void startQueue() {
queue.startQueue();
}
@Override
public void runStep(boolean park) {
queue.runStep(park);
}
}

View File

@ -0,0 +1,63 @@
package it.cavallium.filequeue;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
public final class QueueToConsumerFileChannel<T> implements IQueueToConsumer<T> {
private final SimpleQueueMemorySegment<T> queueMemorySegment;
private final QueueToConsumer<T> queue;
private final Path dir;
public QueueToConsumerFileChannel(Path file,
String dbName,
Serializer<T> serializer,
Deserializer<T> deserializer,
QueueConsumer<T> consumer,
long segmentSize) {
try {
this.dir = file.resolve(dbName);
this.queueMemorySegment = new SimpleQueueMemorySegment<>(dir,
dbName,
serializer,
deserializer,
segmentSize
);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
this.queue = new QueueToConsumer<>(queueMemorySegment, consumer) {
@Override
protected String getQueueManagerName() {
return super.getQueueManagerName() + "-" + dbName;
}
};
}
@Override
public void add(T value) {
queue.add(value);
}
@Override
public void close() {
queue.close();
queueMemorySegment.close();
try {
FileUtils.deleteDirectory(dir);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void startQueue() {
queue.startQueue();
}
@Override
public void runStep(boolean park) {
queue.runStep(park);
}
}

View File

@ -4,18 +4,18 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
public final class MemorySegmentQueueToConsumer<T> implements IQueueToConsumer<T> {
public final class QueueToConsumerMemorySegment<T> implements IQueueToConsumer<T> {
private final SimpleQueueMemorySegment<T> queueMemorySegment;
private final QueueToConsumer<T> queue;
private final Path dir;
public MemorySegmentQueueToConsumer(Path file,
String dbName,
Serializer<T> serializer,
Deserializer<T> deserializer,
QueueConsumer<T> consumer,
long segmentSize) {
public QueueToConsumerMemorySegment(Path file,
String dbName,
Serializer<T> serializer,
Deserializer<T> deserializer,
QueueConsumer<T> consumer,
long segmentSize) {
try {
this.dir = file.resolve(dbName);
this.queueMemorySegment = new SimpleQueueMemorySegment<>(dir,

View File

@ -1,47 +0,0 @@
package it.cavallium.filequeue;
import com.squareup.tape2.QueueFile;
import java.io.IOException;
import java.util.NoSuchElementException;
class SimpleQueueFile<T> implements SimpleQueue<T> {
private final QueueFile queueFile;
private final Serializer<T> ser;
private final Deserializer<T> des;
public SimpleQueueFile(QueueFile queueFile, Serializer<T> serializer, Deserializer<T> deserializer) {
this.queueFile = queueFile;
this.ser = serializer;
this.des = deserializer;
}
@Override
public synchronized void add(T element) {
try {
queueFile.add(ser.serialize(element));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public synchronized T remove() {
try {
byte[] element = queueFile.peek();
if (element == null) {
throw new NoSuchElementException("Queue is empty");
}
var deserialized = des.deserialize(element);
queueFile.remove();
return deserialized;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public synchronized int size() {
return queueFile.size();
}
}

View File

@ -0,0 +1,164 @@
package it.cavallium.filequeue;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.NoSuchElementException;
import java.util.Set;
/**
* Thread safe queue
*/
public class SimpleQueueFileChannel<T> implements SimpleQueue<T>, Closeable {
private static final Set<String> REGISTRY = new HashSet<>();
private final ArrayDeque<SimpleQueueFileChannelFixedSize> queueSegments = new ArrayDeque<>();
private final Path directory;
private final String name;
private final Serializer<T> serializer;
private final Deserializer<T> deserializer;
private final long segmentSize;
public SimpleQueueFileChannel(Path directory,
String name,
Serializer<T> serializer,
Deserializer<T> deserializer,
long segmentSize) throws IOException {
this.name = name;
this.directory = directory;
this.serializer = serializer;
this.deserializer = deserializer;
this.segmentSize = segmentSize;
synchronized (REGISTRY) {
if (!REGISTRY.add(name)) {
throw new IllegalArgumentException("Database already registered: " + name);
}
}
if (Files.notExists(directory)) {
Files.createDirectories(directory);
}
}
private Path generateQueuePath() {
return directory.resolve("queue_%s_%d.queue.bin".formatted(name, System.nanoTime()));
}
@Override
public void add(T element) {
byte[] serialized;
try {
serialized = serializer.serialize(element);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
var memorySegment = ByteBuffer.wrap(serialized);
try {
if (!getQueueSegmentForWrite().offer(memorySegment)) {
expandQueueSegmentForWrite().add(memorySegment);
}
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
private SimpleQueueFileChannelFixedSize expandQueueSegmentForWrite() throws IOException {
synchronized (queueSegments) {
var queueMemorySegment = new SimpleQueueFileChannelFixedSize(segmentSize, generateQueuePath());
// Hibernate the middle segments, if there will be 3 or more segments after the expansion
if (queueSegments.size() >= 2) {
queueSegments.getLast().hibernate();
}
queueSegments.add(queueMemorySegment);
return queueMemorySegment;
}
}
private SimpleQueueFileChannelFixedSize getQueueSegmentForWrite() throws IOException {
synchronized (queueSegments) {
var first = queueSegments.peekLast();
if (first == null) {
first = expandQueueSegmentForWrite();
}
return first;
}
}
private SimpleQueueFileChannelFixedSize getQueueSegmentForRead() throws IOException {
synchronized (queueSegments) {
var last = queueSegments.peekFirst();
if (last == null) {
last = expandQueueSegmentForWrite();
}
return last;
}
}
private boolean popQueueSegmentForRead() throws IOException {
synchronized (queueSegments) {
if (queueSegments.size() > 1) {
var polled = queueSegments.poll();
if (polled != null) {
polled.close();
return true;
}
}
}
return false;
}
@Override
public T remove() {
ByteBuffer removed;
try {
removed = getQueueSegmentForRead().poll();
if (removed == null) {
if (popQueueSegmentForRead()) {
removed = getQueueSegmentForRead().poll();
}
}
if (removed == null) throw new NoSuchElementException();
byte[] data = new byte[removed.remaining()];
removed.get(data);
return readObject(data);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
private T readObject(byte[] byteBuffer) {
try {
return deserializer.deserialize(byteBuffer);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public int size() {
return this.queueSegments.stream().mapToInt(SimpleQueueFileChannelFixedSize::size).sum();
}
@Override
public void close() {
try {
this.queueSegments.forEach(qs -> {
try {
qs.close();
} catch (IOException e) {
e.printStackTrace();
}
});
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
synchronized (REGISTRY) {
REGISTRY.remove(name);
}
}
}
}

View File

@ -0,0 +1,94 @@
package it.cavallium.filequeue;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
public class SimpleQueueFileChannelFixedSize implements Closeable {
private final Path filePath;
private final FileChannel ch;
private final MappedByteBuffer writer;
private final long fixedSize;
private int readPosition = 0;
private int writePosition = 0;
private final AtomicInteger size = new AtomicInteger();
public SimpleQueueFileChannelFixedSize(long fixedSize, Path filePath) throws IOException {
this.filePath = filePath;
this.fixedSize = fixedSize;
Files.deleteIfExists(filePath);
this.ch = FileChannel.open(filePath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE, StandardOpenOption.READ);
ch.truncate(fixedSize);
this.writer = ch.map(MapMode.READ_WRITE, 0L, fixedSize);
}
@Override
public void close() throws IOException {
ch.close();
Files.deleteIfExists(filePath);
}
public void add(ByteBuffer element) {
if (!offer(element)) {
throw new IllegalStateException("Can't add the element, not enough space");
}
}
/**
* @return true if there is enough space, false otherwise
*/
public boolean offer(ByteBuffer element) {
int size = Math.toIntExact(element.remaining());
if (writePosition + size + Integer.BYTES > fixedSize) {
if (writePosition == 0) throw new UnsupportedOperationException("Entity too large! " + size + " bytes");
return false;
}
writer.slice(writePosition, Integer.BYTES).putInt(size);
writer.put(writePosition + Integer.BYTES, element, 0, size);
writePosition += size + Integer.BYTES;
this.size.incrementAndGet();
return true;
}
public ByteBuffer poll() {
if (readPosition >= fixedSize) {
return null;
}
if (size.decrementAndGet() < 0) {
size.incrementAndGet();
return null;
}
int size = writer.slice(readPosition, Integer.BYTES).getInt();
var segment = ByteBuffer.allocate(size);
segment.put(0, writer, readPosition + Integer.BYTES, size);
readPosition += size + Integer.BYTES;
return segment;
}
public ByteBuffer remove() {
var polled = poll();
if (polled == null) throw new NoSuchElementException();
return polled;
}
public int size() {
return Math.max(0, size.get());
}
public void hibernate() {
try {
this.ch.force(false);
} catch (IOException e) {
e.printStackTrace();
}
}
}

View File

@ -1,108 +0,0 @@
package it.cavallium.filequeue;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
import org.lmdbjava.Dbi;
import org.lmdbjava.DbiFlags;
import org.lmdbjava.Env;
import org.lmdbjava.Txn;
/**
* Thread safe queue
*/
public class SimpleQueueLMDB<T> implements SimpleQueue<T>, Closeable {
private static final Set<String> REGISTRY = new HashSet<>();
private final Env<byte[]> env;
private final String name;
private final Dbi<byte[]> dbi;
private final Serializer<T> serializer;
private final Deserializer<T> deserializer;
/**
* The First index.
*/
private int firstIndex;
/**
* The Last index.
*/
private int lastIndex;
public SimpleQueueLMDB(Env<byte[]> env,
String name,
Serializer<T> serializer,
Deserializer<T> deserializer) {
this.env = env;
this.name = name;
this.serializer = serializer;
this.deserializer = deserializer;
synchronized (REGISTRY) {
if (!REGISTRY.add(name)) {
throw new IllegalArgumentException("Database already registered: " + name);
}
}
this.dbi = env.openDbi(name, DbiFlags.MDB_CREATE, DbiFlags.MDB_INTEGERKEY);
}
@Override
public void add(T element) {
int last = lastIndex++;
try {
dbi.put(serializeKey(last), serializer.serialize(element));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public T remove() {
byte[] byteBuffer;
try (Txn<byte[]> txnWrite = env.txnWrite()) {
var bb = serializeKey(firstIndex);
byteBuffer = dbi.get(txnWrite, bb);
if (byteBuffer == null) {
txnWrite.commit();
return null;
}
boolean isDel = dbi.delete(txnWrite, bb);
firstIndex++;
txnWrite.commit();
}
return readObject(byteBuffer);
}
private T readObject(byte[] byteBuffer) {
try {
return deserializer.deserialize(byteBuffer);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private static byte[] serializeKey(int value) {
ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
buffer.putInt(value);
return buffer.array();
}
@Override
public int size() {
return 0;
}
@Override
public void close() {
try {
dbi.close();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
synchronized (REGISTRY) {
REGISTRY.remove(name);
}
}
}
}

View File

@ -0,0 +1,2 @@
module filequeue {
}

View File

@ -13,7 +13,7 @@ public abstract class TestQueueToConsumer {
public abstract SimpleQueue<String> getSimpleQueue();
@Test
public void test() {
public void test() throws Exception {
var q = getSimpleQueue();
AtomicBoolean ab = new AtomicBoolean();
try (var qtc = new QueueToConsumer<>(q, value -> {
@ -35,10 +35,13 @@ public abstract class TestQueueToConsumer {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (q instanceof AutoCloseable autoCloseable) {
autoCloseable.close();
}
}
@Test
public void testConcurrency() {
public void testConcurrency() throws Exception {
var q = getSimpleQueue();
AtomicBoolean ab = new AtomicBoolean();
CountDownLatch cdl = new CountDownLatch(1);
@ -77,5 +80,8 @@ public abstract class TestQueueToConsumer {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (q instanceof AutoCloseable autoCloseable) {
autoCloseable.close();
}
}
}

View File

@ -0,0 +1,55 @@
package it.cavallium.filequeue;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
public class TestQueueToConsumerFileChannel extends TestQueueToConsumer {
private static final long SEGMENT_SIZE = 24;
private Path dir;
@BeforeEach
public void setup() throws IOException {
dir = Files.createTempDirectory("queue");
}
@AfterEach
public void tearDown() throws IOException {
Path pathToBeDeleted = dir;
try (var fs = Files.walk(pathToBeDeleted)) {
fs.sorted(Comparator.reverseOrder())
.forEach(f -> {
try {
Files.deleteIfExists(f);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
}
@Override
public SimpleQueue<String> getSimpleQueue() {
try {
return new SimpleQueueMemorySegment<>(dir, "test", new Serializer<>() {
@Override
public byte[] serialize(String data) {
return data.getBytes(StandardCharsets.UTF_8);
}
}, new Deserializer<>() {
@Override
public String deserialize(byte[] data) {
return new String(data, StandardCharsets.UTF_8);
}
}, SEGMENT_SIZE);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}