From a4a981ad1c5dee072197163a1dd9341c07b0f93f Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 21 Dec 2018 10:03:30 +0100 Subject: [PATCH 1/2] Chunked list works --- .../org/warp/jcwdb/CacheIndexManager.java | 7 ++ src/main/java/org/warp/jcwdb/Cleaner.java | 6 +- .../warp/jcwdb/DBLightArrayListParser.java | 7 ++ .../org/warp/jcwdb/DBLightBigListParser.java | 4 +- .../java/org/warp/jcwdb/DBStandardTypes.java | 26 +++--- .../java/org/warp/jcwdb/EntryReference.java | 3 + .../java/org/warp/jcwdb/FileAllocator.java | 65 +++++++++----- .../java/org/warp/jcwdb/FileIndexManager.java | 80 ++++++++++++++--- .../java/org/warp/jcwdb/FullIndexDetails.java | 14 +++ .../java/org/warp/jcwdb/IndexManager.java | 3 + src/main/java/org/warp/jcwdb/JCWDatabase.java | 9 +- .../java/org/warp/jcwdb/LightArrayList.java | 13 ++- .../java/org/warp/jcwdb/LightBigList.java | 90 ++++++++++++++++--- .../org/warp/jcwdb/MixedIndexDatabase.java | 6 ++ .../java/org/warp/jcwdb/exampleimpl/App.java | 2 +- src/test/java/org/warp/jcwdb/AppTest.java | 31 +++---- .../org/warp/jcwdb/FileAllocatorTest.java | 51 +++++++++++ 17 files changed, 325 insertions(+), 92 deletions(-) create mode 100644 src/main/java/org/warp/jcwdb/FullIndexDetails.java create mode 100644 src/test/java/org/warp/jcwdb/FileAllocatorTest.java diff --git a/src/main/java/org/warp/jcwdb/CacheIndexManager.java b/src/main/java/org/warp/jcwdb/CacheIndexManager.java index 03e1681..92e3294 100644 --- a/src/main/java/org/warp/jcwdb/CacheIndexManager.java +++ b/src/main/java/org/warp/jcwdb/CacheIndexManager.java @@ -1,6 +1,7 @@ package org.warp.jcwdb; import java.io.IOException; +import java.util.function.BiConsumer; import java.util.function.Consumer; public class CacheIndexManager implements IndexManager { @@ -32,6 +33,12 @@ public class CacheIndexManager implements IndexManager { return 0; } + @Override + public FullIndexDetails addAndGetDetails(DBDataOutput writer) { + // TODO: implement + return null; + } + @Override public IndexDetails set(long index, DBDataOutput writer) { // TODO: implement diff --git a/src/main/java/org/warp/jcwdb/Cleaner.java b/src/main/java/org/warp/jcwdb/Cleaner.java index 08ccf4d..7cd1007 100644 --- a/src/main/java/org/warp/jcwdb/Cleaner.java +++ b/src/main/java/org/warp/jcwdb/Cleaner.java @@ -10,7 +10,7 @@ public class Cleaner { private static final double MAXIMUM_SLEEP_INTERVAL = 8d * 1000d; // 8 seconds private static final double MINIMUM_SLEEP_INTERVAL = 1d * 1000d; // 1 second - private static final double NORMAL_REMOVED_ITEMS = 1000l; + private static final double NORMAL_REMOVED_ITEMS = 2500l; private static final double REMOVED_ITEMS_RATIO = 2.5d; // 250% private final Cleanable[] objectsToClean; @@ -26,7 +26,7 @@ public class Cleaner { } public void start() { - this.cleanerThread.start(); + //this.cleanerThread.start(); } /** @@ -73,7 +73,7 @@ public class Cleaner { if (removedItems > 0) { final double removedItemsRatio = removedItems / NORMAL_REMOVED_ITEMS; System.out.println("[CLEANER] REMOVED_ITEMS_RATIO: " + removedItemsRatio); - if (removedItemsRatio < 1d / REMOVED_ITEMS_RATIO && removedItemsRatio >= REMOVED_ITEMS_RATIO) { + if (removedItemsRatio < 1d / REMOVED_ITEMS_RATIO || removedItemsRatio >= REMOVED_ITEMS_RATIO) { suggestedExecutionTimeByItemsCalculations = sleepInterval / removedItemsRatio; } } diff --git a/src/main/java/org/warp/jcwdb/DBLightArrayListParser.java b/src/main/java/org/warp/jcwdb/DBLightArrayListParser.java index 6a980fa..3608ca4 100644 --- a/src/main/java/org/warp/jcwdb/DBLightArrayListParser.java +++ b/src/main/java/org/warp/jcwdb/DBLightArrayListParser.java @@ -35,4 +35,11 @@ public class DBLightArrayListParser extends DBTypeParserImpl value) { return value.internalList.hashCode(); } + + @Override + public String toString() { + return "DBLightArrayListParser{" + + "db=" + db + + '}'; + } } diff --git a/src/main/java/org/warp/jcwdb/DBLightBigListParser.java b/src/main/java/org/warp/jcwdb/DBLightBigListParser.java index 876fd13..c30eab3 100644 --- a/src/main/java/org/warp/jcwdb/DBLightBigListParser.java +++ b/src/main/java/org/warp/jcwdb/DBLightBigListParser.java @@ -21,7 +21,7 @@ public class DBLightBigListParser extends DBTypeParserImpl> { chunks.add(itm); chunkSizes.add(itm2); } - return new LightBigList(db, chunks, chunkSizes); + return new LightBigList<>(db, chunks, chunkSizes); }; } @@ -39,6 +39,6 @@ public class DBLightBigListParser extends DBTypeParserImpl> { @Override public long calculateHash(LightBigList value) { - return value.chunks.hashCode(); + return (((long)value.chunks.hashCode()) << 32) | (value.chunkSizes.hashCode() & 0xffffffffL); } } diff --git a/src/main/java/org/warp/jcwdb/DBStandardTypes.java b/src/main/java/org/warp/jcwdb/DBStandardTypes.java index 23c6796..eb71a70 100644 --- a/src/main/java/org/warp/jcwdb/DBStandardTypes.java +++ b/src/main/java/org/warp/jcwdb/DBStandardTypes.java @@ -1,19 +1,19 @@ package org.warp.jcwdb; public class DBStandardTypes { - private static final int STD = 0xFFFFF000; - public static final int BOOLEAN = STD| 0; - public static final int BYTE = STD| 1; - public static final int SHORT = STD| 2; - public static final int CHAR = STD| 3; - public static final int INTEGER = STD| 4; - public static final int FLOAT = STD| 5; - public static final int DOUBLE = STD| 6; - public static final int STRING = STD| 7; - public static final int BYTE_ARRAY = STD| 8; - public static final int LIGHT_LIST_ARRAY = STD| 9; - public static final int LIGHT_LIST_BIG = STD| 10; - public static final int GENERIC_OBJECT = STD| 11; + private static final int STD = 0xFFFFF000; + public static final int BOOLEAN = STD| 0x000; + public static final int BYTE = STD| 0x001; + public static final int SHORT = STD| 0x002; + public static final int CHAR = STD| 0x003; + public static final int INTEGER = STD| 0x004; + public static final int FLOAT = STD| 0x005; + public static final int DOUBLE = STD| 0x006; + public static final int STRING = STD| 0x007; + public static final int BYTE_ARRAY = STD| 0x008; + public static final int LIGHT_LIST_ARRAY = STD| 0x009; + public static final int LIGHT_LIST_BIG = STD| 0x00A; + public static final int GENERIC_OBJECT = STD| 0x00B; public static void registerStandardTypes(JCWDatabase db, TypesManager typesManager) { typesManager.registerType(String.class, STRING, new DBStringParser()); diff --git a/src/main/java/org/warp/jcwdb/EntryReference.java b/src/main/java/org/warp/jcwdb/EntryReference.java index 977299c..a089d5e 100644 --- a/src/main/java/org/warp/jcwdb/EntryReference.java +++ b/src/main/java/org/warp/jcwdb/EntryReference.java @@ -69,6 +69,9 @@ public class EntryReference implements Castable, Saveable { synchronized(accessLock) { if (loaded && !closed) { try { + if (value instanceof Saveable) { + ((Saveable)value).save(); + } IndexDetails returnedDetails = db.write(entryIndex, parser.getWriter(value)); synchronized(hashCacheLock) { this.cachedHash = returnedDetails.getHash(); diff --git a/src/main/java/org/warp/jcwdb/FileAllocator.java b/src/main/java/org/warp/jcwdb/FileAllocator.java index 94eb771..88f04d5 100644 --- a/src/main/java/org/warp/jcwdb/FileAllocator.java +++ b/src/main/java/org/warp/jcwdb/FileAllocator.java @@ -1,14 +1,18 @@ package org.warp.jcwdb; -import it.unimi.dsi.fastutil.longs.Long2IntMap; -import it.unimi.dsi.fastutil.longs.Long2IntRBTreeMap; +import it.unimi.dsi.fastutil.longs.*; import it.unimi.dsi.fastutil.objects.ObjectBidirectionalIterator; import java.io.IOException; import java.nio.channels.SeekableByteChannel; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Stream; public class FileAllocator implements AutoCloseable { - private static final int MAXIMUM_UNALLOCATED_ENTRIES = 500000; + private static final int MAXIMUM_UNALLOCATED_ENTRIES = 50000; private final SeekableByteChannel dataFileChannel; private volatile long fileSize; @@ -18,13 +22,19 @@ public class FileAllocator implements AutoCloseable { /** * index -> free space size */ - private final Long2IntRBTreeMap freeBytes = new Long2IntRBTreeMap((a, b) -> (int) (a - b)); + private final Long2IntMap freeBytes = new Long2IntLinkedOpenHashMap(); public FileAllocator(SeekableByteChannel dataFileChannel) throws IOException { this.dataFileChannel = dataFileChannel; this.fileSize = this.dataFileChannel.size(); } + public FileAllocator(SeekableByteChannel dataFileChannel, long fileSize, Long2IntMap freeBytes) throws IOException { + this.dataFileChannel = dataFileChannel; + this.fileSize = fileSize; + this.freeBytes.putAll(freeBytes); + } + /** * TODO: not implemented * @@ -44,23 +54,27 @@ public class FileAllocator implements AutoCloseable { } private long allocateIntoUnusedParts(int size) { - ObjectBidirectionalIterator it = freeBytes.long2IntEntrySet().iterator(); - long holeOffset = -1; - int holeSize = 0; - while (it.hasNext()) { - Long2IntMap.Entry entry = it.next(); - int currentHoleSize = entry.getIntValue(); + Stream> sorted = + freeBytes.entrySet().stream() + .sorted(Collections.reverseOrder(Map.Entry.comparingByValue())); + final VariableWrapper holeOffset = new VariableWrapper<>(-1L); + final VariableWrapper holeSize = new VariableWrapper<>(0); + sorted.anyMatch((entry) -> { + int currentHoleSize = entry.getValue(); if (currentHoleSize < size) { - freeBytes.remove(holeOffset); - if (holeSize > size) { - freeBytes.put(holeOffset + size, holeSize - size); - } - break; + return true; + } + holeOffset.var = entry.getKey(); + holeSize.var = currentHoleSize; + return false; + }); + if (holeOffset.var != -1L) { + freeBytes.remove(holeOffset.var); + if (holeSize.var > size) { + freeBytes.put(holeOffset.var + size, holeSize.var - size); } - holeOffset = entry.getLongKey(); - holeSize = currentHoleSize; } - return holeOffset; + return holeOffset.var; } private long allocateToEnd(int size) { @@ -103,13 +117,24 @@ public class FileAllocator implements AutoCloseable { break; } } - if (!addedToList) { + if (!addedToList && length > 0) { freeBytes.put(startPosition, length); } } + if (startPosition + length >= fileSize) { + fileSize = startPosition; + } + + // Remove the smallest hole in the file if (freeBytes.size() > MAXIMUM_UNALLOCATED_ENTRIES) { - freeBytes.remove(freeBytes.lastLongKey()); + Stream> sorted = + freeBytes.entrySet().stream() + .sorted(Map.Entry.comparingByValue()); + Optional> first = sorted.findFirst(); + if (first.isPresent()) { + freeBytes.remove(first.get().getKey()); + } } } diff --git a/src/main/java/org/warp/jcwdb/FileIndexManager.java b/src/main/java/org/warp/jcwdb/FileIndexManager.java index fe87653..16a2ee3 100644 --- a/src/main/java/org/warp/jcwdb/FileIndexManager.java +++ b/src/main/java/org/warp/jcwdb/FileIndexManager.java @@ -3,6 +3,7 @@ package org.warp.jcwdb; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import it.unimi.dsi.fastutil.longs.*; +import it.unimi.dsi.fastutil.objects.ObjectIterator; import java.io.IOException; import java.nio.ByteBuffer; @@ -11,7 +12,7 @@ import java.nio.channels.SeekableByteChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; -import java.util.Iterator; +import java.util.function.BiConsumer; import java.util.function.Consumer; public class FileIndexManager implements IndexManager { @@ -49,13 +50,50 @@ public class FileIndexManager implements IndexManager { } dataFileChannel = Files.newByteChannel(dataFile, StandardOpenOption.READ, StandardOpenOption.WRITE); metadataFileChannel = Files.newByteChannel(metadataFile, StandardOpenOption.READ, StandardOpenOption.WRITE); - fileAllocator = new FileAllocator(dataFileChannel); + fileAllocator = createFileAllocator(dataFileChannel, metadataFileChannel.position(0)); firstAllocableIndex = metadataFileChannel.size() / (long) IndexDetails.TOTAL_BYTES; if (firstAllocableIndex == 0) { firstAllocableIndex = 1; } } + private FileAllocator createFileAllocator(final SeekableByteChannel dataFileChannel, final SeekableByteChannel metadataFileChannel) throws IOException { + Long2IntMap freeBytes = new Long2IntRBTreeMap(); + Long2IntMap usedBytes = new Long2IntRBTreeMap(); + long firstOffset = 0; + while (metadataFileChannel.position() + IndexDetails.TOTAL_BYTES <= metadataFileChannel.size()) { + IndexDetails indexDetails = readIndexDetailsAt(metadataFileChannel); + if (indexDetails != null) { + long offset = indexDetails.getOffset(); + usedBytes.put(offset, indexDetails.getSize()); + if (offset < firstOffset) { + firstOffset = offset; + } + } + } + + long previousEntryOffset = 0; + long previousEntrySize = 0; + ObjectIterator it = usedBytes.long2IntEntrySet().iterator(); + while (it.hasNext()) { + final Long2IntMap.Entry entry = it.next(); + final long entryOffset = entry.getLongKey(); + final long entrySize = entry.getIntValue(); + it.remove(); + + if (previousEntryOffset + previousEntrySize < entryOffset) { + freeBytes.put(previousEntryOffset + previousEntrySize, (int) (entryOffset - (previousEntryOffset + previousEntrySize))); + } + + previousEntryOffset = entryOffset; + previousEntrySize = entrySize; + } + + final long fileSize = previousEntryOffset + previousEntrySize; + + return new FileAllocator(dataFileChannel, fileSize, freeBytes); + } + @Override public T get(long index, DBReader reader) throws IOException { checkClosed(); @@ -116,6 +154,19 @@ public class FileIndexManager implements IndexManager { return index; } + @Override + public FullIndexDetails addAndGetDetails(DBDataOutput data) throws IOException { + checkClosed(); + final int size = data.getSize(); + final long offset = fileAllocator.allocate(size); + final int type = data.getType(); + final long hash = data.calculateHash(); + final IndexDetails indexDetails = new IndexDetails(offset, size, type, hash); + final long index = createIndexMetadata(indexDetails); + writeExact(indexDetails, data); + return new FullIndexDetails(index, indexDetails); + } + /** * Write the data at index. * The input size must be equal to the index size! @@ -239,7 +290,7 @@ public class FileIndexManager implements IndexManager { * @param details */ private void editIndex(long index, IndexDetails details) { - synchronized (indicesMapsAccessLock) {// FIXXXX main3 + synchronized (indicesMapsAccessLock) { loadedIndices.put(index, details); dirtyLoadedIndices.add(index); } @@ -270,8 +321,20 @@ public class FileIndexManager implements IndexManager { return null; } SeekableByteChannel currentMetadataFileChannel = metadataFileChannel.position(metadataPosition); + IndexDetails indexDetails = readIndexDetailsAt(currentMetadataFileChannel); + + if (indexDetails != null) { + editIndex(index, indexDetails); + return indexDetails; + } + + // No results found. Returning null + return null; + } + + private IndexDetails readIndexDetailsAt(SeekableByteChannel currentMetadataFileChannel) throws IOException { IndexDetails indexDetails = null; - synchronized (metadataByteBufferLock) {// FIXXXX main2 + synchronized (metadataByteBufferLock) { metadataByteBuffer.rewind(); currentMetadataFileChannel.read(metadataByteBuffer); metadataByteBuffer.rewind(); @@ -287,14 +350,7 @@ public class FileIndexManager implements IndexManager { indexDetails = new IndexDetails(offset, size, type, hash); } } - - if (indexDetails != null) { - editIndex(index, indexDetails); - return indexDetails; - } - - // No results found. Returning null - return null; + return indexDetails; } private IndexDetails getIndexMetadata(long index) throws IOException { diff --git a/src/main/java/org/warp/jcwdb/FullIndexDetails.java b/src/main/java/org/warp/jcwdb/FullIndexDetails.java new file mode 100644 index 0000000..35cc35e --- /dev/null +++ b/src/main/java/org/warp/jcwdb/FullIndexDetails.java @@ -0,0 +1,14 @@ +package org.warp.jcwdb; + +public class FullIndexDetails extends IndexDetails { + private final long index; + + public FullIndexDetails(long index, IndexDetails details) { + super(details); + this.index = index; + } + + public long getIndex() { + return index; + } +} diff --git a/src/main/java/org/warp/jcwdb/IndexManager.java b/src/main/java/org/warp/jcwdb/IndexManager.java index f08b237..b663b90 100644 --- a/src/main/java/org/warp/jcwdb/IndexManager.java +++ b/src/main/java/org/warp/jcwdb/IndexManager.java @@ -1,6 +1,8 @@ package org.warp.jcwdb; import java.io.IOException; +import java.util.function.BiConsumer; +import java.util.function.BiPredicate; import java.util.function.Consumer; public interface IndexManager extends Cleanable { @@ -8,6 +10,7 @@ public interface IndexManager extends Cleanable { int getType(long index) throws IOException; long getHash(long index) throws IOException; long add(DBDataOutput writer) throws IOException; + FullIndexDetails addAndGetDetails(DBDataOutput writer) throws IOException; IndexDetails set(long index, DBDataOutput writer) throws IOException; void delete(long index) throws IOException; boolean has(long index); diff --git a/src/main/java/org/warp/jcwdb/JCWDatabase.java b/src/main/java/org/warp/jcwdb/JCWDatabase.java index 49425ec..a64dee8 100644 --- a/src/main/java/org/warp/jcwdb/JCWDatabase.java +++ b/src/main/java/org/warp/jcwdb/JCWDatabase.java @@ -6,7 +6,7 @@ import java.io.IOException; import java.nio.file.Path; public class JCWDatabase implements AutoCloseable, Cleanable { - public final static long MAX_LOADED_INDICES = 10000; + public final static long MAX_LOADED_INDICES = 1000; private final TypesManager typesManager; private final MixedIndexDatabase indices; @@ -36,7 +36,7 @@ public class JCWDatabase implements AutoCloseable, Cleanable { if (exists(0)) { return get(0); } else { - LightList newRoot = new LightArrayList<>(this); + LightList newRoot = new LightBigList<>(this); return set(0, newRoot); } } @@ -63,8 +63,9 @@ public class JCWDatabase implements AutoCloseable, Cleanable { long index; long hash; synchronized (indicesAccessLock) { - index = indices.add(typeParser.getWriter(value)); - hash = indices.getHash(index); + FullIndexDetails fullIndexDetails = indices.addAndGetDetails(typeParser.getWriter(value)); + index = fullIndexDetails.getIndex(); + hash = fullIndexDetails.getHash(); } return new EntryReference<>(entryReferenceTools, index, hash, typeParser, value); } diff --git a/src/main/java/org/warp/jcwdb/LightArrayList.java b/src/main/java/org/warp/jcwdb/LightArrayList.java index 8cbe1ef..eccbbf8 100644 --- a/src/main/java/org/warp/jcwdb/LightArrayList.java +++ b/src/main/java/org/warp/jcwdb/LightArrayList.java @@ -121,10 +121,11 @@ public class LightArrayList implements LightList { @SuppressWarnings("unchecked") @Override public T[] toArray() { - final T[] elements = (T[]) new Objects[internalList.size()]; + final T[] elements = (T[]) new Object[internalList.size()]; for (int i = 0; i < elements.length; i++) { try { - elements[i] = (T) db.get(internalList.getLong(i)).getValueReadOnly(); + T element = (T) db.get(internalList.getLong(i)).getValueReadOnly(); + elements[i] = element; } catch (IOException e) { e.printStackTrace(); } @@ -442,4 +443,12 @@ public class LightArrayList implements LightList { } return removed; } + + @Override + public String toString() { + return "LightArrayList{" + + "internalList=" + internalList + + ", db=" + db + + '}'; + } } diff --git a/src/main/java/org/warp/jcwdb/LightBigList.java b/src/main/java/org/warp/jcwdb/LightBigList.java index 53859ad..6d67666 100644 --- a/src/main/java/org/warp/jcwdb/LightBigList.java +++ b/src/main/java/org/warp/jcwdb/LightBigList.java @@ -9,13 +9,18 @@ import java.util.*; import java.util.function.Consumer; import java.util.function.Predicate; -public class LightBigList implements LightList { +public class LightBigList implements LightList, Saveable { - public static final int MAX_ELEMENTS_PER_CHUNK = 10000; + public static final int MAX_ELEMENTS_PER_CHUNK = 200000; public final LongArrayList chunks; public final IntArrayList chunkSizes; - private final transient JCWDatabase db; + private final JCWDatabase db; + private LightList cachedChunk; + private EntryReference> cachedChunkRef; + private long cachedChunkIndex = -1; + private int cachedChunkNumber = -1; + private final Object cachedChunkLock = new Object(); /** * @param db Database reference @@ -43,6 +48,23 @@ public class LightBigList implements LightList { this.db = db; this.chunks = chunks; this.chunkSizes = chunkSizes; + if (this.chunks.size() > 0) { + prepareAccessToChunk(0); + } + } + + private void prepareAccessToChunk(int chunkNumber) { + if (this.cachedChunkRef != null) { + this.cachedChunkRef.save(); + } + this.cachedChunkNumber = chunkNumber; + this.cachedChunkIndex = this.chunks.getLong(chunkNumber); + try { + this.cachedChunkRef = db.get(this.cachedChunkIndex); + } catch (IOException ex) { + throw (NullPointerException) new NullPointerException().initCause(ex); + } + this.cachedChunk = this.cachedChunkRef.getValueReadOnly(); } /** @@ -53,16 +75,13 @@ public class LightBigList implements LightList { for (int i = 0; i < chunks.size(); i++) { final int chunkNumber = i; if (MAX_ELEMENTS_PER_CHUNK - chunkSizes.getInt(i) > 0) { - try { - final long chunkIndex = chunks.getLong(i); - final EntryReference> chunkRef = db.get(chunkIndex); - chunkRef.editValue((chunk) -> { - chunk.appendIndex(elementIndex); - chunkSizes.set(chunkNumber, chunkSizes.getInt(chunkNumber) + 1); - }); + synchronized (cachedChunkLock) { + if (cachedChunkNumber != i) { + prepareAccessToChunk(i); + } + cachedChunk.appendIndex(elementIndex); + chunkSizes.set(chunkNumber, cachedChunk.size()); return; - } catch (IOException ex) { - throw (NullPointerException) new NullPointerException().initCause(ex); } } } @@ -131,7 +150,8 @@ public class LightBigList implements LightList { */ @Deprecated @Override - public Iterator iterator() { + public Iterator iterator() + { throw new RuntimeException("iterator() isn't implemented!"); } @@ -166,10 +186,36 @@ public class LightBigList implements LightList { } } + /** + * toArray() isn't implemented! DO NOT USE IT. + * @return + */ @SuppressWarnings("unchecked") + @Deprecated @Override public T[] toArray() { - throw new RuntimeException("toArray() isn't implemented!"); + T[] result = (T[]) new Object[this.size()]; + + long currentOffset = 0; + // Iterate through all chunks + for (int i = 0; i < chunks.size(); i++) { + final int currentChunkSize = chunkSizes.getInt(i); + final long chunkStartOffset = currentOffset; + currentOffset += currentChunkSize; + // Get chunk index + final long chunkIndex = chunks.getLong(i); + + try { + EntryReference> chunkRef = db.get(chunkIndex); + LightList chunk = chunkRef.getValueReadOnly(); + for (int i1 = 0; i1 < chunk.size(); i1++) { + result[(int)(chunkStartOffset + i1)] = chunk.get(i); + } + } catch (IOException ex) { + throw (NullPointerException) new NullPointerException().initCause(ex); + } + } + return result; } @SuppressWarnings("unchecked") @@ -518,4 +564,20 @@ public class LightBigList implements LightList { } return result.var; } + + @Override + public String toString() { + return "LightBigList{" + + "chunks=" + chunks + + ", chunkSizes=" + chunkSizes + + ", db=" + db + + '}'; + } + + @Override + public void save() { + if (this.cachedChunkRef != null) { + this.cachedChunkRef.save(); + } + } } diff --git a/src/main/java/org/warp/jcwdb/MixedIndexDatabase.java b/src/main/java/org/warp/jcwdb/MixedIndexDatabase.java index 3f4e255..bfbea65 100644 --- a/src/main/java/org/warp/jcwdb/MixedIndexDatabase.java +++ b/src/main/java/org/warp/jcwdb/MixedIndexDatabase.java @@ -5,6 +5,7 @@ import it.unimi.dsi.fastutil.longs.Long2LongMap; import java.io.IOException; import java.nio.file.Path; +import java.util.function.BiConsumer; import java.util.function.Consumer; public class MixedIndexDatabase implements IndexManager { @@ -48,6 +49,11 @@ public class MixedIndexDatabase implements IndexManager { return fileIndices.add(writer); } + + @Override + public FullIndexDetails addAndGetDetails(DBDataOutput writer) throws IOException { + return fileIndices.addAndGetDetails(writer); + } @Override public IndexDetails set(long index, DBDataOutput writer) throws IOException { if (cacheIndices.has(index)) { diff --git a/src/main/java/org/warp/jcwdb/exampleimpl/App.java b/src/main/java/org/warp/jcwdb/exampleimpl/App.java index 4e0f70a..96bbb65 100644 --- a/src/main/java/org/warp/jcwdb/exampleimpl/App.java +++ b/src/main/java/org/warp/jcwdb/exampleimpl/App.java @@ -37,7 +37,7 @@ public class App { // System.out.println(" - " + root.get(i)); // } long prectime = System.currentTimeMillis(); - for (int i = 0; i < 20000/* 2000000 */; i++) { + for (int i = 0; i < 20000000/* 2000000 */; i++) { Animal animal = new StrangeAnimal(i % 40); root.add(animal); if (i > 0 && i % 200000 == 0) { diff --git a/src/test/java/org/warp/jcwdb/AppTest.java b/src/test/java/org/warp/jcwdb/AppTest.java index d632087..662c139 100644 --- a/src/test/java/org/warp/jcwdb/AppTest.java +++ b/src/test/java/org/warp/jcwdb/AppTest.java @@ -1,29 +1,18 @@ package org.warp.jcwdb; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; - import org.junit.Test; -import org.warp.jcwdb.EntryReference; -import org.warp.jcwdb.JCWDatabase; -import org.warp.jcwdb.LightList; + +import static org.junit.Assert.assertTrue; /** * Unit test for simple App. */ -public class AppTest -{ - /** - * Rigorous Test :-) - */ - @Test - public void shouldAnswerWithTrue() - { - assertTrue( true ); - } - +public class AppTest { + /** + * Rigorous Test :-) + */ + @Test + public void shouldAnswerWithTrue() { + assertTrue(true); + } } diff --git a/src/test/java/org/warp/jcwdb/FileAllocatorTest.java b/src/test/java/org/warp/jcwdb/FileAllocatorTest.java new file mode 100644 index 0000000..91e4792 --- /dev/null +++ b/src/test/java/org/warp/jcwdb/FileAllocatorTest.java @@ -0,0 +1,51 @@ +package org.warp.jcwdb; + +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class FileAllocatorTest { + + @Test + public void shouldAllocateAtZero() throws IOException { + Path tempFile = Files.createTempFile("", ""); + SeekableByteChannel byteCh = Files.newByteChannel(tempFile); + FileAllocator allocator = new FileAllocator(byteCh); + long offset1 = allocator.allocate(512); + assertEquals(0, offset1); + } + + @Test + public void shouldAllocateAt512() throws IOException { + Path tempFile = Files.createTempFile("", ""); + SeekableByteChannel byteCh = Files.newByteChannel(tempFile, StandardOpenOption.WRITE); + byteCh.write(ByteBuffer.wrap(new byte[512])); + FileAllocator allocator = new FileAllocator(byteCh); + long offset1 = allocator.allocate(512); + assertEquals(512, offset1); + } + + @Test + public void shouldAllocateUnusedSpace() throws IOException { + Path tempFile = Files.createTempFile("", ""); + SeekableByteChannel byteCh = Files.newByteChannel(tempFile, StandardOpenOption.WRITE); + FileAllocator allocator = new FileAllocator(byteCh); + long offset1 = allocator.allocate(512); + allocator.markFree(offset1, 512); + long offset2 = allocator.allocate(128); + long offset3 = allocator.allocate(512-128); + long offset4 = allocator.allocate(128); + assertEquals(0, offset2); + assertEquals(128, offset3); + assertEquals(512, offset4); + } + +} From 71a8148bdc942b5ed27d7f799857c615cd56bced Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sun, 6 Jan 2019 00:31:52 +0100 Subject: [PATCH 2/2] Fixed BigList and Cleaner --- .../java/org/warp/jcwdb/AdvancedSaveable.java | 5 + .../org/warp/jcwdb/CacheIndexManager.java | 5 + src/main/java/org/warp/jcwdb/Cleaner.java | 24 +-- .../java/org/warp/jcwdb/EntryReference.java | 35 ++-- .../java/org/warp/jcwdb/FileIndexManager.java | 88 ++++++++--- .../java/org/warp/jcwdb/IndexManager.java | 1 + src/main/java/org/warp/jcwdb/JCWDatabase.java | 6 +- .../java/org/warp/jcwdb/LightArrayList.java | 3 +- .../java/org/warp/jcwdb/LightBigList.java | 64 ++++---- .../org/warp/jcwdb/MixedIndexDatabase.java | 13 +- .../java/org/warp/jcwdb/exampleimpl/App.java | 149 +++++++++--------- 11 files changed, 237 insertions(+), 156 deletions(-) create mode 100644 src/main/java/org/warp/jcwdb/AdvancedSaveable.java diff --git a/src/main/java/org/warp/jcwdb/AdvancedSaveable.java b/src/main/java/org/warp/jcwdb/AdvancedSaveable.java new file mode 100644 index 0000000..991edc2 --- /dev/null +++ b/src/main/java/org/warp/jcwdb/AdvancedSaveable.java @@ -0,0 +1,5 @@ +package org.warp.jcwdb; + +public interface AdvancedSaveable extends Saveable { + public void save(boolean isEditFinished); +} diff --git a/src/main/java/org/warp/jcwdb/CacheIndexManager.java b/src/main/java/org/warp/jcwdb/CacheIndexManager.java index 92e3294..6ab7543 100644 --- a/src/main/java/org/warp/jcwdb/CacheIndexManager.java +++ b/src/main/java/org/warp/jcwdb/CacheIndexManager.java @@ -45,6 +45,11 @@ public class CacheIndexManager implements IndexManager { return null; } + @Override + public void setFlushingAllowed(long index, boolean isUnloadingAllowed) { + // TODO: implement + } + @Override public void delete(long index) { // TODO: implement diff --git a/src/main/java/org/warp/jcwdb/Cleaner.java b/src/main/java/org/warp/jcwdb/Cleaner.java index 7cd1007..321e40a 100644 --- a/src/main/java/org/warp/jcwdb/Cleaner.java +++ b/src/main/java/org/warp/jcwdb/Cleaner.java @@ -8,6 +8,8 @@ import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry; public class Cleaner { + public static final boolean DISABLE_CLEANER = false; + public static final boolean ENABLE_CLEANER_LOGGING = false; private static final double MAXIMUM_SLEEP_INTERVAL = 8d * 1000d; // 8 seconds private static final double MINIMUM_SLEEP_INTERVAL = 1d * 1000d; // 1 second private static final double NORMAL_REMOVED_ITEMS = 2500l; @@ -26,7 +28,9 @@ public class Cleaner { } public void start() { - //this.cleanerThread.start(); + if (!DISABLE_CLEANER) { + this.cleanerThread.start(); + } } /** @@ -38,7 +42,7 @@ public class Cleaner { for (Cleanable cleanable : objectsToClean) { cleanedItems += cleanable.clean(); } - System.gc(); + //System.gc(); return cleanedItems; } @@ -61,38 +65,38 @@ public class Cleaner { public void run() { while(!stopRequest) { try { - System.out.println("[CLEANER] Waiting " + sleepInterval + "ms."); + if (ENABLE_CLEANER_LOGGING) System.out.println("[CLEANER] Waiting " + sleepInterval + "ms."); sleepFor(sleepInterval); final long time1 = System.currentTimeMillis(); final double removedItems = clean(); final long time2 = System.currentTimeMillis(); - System.out.println("[CLEANER] CLEAN_TIME " + (time2 - time1)); + if (ENABLE_CLEANER_LOGGING) System.out.println("[CLEANER] CLEAN_TIME " + (time2 - time1)); double suggestedExecutionTimeByItemsCalculations = (sleepInterval + MAXIMUM_SLEEP_INTERVAL) / 2; - System.out.println("[CLEANER] REMOVED_ITEMS: " + removedItems); + if (ENABLE_CLEANER_LOGGING) System.out.println("[CLEANER] REMOVED_ITEMS: " + removedItems); if (removedItems > 0) { final double removedItemsRatio = removedItems / NORMAL_REMOVED_ITEMS; - System.out.println("[CLEANER] REMOVED_ITEMS_RATIO: " + removedItemsRatio); + if (ENABLE_CLEANER_LOGGING) System.out.println("[CLEANER] REMOVED_ITEMS_RATIO: " + removedItemsRatio); if (removedItemsRatio < 1d / REMOVED_ITEMS_RATIO || removedItemsRatio >= REMOVED_ITEMS_RATIO) { suggestedExecutionTimeByItemsCalculations = sleepInterval / removedItemsRatio; } } - System.out.println("[CLEANER] Items: SUGGESTING SLEEP_INTERVAL FROM " + sleepInterval + "ms TO " + suggestedExecutionTimeByItemsCalculations + "ms"); + if (ENABLE_CLEANER_LOGGING) System.out.println("[CLEANER] Items: SUGGESTING SLEEP_INTERVAL FROM " + sleepInterval + "ms TO " + suggestedExecutionTimeByItemsCalculations + "ms"); double newSleepInterval = suggestedExecutionTimeByItemsCalculations; - System.out.println("[CLEANER] Total: SUGGESTING SLEEP_INTERVAL FROM " + sleepInterval + "ms TO " + newSleepInterval + "ms"); + if (ENABLE_CLEANER_LOGGING) System.out.println("[CLEANER] Total: SUGGESTING SLEEP_INTERVAL FROM " + sleepInterval + "ms TO " + newSleepInterval + "ms"); if (newSleepInterval > MAXIMUM_SLEEP_INTERVAL) { sleepInterval = (int) MAXIMUM_SLEEP_INTERVAL; } else if (newSleepInterval < MINIMUM_SLEEP_INTERVAL) { sleepInterval = (int) MINIMUM_SLEEP_INTERVAL; } else { - System.out.println("[CLEANER] CHANGED SLEEP_INTERVAL FROM " + sleepInterval + "ms TO " + newSleepInterval + "ms"); + if (ENABLE_CLEANER_LOGGING) System.out.println("[CLEANER] CHANGED SLEEP_INTERVAL FROM " + sleepInterval + "ms TO " + newSleepInterval + "ms"); sleepInterval = (int) newSleepInterval; } - System.out.println("[CLEANER] Cleaned " + removedItems + " items."); + if (ENABLE_CLEANER_LOGGING) System.out.println("[CLEANER] Cleaned " + removedItems + " items."); }catch (InterruptedException e) { } diff --git a/src/main/java/org/warp/jcwdb/EntryReference.java b/src/main/java/org/warp/jcwdb/EntryReference.java index a089d5e..6828080 100644 --- a/src/main/java/org/warp/jcwdb/EntryReference.java +++ b/src/main/java/org/warp/jcwdb/EntryReference.java @@ -10,7 +10,7 @@ import java.util.function.Function; * You must have only a maximum of 1 reference for each index * @param */ -public class EntryReference implements Castable, Saveable { +public class EntryReference implements Castable, AdvancedSaveable { private final JCWDatabase.EntryReferenceTools db; private final long entryIndex; private final DBTypeParser parser; @@ -19,6 +19,7 @@ public class EntryReference implements Castable, Saveable { private volatile boolean isHashCached; private volatile boolean loaded; private volatile boolean closed; + private volatile boolean isFlushingAllowed; private final Object hashCacheLock = new Object(); private final Object accessLock = new Object(); private final Object closeLock = new Object(); @@ -66,17 +67,29 @@ public class EntryReference implements Castable, Saveable { * Note that this method won't be called when closing without saving */ public void save() { + this.save(false); + } + + public void save(boolean isEditFinished) { synchronized(accessLock) { if (loaded && !closed) { try { - if (value instanceof Saveable) { + if (value instanceof AdvancedSaveable) { + ((AdvancedSaveable)value).save(isEditFinished); + } else if (value instanceof Saveable) { ((Saveable)value).save(); } - IndexDetails returnedDetails = db.write(entryIndex, parser.getWriter(value)); + IndexDetails returnedDetails = this.db.write(entryIndex, parser.getWriter(value)); synchronized(hashCacheLock) { this.cachedHash = returnedDetails.getHash(); this.isHashCached = true; } + if (isEditFinished) { + if (!isFlushingAllowed) { + this.db.setFlushingAllowed(entryIndex, true); + this.isFlushingAllowed = true; + } + } } catch (IOException e) { e.printStackTrace(); } @@ -93,7 +106,7 @@ public class EntryReference implements Castable, Saveable { synchronized(accessLock) { load(); this.value = editFunction.apply(this.value, this); - this.save(); + this.save(true); } } @@ -106,7 +119,7 @@ public class EntryReference implements Castable, Saveable { synchronized(accessLock) { load(); this.value = editFunction.apply(this.value); - this.save(); + this.save(true); } } @@ -119,7 +132,7 @@ public class EntryReference implements Castable, Saveable { synchronized(accessLock) { load(); editFunction.accept(this.value, this); - this.save(); + this.save(true); } } @@ -132,7 +145,7 @@ public class EntryReference implements Castable, Saveable { synchronized(accessLock) { load(); editFunction.accept(this.value); - this.save(); + this.save(true); } } @@ -148,7 +161,7 @@ public class EntryReference implements Castable, Saveable { synchronized(hashCacheLock) { this.isHashCached = false; } - this.save(); + this.save(true); } } @@ -177,6 +190,10 @@ public class EntryReference implements Castable, Saveable { synchronized(accessLock) { if (!loaded) { try { + if (this.isFlushingAllowed) { + this.db.setFlushingAllowed(entryIndex, false); + this.isFlushingAllowed = false; + } this.value = db.read(entryIndex, parser.getReader()); this.loaded = true; } catch (IOException e) { @@ -201,7 +218,7 @@ public class EntryReference implements Castable, Saveable { return; } - save(); + save(true); closed = true; } diff --git a/src/main/java/org/warp/jcwdb/FileIndexManager.java b/src/main/java/org/warp/jcwdb/FileIndexManager.java index 16a2ee3..59b3710 100644 --- a/src/main/java/org/warp/jcwdb/FileIndexManager.java +++ b/src/main/java/org/warp/jcwdb/FileIndexManager.java @@ -12,11 +12,10 @@ import java.nio.channels.SeekableByteChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; -import java.util.function.BiConsumer; -import java.util.function.Consumer; public class FileIndexManager implements IndexManager { private final SeekableByteChannel dataFileChannel, metadataFileChannel; + private volatile long metadataFileChannelSize; private final FileAllocator fileAllocator; private final ByteBuffer metadataByteBuffer = ByteBuffer.allocateDirect(IndexDetails.TOTAL_BYTES); private final ByteBuffer maskByteBuffer = ByteBuffer.allocateDirect(Integer.BYTES); @@ -35,13 +34,21 @@ public class FileIndexManager implements IndexManager { /** * Edit this using editIndex() */ - private final LongSet dirtyLoadedIndices, removedIndices; + private final LongSet dirtyLoadedIndices, flushingAllowedIndices, removedIndices; private long firstAllocableIndex; public FileIndexManager(Path dataFile, Path metadataFile) throws IOException { - loadedIndices = new Long2ObjectOpenHashMap<>(); - dirtyLoadedIndices = new LongOpenHashSet(); - removedIndices = new LongOpenHashSet(); + if (Cleaner.DISABLE_CLEANER) { + loadedIndices = new Long2ObjectOpenHashMap<>(); + dirtyLoadedIndices = new LongOpenHashSet(); + flushingAllowedIndices = new LongOpenHashSet(); + removedIndices = new LongOpenHashSet(); + } else { + loadedIndices = new Long2ObjectLinkedOpenHashMap<>(); + dirtyLoadedIndices = new LongLinkedOpenHashSet(); + flushingAllowedIndices = new LongLinkedOpenHashSet(); + removedIndices = new LongLinkedOpenHashSet(); + } if (Files.notExists(dataFile)) { Files.createFile(dataFile); } @@ -51,17 +58,22 @@ public class FileIndexManager implements IndexManager { dataFileChannel = Files.newByteChannel(dataFile, StandardOpenOption.READ, StandardOpenOption.WRITE); metadataFileChannel = Files.newByteChannel(metadataFile, StandardOpenOption.READ, StandardOpenOption.WRITE); fileAllocator = createFileAllocator(dataFileChannel, metadataFileChannel.position(0)); - firstAllocableIndex = metadataFileChannel.size() / (long) IndexDetails.TOTAL_BYTES; + metadataFileChannelSize = metadataFileChannel.size(); + firstAllocableIndex = getMetadataFileChannelSize() / (long) IndexDetails.TOTAL_BYTES; if (firstAllocableIndex == 0) { firstAllocableIndex = 1; } } + private long getMetadataFileChannelSize() throws IOException { + return metadataFileChannelSize; + } + private FileAllocator createFileAllocator(final SeekableByteChannel dataFileChannel, final SeekableByteChannel metadataFileChannel) throws IOException { Long2IntMap freeBytes = new Long2IntRBTreeMap(); Long2IntMap usedBytes = new Long2IntRBTreeMap(); long firstOffset = 0; - while (metadataFileChannel.position() + IndexDetails.TOTAL_BYTES <= metadataFileChannel.size()) { + while (metadataFileChannel.position() + IndexDetails.TOTAL_BYTES <= getMetadataFileChannelSize()) { IndexDetails indexDetails = readIndexDetailsAt(metadataFileChannel); if (indexDetails != null) { long offset = indexDetails.getOffset(); @@ -117,7 +129,7 @@ public class FileIndexManager implements IndexManager { public IndexDetails set(long index, DBDataOutput data) throws IOException { checkClosed(); final int dataSize = data.getSize(); - final IndexDetails indexDetails = getIndexMetadataUnsafe(index); + IndexDetails indexDetails = getIndexMetadataUnsafe(index); if (indexDetails == null || indexDetails.getSize() < dataSize) { // Allocate new space IndexDetails newDetails = allocateAndWrite(index, data); @@ -133,7 +145,7 @@ public class FileIndexManager implements IndexManager { fileAllocator.markFree(indexDetails.getOffset() + dataSize, dataSize); } // Update index details - editIndex(index, indexDetails, indexDetails.getOffset(), dataSize, indexDetails.getType(), data.calculateHash()); + indexDetails = editIndex(index, indexDetails, indexDetails.getOffset(), dataSize, indexDetails.getType(), data.calculateHash()); // Write data writeExact(indexDetails, data); // Before returning, return IndexDetails @@ -141,6 +153,16 @@ public class FileIndexManager implements IndexManager { } } + @Override + public void setFlushingAllowed(long index, boolean isUnloadingAllowed) { + checkClosed(); + if (isUnloadingAllowed) { + flushingAllowedIndices.add(index); + } else { + flushingAllowedIndices.remove(index); + } + } + @Override public long add(DBDataOutput data) throws IOException { checkClosed(); @@ -206,6 +228,7 @@ public class FileIndexManager implements IndexManager { } synchronized (indicesMapsAccessLock) { dirtyLoadedIndices.remove(index); + flushingAllowedIndices.remove(index); loadedIndices.remove(index); removedIndices.add(index); } @@ -216,6 +239,7 @@ public class FileIndexManager implements IndexManager { synchronized (indicesMapsAccessLock) { removedIndices.remove(index); dirtyLoadedIndices.remove(index); + flushingAllowedIndices.remove(index); loadedIndices.remove(index); } // Update indices metadata @@ -228,11 +252,14 @@ public class FileIndexManager implements IndexManager { if (dirtyLoadedIndices.contains(index)) { indexDetails = loadedIndices.get(index); dirtyLoadedIndices.remove(index); + flushingAllowedIndices.remove(index); } } if (isDirty) { // Update indices metadata - SeekableByteChannel metadata = metadataFileChannel.position(index * IndexDetails.TOTAL_BYTES); + long position = index * IndexDetails.TOTAL_BYTES; + resizeMetadataFileChannel(position); + SeekableByteChannel metadata = metadataFileChannel.position(position); writeIndexDetails(metadata, indexDetails); } synchronized (indicesMapsAccessLock) { @@ -293,6 +320,7 @@ public class FileIndexManager implements IndexManager { synchronized (indicesMapsAccessLock) { loadedIndices.put(index, details); dirtyLoadedIndices.add(index); + flushingAllowedIndices.remove(index); } } @@ -301,6 +329,7 @@ public class FileIndexManager implements IndexManager { long newIndex = firstAllocableIndex++; loadedIndices.put(newIndex, indexDetails); dirtyLoadedIndices.add(newIndex); + flushingAllowedIndices.remove(newIndex); removedIndices.remove(newIndex); return newIndex; } @@ -316,7 +345,7 @@ public class FileIndexManager implements IndexManager { // Try to load the details from file final long metadataPosition = index * IndexDetails.TOTAL_BYTES; - if (metadataPosition + IndexDetails.TOTAL_BYTES > metadataFileChannel.size()) { + if (metadataPosition + IndexDetails.TOTAL_BYTES > getMetadataFileChannelSize()) { // Avoid underflow exception return null; } @@ -374,7 +403,7 @@ public class FileIndexManager implements IndexManager { } // Update indices metadata - flushAllIndices(); + flushAllFlushableIndices(); // Remove removed indices removeRemovedIndices(); @@ -417,39 +446,56 @@ public class FileIndexManager implements IndexManager { @Override public long clean() { long cleaned = 0; + long tim1 = System.currentTimeMillis(); try { - cleaned += flushAllIndices(); + cleaned += flushAllFlushableIndices(); } catch (IOException ex) { ex.printStackTrace(); } + long tim2 = System.currentTimeMillis(); try { cleaned += removeRemovedIndices(); } catch (IOException ex) { ex.printStackTrace(); } + long tim3 = System.currentTimeMillis(); cleaned += cleanExtraIndices(); + long tim4 = System.currentTimeMillis(); + if (Cleaner.ENABLE_CLEANER_LOGGING) System.out.println("[CLEANER] FileIndexManager CLEAN_TIME: " + (tim2-tim1) + "," + (tim3-tim2) + "," + (tim4-tim3)); return cleaned; } - private long flushAllIndices() throws IOException { + private long flushAllFlushableIndices() throws IOException { long flushedIndices = 0; SeekableByteChannel metadata = metadataFileChannel; long lastIndex = -2; synchronized (indicesMapsAccessLock) { for (long index : dirtyLoadedIndices) { - IndexDetails indexDetails = loadedIndices.get(index); - if (index - lastIndex != 1) { - metadata = metadata.position(index * IndexDetails.TOTAL_BYTES); + if (!flushingAllowedIndices.contains(index)) { + IndexDetails indexDetails = loadedIndices.get(index); + long position = index * IndexDetails.TOTAL_BYTES; + resizeMetadataFileChannel(position); + if (index - lastIndex != 1) { + metadata = metadata.position(position); + } + writeIndexDetails(metadata, indexDetails); + lastIndex = index; + flushedIndices++; } - writeIndexDetails(metadata, indexDetails); - lastIndex = index; - flushedIndices++; } dirtyLoadedIndices.clear(); + dirtyLoadedIndices.addAll(flushingAllowedIndices); + flushingAllowedIndices.clear(); } return flushedIndices; } + private void resizeMetadataFileChannel(long position) { + if (position + IndexDetails.TOTAL_BYTES > metadataFileChannelSize) { + metadataFileChannelSize = position + IndexDetails.TOTAL_BYTES; + } + } + private long removeRemovedIndices() throws IOException { SeekableByteChannel metadata = metadataFileChannel; synchronized (indicesMapsAccessLock) { diff --git a/src/main/java/org/warp/jcwdb/IndexManager.java b/src/main/java/org/warp/jcwdb/IndexManager.java index b663b90..a48fbe3 100644 --- a/src/main/java/org/warp/jcwdb/IndexManager.java +++ b/src/main/java/org/warp/jcwdb/IndexManager.java @@ -12,6 +12,7 @@ public interface IndexManager extends Cleanable { long add(DBDataOutput writer) throws IOException; FullIndexDetails addAndGetDetails(DBDataOutput writer) throws IOException; IndexDetails set(long index, DBDataOutput writer) throws IOException; + void setFlushingAllowed(long index, boolean isUnloadingAllowed); void delete(long index) throws IOException; boolean has(long index); void close() throws IOException; diff --git a/src/main/java/org/warp/jcwdb/JCWDatabase.java b/src/main/java/org/warp/jcwdb/JCWDatabase.java index a64dee8..c8c21d0 100644 --- a/src/main/java/org/warp/jcwdb/JCWDatabase.java +++ b/src/main/java/org/warp/jcwdb/JCWDatabase.java @@ -1,7 +1,5 @@ package org.warp.jcwdb; -import it.unimi.dsi.fastutil.longs.LongArrayList; - import java.io.IOException; import java.nio.file.Path; @@ -154,6 +152,10 @@ public class JCWDatabase implements AutoCloseable, Cleanable { public IndexDetails write(long index, DBDataOutput writer) throws IOException { return indices.set(index, writer); } + + public void setFlushingAllowed(long index, boolean isFlushingAllowed) { + indices.setFlushingAllowed(index, isFlushingAllowed); + } } @SuppressWarnings("unchecked") diff --git a/src/main/java/org/warp/jcwdb/LightArrayList.java b/src/main/java/org/warp/jcwdb/LightArrayList.java index eccbbf8..b65087f 100644 --- a/src/main/java/org/warp/jcwdb/LightArrayList.java +++ b/src/main/java/org/warp/jcwdb/LightArrayList.java @@ -3,6 +3,7 @@ package org.warp.jcwdb; import it.unimi.dsi.fastutil.longs.LongArrayList; import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import java.io.IOError; import java.io.IOException; import java.util.*; import java.util.function.Consumer; @@ -112,7 +113,7 @@ public class LightArrayList implements LightList { try { action.accept(db.get(index)); } catch (IOException e) { - throw (RuntimeException) new RuntimeException().initCause(e); + throw new IOError(e); } } } diff --git a/src/main/java/org/warp/jcwdb/LightBigList.java b/src/main/java/org/warp/jcwdb/LightBigList.java index 6d67666..85d68bc 100644 --- a/src/main/java/org/warp/jcwdb/LightBigList.java +++ b/src/main/java/org/warp/jcwdb/LightBigList.java @@ -4,6 +4,7 @@ import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.longs.LongArrayList; import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import java.io.IOError; import java.io.IOException; import java.util.*; import java.util.function.Consumer; @@ -16,8 +17,8 @@ public class LightBigList implements LightList, Saveable { public final LongArrayList chunks; public final IntArrayList chunkSizes; private final JCWDatabase db; - private LightList cachedChunk; - private EntryReference> cachedChunkRef; + private LightArrayList cachedChunk; + private EntryReference> cachedChunkRef; private long cachedChunkIndex = -1; private int cachedChunkNumber = -1; private final Object cachedChunkLock = new Object(); @@ -132,8 +133,8 @@ public class LightBigList implements LightList, Saveable { if (o != null) { for (long chunkIndex : chunks) { try { - EntryReference> chunkRef = db.get(chunkIndex); - LightList chunk = chunkRef.getValueReadOnly(); + EntryReference> chunkRef = db.get(chunkIndex); + LightArrayList chunk = chunkRef.getValueReadOnly(); if (chunk.contains(o)) { return true; } @@ -175,13 +176,13 @@ public class LightBigList implements LightList, Saveable { @Override public void forEachReference(Consumer> action) { Objects.requireNonNull(action); - for (long chunkIndex : this.chunks) { - try { - EntryReference> chunkRef = db.get(chunkIndex); - LightList chunk = chunkRef.getValueReadOnly(); - chunk.forEachReference(action); - } catch (IOException ex) { - throw (NullPointerException) new NullPointerException().initCause(ex); + // Iterate through all chunks + for (int i = 0; i < chunks.size(); i++) { + synchronized (cachedChunkLock) { + if (cachedChunkNumber != i) { + prepareAccessToChunk(i); + } + cachedChunk.forEachReference(action); } } } @@ -194,6 +195,9 @@ public class LightBigList implements LightList, Saveable { @Deprecated @Override public T[] toArray() { + if (true) { + throw new RuntimeException("toArray() isn't implemented!"); + } T[] result = (T[]) new Object[this.size()]; long currentOffset = 0; @@ -206,8 +210,8 @@ public class LightBigList implements LightList, Saveable { final long chunkIndex = chunks.getLong(i); try { - EntryReference> chunkRef = db.get(chunkIndex); - LightList chunk = chunkRef.getValueReadOnly(); + EntryReference> chunkRef = db.get(chunkIndex); + LightArrayList chunk = chunkRef.getValueReadOnly(); for (int i1 = 0; i1 < chunk.size(); i1++) { result[(int)(chunkStartOffset + i1)] = chunk.get(i); } @@ -276,7 +280,7 @@ public class LightBigList implements LightList, Saveable { } try { - EntryReference> chunkRef = db.get(chunkIndex); + EntryReference> chunkRef = db.get(chunkIndex); chunkRef.editValue((chunk) -> { result.var = chunk.remove(relativeOffset); }); @@ -414,7 +418,7 @@ public class LightBigList implements LightList, Saveable { } try { - EntryReference> chunkRef = db.get(chunkIndex); + EntryReference> chunkRef = db.get(chunkIndex); chunkRef.editValue((chunk) -> { chunk.set(relativeOffset, element); wrapper.var = element; @@ -458,7 +462,7 @@ public class LightBigList implements LightList, Saveable { // Get chunk index final long chunkIndex = chunks.getLong(i); - EntryReference> chunkRef = db.get(chunkIndex); + EntryReference> chunkRef = db.get(chunkIndex); final int foundIndex = chunkRef.getValueReadOnly().indexOfEntry(ref); if (foundIndex >= 0) { return currentOffset + foundIndex; @@ -487,7 +491,7 @@ public class LightBigList implements LightList, Saveable { // Get chunk index final long chunkIndex = chunks.getLong(i); - EntryReference> chunkRef = db.get(chunkIndex); + EntryReference> chunkRef = db.get(chunkIndex); final int foundIndex = chunkRef.getValueReadOnly().lastIndexOfEntry(ref); if (foundIndex >= 0) { return currentOffset + foundIndex; @@ -543,26 +547,20 @@ public class LightBigList implements LightList, Saveable { @Override public boolean removeIf(Predicate filter) { Objects.requireNonNull(filter); - final VariableWrapper result = new VariableWrapper(false); + boolean result = false; // Iterate through all chunks for (int i = 0; i < chunks.size(); i++) { - try { - final int chunkOffset = i; - // Get chunk index - final long chunkIndex = chunks.getLong(i); - EntryReference> chunkRef = db.get(chunkIndex); - chunkRef.editValue((chunk) -> { - boolean removed = chunk.removeIf(filter); - if (removed) { - result.var = true; - chunkSizes.set(chunkOffset, chunk.size()); - } - }); - } catch (IOException ex) { - throw (NullPointerException) new NullPointerException().initCause(ex); + synchronized (cachedChunkLock) { + if (cachedChunkNumber != i) { + prepareAccessToChunk(i); + } + if (cachedChunk.removeIf(filter)) { + result = true; + chunkSizes.set(cachedChunkNumber, cachedChunk.size()); + } } } - return result.var; + return result; } @Override diff --git a/src/main/java/org/warp/jcwdb/MixedIndexDatabase.java b/src/main/java/org/warp/jcwdb/MixedIndexDatabase.java index bfbea65..4806a74 100644 --- a/src/main/java/org/warp/jcwdb/MixedIndexDatabase.java +++ b/src/main/java/org/warp/jcwdb/MixedIndexDatabase.java @@ -1,12 +1,7 @@ package org.warp.jcwdb; -import it.unimi.dsi.fastutil.longs.Long2LongLinkedOpenHashMap; -import it.unimi.dsi.fastutil.longs.Long2LongMap; - import java.io.IOException; import java.nio.file.Path; -import java.util.function.BiConsumer; -import java.util.function.Consumer; public class MixedIndexDatabase implements IndexManager { private final FileIndexManager fileIndices; @@ -62,6 +57,14 @@ public class MixedIndexDatabase implements IndexManager { return fileIndices.set(index, writer); } } + @Override + public void setFlushingAllowed(long index, boolean isFlushingAllowed) { + if (cacheIndices.has(index)) { + cacheIndices.setFlushingAllowed(index, isFlushingAllowed); + } else { + fileIndices.setFlushingAllowed(index, isFlushingAllowed); + } + } @Override public void delete(long index) throws IOException { diff --git a/src/main/java/org/warp/jcwdb/exampleimpl/App.java b/src/main/java/org/warp/jcwdb/exampleimpl/App.java index 96bbb65..b9cfe2f 100644 --- a/src/main/java/org/warp/jcwdb/exampleimpl/App.java +++ b/src/main/java/org/warp/jcwdb/exampleimpl/App.java @@ -7,92 +7,91 @@ import org.warp.jcwdb.LightList; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import it.unimi.dsi.fastutil.objects.ObjectList; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.function.Predicate; public class App { static long time3; - public static void main(String[] args) { + public static void main(String[] args) throws IOException { + if (args.length > 2 && Boolean.parseBoolean(args[2])) { + Files.delete(Paths.get(args[0])); + Files.delete(Paths.get(args[1])); + } + System.out.println("Loading database..."); + long time0 = System.currentTimeMillis(); + JCWDatabase db = new JCWDatabase(Paths.get(args[0]), Paths.get(args[1])); + db.registerClass(StrangeAnimal.class, 0); try { - if (args.length > 2 && Boolean.parseBoolean(args[2])) { - Files.delete(Paths.get(args[0])); - Files.delete(Paths.get(args[1])); - } - System.out.println("Loading database..."); - long time0 = System.currentTimeMillis(); - JCWDatabase db = new JCWDatabase(Paths.get(args[0]), Paths.get(args[1])); - db.registerClass(StrangeAnimal.class, 0); - try { - long time01 = System.currentTimeMillis(); - System.out.println("Time elapsed: " + (time01 - time0)); - System.out.println("Loading root..."); - EntryReference> rootRef = db.getRoot(Animal.class); - rootRef.editValue((root, saver) -> { - long time1 = System.currentTimeMillis(); - System.out.println("Time elapsed: " + (time1 - time01)); - System.out.println("Root size: " + root.size()); - System.out.println("Root:"); - // for (int i = 0; i < root.size(); i++) { - // System.out.println(" - " + root.get(i)); - // } - long prectime = System.currentTimeMillis(); - for (int i = 0; i < 20000000/* 2000000 */; i++) { - Animal animal = new StrangeAnimal(i % 40); - root.add(animal); - if (i > 0 && i % 200000 == 0) { - long precprectime = prectime; - prectime = System.currentTimeMillis(); - System.out.println("Element " + i + " (" + (prectime - precprectime) + "ms)"); - } + long time01 = System.currentTimeMillis(); + System.out.println("Time elapsed: " + (time01 - time0)); + System.out.println("Loading root..."); + EntryReference> rootRef = db.getRoot(Animal.class); + rootRef.editValue((root, saver) -> { + long time1 = System.currentTimeMillis(); + System.out.println("Time elapsed: " + (time1 - time01)); + System.out.println("Root size: " + root.size()); + System.out.println("Root:"); +// for (int i = 0; i < root.size(); i++) { +// System.out.println(" - " + root.get(i)); +// } + long prectime = System.currentTimeMillis(); + for (int i = 0; i < 2000000/* 2000000 */; i++) { + Animal animal = new StrangeAnimal(i % 40); + root.addEntry(animal); + if (i > 0 && i % 200000 == 0) { + long precprectime = prectime; + prectime = System.currentTimeMillis(); + System.out.println("Element " + i + " (" + (prectime - precprectime) + "ms)" + " Total Time: " + (prectime - time1)); } - long time2 = System.currentTimeMillis(); - saver.save(); - System.out.println("Root size: " + root.size()); - System.out.println("Time elapsed: " + (time2 - time1)); - System.out.println("Used memory: " - + ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024) + "MB"); - long time2_0 = System.currentTimeMillis(); - System.out.println("Filtering strings..."); - //root.removeIf(Animal::hasFourLegs); - long time2_1 = System.currentTimeMillis(); - System.out.println("Time elapsed: " + (time2_1 - time2_0)); - ObjectList results = new ObjectArrayList<>(); - - root.forEachReference((valueReference) -> { - Animal value = valueReference.getValueReadOnly(); - if (Animal.hasFourLegs(value)) { - results.add(value); - } - //System.out.println("val:" + value); - }); - long time2_2 = System.currentTimeMillis(); - System.out.println("Matches: " + results.size()); - System.out.println("Time elapsed: " + (time2_2 - time2_1)); - System.out.println("Used memory: " - + ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024) + "MB"); - System.out.println("Used memory: " + ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024) + "MB"); - System.out.println("Cleaning database (to reduce the amount of used memory and detect memory leaks)..."); - long removedItems = 0;//db.clean(); - time3 = System.currentTimeMillis(); - System.out.println("Removed items: " + removedItems); - System.out.println("Used memory: " + ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024) + "MB"); - System.out.println("Time elapsed: " + (time3 - time2_2)); - System.out.println("Saving database..."); - System.out.println("Root size: " + root.size()); - }); - db.close(); - long time4 = System.currentTimeMillis(); - System.out.println("Time elapsed: " + (time4 - time3)); - } catch (Exception ex) { - ex.printStackTrace(); - } finally { - if (db.isOpen()) { - db.close(); } - } + long time2 = System.currentTimeMillis(); + saver.save(); + System.out.println("Root size: " + root.size()); + System.out.println("Time elapsed: " + (time2 - time1)); + System.out.println("Used memory: " + + ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024) + "MB"); + long time2_0 = System.currentTimeMillis(); + System.out.println("Filtering strings..."); + long oldSize = root.size(); + root.removeIf(Animal::hasFourLegs); + long time2_1 = System.currentTimeMillis(); + System.out.println("RemoveIf(x) removed items: " + (oldSize - root.size())); + System.out.println("Time elapsed: " + (time2_1 - time2_0)); + ObjectList results = new ObjectArrayList<>(); + + System.out.println("Retrieving items..."); + root.forEachReference((valueReference) -> { + Animal value = valueReference.getValueReadOnly(); + if (Animal.hasFourLegs(value)) { + results.add(value); + } + //System.out.println("val:" + value); + }); + long time2_2 = System.currentTimeMillis(); + System.out.println("Matches: " + results.size()); + System.out.println("Time elapsed: " + (time2_2 - time2_1)); + System.out.println("Used memory: " + + ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024) + "MB"); + System.out.println("Used memory: " + ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024) + "MB"); + System.out.println("Cleaning database (to reduce the amount of used memory and detect memory leaks)..."); + db.clean(); + time3 = System.currentTimeMillis(); + System.out.println("Used memory: " + ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024) + "MB"); + System.out.println("Time elapsed: " + (time3 - time2_2)); + System.out.println("Saving database..."); + System.out.println("Root size: " + root.size()); + }); + db.close(); + long time4 = System.currentTimeMillis(); + System.out.println("Time elapsed: " + (time4 - time3)); } catch (Exception ex) { ex.printStackTrace(); + } finally { + if (db.isOpen()) { + db.close(); + } } }