diff --git a/pom.xml b/pom.xml index 7c4f88f..4465956 100644 --- a/pom.xml +++ b/pom.xml @@ -40,16 +40,6 @@ - - com.squareup.tape2 - tape - 2.0.0-beta1 - - - org.lmdbjava - lmdbjava - 0.8.3 - org.junit.jupiter junit-jupiter-api diff --git a/src/main/java/it/cavallium/filequeue/DiskQueueToConsumer.java b/src/main/java/it/cavallium/filequeue/DiskQueueToConsumer.java deleted file mode 100644 index 222007a..0000000 --- a/src/main/java/it/cavallium/filequeue/DiskQueueToConsumer.java +++ /dev/null @@ -1,60 +0,0 @@ -package it.cavallium.filequeue; - -import com.squareup.tape2.QueueFile; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; - -public final class DiskQueueToConsumer implements IQueueToConsumer { - - private final QueueToConsumer queue; - private final QueueFile queueFile; - - public DiskQueueToConsumer(Path file, - boolean clear, - Serializer serializer, - Deserializer deserializer, - QueueConsumer consumer) throws IOException { - QueueFile queueFile; - try { - queueFile = new QueueFile.Builder(file.toFile()).zero(false).build(); - if (clear) { - queueFile.clear(); - } - } catch (Throwable ex) { - try { - Files.deleteIfExists(file); - } catch (Throwable ex2) { - - } - queueFile = new QueueFile.Builder(file.toFile()).zero(false).build(); - } - this.queueFile = queueFile; - this.queue = new QueueToConsumer<>(new SimpleQueueFile<>(queueFile, serializer, deserializer), consumer); - } - - @Override - public void add(T value) { - queue.add(value); - } - - @Override - public void close() { - queue.close(); - try { - queueFile.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void startQueue() { - queue.startQueue(); - } - - @Override - public void runStep(boolean park) { - queue.runStep(park); - } -} diff --git a/src/main/java/it/cavallium/filequeue/LMDBEnvManager.java b/src/main/java/it/cavallium/filequeue/LMDBEnvManager.java deleted file mode 100644 index 8ee2081..0000000 --- a/src/main/java/it/cavallium/filequeue/LMDBEnvManager.java +++ /dev/null @@ -1,58 +0,0 @@ -package it.cavallium.filequeue; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Map; -import org.lmdbjava.ByteArrayProxy; -import org.lmdbjava.Env; -import org.lmdbjava.EnvFlags; - -public class LMDBEnvManager { - private static final Map> ENVS = new HashMap<>(); - - public synchronized static Env ofPath(Path path) { - return ENVS.compute(path, (p, env) -> { - if (env != null) { - if (env.isClosed()) { - env = null; - } - } - if (env != null) { - return env; - } - if (Files.notExists(path)) { - try { - Files.createDirectories(path); - } catch (IOException e) { - throw new IllegalStateException(e); - } - } - try { - return open(path.toFile()); - } catch (Throwable ex) { - try { - Files.walk(path) - .sorted(Comparator.reverseOrder()) - .map(Path::toFile) - .forEach(File::delete); - } catch (IOException e) { - throw new RuntimeException(e); - } - return open(path.toFile()); - } - }); - } - - private static Env open(File path) { - return Env.create(ByteArrayProxy.PROXY_BA) - // 128GiB - .setMapSize(Integer.parseInt(System.getProperty("fq.writemap.size.gb", "32")) * 1024L * 1024 * 1024) - .setMaxReaders(1024) - .setMaxDbs(1024) - .open(path, EnvFlags.MDB_NOSYNC, EnvFlags.MDB_WRITEMAP, EnvFlags.MDB_NOMETASYNC); - } -} diff --git a/src/main/java/it/cavallium/filequeue/LMDBQueueToConsumer.java b/src/main/java/it/cavallium/filequeue/LMDBQueueToConsumer.java deleted file mode 100644 index 7aad744..0000000 --- a/src/main/java/it/cavallium/filequeue/LMDBQueueToConsumer.java +++ /dev/null @@ -1,48 +0,0 @@ -package it.cavallium.filequeue; - -import java.nio.file.Path; -import org.lmdbjava.Env; - -public final class LMDBQueueToConsumer implements IQueueToConsumer { - - private final Env env; - private final SimpleQueueLMDB queueLMDB; - private final QueueToConsumer queue; - - public LMDBQueueToConsumer(Path file, - String dbName, - boolean clear, - Serializer serializer, - Deserializer deserializer, - QueueConsumer consumer) { - this.env = LMDBEnvManager.ofPath(file); - this.queueLMDB = new SimpleQueueLMDB<>(env, dbName, serializer, deserializer); - this.queue = new QueueToConsumer<>(queueLMDB, consumer) { - @Override - protected String getQueueManagerName() { - return super.getQueueManagerName() + "-" + dbName; - } - }; - } - - @Override - public void add(T value) { - queue.add(value); - } - - @Override - public void close() { - queue.close(); - queueLMDB.close(); - } - - @Override - public void startQueue() { - queue.startQueue(); - } - - @Override - public void runStep(boolean park) { - queue.runStep(park); - } -} diff --git a/src/main/java/it/cavallium/filequeue/QueueToConsumerFileChannel.java b/src/main/java/it/cavallium/filequeue/QueueToConsumerFileChannel.java new file mode 100644 index 0000000..751ac5a --- /dev/null +++ b/src/main/java/it/cavallium/filequeue/QueueToConsumerFileChannel.java @@ -0,0 +1,63 @@ +package it.cavallium.filequeue; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Path; + +public final class QueueToConsumerFileChannel implements IQueueToConsumer { + + private final SimpleQueueMemorySegment queueMemorySegment; + private final QueueToConsumer queue; + private final Path dir; + + public QueueToConsumerFileChannel(Path file, + String dbName, + Serializer serializer, + Deserializer deserializer, + QueueConsumer consumer, + long segmentSize) { + try { + this.dir = file.resolve(dbName); + this.queueMemorySegment = new SimpleQueueMemorySegment<>(dir, + dbName, + serializer, + deserializer, + segmentSize + ); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + this.queue = new QueueToConsumer<>(queueMemorySegment, consumer) { + @Override + protected String getQueueManagerName() { + return super.getQueueManagerName() + "-" + dbName; + } + }; + } + + @Override + public void add(T value) { + queue.add(value); + } + + @Override + public void close() { + queue.close(); + queueMemorySegment.close(); + try { + FileUtils.deleteDirectory(dir); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void startQueue() { + queue.startQueue(); + } + + @Override + public void runStep(boolean park) { + queue.runStep(park); + } +} diff --git a/src/main/java/it/cavallium/filequeue/MemorySegmentQueueToConsumer.java b/src/main/java/it/cavallium/filequeue/QueueToConsumerMemorySegment.java similarity index 79% rename from src/main/java/it/cavallium/filequeue/MemorySegmentQueueToConsumer.java rename to src/main/java/it/cavallium/filequeue/QueueToConsumerMemorySegment.java index 2d3f1de..80cd846 100644 --- a/src/main/java/it/cavallium/filequeue/MemorySegmentQueueToConsumer.java +++ b/src/main/java/it/cavallium/filequeue/QueueToConsumerMemorySegment.java @@ -4,18 +4,18 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Path; -public final class MemorySegmentQueueToConsumer implements IQueueToConsumer { +public final class QueueToConsumerMemorySegment implements IQueueToConsumer { private final SimpleQueueMemorySegment queueMemorySegment; private final QueueToConsumer queue; private final Path dir; - public MemorySegmentQueueToConsumer(Path file, - String dbName, - Serializer serializer, - Deserializer deserializer, - QueueConsumer consumer, - long segmentSize) { + public QueueToConsumerMemorySegment(Path file, + String dbName, + Serializer serializer, + Deserializer deserializer, + QueueConsumer consumer, + long segmentSize) { try { this.dir = file.resolve(dbName); this.queueMemorySegment = new SimpleQueueMemorySegment<>(dir, diff --git a/src/main/java/it/cavallium/filequeue/SimpleQueueFile.java b/src/main/java/it/cavallium/filequeue/SimpleQueueFile.java deleted file mode 100644 index a2c074a..0000000 --- a/src/main/java/it/cavallium/filequeue/SimpleQueueFile.java +++ /dev/null @@ -1,47 +0,0 @@ -package it.cavallium.filequeue; - -import com.squareup.tape2.QueueFile; -import java.io.IOException; -import java.util.NoSuchElementException; - -class SimpleQueueFile implements SimpleQueue { - - private final QueueFile queueFile; - private final Serializer ser; - private final Deserializer des; - - public SimpleQueueFile(QueueFile queueFile, Serializer serializer, Deserializer deserializer) { - this.queueFile = queueFile; - this.ser = serializer; - this.des = deserializer; - } - - @Override - public synchronized void add(T element) { - try { - queueFile.add(ser.serialize(element)); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public synchronized T remove() { - try { - byte[] element = queueFile.peek(); - if (element == null) { - throw new NoSuchElementException("Queue is empty"); - } - var deserialized = des.deserialize(element); - queueFile.remove(); - return deserialized; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public synchronized int size() { - return queueFile.size(); - } -} diff --git a/src/main/java/it/cavallium/filequeue/SimpleQueueFileChannel.java b/src/main/java/it/cavallium/filequeue/SimpleQueueFileChannel.java new file mode 100644 index 0000000..80a78f8 --- /dev/null +++ b/src/main/java/it/cavallium/filequeue/SimpleQueueFileChannel.java @@ -0,0 +1,164 @@ +package it.cavallium.filequeue; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayDeque; +import java.util.HashSet; +import java.util.NoSuchElementException; +import java.util.Set; + +/** + * Thread safe queue + */ +public class SimpleQueueFileChannel implements SimpleQueue, Closeable { + + private static final Set REGISTRY = new HashSet<>(); + private final ArrayDeque queueSegments = new ArrayDeque<>(); + private final Path directory; + private final String name; + private final Serializer serializer; + private final Deserializer deserializer; + private final long segmentSize; + + public SimpleQueueFileChannel(Path directory, + String name, + Serializer serializer, + Deserializer deserializer, + long segmentSize) throws IOException { + this.name = name; + this.directory = directory; + this.serializer = serializer; + this.deserializer = deserializer; + this.segmentSize = segmentSize; + synchronized (REGISTRY) { + if (!REGISTRY.add(name)) { + throw new IllegalArgumentException("Database already registered: " + name); + } + } + if (Files.notExists(directory)) { + Files.createDirectories(directory); + } + } + + private Path generateQueuePath() { + return directory.resolve("queue_%s_%d.queue.bin".formatted(name, System.nanoTime())); + } + + @Override + public void add(T element) { + byte[] serialized; + try { + serialized = serializer.serialize(element); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + var memorySegment = ByteBuffer.wrap(serialized); + try { + if (!getQueueSegmentForWrite().offer(memorySegment)) { + expandQueueSegmentForWrite().add(memorySegment); + } + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + } + + private SimpleQueueFileChannelFixedSize expandQueueSegmentForWrite() throws IOException { + synchronized (queueSegments) { + var queueMemorySegment = new SimpleQueueFileChannelFixedSize(segmentSize, generateQueuePath()); + // Hibernate the middle segments, if there will be 3 or more segments after the expansion + if (queueSegments.size() >= 2) { + queueSegments.getLast().hibernate(); + } + queueSegments.add(queueMemorySegment); + return queueMemorySegment; + } + } + + private SimpleQueueFileChannelFixedSize getQueueSegmentForWrite() throws IOException { + synchronized (queueSegments) { + var first = queueSegments.peekLast(); + if (first == null) { + first = expandQueueSegmentForWrite(); + } + return first; + } + } + + private SimpleQueueFileChannelFixedSize getQueueSegmentForRead() throws IOException { + synchronized (queueSegments) { + var last = queueSegments.peekFirst(); + if (last == null) { + last = expandQueueSegmentForWrite(); + } + return last; + } + } + + private boolean popQueueSegmentForRead() throws IOException { + synchronized (queueSegments) { + if (queueSegments.size() > 1) { + var polled = queueSegments.poll(); + if (polled != null) { + polled.close(); + return true; + } + } + } + return false; + } + + @Override + public T remove() { + ByteBuffer removed; + try { + removed = getQueueSegmentForRead().poll(); + if (removed == null) { + if (popQueueSegmentForRead()) { + removed = getQueueSegmentForRead().poll(); + } + } + if (removed == null) throw new NoSuchElementException(); + byte[] data = new byte[removed.remaining()]; + removed.get(data); + return readObject(data); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + } + + private T readObject(byte[] byteBuffer) { + try { + return deserializer.deserialize(byteBuffer); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public int size() { + return this.queueSegments.stream().mapToInt(SimpleQueueFileChannelFixedSize::size).sum(); + } + + @Override + public void close() { + try { + this.queueSegments.forEach(qs -> { + try { + qs.close(); + } catch (IOException e) { + e.printStackTrace(); + } + }); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + synchronized (REGISTRY) { + REGISTRY.remove(name); + } + } + } +} diff --git a/src/main/java/it/cavallium/filequeue/SimpleQueueFileChannelFixedSize.java b/src/main/java/it/cavallium/filequeue/SimpleQueueFileChannelFixedSize.java new file mode 100644 index 0000000..761cf6b --- /dev/null +++ b/src/main/java/it/cavallium/filequeue/SimpleQueueFileChannelFixedSize.java @@ -0,0 +1,94 @@ +package it.cavallium.filequeue; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicInteger; + +public class SimpleQueueFileChannelFixedSize implements Closeable { + + private final Path filePath; + private final FileChannel ch; + private final MappedByteBuffer writer; + private final long fixedSize; + private int readPosition = 0; + private int writePosition = 0; + private final AtomicInteger size = new AtomicInteger(); + + public SimpleQueueFileChannelFixedSize(long fixedSize, Path filePath) throws IOException { + this.filePath = filePath; + this.fixedSize = fixedSize; + Files.deleteIfExists(filePath); + this.ch = FileChannel.open(filePath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE, StandardOpenOption.READ); + ch.truncate(fixedSize); + this.writer = ch.map(MapMode.READ_WRITE, 0L, fixedSize); + } + + @Override + public void close() throws IOException { + ch.close(); + Files.deleteIfExists(filePath); + } + + public void add(ByteBuffer element) { + if (!offer(element)) { + throw new IllegalStateException("Can't add the element, not enough space"); + } + } + + /** + * @return true if there is enough space, false otherwise + */ + public boolean offer(ByteBuffer element) { + int size = Math.toIntExact(element.remaining()); + if (writePosition + size + Integer.BYTES > fixedSize) { + if (writePosition == 0) throw new UnsupportedOperationException("Entity too large! " + size + " bytes"); + return false; + } + writer.slice(writePosition, Integer.BYTES).putInt(size); + writer.put(writePosition + Integer.BYTES, element, 0, size); + writePosition += size + Integer.BYTES; + this.size.incrementAndGet(); + return true; + } + + public ByteBuffer poll() { + if (readPosition >= fixedSize) { + return null; + } + if (size.decrementAndGet() < 0) { + size.incrementAndGet(); + return null; + } + int size = writer.slice(readPosition, Integer.BYTES).getInt(); + var segment = ByteBuffer.allocate(size); + segment.put(0, writer, readPosition + Integer.BYTES, size); + readPosition += size + Integer.BYTES; + return segment; + } + + public ByteBuffer remove() { + var polled = poll(); + if (polled == null) throw new NoSuchElementException(); + return polled; + } + + public int size() { + return Math.max(0, size.get()); + } + + public void hibernate() { + try { + this.ch.force(false); + } catch (IOException e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/it/cavallium/filequeue/SimpleQueueLMDB.java b/src/main/java/it/cavallium/filequeue/SimpleQueueLMDB.java deleted file mode 100644 index 9bbdfcc..0000000 --- a/src/main/java/it/cavallium/filequeue/SimpleQueueLMDB.java +++ /dev/null @@ -1,108 +0,0 @@ -package it.cavallium.filequeue; - -import java.io.Closeable; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.ByteBuffer; -import java.util.HashSet; -import java.util.Set; -import org.lmdbjava.Dbi; -import org.lmdbjava.DbiFlags; -import org.lmdbjava.Env; -import org.lmdbjava.Txn; - -/** - * Thread safe queue - */ -public class SimpleQueueLMDB implements SimpleQueue, Closeable { - - private static final Set REGISTRY = new HashSet<>(); - private final Env env; - private final String name; - private final Dbi dbi; - private final Serializer serializer; - private final Deserializer deserializer; - /** - * The First index. - */ - private int firstIndex; - /** - * The Last index. - */ - private int lastIndex; - - public SimpleQueueLMDB(Env env, - String name, - Serializer serializer, - Deserializer deserializer) { - this.env = env; - this.name = name; - this.serializer = serializer; - this.deserializer = deserializer; - synchronized (REGISTRY) { - if (!REGISTRY.add(name)) { - throw new IllegalArgumentException("Database already registered: " + name); - } - } - this.dbi = env.openDbi(name, DbiFlags.MDB_CREATE, DbiFlags.MDB_INTEGERKEY); - } - - @Override - public void add(T element) { - int last = lastIndex++; - try { - dbi.put(serializeKey(last), serializer.serialize(element)); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - @Override - public T remove() { - byte[] byteBuffer; - try (Txn txnWrite = env.txnWrite()) { - var bb = serializeKey(firstIndex); - byteBuffer = dbi.get(txnWrite, bb); - if (byteBuffer == null) { - txnWrite.commit(); - return null; - } - boolean isDel = dbi.delete(txnWrite, bb); - firstIndex++; - txnWrite.commit(); - } - return readObject(byteBuffer); - } - - private T readObject(byte[] byteBuffer) { - try { - return deserializer.deserialize(byteBuffer); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - private static byte[] serializeKey(int value) { - ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES); - buffer.putInt(value); - return buffer.array(); - } - - @Override - public int size() { - return 0; - } - - @Override - public void close() { - try { - dbi.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - synchronized (REGISTRY) { - REGISTRY.remove(name); - } - } - } -} diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java new file mode 100644 index 0000000..4783c7c --- /dev/null +++ b/src/main/java/module-info.java @@ -0,0 +1,2 @@ +module filequeue { +} \ No newline at end of file diff --git a/src/test/java/it/cavallium/filequeue/TestQueueToConsumer.java b/src/test/java/it/cavallium/filequeue/TestQueueToConsumer.java index 8ef78b5..77d3e77 100644 --- a/src/test/java/it/cavallium/filequeue/TestQueueToConsumer.java +++ b/src/test/java/it/cavallium/filequeue/TestQueueToConsumer.java @@ -13,7 +13,7 @@ public abstract class TestQueueToConsumer { public abstract SimpleQueue getSimpleQueue(); @Test - public void test() { + public void test() throws Exception { var q = getSimpleQueue(); AtomicBoolean ab = new AtomicBoolean(); try (var qtc = new QueueToConsumer<>(q, value -> { @@ -35,10 +35,13 @@ public abstract class TestQueueToConsumer { } catch (InterruptedException e) { throw new RuntimeException(e); } + if (q instanceof AutoCloseable autoCloseable) { + autoCloseable.close(); + } } @Test - public void testConcurrency() { + public void testConcurrency() throws Exception { var q = getSimpleQueue(); AtomicBoolean ab = new AtomicBoolean(); CountDownLatch cdl = new CountDownLatch(1); @@ -77,5 +80,8 @@ public abstract class TestQueueToConsumer { } catch (InterruptedException e) { throw new RuntimeException(e); } + if (q instanceof AutoCloseable autoCloseable) { + autoCloseable.close(); + } } } diff --git a/src/test/java/it/cavallium/filequeue/TestQueueToConsumerFileChannel.java b/src/test/java/it/cavallium/filequeue/TestQueueToConsumerFileChannel.java new file mode 100644 index 0000000..0bed5ee --- /dev/null +++ b/src/test/java/it/cavallium/filequeue/TestQueueToConsumerFileChannel.java @@ -0,0 +1,55 @@ +package it.cavallium.filequeue; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; + +public class TestQueueToConsumerFileChannel extends TestQueueToConsumer { + + private static final long SEGMENT_SIZE = 24; + private Path dir; + + @BeforeEach + public void setup() throws IOException { + dir = Files.createTempDirectory("queue"); + } + + @AfterEach + public void tearDown() throws IOException { + Path pathToBeDeleted = dir; + try (var fs = Files.walk(pathToBeDeleted)) { + fs.sorted(Comparator.reverseOrder()) + .forEach(f -> { + try { + Files.deleteIfExists(f); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + } + + @Override + public SimpleQueue getSimpleQueue() { + try { + return new SimpleQueueMemorySegment<>(dir, "test", new Serializer<>() { + @Override + public byte[] serialize(String data) { + return data.getBytes(StandardCharsets.UTF_8); + } + }, new Deserializer<>() { + @Override + public String deserialize(byte[] data) { + return new String(data, StandardCharsets.UTF_8); + } + }, SEGMENT_SIZE); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +}