diff --git a/src/main/java/org/warp/jcwdb/DBLightListParser.java b/src/main/java/org/warp/jcwdb/DBLightListParser.java index 7828463..875cd37 100644 --- a/src/main/java/org/warp/jcwdb/DBLightListParser.java +++ b/src/main/java/org/warp/jcwdb/DBLightListParser.java @@ -1,14 +1,8 @@ package org.warp.jcwdb; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Output; - -import java.io.ByteArrayOutputStream; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; public class DBLightListParser extends DBTypeParserImpl { - private static final Kryo kryo = new Kryo(); private final JCWDatabase db; public DBLightListParser(JCWDatabase db) { @@ -16,21 +10,24 @@ public class DBLightListParser extends DBTypeParserImpl { } public DBReader getReader() { - return (i) -> { - ArrayList internalList = (ArrayList) kryo.readClassAndObject(i); + return (i, size) -> { + ArrayList internalList = new ArrayList<>(); + long max = size / Long.BYTES; + for (int item = 0; item < max; item++){ + long itm = i.readLong(); + internalList.add(itm); + } return new LightList(db, internalList); }; } public DBDataOutput getWriter(final LightList value) { - // TODO: optimize by writing longs directly, to make length determinable without writing to memory the output. - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - Output out = new Output(baos); - kryo.writeClassAndObject(out, value.internalList); - out.close(); - byte[] b = baos.toByteArray(); + final int elementsCount = value.internalList.size(); return DBDataOutput.create((o) -> { - o.write(b); - }, DBStandardTypes.STRING, b.length); + ArrayList list = value.internalList; + for (Long item : list) { + o.writeLong(item); + } + }, DBStandardTypes.LIGHT_LIST, elementsCount * Long.BYTES); } } diff --git a/src/main/java/org/warp/jcwdb/DBReader.java b/src/main/java/org/warp/jcwdb/DBReader.java index 8b707ac..dda81b3 100644 --- a/src/main/java/org/warp/jcwdb/DBReader.java +++ b/src/main/java/org/warp/jcwdb/DBReader.java @@ -3,5 +3,5 @@ package org.warp.jcwdb; import com.esotericsoftware.kryo.io.Input; public interface DBReader { - T read(Input i); + T read(Input i, int size); } diff --git a/src/main/java/org/warp/jcwdb/DBStringParser.java b/src/main/java/org/warp/jcwdb/DBStringParser.java index ac359fa..b446a01 100644 --- a/src/main/java/org/warp/jcwdb/DBStringParser.java +++ b/src/main/java/org/warp/jcwdb/DBStringParser.java @@ -1,12 +1,11 @@ package org.warp.jcwdb; -import java.nio.charset.StandardCharsets; +import java.io.ByteArrayOutputStream; -import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; public class DBStringParser extends DBTypeParserImpl { - private static final DBReader defaultReader = (i) -> { + private static final DBReader defaultReader = (i, size) -> { return i.readString(); }; @@ -15,7 +14,12 @@ public class DBStringParser extends DBTypeParserImpl { } public DBDataOutput getWriter(final String value) { - final byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Output tmpO = new Output(baos); + tmpO.writeString(value); + tmpO.flush(); + final byte[] bytes = baos.toByteArray(); + tmpO.close(); return DBDataOutput.create((o) -> { o.write(bytes); }, DBStandardTypes.STRING, bytes.length); diff --git a/src/main/java/org/warp/jcwdb/EntryReference.java b/src/main/java/org/warp/jcwdb/EntryReference.java index 345f804..4bc3a94 100644 --- a/src/main/java/org/warp/jcwdb/EntryReference.java +++ b/src/main/java/org/warp/jcwdb/EntryReference.java @@ -21,6 +21,13 @@ public class EntryReference implements Castable, AutoCloseable { this.value = db.indices.get(entryId, parser.getReader()); } + public EntryReference(JCWDatabase db, long entryId, DBTypeParser parser, T value) { + this.db = db; + this.entryIndex = entryId; + this.parser = parser; + this.value = value; + } + public DBTypeParser getParser() { return parser; } diff --git a/src/main/java/org/warp/jcwdb/FileAllocator.java b/src/main/java/org/warp/jcwdb/FileAllocator.java index 031bc36..9fb9424 100644 --- a/src/main/java/org/warp/jcwdb/FileAllocator.java +++ b/src/main/java/org/warp/jcwdb/FileAllocator.java @@ -15,6 +15,7 @@ public class FileAllocator implements AutoCloseable { private volatile long allocableOffset; private volatile boolean closed; private final Object closeLock = new Object(); + private final Object allocateLock = new Object(); public FileAllocator(SeekableByteChannel dataFileChannel) throws IOException { this.dataFileChannel = dataFileChannel; @@ -28,9 +29,11 @@ public class FileAllocator implements AutoCloseable { */ public long allocate(int size) { checkClosed(); - long allocatedOffset = allocableOffset; - allocatedOffset += size; - return allocatedOffset; + synchronized (allocateLock) { + long allocatedOffset = allocableOffset; + allocableOffset += size; + return allocatedOffset; + } } diff --git a/src/main/java/org/warp/jcwdb/FileIndexManager.java b/src/main/java/org/warp/jcwdb/FileIndexManager.java index 990963e..8b98e00 100644 --- a/src/main/java/org/warp/jcwdb/FileIndexManager.java +++ b/src/main/java/org/warp/jcwdb/FileIndexManager.java @@ -10,6 +10,7 @@ import java.nio.channels.SeekableByteChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.nio.file.attribute.FileAttribute; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -39,10 +40,19 @@ public class FileIndexManager implements IndexManager { loadedIndices = new HashMap<>(); dirtyLoadedIndices = new HashSet<>(); removedIndices = new HashSet<>(); - dataFileChannel = Files.newByteChannel(dataFile, StandardOpenOption.CREATE); - metadataFileChannel = Files.newByteChannel(metadataFile, StandardOpenOption.CREATE); + if (Files.notExists(dataFile)) { + Files.createFile(dataFile); + } + if (Files.notExists(metadataFile)) { + Files.createFile(metadataFile); + } + dataFileChannel = Files.newByteChannel(dataFile, StandardOpenOption.READ, StandardOpenOption.WRITE); + metadataFileChannel = Files.newByteChannel(metadataFile, StandardOpenOption.READ, StandardOpenOption.WRITE); fileAllocator = new FileAllocator(dataFileChannel); firstAllocableIndex = metadataFileChannel.size() / (long) IndexDetails.TOTAL_BYTES; + if (firstAllocableIndex == 0) { + firstAllocableIndex = 1; + } } @Override @@ -50,8 +60,7 @@ public class FileIndexManager implements IndexManager { checkClosed(); IndexDetails details = getIndexMetadata(index); Input i = new Input(Channels.newInputStream(dataFileChannel.position(details.getOffset()))); - T result = reader.read(i); - i.close(); + T result = reader.read(i, details.getSize()); return result; } @@ -87,14 +96,6 @@ public class FileIndexManager implements IndexManager { return index; } - private long createIndexMetadata(IndexDetails indexDetails) { - long newIndex = firstAllocableIndex++; - loadedIndices.put(newIndex, indexDetails); - dirtyLoadedIndices.add(newIndex); - removedIndices.remove(newIndex); - return newIndex; - } - /** * Write the data at index. * The input size must be equal to the index size! @@ -110,7 +111,7 @@ public class FileIndexManager implements IndexManager { final Output o = new Output(Channels.newOutputStream(dataFileChannel.position(offset))); data.getWriter().write(o); - o.close(); + o.flush(); } private void allocateAndWrite(final long index, DBDataOutput w) throws IOException { @@ -150,18 +151,36 @@ public class FileIndexManager implements IndexManager { return indexDetails; } + private void editIndex(long index, IndexDetails details) { + loadedIndices.put(index, details); + dirtyLoadedIndices.add(index); + } + + private long createIndexMetadata(IndexDetails indexDetails) { + long newIndex = firstAllocableIndex++; + loadedIndices.put(newIndex, indexDetails); + dirtyLoadedIndices.add(newIndex); + removedIndices.remove(newIndex); + return newIndex; + } + private IndexDetails getIndexMetadataUnsafe(long index) throws IOException { // Return index details if loaded IndexDetails details = loadedIndices.getOrDefault(index, null); if (details != null) return details; // Try to load the details from file - SeekableByteChannel currentMetadataFileChannel = metadataFileChannel.position(index * IndexDetails.TOTAL_BYTES); + final long metadataPosition = index * IndexDetails.TOTAL_BYTES; + if (metadataPosition + IndexDetails.TOTAL_BYTES > metadataFileChannel.size()) { + // Avoid underflow exception + return null; + } + SeekableByteChannel currentMetadataFileChannel = metadataFileChannel.position(metadataPosition); ByteBuffer currentMetadataByteBuffer = ByteBuffer.allocateDirect(IndexDetails.TOTAL_BYTES); currentMetadataFileChannel.read(currentMetadataByteBuffer); currentMetadataByteBuffer.flip(); // If it's not deleted continue - if ((currentMetadataByteBuffer.get() & IndexDetails.MASK_DELETED) == 0) { + if ((currentMetadataByteBuffer.getInt() & IndexDetails.MASK_DELETED) == 0) { final long offset = currentMetadataByteBuffer.getLong(); final int size = currentMetadataByteBuffer.getInt(); final int type = currentMetadataByteBuffer.getInt(); @@ -182,11 +201,6 @@ public class FileIndexManager implements IndexManager { return details; } - private void editIndex(long index, IndexDetails details) { - loadedIndices.put(index, details); - dirtyLoadedIndices.add(index); - } - @Override public void close() throws IOException { if (closed) { @@ -208,7 +222,8 @@ public class FileIndexManager implements IndexManager { if (index - lastIndex != 1) { metadata = metadata.position(index * IndexDetails.TOTAL_BYTES); } - metadataEntryBuffer.put((byte) 0); + metadataEntryBuffer.clear(); + metadataEntryBuffer.putInt(0); metadataEntryBuffer.putLong(indexDetails.getOffset()); metadataEntryBuffer.putInt(indexDetails.getSize()); metadataEntryBuffer.putInt(indexDetails.getType()); @@ -221,7 +236,7 @@ public class FileIndexManager implements IndexManager { ByteBuffer updatedMaskBuffer = ByteBuffer.allocateDirect(1); for (Long index : removedIndices) { metadata = metadata.position(index * IndexDetails.TOTAL_BYTES); - updatedMaskBuffer.put(IndexDetails.MASK_DELETED); + updatedMaskBuffer.putInt(IndexDetails.MASK_DELETED); updatedMaskBuffer.flip(); metadata.write(updatedMaskBuffer); } diff --git a/src/main/java/org/warp/jcwdb/IndexDetails.java b/src/main/java/org/warp/jcwdb/IndexDetails.java index 50521e4..67a7e31 100644 --- a/src/main/java/org/warp/jcwdb/IndexDetails.java +++ b/src/main/java/org/warp/jcwdb/IndexDetails.java @@ -6,12 +6,12 @@ public class IndexDetails { /** * The bitmask is used to determine if an index has been deleted */ - public static final int BITMASK = 1; // 1 byte - public static final int OFFSET_BYTES = Long.SIZE; - public static final int DATA_SIZE_BYTES = Integer.SIZE; - public static final int TYPE_BYTES = Integer.SIZE; - public static final int TOTAL_BYTES = BITMASK + OFFSET_BYTES + DATA_SIZE_BYTES + TYPE_BYTES; - public static final byte MASK_DELETED = 0b00000001; + public static final int BITMASK_SIZE = Integer.BYTES; + public static final int OFFSET_BYTES = Long.BYTES; + public static final int DATA_SIZE_BYTES = Integer.BYTES; + public static final int TYPE_BYTES = Integer.BYTES; + public static final int TOTAL_BYTES = BITMASK_SIZE + OFFSET_BYTES + DATA_SIZE_BYTES + TYPE_BYTES; + public static final int MASK_DELETED = 0b00000001; private final long offset; private final int size; private final int type; diff --git a/src/main/java/org/warp/jcwdb/JCWDatabase.java b/src/main/java/org/warp/jcwdb/JCWDatabase.java index 810e1d6..74f0f34 100644 --- a/src/main/java/org/warp/jcwdb/JCWDatabase.java +++ b/src/main/java/org/warp/jcwdb/JCWDatabase.java @@ -2,6 +2,8 @@ package org.warp.jcwdb; import java.io.IOException; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Iterator; import java.util.Map; import java.util.WeakHashMap; @@ -10,56 +12,134 @@ public class JCWDatabase implements AutoCloseable { protected final MixedIndexDatabase indices; private final WeakHashMap> references; private final WeakHashMap> referencesByObject; + private volatile boolean closed; + private final Object closeLock = new Object(); + private final Object indicesAccessLock = new Object(); + private final Object referencesAccessLock = new Object(); public JCWDatabase(Path dataFile, Path metadataFile) throws IOException { this.typesManager = new TypesManager(this); this.indices = new MixedIndexDatabase(typesManager, dataFile, metadataFile); this.references = new WeakHashMap<>(); this.referencesByObject = new WeakHashMap<>(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + JCWDatabase.this.close(); + } catch (Exception e) { + e.printStackTrace(); + } + })); } - public EntryReference getRoot() throws IOException { - try { + public EntryReference> getRoot() throws IOException { + checkClosed(); + if (exists(0)) { return get(0); - } catch (IOException e) { - throw new IOException("Can't load root!", e); + } else { + LightList newRoot = new LightList(this, new ArrayList<>()); + return set(0, newRoot); } } public EntryReference get(long index) throws IOException { - EntryReference ref = (EntryReference) this.references.getOrDefault(index, null); - if (ref == null) { - int type = this.indices.getType(index); - DBTypeParser typeParser = this.typesManager.get(type); - ref = new EntryReference<>(this, index, typeParser); - this.references.put(index, ref); - this.referencesByObject.put(ref.value, ref); - } - return ref; - } - - public EntryReference add(T o) throws IOException { - EntryReference ref = (EntryReference) referencesByObject.getOrDefault(o, null); - if (ref != null) { + checkClosed(); + synchronized (referencesAccessLock) { + EntryReference ref = (EntryReference) this.references.getOrDefault(index, null); + if (ref == null) { + int type; + synchronized (indicesAccessLock) { + type = this.indices.getType(index); + } + DBTypeParser typeParser = this.typesManager.get(type); + ref = new EntryReference<>(this, index, typeParser); + this.references.put(index, ref); + this.referencesByObject.put(ref.value, ref); + } return ref; } - DBTypeParser typeParser = this.typesManager.get((Class) o.getClass()); - long index = indices.add(typeParser.getWriter(o)); - ref = new EntryReference<>(this, index, typeParser); - this.references.put(index, ref); - this.referencesByObject.put(o, ref); - return ref; + } + + protected EntryReference add(T value) throws IOException { + checkClosed(); + synchronized (referencesAccessLock) { + EntryReference ref = (EntryReference) referencesByObject.getOrDefault(value, null); + if (ref != null) { + return ref; + } + DBTypeParser typeParser = this.typesManager.get((Class) value.getClass()); + long index; + synchronized (indicesAccessLock) { + index = indices.add(typeParser.getWriter(value)); + } + ref = new EntryReference<>(this, index, typeParser, value); + this.references.put(index, ref); + this.referencesByObject.put(value, ref); + return ref; + } + } + + protected boolean exists(long index) { + checkClosed(); + synchronized (referencesAccessLock) { + synchronized (indicesAccessLock) { + return this.references.containsKey(index) || this.indices.has(index); + } + } + } + + protected EntryReference set(long index, T value) throws IOException { + checkClosed(); + synchronized (referencesAccessLock) { + EntryReference ref; + if (exists(index)) { + ref = get(index); + ref.value = value; + return ref; + } else { + DBTypeParser typeParser = this.typesManager.get((Class) value.getClass()); + synchronized (indicesAccessLock) { + indices.set(index, typeParser.getWriter(value)); + } + ref = new EntryReference<>(this, index, typeParser); + this.references.put(index, ref); + this.referencesByObject.put(value, ref); + return ref; + } + } } protected void removeEntryReference(long index) { - this.references.remove(index); + synchronized (referencesAccessLock) { + this.references.remove(index); + } } @Override public void close() throws Exception { - for (Map.Entry> reference : references.entrySet()) { - reference.getValue().close(); + if (closed) { + return; + } + synchronized (closeLock) { + if (closed) { + return; + } + closed = true; + } + + synchronized (referencesAccessLock) { + for (Map.Entry> reference : references.entrySet()) { + reference.getValue().save(); + } + } + synchronized (indicesAccessLock) { + this.indices.close(); + } + System.out.println("Database closed."); + } + + private void checkClosed() { + if (closed) { + throw new RuntimeException("Index Manager is closed."); } - this.indices.close(); } } diff --git a/src/main/java/org/warp/jcwdb/LightList.java b/src/main/java/org/warp/jcwdb/LightList.java index 3753684..dcd22c5 100644 --- a/src/main/java/org/warp/jcwdb/LightList.java +++ b/src/main/java/org/warp/jcwdb/LightList.java @@ -139,8 +139,12 @@ public class LightList implements List { @Override public T get(int index) { - // TODO: implement - return null; + try { + return (T) db.get(internalList.get(index)).value; + } catch (IOException e) { + e.printStackTrace(); + return null; + } } @Override diff --git a/src/main/java/org/warp/jcwdb/MixedIndexDatabase.java b/src/main/java/org/warp/jcwdb/MixedIndexDatabase.java index b597120..c2e1d25 100644 --- a/src/main/java/org/warp/jcwdb/MixedIndexDatabase.java +++ b/src/main/java/org/warp/jcwdb/MixedIndexDatabase.java @@ -7,13 +7,11 @@ import java.io.IOException; import java.nio.file.Path; public class MixedIndexDatabase implements IndexManager { - private final TypesManager typesManager; private final Long2LongMap mostAccessedIndices; private final FileIndexManager fileIndices; private final CacheIndexManager cacheIndices; public MixedIndexDatabase(TypesManager typesManager, Path dataFile, Path metadataFile) throws IOException { - this.typesManager = typesManager; this.mostAccessedIndices = new Long2LongLinkedOpenHashMap(); this.fileIndices = new FileIndexManager(typesManager, dataFile, metadataFile); this.cacheIndices = new CacheIndexManager(typesManager); @@ -21,6 +19,7 @@ public class MixedIndexDatabase implements IndexManager { @Override public T get(long index, DBReader reader) throws IOException { + incrementUsage(index); if (cacheIndices.has(index)) { return cacheIndices.get(index, reader); } else { @@ -62,6 +61,10 @@ public class MixedIndexDatabase implements IndexManager { return cacheIndices.has(index) || fileIndices.has(index); } + private void incrementUsage(long index) { + mostAccessedIndices.put(index, mostAccessedIndices.getOrDefault(index, 0) + 1); + } + @Override public void close() throws IOException { // TODO: move all cached indices to filesIndices before closing. diff --git a/src/main/java/org/warp/jcwdb/exampleimpl/App.java b/src/main/java/org/warp/jcwdb/exampleimpl/App.java new file mode 100644 index 0000000..f37c53b --- /dev/null +++ b/src/main/java/org/warp/jcwdb/exampleimpl/App.java @@ -0,0 +1,26 @@ +package org.warp.jcwdb.exampleimpl; + +import org.warp.jcwdb.EntryReference; +import org.warp.jcwdb.JCWDatabase; +import org.warp.jcwdb.LightList; + +import java.nio.file.Files; +import java.nio.file.Paths; + +public class App { + public static void main(String[] args) throws Exception { + if (args.length > 2 && Boolean.parseBoolean(args[2])) { + Files.delete(Paths.get(args[0])); + Files.delete(Paths.get(args[1])); + } + JCWDatabase db = new JCWDatabase(Paths.get(args[0]), Paths.get(args[1])); + LightList root = ((EntryReference>) db.getRoot().cast()).value; + System.out.println("Root:"); + for (int i = 0; i < root.size(); i++) { + System.out.println(" - " + root.get(i)); + } + for (int i = 0; i < 100; i++) { + root.add("Test " + System.currentTimeMillis()); + } + } +}