From cc02c053ddcb38a63d8c82cb39c39ed4f6a9adfc Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 10 Oct 2022 01:10:05 +0200 Subject: [PATCH] first commit --- .gitignore | 60 ++++++++++++ pom.xml | 49 ++++++++++ .../it/cavallium/filequeue/Deserializer.java | 22 +++++ .../filequeue/DiskQueueToConsumer.java | 40 ++++++++ .../cavallium/filequeue/IQueueToConsumer.java | 13 +++ .../it/cavallium/filequeue/QueueConsumer.java | 6 ++ .../cavallium/filequeue/QueueToConsumer.java | 93 +++++++++++++++++++ .../it/cavallium/filequeue/Serializer.java | 22 +++++ .../it/cavallium/filequeue/SimpleQueue.java | 10 ++ .../cavallium/filequeue/SimpleQueueFile.java | 47 ++++++++++ .../cavallium/filequeue/SimpleQueueJava.java | 27 ++++++ 11 files changed, 389 insertions(+) create mode 100644 .gitignore create mode 100644 pom.xml create mode 100644 src/main/java/it/cavallium/filequeue/Deserializer.java create mode 100644 src/main/java/it/cavallium/filequeue/DiskQueueToConsumer.java create mode 100644 src/main/java/it/cavallium/filequeue/IQueueToConsumer.java create mode 100644 src/main/java/it/cavallium/filequeue/QueueConsumer.java create mode 100644 src/main/java/it/cavallium/filequeue/QueueToConsumer.java create mode 100644 src/main/java/it/cavallium/filequeue/Serializer.java create mode 100644 src/main/java/it/cavallium/filequeue/SimpleQueue.java create mode 100644 src/main/java/it/cavallium/filequeue/SimpleQueueFile.java create mode 100644 src/main/java/it/cavallium/filequeue/SimpleQueueJava.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cc6d95c --- /dev/null +++ b/.gitignore @@ -0,0 +1,60 @@ +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and WebStorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +.idea/*.* +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/dictionaries +.idea/**/shelf + +# Sensitive or high-churn files +.idea/workspace.xml +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +# Gradle +.idea/**/gradle.xml +.idea/**/libraries + +# CMake +cmake-build-debug/ +cmake-build-release/ + +# Mongo Explorer plugin +.idea/**/mongoSettings.xml + +# File-based project format +*.iws + +# IntelliJ +out/ +filequeue.iml + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +# Editor-based Rest Client +.idea/httpRequests +/.idea/ +target/ +.DS_Store +/.idea/ +/.idea/ diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..c262a01 --- /dev/null +++ b/pom.xml @@ -0,0 +1,49 @@ + + + it.cavallium + filequeue + file queue project + 3.0.0 + jar + Light weight, high performance, simple, reliable and persistent queue + 4.0.0 + + UTF-8 + UTF-8 + + + + mchv-release-distribution + MCHV Release Apache Maven Packages Distribution + https://mvn.mchv.eu/repository/mchv + + + mchv-snapshot-distribution + MCHV Snapshot Apache Maven Packages Distribution + https://mvn.mchv.eu/repository/mchv-snapshot + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.7.0 + + 11 + 11 + + + + + + + com.squareup.tape2 + tape + 2.0.0-beta1 + + + + diff --git a/src/main/java/it/cavallium/filequeue/Deserializer.java b/src/main/java/it/cavallium/filequeue/Deserializer.java new file mode 100644 index 0000000..08f5ae2 --- /dev/null +++ b/src/main/java/it/cavallium/filequeue/Deserializer.java @@ -0,0 +1,22 @@ +package it.cavallium.filequeue; + +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.Map; + +public interface Deserializer { + + default T deserialize(byte[] data) throws IOException { + ByteArrayInputStream bais = new ByteArrayInputStream(data); + return deserialize(data.length, new DataInputStream(bais)); + } + + default T deserialize(int length, DataInput dataInput) throws IOException { + byte[] data = new byte[length]; + dataInput.readFully(data); + return deserialize(data); + } +} diff --git a/src/main/java/it/cavallium/filequeue/DiskQueueToConsumer.java b/src/main/java/it/cavallium/filequeue/DiskQueueToConsumer.java new file mode 100644 index 0000000..fdd61bd --- /dev/null +++ b/src/main/java/it/cavallium/filequeue/DiskQueueToConsumer.java @@ -0,0 +1,40 @@ +package it.cavallium.filequeue; + +import com.squareup.tape2.QueueFile; +import java.io.IOException; +import java.nio.file.Path; + +public final class DiskQueueToConsumer implements IQueueToConsumer { + + private final QueueToConsumer queue; + private final QueueFile queueFile; + + public DiskQueueToConsumer(Path file, + Serializer serializer, + Deserializer deserializer, + QueueConsumer consumer) throws IOException { + QueueFile queueFile = new QueueFile.Builder(file.toFile()).build(); + this.queueFile = queueFile; + this.queue = new QueueToConsumer<>(new SimpleQueueFile<>(queueFile, serializer, deserializer), consumer); + } + + @Override + public void add(T value) { + queue.add(value); + } + + @Override + public void close() { + queue.close(); + try { + queueFile.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void startQueue() { + queue.startQueue(); + } +} diff --git a/src/main/java/it/cavallium/filequeue/IQueueToConsumer.java b/src/main/java/it/cavallium/filequeue/IQueueToConsumer.java new file mode 100644 index 0000000..fb5d9a2 --- /dev/null +++ b/src/main/java/it/cavallium/filequeue/IQueueToConsumer.java @@ -0,0 +1,13 @@ +package it.cavallium.filequeue; + +import java.io.Closeable; + +public interface IQueueToConsumer extends Closeable { + + void add(T value); + + @Override + void close(); + + void startQueue(); +} diff --git a/src/main/java/it/cavallium/filequeue/QueueConsumer.java b/src/main/java/it/cavallium/filequeue/QueueConsumer.java new file mode 100644 index 0000000..191aa36 --- /dev/null +++ b/src/main/java/it/cavallium/filequeue/QueueConsumer.java @@ -0,0 +1,6 @@ +package it.cavallium.filequeue; + +public interface QueueConsumer { + + boolean tryConsume(T value); +} diff --git a/src/main/java/it/cavallium/filequeue/QueueToConsumer.java b/src/main/java/it/cavallium/filequeue/QueueToConsumer.java new file mode 100644 index 0000000..623a294 --- /dev/null +++ b/src/main/java/it/cavallium/filequeue/QueueToConsumer.java @@ -0,0 +1,93 @@ +package it.cavallium.filequeue; + +import java.time.Duration; +import java.util.concurrent.Semaphore; +import java.util.concurrent.locks.LockSupport; + +class QueueToConsumer implements IQueueToConsumer { + + private final long BACKOFF_NS = Duration.ofMillis(2).toNanos(); + private final long MAX_BACKOFF_NS = Duration.ofMillis(500).toNanos(); + + private final Object lock = new Object(); + private final Semaphore semaphore = new Semaphore(0); + private long queued; + private final SimpleQueue queue; + private final QueueConsumer consumer; + private Manager manager; + private volatile boolean closed; + + public QueueToConsumer(SimpleQueue queue, QueueConsumer consumer) { + this.queue = queue; + this.consumer = consumer; + queued = queue.size(); + } + + public synchronized void startQueue() { + if (manager == null) { + this.manager = new Manager(); + manager.setName("queue-manager"); + manager.start(); + } + } + + @Override + public void add(T value) { + boolean shouldAdd = true; + synchronized (lock) { + if (queued == 0 && consumer.tryConsume(value)) { + shouldAdd = false; + } else { + queued++; + } + } + if (shouldAdd && !closed) { + synchronized (queue) { + queue.add(value); + } + semaphore.release(); + } + } + + @Override + public void close() { + closed = true; + semaphore.release(); + } + + private class Manager extends Thread { + + @Override + public void run() { + try { + while (!closed) { + boolean shouldRemove = false; + T element; + synchronized (lock) { + if (queued > 0) { + queued--; + shouldRemove = true; + } + } + semaphore.acquire(); + if (!closed && shouldRemove) { + synchronized (queue) { + element = queue.remove(); + } + long nextDelay = BACKOFF_NS; + while (!closed && !consumer.tryConsume(element)) { + LockSupport.parkNanos(nextDelay); + if (nextDelay + BACKOFF_NS <= MAX_BACKOFF_NS) { + nextDelay += BACKOFF_NS; + } else if (nextDelay < MAX_BACKOFF_NS) { + nextDelay = MAX_BACKOFF_NS; + } + } + } + } + } catch (InterruptedException ex) { + closed = true; + } + } + } +} diff --git a/src/main/java/it/cavallium/filequeue/Serializer.java b/src/main/java/it/cavallium/filequeue/Serializer.java new file mode 100644 index 0000000..194db83 --- /dev/null +++ b/src/main/java/it/cavallium/filequeue/Serializer.java @@ -0,0 +1,22 @@ +package it.cavallium.filequeue; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; + +public interface Serializer { + + default byte[] serialize(T data) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + try (DataOutputStream daos = new DataOutputStream(baos)) { + serialize(data, daos); + return baos.toByteArray(); + } + } + } + + default void serialize(T data, DataOutput output) throws IOException { + output.write(serialize(data)); + } +} diff --git a/src/main/java/it/cavallium/filequeue/SimpleQueue.java b/src/main/java/it/cavallium/filequeue/SimpleQueue.java new file mode 100644 index 0000000..cd4903b --- /dev/null +++ b/src/main/java/it/cavallium/filequeue/SimpleQueue.java @@ -0,0 +1,10 @@ +package it.cavallium.filequeue; + +interface SimpleQueue { + + void add(T element); + + T remove(); + + int size(); +} diff --git a/src/main/java/it/cavallium/filequeue/SimpleQueueFile.java b/src/main/java/it/cavallium/filequeue/SimpleQueueFile.java new file mode 100644 index 0000000..b1d5bb2 --- /dev/null +++ b/src/main/java/it/cavallium/filequeue/SimpleQueueFile.java @@ -0,0 +1,47 @@ +package it.cavallium.filequeue; + +import com.squareup.tape2.QueueFile; +import java.io.IOException; +import java.util.NoSuchElementException; + +class SimpleQueueFile implements SimpleQueue { + + private final QueueFile queueFile; + private final Serializer ser; + private final Deserializer des; + + public SimpleQueueFile(QueueFile queueFile, Serializer serializer, Deserializer deserializer) { + this.queueFile = queueFile; + this.ser = serializer; + this.des = deserializer; + } + + @Override + public void add(T element) { + try { + queueFile.add(ser.serialize(element)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public T remove() { + try { + byte[] element = queueFile.peek(); + if (element == null) { + throw new NoSuchElementException("Queue is empty"); + } + var deserialized = des.deserialize(element); + queueFile.remove(); + return deserialized; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public int size() { + return queueFile.size(); + } +} diff --git a/src/main/java/it/cavallium/filequeue/SimpleQueueJava.java b/src/main/java/it/cavallium/filequeue/SimpleQueueJava.java new file mode 100644 index 0000000..ccf2f57 --- /dev/null +++ b/src/main/java/it/cavallium/filequeue/SimpleQueueJava.java @@ -0,0 +1,27 @@ +package it.cavallium.filequeue; + +import java.util.Queue; + +class SimpleQueueJava implements SimpleQueue { + + private final Queue queue; + + public SimpleQueueJava(Queue queue) { + this.queue = queue; + } + + @Override + public void add(T element) { + queue.add(element); + } + + @Override + public T remove() { + return queue.remove(); + } + + @Override + public int size() { + return queue.size(); + } +}