memorysegments per-file scope
This commit is contained in:
parent
fe33243258
commit
67402c3ef6
2
pom.xml
2
pom.xml
@ -4,7 +4,7 @@
|
|||||||
<groupId>it.cavallium</groupId>
|
<groupId>it.cavallium</groupId>
|
||||||
<artifactId>filequeue</artifactId>
|
<artifactId>filequeue</artifactId>
|
||||||
<name>file queue project</name>
|
<name>file queue project</name>
|
||||||
<version>3.1.8</version>
|
<version>3.1.9</version>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
<description>Light weight, high performance, simple, reliable and persistent queue</description>
|
<description>Light weight, high performance, simple, reliable and persistent queue</description>
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
@ -24,7 +24,6 @@ public class SimpleQueueMemorySegment<T> implements SimpleQueue<T>, Closeable {
|
|||||||
private final String name;
|
private final String name;
|
||||||
private final Serializer<T> serializer;
|
private final Serializer<T> serializer;
|
||||||
private final Deserializer<T> deserializer;
|
private final Deserializer<T> deserializer;
|
||||||
private final Arena arena;
|
|
||||||
private final long segmentSize;
|
private final long segmentSize;
|
||||||
|
|
||||||
public SimpleQueueMemorySegment(Path directory,
|
public SimpleQueueMemorySegment(Path directory,
|
||||||
@ -32,7 +31,6 @@ public class SimpleQueueMemorySegment<T> implements SimpleQueue<T>, Closeable {
|
|||||||
Serializer<T> serializer,
|
Serializer<T> serializer,
|
||||||
Deserializer<T> deserializer,
|
Deserializer<T> deserializer,
|
||||||
long segmentSize) throws IOException {
|
long segmentSize) throws IOException {
|
||||||
this.arena = Arena.ofShared();
|
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.directory = directory;
|
this.directory = directory;
|
||||||
this.serializer = serializer;
|
this.serializer = serializer;
|
||||||
@ -74,7 +72,7 @@ public class SimpleQueueMemorySegment<T> implements SimpleQueue<T>, Closeable {
|
|||||||
|
|
||||||
private SimpleQueueMemorySegmentFixedSize expandQueueSegmentForWrite() throws IOException {
|
private SimpleQueueMemorySegmentFixedSize expandQueueSegmentForWrite() throws IOException {
|
||||||
synchronized (queueSegments) {
|
synchronized (queueSegments) {
|
||||||
var queueMemorySegment = new SimpleQueueMemorySegmentFixedSize(arena, segmentSize, generateQueuePath());
|
var queueMemorySegment = new SimpleQueueMemorySegmentFixedSize(segmentSize, generateQueuePath());
|
||||||
// Hibernate the middle segments, if there will be 3 or more segments after the expansion
|
// Hibernate the middle segments, if there will be 3 or more segments after the expansion
|
||||||
if (queueSegments.size() >= 2) {
|
if (queueSegments.size() >= 2) {
|
||||||
queueSegments.getLast().hibernate();
|
queueSegments.getLast().hibernate();
|
||||||
@ -157,7 +155,6 @@ public class SimpleQueueMemorySegment<T> implements SimpleQueue<T>, Closeable {
|
|||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
this.arena.close();
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -18,26 +18,30 @@ public class SimpleQueueMemorySegmentFixedSize implements Closeable {
|
|||||||
private final Path filePath;
|
private final Path filePath;
|
||||||
private final FileChannel ch;
|
private final FileChannel ch;
|
||||||
private final MemorySegment writer;
|
private final MemorySegment writer;
|
||||||
private final Arena arena;
|
private final Arena mmapArena;
|
||||||
private final long fixedSize;
|
private final long fixedSize;
|
||||||
private long readPosition = 0;
|
private long readPosition = 0;
|
||||||
private long writePosition = 0;
|
private long writePosition = 0;
|
||||||
private final AtomicInteger size = new AtomicInteger();
|
private final AtomicInteger size = new AtomicInteger();
|
||||||
|
|
||||||
public SimpleQueueMemorySegmentFixedSize(Arena arena, long fixedSize, Path filePath) throws IOException {
|
public SimpleQueueMemorySegmentFixedSize(long fixedSize, Path filePath) throws IOException {
|
||||||
this.arena = arena;
|
this.mmapArena = Arena.ofShared();
|
||||||
this.filePath = filePath;
|
this.filePath = filePath;
|
||||||
this.fixedSize = fixedSize;
|
this.fixedSize = fixedSize;
|
||||||
Files.deleteIfExists(filePath);
|
Files.deleteIfExists(filePath);
|
||||||
this.ch = FileChannel.open(filePath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE, StandardOpenOption.READ);
|
this.ch = FileChannel.open(filePath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE, StandardOpenOption.READ);
|
||||||
ch.truncate(fixedSize);
|
ch.truncate(fixedSize);
|
||||||
this.writer = ch.map(MapMode.READ_WRITE, 0L, fixedSize, arena);
|
this.writer = ch.map(MapMode.READ_WRITE, 0L, fixedSize, mmapArena);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
|
try {
|
||||||
ch.close();
|
ch.close();
|
||||||
Files.deleteIfExists(filePath);
|
Files.deleteIfExists(filePath);
|
||||||
|
} finally {
|
||||||
|
mmapArena.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void add(MemorySegment element) {
|
public void add(MemorySegment element) {
|
||||||
|
Loading…
Reference in New Issue
Block a user