diff --git a/src/main/java/org/warp/jcwdb/CacheIndexManager.java b/src/main/java/org/warp/jcwdb/CacheIndexManager.java index 31c9cb9..015defc 100644 --- a/src/main/java/org/warp/jcwdb/CacheIndexManager.java +++ b/src/main/java/org/warp/jcwdb/CacheIndexManager.java @@ -45,4 +45,9 @@ public class CacheIndexManager implements IndexManager { public void close() { // TODO: implement } + + @Override + public long clean() { + return 0; + } } diff --git a/src/main/java/org/warp/jcwdb/Cleaner.java b/src/main/java/org/warp/jcwdb/Cleaner.java index d264cd8..33cb045 100644 --- a/src/main/java/org/warp/jcwdb/Cleaner.java +++ b/src/main/java/org/warp/jcwdb/Cleaner.java @@ -6,16 +6,24 @@ import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry; public class Cleaner { - private Cleanable[] objectsToClean; - private Thread cleanerThread; - private int sleepInterval; + private static final double MAXIMUM_SLEEP_INTERVAL = 20d * 1000d; // 20 minutes + private static final double MINIMUM_SLEEP_INTERVAL = 1d * 1000d; // 1 second + private static final double NORMAL_REMOVED_ITEMS = 1000l; + private static final double REMOVED_ITEMS_RATIO = 2.5d; // 250% + + private final Cleanable[] objectsToClean; + private final Thread cleanerThread; + private int sleepInterval = (int) MINIMUM_SLEEP_INTERVAL; public Cleaner(Cleanable... objectsToClean) { this.objectsToClean = objectsToClean; + this.cleanerThread = new Thread(new CleanLoop()); + this.cleanerThread.setName("Cleaner thread"); + this.cleanerThread.setDaemon(true); } public void start() { - this.cleanerThread = new Thread(new CleanLoop()); + this.cleanerThread.start(); } /** @@ -33,6 +41,13 @@ public class Cleaner { public void stop() { if (cleanerThread != null) { cleanerThread.interrupt(); + while (cleanerThread.isAlive()) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } } } @@ -42,8 +57,35 @@ public class Cleaner { public void run() { while(!cleanerThread.isInterrupted()) { try { + System.out.println("[CLEANER] Waiting " + sleepInterval + "ms."); Thread.sleep(sleepInterval); - System.out.println("Cleaned " + clean() + " items."); + final double removedItems = clean(); + double suggestedExecutionTimeByItemsCalculations = sleepInterval; + + System.out.println("[CLEANER] REMOVED_ITEMS: " + removedItems); + if (removedItems > 0) { + final double removedItemsRatio = removedItems / NORMAL_REMOVED_ITEMS; + System.out.println("[CLEANER] REMOVED_ITEMS_RATIO: " + removedItemsRatio); + if (removedItemsRatio < 1d / REMOVED_ITEMS_RATIO || removedItemsRatio > REMOVED_ITEMS_RATIO) { + suggestedExecutionTimeByItemsCalculations = sleepInterval / removedItemsRatio; + } + } + + System.out.println("[CLEANER] Items: SUGGESTING SLEEP_INTERVAL FROM " + sleepInterval + "ms TO " + suggestedExecutionTimeByItemsCalculations + "ms"); + + double newSleepInterval = suggestedExecutionTimeByItemsCalculations; + System.out.println("[CLEANER] Total: SUGGESTING SLEEP_INTERVAL FROM " + sleepInterval + "ms TO " + newSleepInterval + "ms"); + if (newSleepInterval > MAXIMUM_SLEEP_INTERVAL) { + sleepInterval = (int) MAXIMUM_SLEEP_INTERVAL; + } else if (newSleepInterval < MINIMUM_SLEEP_INTERVAL) { + sleepInterval = (int) MINIMUM_SLEEP_INTERVAL; + } else { + System.out.println("[CLEANER] CHANGED SLEEP_INTERVAL FROM " + sleepInterval + "ms TO " + newSleepInterval + "ms"); + sleepInterval = (int) newSleepInterval; + } + + + System.out.println("[CLEANER] Cleaned " + removedItems + " items."); } catch (InterruptedException e) { } } diff --git a/src/main/java/org/warp/jcwdb/DBStandardTypes.java b/src/main/java/org/warp/jcwdb/DBStandardTypes.java index 02f2611..23d8f3f 100644 --- a/src/main/java/org/warp/jcwdb/DBStandardTypes.java +++ b/src/main/java/org/warp/jcwdb/DBStandardTypes.java @@ -19,4 +19,4 @@ public class DBStandardTypes { typesManager.registerType(LightList.class, LIGHT_LIST, new DBLightListParser(db)); typesManager.registerTypeFallback(new DBGenericObjectParser()); } -} +} \ No newline at end of file diff --git a/src/main/java/org/warp/jcwdb/EntryReference.java b/src/main/java/org/warp/jcwdb/EntryReference.java index 4bc3a94..2cdf849 100644 --- a/src/main/java/org/warp/jcwdb/EntryReference.java +++ b/src/main/java/org/warp/jcwdb/EntryReference.java @@ -6,22 +6,22 @@ import java.io.IOException; * You must have only a maximum of 1 reference for each index * @param */ -public class EntryReference implements Castable, AutoCloseable { - private final JCWDatabase db; +public class EntryReference implements Castable { + private final JCWDatabase.EntryReferenceTools db; private final long entryIndex; private final DBTypeParser parser; public T value; private volatile boolean closed; private final Object closeLock = new Object(); - public EntryReference(JCWDatabase db, long entryId, DBTypeParser parser) throws IOException { + public EntryReference(JCWDatabase.EntryReferenceTools db, long entryId, DBTypeParser parser) throws IOException { this.db = db; this.entryIndex = entryId; this.parser = parser; - this.value = db.indices.get(entryId, parser.getReader()); + this.value = db.read(entryId, parser.getReader()); } - public EntryReference(JCWDatabase db, long entryId, DBTypeParser parser, T value) { + public EntryReference(JCWDatabase.EntryReferenceTools db, long entryId, DBTypeParser parser, T value) { this.db = db; this.entryIndex = entryId; this.parser = parser; @@ -38,7 +38,7 @@ public class EntryReference implements Castable, AutoCloseable { public void save() throws IOException { if (!closed) { - db.indices.set(entryIndex, parser.getWriter(value)); + db.write(entryIndex, parser.getWriter(value)); } } @@ -47,8 +47,7 @@ public class EntryReference implements Castable, AutoCloseable { return (T) this; } - @Override - public void close() throws IOException { + protected void close() throws IOException { if (closed) { return; } @@ -57,7 +56,6 @@ public class EntryReference implements Castable, AutoCloseable { return; } - db.removeEntryReference(entryIndex); save(); closed = true; diff --git a/src/main/java/org/warp/jcwdb/FileIndexManager.java b/src/main/java/org/warp/jcwdb/FileIndexManager.java index 161cec8..6c75054 100644 --- a/src/main/java/org/warp/jcwdb/FileIndexManager.java +++ b/src/main/java/org/warp/jcwdb/FileIndexManager.java @@ -3,10 +3,7 @@ package org.warp.jcwdb; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; -import it.unimi.dsi.fastutil.longs.Long2ObjectMap; -import it.unimi.dsi.fastutil.longs.LongAVLTreeSet; -import it.unimi.dsi.fastutil.longs.LongSet; +import it.unimi.dsi.fastutil.longs.*; import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry; import java.io.IOException; @@ -17,6 +14,7 @@ import java.nio.channels.SeekableByteChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.Iterator; public class FileIndexManager implements IndexManager { private final SeekableByteChannel dataFileChannel, metadataFileChannel; @@ -42,9 +40,9 @@ public class FileIndexManager implements IndexManager { private long firstAllocableIndex; public FileIndexManager(Path dataFile, Path metadataFile) throws IOException { - loadedIndices = new Long2ObjectAVLTreeMap<>(); - dirtyLoadedIndices = new LongAVLTreeSet(); - removedIndices = new LongAVLTreeSet(); + loadedIndices = new Long2ObjectOpenHashMap<>(); + dirtyLoadedIndices = new LongOpenHashSet(); + removedIndices = new LongOpenHashSet(); if (Files.notExists(dataFile)) { Files.createFile(dataFile); } @@ -319,12 +317,14 @@ public class FileIndexManager implements IndexManager { synchronized(indicesMapsAccessLock) { if (loadedIndices.size() > JCWDatabase.MAX_LOADED_REFERENCES) { long count = loadedIndices.size(); - for (Entry loadedIndex : loadedIndices.long2ObjectEntrySet()) { - if (count < JCWDatabase.MAX_LOADED_REFERENCES * 3l / 2l) { + LongIterator it = loadedIndices.keySet().iterator(); + while(it.hasNext()) { + long loadedIndex = it.nextLong(); + if (count < JCWDatabase.MAX_LOADED_REFERENCES * 3l / 2l) { break; } try { - flushAndUnload(loadedIndex.getLongKey()); + flushAndUnload(loadedIndex); } catch (IOException e) { e.printStackTrace(); } diff --git a/src/main/java/org/warp/jcwdb/JCWDatabase.java b/src/main/java/org/warp/jcwdb/JCWDatabase.java index f57d473..f775d5c 100644 --- a/src/main/java/org/warp/jcwdb/JCWDatabase.java +++ b/src/main/java/org/warp/jcwdb/JCWDatabase.java @@ -4,10 +4,12 @@ import java.io.IOException; import java.lang.ref.WeakReference; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Iterator; import it.unimi.dsi.fastutil.longs.Long2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.longs.Long2ObjectMap; import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry; +import it.unimi.dsi.fastutil.objects.ObjectIterator; public class JCWDatabase implements AutoCloseable, Cleanable { public final static long MAX_LOADED_REFERENCES = 10; @@ -16,6 +18,7 @@ public class JCWDatabase implements AutoCloseable, Cleanable { private final TypesManager typesManager; private final MixedIndexDatabase indices; private final Cleaner databaseCleaner; + private final EntryReferenceTools entryReferenceTools = new EntryReferenceTools(); private final Long2ObjectMap>> references; private volatile boolean closed; private final Object closeLock = new Object(); @@ -59,8 +62,8 @@ public class JCWDatabase implements AutoCloseable, Cleanable { type = this.indices.getType(index); } DBTypeParser typeParser = this.typesManager.get(type); - ref = new EntryReference<>(this, index, typeParser); - refRef = new WeakReference>(ref); + ref = new EntryReference<>(entryReferenceTools, index, typeParser); + refRef = new WeakReference<>(ref); this.references.put(index, refRef); } return ref; @@ -76,8 +79,8 @@ public class JCWDatabase implements AutoCloseable, Cleanable { synchronized (indicesAccessLock) { index = indices.add(typeParser.getWriter(value)); } - ref = new EntryReference<>(this, index, typeParser, value); - this.references.put(index, new WeakReference>(ref)); + ref = new EntryReference<>(entryReferenceTools, index, typeParser, value); + this.references.put(index, new WeakReference<>(ref)); return ref; } } @@ -104,19 +107,13 @@ public class JCWDatabase implements AutoCloseable, Cleanable { synchronized (indicesAccessLock) { indices.set(index, typeParser.getWriter(value)); } - ref = new EntryReference<>(this, index, typeParser); + ref = new EntryReference<>(entryReferenceTools, index, typeParser); this.references.put(index, new WeakReference>(ref)); return ref; } } } - protected void removeEntryReference(long index) { - synchronized (referencesAccessLock) { - this.references.remove(index); - } - } - @Override public void close() throws IOException { if (closed) { @@ -129,12 +126,18 @@ public class JCWDatabase implements AutoCloseable, Cleanable { closed = true; } + this.databaseCleaner.stop(); + synchronized (referencesAccessLock) { - for (WeakReference> referenceRef : references.values()) { + ObjectIterator>> iterator = references.values().iterator(); + while (iterator.hasNext()) { + WeakReference> referenceRef = iterator.next(); EntryReference reference = referenceRef.get(); if (reference != null) { reference.close(); + iterator.remove(); } + } } synchronized (indicesAccessLock) { @@ -151,18 +154,21 @@ public class JCWDatabase implements AutoCloseable, Cleanable { @Override public long clean() { - return cleanEmptyReferences() + long removedItems = cleanEmptyReferences() + cleanExtraReferences() + indices.clean(); + return removedItems; } private long cleanEmptyReferences() { long removed = 0; synchronized(referencesAccessLock) { - for (Entry>> entry : references.long2ObjectEntrySet()) { + ObjectIterator>>> iterator = references.long2ObjectEntrySet().iterator(); + while (iterator.hasNext()) { + Entry>> entry = iterator.next(); if (entry.getValue().get() == null) { - references.remove(entry.getLongKey()); + iterator.remove(); removed++; } } @@ -175,9 +181,11 @@ public class JCWDatabase implements AutoCloseable, Cleanable { synchronized(referencesAccessLock) { if (references.size() > MAX_LOADED_REFERENCES) { long count = 0; - for (Entry>> entry : references.long2ObjectEntrySet()) { + ObjectIterator>>> iterator = references.long2ObjectEntrySet().iterator(); + while (iterator.hasNext()) { + Entry>> entry = iterator.next(); if (count > MAX_LOADED_REFERENCES * 3l / 2l) { - references.remove(entry.getLongKey()); + iterator.remove(); removedReferences++; } else { count++; @@ -187,4 +195,18 @@ public class JCWDatabase implements AutoCloseable, Cleanable { } return removedReferences; } + + public class EntryReferenceTools { + private EntryReferenceTools() { + + } + + public T read(long index, DBReader reader) throws IOException { + return indices.get(index, reader); + } + + public void write(long index, DBDataOutput writer) throws IOException { + indices.set(index, writer); + } + } } diff --git a/src/main/java/org/warp/jcwdb/exampleimpl/App.java b/src/main/java/org/warp/jcwdb/exampleimpl/App.java index e76a69b..85c11d1 100644 --- a/src/main/java/org/warp/jcwdb/exampleimpl/App.java +++ b/src/main/java/org/warp/jcwdb/exampleimpl/App.java @@ -27,7 +27,7 @@ public class App { // System.out.println(" - " + root.get(i)); // } long prectime = System.currentTimeMillis(); - for (int i = 0; i < 2/*2000000*/; i++) { + for (int i = 0; i < 2000000/*2000000*/; i++) { root.add("Test " + i); if (i > 0 && i % 200000 == 0) { long precprectime = prectime; @@ -39,9 +39,15 @@ public class App { System.out.println("Root size: "+root.size()); System.out.println("Time elapsed: " + (time2 - time1)); System.out.println("Used memory: " + ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024) + "MB"); + System.out.println("Cleaning database (to reduce the amount of used memory and detect memory leaks)..."); + long removedItems = db.clean(); + long time3 = System.currentTimeMillis(); + System.out.println("Removed items: " + removedItems); + System.out.println("Used memory: " + ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024) + "MB"); + System.out.println("Time elapsed: " + (time3 - time2)); System.out.println("Saving database..."); db.close(); - long time3 = System.currentTimeMillis(); - System.out.println("Time elapsed: " + (time3 - time2)); + long time4 = System.currentTimeMillis(); + System.out.println("Time elapsed: " + (time4 - time3)); } }