New Cleaner Logic

This commit is contained in:
Andrea Cavalli 2018-12-05 02:39:41 +01:00
parent 9db950fe1c
commit cbfa8584fe
7 changed files with 118 additions and 45 deletions

View File

@ -45,4 +45,9 @@ public class CacheIndexManager implements IndexManager {
public void close() {
// TODO: implement
}
@Override
public long clean() {
return 0;
}
}

View File

@ -6,16 +6,24 @@ import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry;
public class Cleaner {
private Cleanable[] objectsToClean;
private Thread cleanerThread;
private int sleepInterval;
private static final double MAXIMUM_SLEEP_INTERVAL = 20d * 1000d; // 20 minutes
private static final double MINIMUM_SLEEP_INTERVAL = 1d * 1000d; // 1 second
private static final double NORMAL_REMOVED_ITEMS = 1000l;
private static final double REMOVED_ITEMS_RATIO = 2.5d; // 250%
private final Cleanable[] objectsToClean;
private final Thread cleanerThread;
private int sleepInterval = (int) MINIMUM_SLEEP_INTERVAL;
public Cleaner(Cleanable... objectsToClean) {
this.objectsToClean = objectsToClean;
this.cleanerThread = new Thread(new CleanLoop());
this.cleanerThread.setName("Cleaner thread");
this.cleanerThread.setDaemon(true);
}
public void start() {
this.cleanerThread = new Thread(new CleanLoop());
this.cleanerThread.start();
}
/**
@ -33,6 +41,13 @@ public class Cleaner {
public void stop() {
if (cleanerThread != null) {
cleanerThread.interrupt();
while (cleanerThread.isAlive()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@ -42,8 +57,35 @@ public class Cleaner {
public void run() {
while(!cleanerThread.isInterrupted()) {
try {
System.out.println("[CLEANER] Waiting " + sleepInterval + "ms.");
Thread.sleep(sleepInterval);
System.out.println("Cleaned " + clean() + " items.");
final double removedItems = clean();
double suggestedExecutionTimeByItemsCalculations = sleepInterval;
System.out.println("[CLEANER] REMOVED_ITEMS: " + removedItems);
if (removedItems > 0) {
final double removedItemsRatio = removedItems / NORMAL_REMOVED_ITEMS;
System.out.println("[CLEANER] REMOVED_ITEMS_RATIO: " + removedItemsRatio);
if (removedItemsRatio < 1d / REMOVED_ITEMS_RATIO || removedItemsRatio > REMOVED_ITEMS_RATIO) {
suggestedExecutionTimeByItemsCalculations = sleepInterval / removedItemsRatio;
}
}
System.out.println("[CLEANER] Items: SUGGESTING SLEEP_INTERVAL FROM " + sleepInterval + "ms TO " + suggestedExecutionTimeByItemsCalculations + "ms");
double newSleepInterval = suggestedExecutionTimeByItemsCalculations;
System.out.println("[CLEANER] Total: SUGGESTING SLEEP_INTERVAL FROM " + sleepInterval + "ms TO " + newSleepInterval + "ms");
if (newSleepInterval > MAXIMUM_SLEEP_INTERVAL) {
sleepInterval = (int) MAXIMUM_SLEEP_INTERVAL;
} else if (newSleepInterval < MINIMUM_SLEEP_INTERVAL) {
sleepInterval = (int) MINIMUM_SLEEP_INTERVAL;
} else {
System.out.println("[CLEANER] CHANGED SLEEP_INTERVAL FROM " + sleepInterval + "ms TO " + newSleepInterval + "ms");
sleepInterval = (int) newSleepInterval;
}
System.out.println("[CLEANER] Cleaned " + removedItems + " items.");
} catch (InterruptedException e) {
}
}

View File

@ -19,4 +19,4 @@ public class DBStandardTypes {
typesManager.registerType(LightList.class, LIGHT_LIST, new DBLightListParser(db));
typesManager.registerTypeFallback(new DBGenericObjectParser());
}
}
}

View File

@ -6,22 +6,22 @@ import java.io.IOException;
* You must have only a maximum of 1 reference for each index
* @param <T>
*/
public class EntryReference<T> implements Castable, AutoCloseable {
private final JCWDatabase db;
public class EntryReference<T> implements Castable {
private final JCWDatabase.EntryReferenceTools db;
private final long entryIndex;
private final DBTypeParser<T> parser;
public T value;
private volatile boolean closed;
private final Object closeLock = new Object();
public EntryReference(JCWDatabase db, long entryId, DBTypeParser<T> parser) throws IOException {
public EntryReference(JCWDatabase.EntryReferenceTools db, long entryId, DBTypeParser<T> parser) throws IOException {
this.db = db;
this.entryIndex = entryId;
this.parser = parser;
this.value = db.indices.get(entryId, parser.getReader());
this.value = db.read(entryId, parser.getReader());
}
public EntryReference(JCWDatabase db, long entryId, DBTypeParser<T> parser, T value) {
public EntryReference(JCWDatabase.EntryReferenceTools db, long entryId, DBTypeParser<T> parser, T value) {
this.db = db;
this.entryIndex = entryId;
this.parser = parser;
@ -38,7 +38,7 @@ public class EntryReference<T> implements Castable, AutoCloseable {
public void save() throws IOException {
if (!closed) {
db.indices.set(entryIndex, parser.getWriter(value));
db.write(entryIndex, parser.getWriter(value));
}
}
@ -47,8 +47,7 @@ public class EntryReference<T> implements Castable, AutoCloseable {
return (T) this;
}
@Override
public void close() throws IOException {
protected void close() throws IOException {
if (closed) {
return;
}
@ -57,7 +56,6 @@ public class EntryReference<T> implements Castable, AutoCloseable {
return;
}
db.removeEntryReference(entryIndex);
save();
closed = true;

View File

@ -3,10 +3,7 @@ package org.warp.jcwdb;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.LongAVLTreeSet;
import it.unimi.dsi.fastutil.longs.LongSet;
import it.unimi.dsi.fastutil.longs.*;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry;
import java.io.IOException;
@ -17,6 +14,7 @@ import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Iterator;
public class FileIndexManager implements IndexManager {
private final SeekableByteChannel dataFileChannel, metadataFileChannel;
@ -42,9 +40,9 @@ public class FileIndexManager implements IndexManager {
private long firstAllocableIndex;
public FileIndexManager(Path dataFile, Path metadataFile) throws IOException {
loadedIndices = new Long2ObjectAVLTreeMap<>();
dirtyLoadedIndices = new LongAVLTreeSet();
removedIndices = new LongAVLTreeSet();
loadedIndices = new Long2ObjectOpenHashMap<>();
dirtyLoadedIndices = new LongOpenHashSet();
removedIndices = new LongOpenHashSet();
if (Files.notExists(dataFile)) {
Files.createFile(dataFile);
}
@ -319,12 +317,14 @@ public class FileIndexManager implements IndexManager {
synchronized(indicesMapsAccessLock) {
if (loadedIndices.size() > JCWDatabase.MAX_LOADED_REFERENCES) {
long count = loadedIndices.size();
for (Entry<IndexDetails> loadedIndex : loadedIndices.long2ObjectEntrySet()) {
if (count < JCWDatabase.MAX_LOADED_REFERENCES * 3l / 2l) {
LongIterator it = loadedIndices.keySet().iterator();
while(it.hasNext()) {
long loadedIndex = it.nextLong();
if (count < JCWDatabase.MAX_LOADED_REFERENCES * 3l / 2l) {
break;
}
try {
flushAndUnload(loadedIndex.getLongKey());
flushAndUnload(loadedIndex);
} catch (IOException e) {
e.printStackTrace();
}

View File

@ -4,10 +4,12 @@ import java.io.IOException;
import java.lang.ref.WeakReference;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Iterator;
import it.unimi.dsi.fastutil.longs.Long2ObjectLinkedOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
public class JCWDatabase implements AutoCloseable, Cleanable {
public final static long MAX_LOADED_REFERENCES = 10;
@ -16,6 +18,7 @@ public class JCWDatabase implements AutoCloseable, Cleanable {
private final TypesManager typesManager;
private final MixedIndexDatabase indices;
private final Cleaner databaseCleaner;
private final EntryReferenceTools entryReferenceTools = new EntryReferenceTools();
private final Long2ObjectMap<WeakReference<EntryReference<?>>> references;
private volatile boolean closed;
private final Object closeLock = new Object();
@ -59,8 +62,8 @@ public class JCWDatabase implements AutoCloseable, Cleanable {
type = this.indices.getType(index);
}
DBTypeParser<T> typeParser = this.typesManager.get(type);
ref = new EntryReference<>(this, index, typeParser);
refRef = new WeakReference<EntryReference<?>>(ref);
ref = new EntryReference<>(entryReferenceTools, index, typeParser);
refRef = new WeakReference<>(ref);
this.references.put(index, refRef);
}
return ref;
@ -76,8 +79,8 @@ public class JCWDatabase implements AutoCloseable, Cleanable {
synchronized (indicesAccessLock) {
index = indices.add(typeParser.getWriter(value));
}
ref = new EntryReference<>(this, index, typeParser, value);
this.references.put(index, new WeakReference<EntryReference<?>>(ref));
ref = new EntryReference<>(entryReferenceTools, index, typeParser, value);
this.references.put(index, new WeakReference<>(ref));
return ref;
}
}
@ -104,19 +107,13 @@ public class JCWDatabase implements AutoCloseable, Cleanable {
synchronized (indicesAccessLock) {
indices.set(index, typeParser.getWriter(value));
}
ref = new EntryReference<>(this, index, typeParser);
ref = new EntryReference<>(entryReferenceTools, index, typeParser);
this.references.put(index, new WeakReference<EntryReference<?>>(ref));
return ref;
}
}
}
protected void removeEntryReference(long index) {
synchronized (referencesAccessLock) {
this.references.remove(index);
}
}
@Override
public void close() throws IOException {
if (closed) {
@ -129,12 +126,18 @@ public class JCWDatabase implements AutoCloseable, Cleanable {
closed = true;
}
this.databaseCleaner.stop();
synchronized (referencesAccessLock) {
for (WeakReference<EntryReference<?>> referenceRef : references.values()) {
ObjectIterator<WeakReference<EntryReference<?>>> iterator = references.values().iterator();
while (iterator.hasNext()) {
WeakReference<EntryReference<?>> referenceRef = iterator.next();
EntryReference<?> reference = referenceRef.get();
if (reference != null) {
reference.close();
iterator.remove();
}
}
}
synchronized (indicesAccessLock) {
@ -151,18 +154,21 @@ public class JCWDatabase implements AutoCloseable, Cleanable {
@Override
public long clean() {
return cleanEmptyReferences()
long removedItems = cleanEmptyReferences()
+ cleanExtraReferences()
+ indices.clean();
return removedItems;
}
private long cleanEmptyReferences() {
long removed = 0;
synchronized(referencesAccessLock) {
for (Entry<WeakReference<EntryReference<?>>> entry : references.long2ObjectEntrySet()) {
ObjectIterator<Entry<WeakReference<EntryReference<?>>>> iterator = references.long2ObjectEntrySet().iterator();
while (iterator.hasNext()) {
Entry<WeakReference<EntryReference<?>>> entry = iterator.next();
if (entry.getValue().get() == null) {
references.remove(entry.getLongKey());
iterator.remove();
removed++;
}
}
@ -175,9 +181,11 @@ public class JCWDatabase implements AutoCloseable, Cleanable {
synchronized(referencesAccessLock) {
if (references.size() > MAX_LOADED_REFERENCES) {
long count = 0;
for (Entry<WeakReference<EntryReference<?>>> entry : references.long2ObjectEntrySet()) {
ObjectIterator<Entry<WeakReference<EntryReference<?>>>> iterator = references.long2ObjectEntrySet().iterator();
while (iterator.hasNext()) {
Entry<WeakReference<EntryReference<?>>> entry = iterator.next();
if (count > MAX_LOADED_REFERENCES * 3l / 2l) {
references.remove(entry.getLongKey());
iterator.remove();
removedReferences++;
} else {
count++;
@ -187,4 +195,18 @@ public class JCWDatabase implements AutoCloseable, Cleanable {
}
return removedReferences;
}
public class EntryReferenceTools {
private EntryReferenceTools() {
}
public <T> T read(long index, DBReader<T> reader) throws IOException {
return indices.get(index, reader);
}
public <T> void write(long index, DBDataOutput<T> writer) throws IOException {
indices.set(index, writer);
}
}
}

View File

@ -27,7 +27,7 @@ public class App {
// System.out.println(" - " + root.get(i));
// }
long prectime = System.currentTimeMillis();
for (int i = 0; i < 2/*2000000*/; i++) {
for (int i = 0; i < 2000000/*2000000*/; i++) {
root.add("Test " + i);
if (i > 0 && i % 200000 == 0) {
long precprectime = prectime;
@ -39,9 +39,15 @@ public class App {
System.out.println("Root size: "+root.size());
System.out.println("Time elapsed: " + (time2 - time1));
System.out.println("Used memory: " + ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024) + "MB");
System.out.println("Cleaning database (to reduce the amount of used memory and detect memory leaks)...");
long removedItems = db.clean();
long time3 = System.currentTimeMillis();
System.out.println("Removed items: " + removedItems);
System.out.println("Used memory: " + ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024) + "MB");
System.out.println("Time elapsed: " + (time3 - time2));
System.out.println("Saving database...");
db.close();
long time3 = System.currentTimeMillis();
System.out.println("Time elapsed: " + (time3 - time2));
long time4 = System.currentTimeMillis();
System.out.println("Time elapsed: " + (time4 - time3));
}
}