Fixed cleaner crash

This commit is contained in:
Andrea Cavalli 2018-12-06 12:30:04 +01:00
parent aa5cf24add
commit b00cd8d6b5
5 changed files with 113 additions and 93 deletions

View File

@ -1,6 +1,8 @@
package org.warp.jcwdb;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.nio.channels.ClosedChannelException;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry;
@ -14,6 +16,7 @@ public class Cleaner {
private final Cleanable[] objectsToClean;
private final Thread cleanerThread;
private int sleepInterval = (int) MINIMUM_SLEEP_INTERVAL;
private volatile boolean stopRequest = false;
public Cleaner(Cleanable... objectsToClean) {
this.objectsToClean = objectsToClean;
@ -35,12 +38,13 @@ public class Cleaner {
for (Cleanable cleanable : objectsToClean) {
cleanedItems += cleanable.clean();
}
System.gc();
return cleanedItems;
}
public void stop() {
if (cleanerThread != null) {
cleanerThread.interrupt();
stopRequest = true;
while (cleanerThread.isAlive()) {
try {
Thread.sleep(100);
@ -55,7 +59,7 @@ public class Cleaner {
@Override
public void run() {
while(!cleanerThread.isInterrupted()) {
while(!stopRequest) {
try {
System.out.println("[CLEANER] Waiting " + sleepInterval + "ms.");
Thread.sleep(sleepInterval);
@ -86,7 +90,8 @@ public class Cleaner {
System.out.println("[CLEANER] Cleaned " + removedItems + " items.");
} catch (InterruptedException e) {
}catch (InterruptedException e) {
}
}
}

View File

@ -79,8 +79,6 @@ public class EntryReference<T> implements Castable {
return;
}
db.removeEntryReference(entryIndex);
closed = true;
}
}

View File

@ -2,19 +2,15 @@ package org.warp.jcwdb;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import it.unimi.dsi.fastutil.longs.*;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Iterator;
public class FileIndexManager implements IndexManager {
private final SeekableByteChannel dataFileChannel, metadataFileChannel;
@ -81,7 +77,7 @@ public class FileIndexManager implements IndexManager {
} else {
if (indexDetails.getSize() > data.getSize()) {
editIndex(index, indexDetails.getOffset(), data.getSize(), indexDetails.getType(), data.calculateHash());
fileAllocator.markFree(indexDetails.getOffset()+data.getSize(), data.getSize());
fileAllocator.markFree(indexDetails.getOffset() + data.getSize(), data.getSize());
}
writeExact(indexDetails, data);
}
@ -103,6 +99,7 @@ public class FileIndexManager implements IndexManager {
/**
* Write the data at index.
* The input size must be equal to the index size!
*
* @param indexDetails
* @param data
* @throws IOException
@ -135,16 +132,16 @@ public class FileIndexManager implements IndexManager {
if (indexDetails != null) {
fileAllocator.markFree(indexDetails.getOffset(), indexDetails.getSize());
}
synchronized(indicesMapsAccessLock) {
synchronized (indicesMapsAccessLock) {
dirtyLoadedIndices.remove(index);
loadedIndices.remove(index);
removedIndices.add(index);
}
}
public void flushAndUnload(long index) throws IOException {
if (removedIndices.contains(index)) {
synchronized(indicesMapsAccessLock) {
synchronized (indicesMapsAccessLock) {
removedIndices.remove(index);
dirtyLoadedIndices.remove(index);
loadedIndices.remove(index);
@ -154,18 +151,18 @@ public class FileIndexManager implements IndexManager {
eraseIndexDetails(metadata);
}
if (dirtyLoadedIndices.contains(index)) {
synchronized(indicesMapsAccessLock) {
synchronized (indicesMapsAccessLock) {
dirtyLoadedIndices.remove(index);
}
// Update indices metadata
SeekableByteChannel metadata = metadataFileChannel.position(index * IndexDetails.TOTAL_BYTES);
IndexDetails indexDetails;
synchronized(indicesMapsAccessLock) {
synchronized (indicesMapsAccessLock) {
indexDetails = loadedIndices.get(index);
}
writeIndexDetails(metadata, indexDetails);
}
synchronized(indicesMapsAccessLock) {
synchronized (indicesMapsAccessLock) {
loadedIndices.remove(index);
}
}
@ -188,14 +185,14 @@ public class FileIndexManager implements IndexManager {
}
private void editIndex(long index, IndexDetails details) {
synchronized(indicesMapsAccessLock) {
synchronized (indicesMapsAccessLock) {
loadedIndices.put(index, details);
dirtyLoadedIndices.add(index);
}
}
private long createIndexMetadata(IndexDetails indexDetails) {
synchronized(indicesMapsAccessLock) {
synchronized (indicesMapsAccessLock) {
long newIndex = firstAllocableIndex++;
loadedIndices.put(newIndex, indexDetails);
dirtyLoadedIndices.add(newIndex);
@ -207,7 +204,7 @@ public class FileIndexManager implements IndexManager {
private IndexDetails getIndexMetadataUnsafe(long index) throws IOException {
// Return index details if loaded
IndexDetails details;
synchronized(indicesMapsAccessLock) {
synchronized (indicesMapsAccessLock) {
details = loadedIndices.getOrDefault(index, null);
}
if (details != null) return details;
@ -265,7 +262,7 @@ public class FileIndexManager implements IndexManager {
// Update indices metadata
SeekableByteChannel metadata = metadataFileChannel;
long lastIndex = -2;
synchronized(indicesMapsAccessLock) {
synchronized (indicesMapsAccessLock) {
for (long index : dirtyLoadedIndices) {
IndexDetails indexDetails = loadedIndices.get(index);
if (index - lastIndex != 1) {
@ -277,15 +274,15 @@ public class FileIndexManager implements IndexManager {
}
// Remove removed indices
synchronized(indicesMapsAccessLock) {
synchronized (indicesMapsAccessLock) {
for (long index : removedIndices) {
metadata = metadata.position(index * IndexDetails.TOTAL_BYTES);
eraseIndexDetails(metadata);
}
}
}
fileAllocator.close();
}
private void writeIndexDetails(SeekableByteChannel position, IndexDetails indexDetails) throws IOException {
synchronized (metadataByteBufferLock) {
final int size = indexDetails.getSize();
@ -303,7 +300,7 @@ public class FileIndexManager implements IndexManager {
position.write(metadataByteBuffer);
}
}
private void eraseIndexDetails(SeekableByteChannel position) throws IOException {
synchronized (maskByteBufferLock) {
maskByteBuffer.rewind();
@ -323,28 +320,32 @@ public class FileIndexManager implements IndexManager {
public long clean() {
return cleanExtraIndices();
}
private long cleanExtraIndices() {
long removedIndices = 0;
synchronized(indicesMapsAccessLock) {
LongArrayList toUnload = new LongArrayList();
synchronized (indicesMapsAccessLock) {
if (loadedIndices.size() > JCWDatabase.MAX_LOADED_REFERENCES) {
long count = loadedIndices.size();
LongIterator it = loadedIndices.keySet().iterator();
while(it.hasNext()) {
while (it.hasNext()) {
long loadedIndex = it.nextLong();
if (count < JCWDatabase.MAX_LOADED_REFERENCES * 3l / 2l) {
break;
}
try {
flushAndUnload(loadedIndex);
} catch (IOException e) {
e.printStackTrace();
}
toUnload.add(loadedIndex);
removedIndices++;
count--;
}
}
}
for (long index : toUnload.elements()) {
try {
flushAndUnload(index);
} catch (IOException e) {
e.printStackTrace();
}
}
return removedIndices;
}
}

View File

@ -13,8 +13,8 @@ import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
public class JCWDatabase implements AutoCloseable, Cleanable {
public final static long MAX_LOADED_REFERENCES = 10;
public final static long MAX_LOADED_INDICES = 1000;
public final static long MAX_LOADED_REFERENCES = 1000;
public final static long MAX_LOADED_INDICES = 10000;
private final TypesManager typesManager;
private final MixedIndexDatabase indices;
@ -119,6 +119,10 @@ public class JCWDatabase implements AutoCloseable, Cleanable {
}
}
public boolean isOpen() {
return !closed;
}
@Override
public void close() throws IOException {
if (closed) {

View File

@ -12,70 +12,82 @@ import java.nio.file.Paths;
import java.util.function.Predicate;
public class App {
public static void main(String[] args) throws Exception {
if (args.length > 2 && Boolean.parseBoolean(args[2])) {
Files.delete(Paths.get(args[0]));
Files.delete(Paths.get(args[1]));
}
System.out.println("Loading database...");
long time0 = System.currentTimeMillis();
JCWDatabase db = new JCWDatabase(Paths.get(args[0]), Paths.get(args[1]));
long time01 = System.currentTimeMillis();
System.out.println("Time elapsed: " + (time01 - time0));
System.out.println("Loading root...");
EntryReference<LightList<Animal>> rootRef = db.getRoot(Animal.class);
LightList<Animal> root = rootRef.value;
long time1 = System.currentTimeMillis();
System.out.println("Time elapsed: " + (time1 - time01));
System.out.println("Root size: " + root.size());
System.out.println("Root:");
public static void main(String[] args) {
try {
if (args.length > 2 && Boolean.parseBoolean(args[2])) {
Files.delete(Paths.get(args[0]));
Files.delete(Paths.get(args[1]));
}
System.out.println("Loading database...");
long time0 = System.currentTimeMillis();
JCWDatabase db = new JCWDatabase(Paths.get(args[0]), Paths.get(args[1]));
try {
long time01 = System.currentTimeMillis();
System.out.println("Time elapsed: " + (time01 - time0));
System.out.println("Loading root...");
EntryReference<LightList<Animal>> rootRef = db.getRoot(Animal.class);
LightList<Animal> root = rootRef.value;
long time1 = System.currentTimeMillis();
System.out.println("Time elapsed: " + (time1 - time01));
System.out.println("Root size: " + root.size());
System.out.println("Root:");
// for (int i = 0; i < root.size(); i++) {
// System.out.println(" - " + root.get(i));
// }
long prectime = System.currentTimeMillis();
for (int i = 0; i < 2000000000/* 2000000 */; i++) {
Animal animal = new StrangeAnimal(i % 40);
root.add(animal);
if (i > 0 && i % 200000 == 0) {
long precprectime = prectime;
prectime = System.currentTimeMillis();
System.out.println("Element " + i + " (" + (prectime - precprectime) + "ms)");
long prectime = System.currentTimeMillis();
for (int i = 0; i < 2000000/* 2000000 */; i++) {
Animal animal = new StrangeAnimal(i % 40);
root.add(animal);
if (i > 0 && i % 200000 == 0) {
long precprectime = prectime;
prectime = System.currentTimeMillis();
System.out.println("Element " + i + " (" + (prectime - precprectime) + "ms)");
}
}
long time2 = System.currentTimeMillis();
System.out.println("Root size: " + root.size());
System.out.println("Time elapsed: " + (time2 - time1));
System.out.println("Used memory: "
+ ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024) + "MB");
long time2_0 = System.currentTimeMillis();
System.out.println("Filtering strings...");
//root.removeIf(Animal::hasFourLegs);
long time2_1 = System.currentTimeMillis();
System.out.println("Time elapsed: " + (time2_1 - time2_0));
ObjectList<Animal> results = new ObjectArrayList<>();
root.forEach((value) -> {
if (Animal.hasFourLegs(value)) {
results.add(value);
}
//System.out.println("val:" + value);
});
long time2_2 = System.currentTimeMillis();
System.out.println("Time elapsed: " + (time2_2 - time2_1));
System.out.println("Used memory: "
+ ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024) + "MB");
System.out.println("Used memory: " + ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024) + "MB");
System.out.println("Cleaning database (to reduce the amount of used memory and detect memory leaks)...");
long removedItems = db.clean();
long time3 = System.currentTimeMillis();
System.out.println("Removed items: " + removedItems);
System.out.println("Used memory: " + ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024) + "MB");
System.out.println("Time elapsed: " + (time3 - time2_2));
System.out.println("Saving database...");
System.out.println("Root size: " + root.size());
db.close();
long time4 = System.currentTimeMillis();
System.out.println("Time elapsed: " + (time4 - time3));
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (db.isOpen()) {
db.close();
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
long time2 = System.currentTimeMillis();
System.out.println("Root size: " + root.size());
System.out.println("Time elapsed: " + (time2 - time1));
System.out.println("Used memory: "
+ ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024) + "MB");
long time2_0 = System.currentTimeMillis();
System.out.println("Filtering strings...");
//root.removeIf(Animal::hasFourLegs);
long time2_1 = System.currentTimeMillis();
System.out.println("Time elapsed: " + (time2_1 - time2_0));
ObjectList<Animal> results = new ObjectArrayList<>();
root.forEach((value) -> {
if (Animal.hasFourLegs(value)) {
results.add(value);
}
//System.out.println("val:" + value);
});
long time2_2 = System.currentTimeMillis();
System.out.println("Time elapsed: " + (time2_2 - time2_1));
System.out.println("Used memory: "
+ ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024) + "MB");
System.out.println("Used memory: " + ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024) + "MB");
System.out.println("Cleaning database (to reduce the amount of used memory and detect memory leaks)...");
long removedItems = db.clean();
long time3 = System.currentTimeMillis();
System.out.println("Removed items: " + removedItems);
System.out.println("Used memory: " + ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024) + "MB");
System.out.println("Time elapsed: " + (time3 - time2));
System.out.println("Saving database...");
System.out.println("Root size: " + root.size());
db.close();
long time3 = System.currentTimeMillis();
System.out.println("Time elapsed: " + (time3 - time2_2));
}
public static <T> Predicate<T> not(Predicate<T> t) {