From 9db950fe1c08c92ff46b0b2e7b60902b41705bf7 Mon Sep 17 00:00:00 2001 From: Cavallium Date: Tue, 27 Nov 2018 17:47:19 +0100 Subject: [PATCH] Cleanable --- src/main/java/org/warp/jcwdb/Cleanable.java | 9 ++ src/main/java/org/warp/jcwdb/Cleaner.java | 53 +++++++ .../java/org/warp/jcwdb/FileIndexManager.java | 145 ++++++++++++++---- .../java/org/warp/jcwdb/IndexManager.java | 2 +- src/main/java/org/warp/jcwdb/JCWDatabase.java | 53 ++++++- .../org/warp/jcwdb/MixedIndexDatabase.java | 6 + .../java/org/warp/jcwdb/exampleimpl/App.java | 2 +- 7 files changed, 232 insertions(+), 38 deletions(-) create mode 100644 src/main/java/org/warp/jcwdb/Cleanable.java create mode 100644 src/main/java/org/warp/jcwdb/Cleaner.java diff --git a/src/main/java/org/warp/jcwdb/Cleanable.java b/src/main/java/org/warp/jcwdb/Cleanable.java new file mode 100644 index 0000000..57af24c --- /dev/null +++ b/src/main/java/org/warp/jcwdb/Cleanable.java @@ -0,0 +1,9 @@ +package org.warp.jcwdb; + +public interface Cleanable { + /** + * Clean the object + * @return the approximated number of cleaned items + */ + public long clean(); +} diff --git a/src/main/java/org/warp/jcwdb/Cleaner.java b/src/main/java/org/warp/jcwdb/Cleaner.java new file mode 100644 index 0000000..d264cd8 --- /dev/null +++ b/src/main/java/org/warp/jcwdb/Cleaner.java @@ -0,0 +1,53 @@ +package org.warp.jcwdb; + +import java.lang.ref.WeakReference; + +import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry; + +public class Cleaner { + + private Cleanable[] objectsToClean; + private Thread cleanerThread; + private int sleepInterval; + + public Cleaner(Cleanable... objectsToClean) { + this.objectsToClean = objectsToClean; + } + + public void start() { + this.cleanerThread = new Thread(new CleanLoop()); + } + + /** + * Clean + * @return number of removed items + */ + private long clean() { + long cleanedItems = 0; + for (Cleanable cleanable : objectsToClean) { + cleanedItems += cleanable.clean(); + } + return cleanedItems; + } + + public void stop() { + if (cleanerThread != null) { + cleanerThread.interrupt(); + } + } + + private class CleanLoop implements Runnable { + + @Override + public void run() { + while(!cleanerThread.isInterrupted()) { + try { + Thread.sleep(sleepInterval); + System.out.println("Cleaned " + clean() + " items."); + } catch (InterruptedException e) { + } + } + } + + } +} diff --git a/src/main/java/org/warp/jcwdb/FileIndexManager.java b/src/main/java/org/warp/jcwdb/FileIndexManager.java index de0e6db..161cec8 100644 --- a/src/main/java/org/warp/jcwdb/FileIndexManager.java +++ b/src/main/java/org/warp/jcwdb/FileIndexManager.java @@ -7,8 +7,10 @@ import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; import it.unimi.dsi.fastutil.longs.Long2ObjectMap; import it.unimi.dsi.fastutil.longs.LongAVLTreeSet; import it.unimi.dsi.fastutil.longs.LongSet; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry; import java.io.IOException; +import java.lang.ref.WeakReference; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.SeekableByteChannel; @@ -25,6 +27,7 @@ public class FileIndexManager implements IndexManager { private final Object closeLock = new Object(); private final Object metadataByteBufferLock = new Object(); private final Object maskByteBufferLock = new Object(); + private final Object indicesMapsAccessLock = new Object(); /** * Edit this using editIndex() @@ -132,9 +135,39 @@ public class FileIndexManager implements IndexManager { if (indexDetails != null) { fileAllocator.markFree(indexDetails.getOffset(), indexDetails.getSize()); } - dirtyLoadedIndices.remove(index); - loadedIndices.remove(index); - removedIndices.add(index); + synchronized(indicesMapsAccessLock) { + dirtyLoadedIndices.remove(index); + loadedIndices.remove(index); + removedIndices.add(index); + } + } + + public void flushAndUnload(long index) throws IOException { + if (removedIndices.contains(index)) { + synchronized(indicesMapsAccessLock) { + removedIndices.remove(index); + dirtyLoadedIndices.remove(index); + loadedIndices.remove(index); + } + // Update indices metadata + SeekableByteChannel metadata = metadataFileChannel.position(index * IndexDetails.TOTAL_BYTES); + eraseIndexDetails(metadata); + } + if (dirtyLoadedIndices.contains(index)) { + synchronized(indicesMapsAccessLock) { + dirtyLoadedIndices.remove(index); + } + // Update indices metadata + SeekableByteChannel metadata = metadataFileChannel.position(index * IndexDetails.TOTAL_BYTES); + IndexDetails indexDetails; + synchronized(indicesMapsAccessLock) { + indexDetails = loadedIndices.get(index); + } + writeIndexDetails(metadata, indexDetails); + } + synchronized(indicesMapsAccessLock) { + loadedIndices.remove(index); + } } @Override @@ -155,21 +188,28 @@ public class FileIndexManager implements IndexManager { } private void editIndex(long index, IndexDetails details) { - loadedIndices.put(index, details); - dirtyLoadedIndices.add(index); + synchronized(indicesMapsAccessLock) { + loadedIndices.put(index, details); + dirtyLoadedIndices.add(index); + } } private long createIndexMetadata(IndexDetails indexDetails) { - long newIndex = firstAllocableIndex++; - loadedIndices.put(newIndex, indexDetails); - dirtyLoadedIndices.add(newIndex); - removedIndices.remove(newIndex); - return newIndex; + synchronized(indicesMapsAccessLock) { + long newIndex = firstAllocableIndex++; + loadedIndices.put(newIndex, indexDetails); + dirtyLoadedIndices.add(newIndex); + removedIndices.remove(newIndex); + return newIndex; + } } private IndexDetails getIndexMetadataUnsafe(long index) throws IOException { // Return index details if loaded - IndexDetails details = loadedIndices.getOrDefault(index, null); + IndexDetails details; + synchronized(indicesMapsAccessLock) { + details = loadedIndices.getOrDefault(index, null); + } if (details != null) return details; // Try to load the details from file @@ -221,39 +261,78 @@ public class FileIndexManager implements IndexManager { // Update indices metadata SeekableByteChannel metadata = metadataFileChannel; long lastIndex = -2; - for (long index : dirtyLoadedIndices) { - IndexDetails indexDetails = loadedIndices.get(index); - if (index - lastIndex != 1) { - metadata = metadata.position(index * IndexDetails.TOTAL_BYTES); + synchronized(indicesMapsAccessLock) { + for (long index : dirtyLoadedIndices) { + IndexDetails indexDetails = loadedIndices.get(index); + if (index - lastIndex != 1) { + metadata = metadata.position(index * IndexDetails.TOTAL_BYTES); + } + writeIndexDetails(metadata, indexDetails); + lastIndex = index; } - synchronized (metadataByteBufferLock) { - metadataByteBuffer.rewind(); - metadataByteBuffer.putInt(0); - metadataByteBuffer.putLong(indexDetails.getOffset()); - metadataByteBuffer.putInt(indexDetails.getSize()); - metadataByteBuffer.putInt(indexDetails.getType()); - metadataByteBuffer.rewind(); - metadata.write(metadataByteBuffer); - } - lastIndex = index; } // Remove removed indices - for (long index : removedIndices) { - metadata = metadata.position(index * IndexDetails.TOTAL_BYTES); - synchronized (maskByteBufferLock) { - maskByteBuffer.rewind(); - maskByteBuffer.putInt(IndexDetails.MASK_DELETED); - maskByteBuffer.rewind(); - metadata.write(maskByteBuffer); - } + synchronized(indicesMapsAccessLock) { + for (long index : removedIndices) { + metadata = metadata.position(index * IndexDetails.TOTAL_BYTES); + eraseIndexDetails(metadata); + } } fileAllocator.close(); } + + private void writeIndexDetails(SeekableByteChannel position, IndexDetails indexDetails) throws IOException { + synchronized (metadataByteBufferLock) { + metadataByteBuffer.rewind(); + metadataByteBuffer.putInt(0); + metadataByteBuffer.putLong(indexDetails.getOffset()); + metadataByteBuffer.putInt(indexDetails.getSize()); + metadataByteBuffer.putInt(indexDetails.getType()); + metadataByteBuffer.rewind(); + position.write(metadataByteBuffer); + } + } + + private void eraseIndexDetails(SeekableByteChannel position) throws IOException { + synchronized (maskByteBufferLock) { + maskByteBuffer.rewind(); + maskByteBuffer.putInt(IndexDetails.MASK_DELETED); + maskByteBuffer.rewind(); + position.write(maskByteBuffer); + } + } private void checkClosed() { if (closed) { throw new RuntimeException("Index Manager is closed."); } } + + @Override + public long clean() { + return cleanExtraIndices(); + } + + private long cleanExtraIndices() { + long removedIndices = 0; + synchronized(indicesMapsAccessLock) { + if (loadedIndices.size() > JCWDatabase.MAX_LOADED_REFERENCES) { + long count = loadedIndices.size(); + for (Entry loadedIndex : loadedIndices.long2ObjectEntrySet()) { + if (count < JCWDatabase.MAX_LOADED_REFERENCES * 3l / 2l) { + break; + } + try { + flushAndUnload(loadedIndex.getLongKey()); + } catch (IOException e) { + e.printStackTrace(); + } + removedIndices++; + count--; + } + } + } + return removedIndices; + } } diff --git a/src/main/java/org/warp/jcwdb/IndexManager.java b/src/main/java/org/warp/jcwdb/IndexManager.java index 290b0ad..9c7ee80 100644 --- a/src/main/java/org/warp/jcwdb/IndexManager.java +++ b/src/main/java/org/warp/jcwdb/IndexManager.java @@ -2,7 +2,7 @@ package org.warp.jcwdb; import java.io.IOException; -public interface IndexManager { +public interface IndexManager extends Cleanable { T get(long index, DBReader reader) throws IOException; int getType(long index) throws IOException; long add(DBDataOutput writer) throws IOException; diff --git a/src/main/java/org/warp/jcwdb/JCWDatabase.java b/src/main/java/org/warp/jcwdb/JCWDatabase.java index 0cff920..f57d473 100644 --- a/src/main/java/org/warp/jcwdb/JCWDatabase.java +++ b/src/main/java/org/warp/jcwdb/JCWDatabase.java @@ -7,10 +7,15 @@ import java.util.ArrayList; import it.unimi.dsi.fastutil.longs.Long2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry; -public class JCWDatabase implements AutoCloseable { - protected final TypesManager typesManager; - protected final MixedIndexDatabase indices; +public class JCWDatabase implements AutoCloseable, Cleanable { + public final static long MAX_LOADED_REFERENCES = 10; + public final static long MAX_LOADED_INDICES = 1000; + + private final TypesManager typesManager; + private final MixedIndexDatabase indices; + private final Cleaner databaseCleaner; private final Long2ObjectMap>> references; private volatile boolean closed; private final Object closeLock = new Object(); @@ -28,6 +33,9 @@ public class JCWDatabase implements AutoCloseable { e.printStackTrace(); } })); + this.databaseCleaner = new Cleaner(this); + + this.databaseCleaner.start(); } public EntryReference> getRoot() throws IOException { @@ -140,4 +148,43 @@ public class JCWDatabase implements AutoCloseable { throw new RuntimeException("Index Manager is closed."); } } + + @Override + public long clean() { + return cleanEmptyReferences() + + cleanExtraReferences() + + indices.clean(); + } + + + private long cleanEmptyReferences() { + long removed = 0; + synchronized(referencesAccessLock) { + for (Entry>> entry : references.long2ObjectEntrySet()) { + if (entry.getValue().get() == null) { + references.remove(entry.getLongKey()); + removed++; + } + } + } + return removed; + } + + private long cleanExtraReferences() { + long removedReferences = 0; + synchronized(referencesAccessLock) { + if (references.size() > MAX_LOADED_REFERENCES) { + long count = 0; + for (Entry>> entry : references.long2ObjectEntrySet()) { + if (count > MAX_LOADED_REFERENCES * 3l / 2l) { + references.remove(entry.getLongKey()); + removedReferences++; + } else { + count++; + } + } + } + } + return removedReferences; + } } diff --git a/src/main/java/org/warp/jcwdb/MixedIndexDatabase.java b/src/main/java/org/warp/jcwdb/MixedIndexDatabase.java index 287fd0f..5e525d9 100644 --- a/src/main/java/org/warp/jcwdb/MixedIndexDatabase.java +++ b/src/main/java/org/warp/jcwdb/MixedIndexDatabase.java @@ -71,4 +71,10 @@ public class MixedIndexDatabase implements IndexManager { this.cacheIndices.close(); this.fileIndices.close(); } + + @Override + public long clean() { + return fileIndices.clean() + + cacheIndices.clean(); + } } diff --git a/src/main/java/org/warp/jcwdb/exampleimpl/App.java b/src/main/java/org/warp/jcwdb/exampleimpl/App.java index 389c6eb..e76a69b 100644 --- a/src/main/java/org/warp/jcwdb/exampleimpl/App.java +++ b/src/main/java/org/warp/jcwdb/exampleimpl/App.java @@ -27,7 +27,7 @@ public class App { // System.out.println(" - " + root.get(i)); // } long prectime = System.currentTimeMillis(); - for (int i = 0; i < 2000000; i++) { + for (int i = 0; i < 2/*2000000*/; i++) { root.add("Test " + i); if (i > 0 && i % 200000 == 0) { long precprectime = prectime;