Better implementation of LightList
This commit is contained in:
parent
d9da3274c2
commit
fa692b391a
@ -1,14 +1,8 @@
|
||||
package org.warp.jcwdb;
|
||||
|
||||
import com.esotericsoftware.kryo.Kryo;
|
||||
import com.esotericsoftware.kryo.io.Output;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
|
||||
public class DBLightListParser extends DBTypeParserImpl<LightList> {
|
||||
private static final Kryo kryo = new Kryo();
|
||||
private final JCWDatabase db;
|
||||
|
||||
public DBLightListParser(JCWDatabase db) {
|
||||
@ -16,21 +10,24 @@ public class DBLightListParser extends DBTypeParserImpl<LightList> {
|
||||
}
|
||||
|
||||
public DBReader<LightList> getReader() {
|
||||
return (i) -> {
|
||||
ArrayList<Long> internalList = (ArrayList<Long>) kryo.readClassAndObject(i);
|
||||
return (i, size) -> {
|
||||
ArrayList<Long> internalList = new ArrayList<>();
|
||||
long max = size / Long.BYTES;
|
||||
for (int item = 0; item < max; item++){
|
||||
long itm = i.readLong();
|
||||
internalList.add(itm);
|
||||
}
|
||||
return new LightList(db, internalList);
|
||||
};
|
||||
}
|
||||
|
||||
public DBDataOutput<LightList> getWriter(final LightList value) {
|
||||
// TODO: optimize by writing longs directly, to make length determinable without writing to memory the output.
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
Output out = new Output(baos);
|
||||
kryo.writeClassAndObject(out, value.internalList);
|
||||
out.close();
|
||||
byte[] b = baos.toByteArray();
|
||||
final int elementsCount = value.internalList.size();
|
||||
return DBDataOutput.create((o) -> {
|
||||
o.write(b);
|
||||
}, DBStandardTypes.STRING, b.length);
|
||||
ArrayList<Long> list = value.internalList;
|
||||
for (Long item : list) {
|
||||
o.writeLong(item);
|
||||
}
|
||||
}, DBStandardTypes.LIGHT_LIST, elementsCount * Long.BYTES);
|
||||
}
|
||||
}
|
||||
|
@ -3,5 +3,5 @@ package org.warp.jcwdb;
|
||||
import com.esotericsoftware.kryo.io.Input;
|
||||
|
||||
public interface DBReader<T> {
|
||||
T read(Input i);
|
||||
T read(Input i, int size);
|
||||
}
|
||||
|
@ -1,12 +1,11 @@
|
||||
package org.warp.jcwdb;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
|
||||
import com.esotericsoftware.kryo.io.Input;
|
||||
import com.esotericsoftware.kryo.io.Output;
|
||||
|
||||
public class DBStringParser extends DBTypeParserImpl<String> {
|
||||
private static final DBReader<String> defaultReader = (i) -> {
|
||||
private static final DBReader<String> defaultReader = (i, size) -> {
|
||||
return i.readString();
|
||||
};
|
||||
|
||||
@ -15,7 +14,12 @@ public class DBStringParser extends DBTypeParserImpl<String> {
|
||||
}
|
||||
|
||||
public DBDataOutput<String> getWriter(final String value) {
|
||||
final byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
Output tmpO = new Output(baos);
|
||||
tmpO.writeString(value);
|
||||
tmpO.flush();
|
||||
final byte[] bytes = baos.toByteArray();
|
||||
tmpO.close();
|
||||
return DBDataOutput.create((o) -> {
|
||||
o.write(bytes);
|
||||
}, DBStandardTypes.STRING, bytes.length);
|
||||
|
@ -21,6 +21,13 @@ public class EntryReference<T> implements Castable, AutoCloseable {
|
||||
this.value = db.indices.get(entryId, parser.getReader());
|
||||
}
|
||||
|
||||
public EntryReference(JCWDatabase db, long entryId, DBTypeParser<T> parser, T value) {
|
||||
this.db = db;
|
||||
this.entryIndex = entryId;
|
||||
this.parser = parser;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public DBTypeParser<T> getParser() {
|
||||
return parser;
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ public class FileAllocator implements AutoCloseable {
|
||||
private volatile long allocableOffset;
|
||||
private volatile boolean closed;
|
||||
private final Object closeLock = new Object();
|
||||
private final Object allocateLock = new Object();
|
||||
|
||||
public FileAllocator(SeekableByteChannel dataFileChannel) throws IOException {
|
||||
this.dataFileChannel = dataFileChannel;
|
||||
@ -28,9 +29,11 @@ public class FileAllocator implements AutoCloseable {
|
||||
*/
|
||||
public long allocate(int size) {
|
||||
checkClosed();
|
||||
long allocatedOffset = allocableOffset;
|
||||
allocatedOffset += size;
|
||||
return allocatedOffset;
|
||||
synchronized (allocateLock) {
|
||||
long allocatedOffset = allocableOffset;
|
||||
allocableOffset += size;
|
||||
return allocatedOffset;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -10,6 +10,7 @@ import java.nio.channels.SeekableByteChannel;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.nio.file.attribute.FileAttribute;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
@ -39,10 +40,19 @@ public class FileIndexManager implements IndexManager {
|
||||
loadedIndices = new HashMap<>();
|
||||
dirtyLoadedIndices = new HashSet<>();
|
||||
removedIndices = new HashSet<>();
|
||||
dataFileChannel = Files.newByteChannel(dataFile, StandardOpenOption.CREATE);
|
||||
metadataFileChannel = Files.newByteChannel(metadataFile, StandardOpenOption.CREATE);
|
||||
if (Files.notExists(dataFile)) {
|
||||
Files.createFile(dataFile);
|
||||
}
|
||||
if (Files.notExists(metadataFile)) {
|
||||
Files.createFile(metadataFile);
|
||||
}
|
||||
dataFileChannel = Files.newByteChannel(dataFile, StandardOpenOption.READ, StandardOpenOption.WRITE);
|
||||
metadataFileChannel = Files.newByteChannel(metadataFile, StandardOpenOption.READ, StandardOpenOption.WRITE);
|
||||
fileAllocator = new FileAllocator(dataFileChannel);
|
||||
firstAllocableIndex = metadataFileChannel.size() / (long) IndexDetails.TOTAL_BYTES;
|
||||
if (firstAllocableIndex == 0) {
|
||||
firstAllocableIndex = 1;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -50,8 +60,7 @@ public class FileIndexManager implements IndexManager {
|
||||
checkClosed();
|
||||
IndexDetails details = getIndexMetadata(index);
|
||||
Input i = new Input(Channels.newInputStream(dataFileChannel.position(details.getOffset())));
|
||||
T result = reader.read(i);
|
||||
i.close();
|
||||
T result = reader.read(i, details.getSize());
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -87,14 +96,6 @@ public class FileIndexManager implements IndexManager {
|
||||
return index;
|
||||
}
|
||||
|
||||
private long createIndexMetadata(IndexDetails indexDetails) {
|
||||
long newIndex = firstAllocableIndex++;
|
||||
loadedIndices.put(newIndex, indexDetails);
|
||||
dirtyLoadedIndices.add(newIndex);
|
||||
removedIndices.remove(newIndex);
|
||||
return newIndex;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write the data at index.
|
||||
* The input size must be equal to the index size!
|
||||
@ -110,7 +111,7 @@ public class FileIndexManager implements IndexManager {
|
||||
|
||||
final Output o = new Output(Channels.newOutputStream(dataFileChannel.position(offset)));
|
||||
data.getWriter().write(o);
|
||||
o.close();
|
||||
o.flush();
|
||||
}
|
||||
|
||||
private void allocateAndWrite(final long index, DBDataOutput<?> w) throws IOException {
|
||||
@ -150,18 +151,36 @@ public class FileIndexManager implements IndexManager {
|
||||
return indexDetails;
|
||||
}
|
||||
|
||||
private void editIndex(long index, IndexDetails details) {
|
||||
loadedIndices.put(index, details);
|
||||
dirtyLoadedIndices.add(index);
|
||||
}
|
||||
|
||||
private long createIndexMetadata(IndexDetails indexDetails) {
|
||||
long newIndex = firstAllocableIndex++;
|
||||
loadedIndices.put(newIndex, indexDetails);
|
||||
dirtyLoadedIndices.add(newIndex);
|
||||
removedIndices.remove(newIndex);
|
||||
return newIndex;
|
||||
}
|
||||
|
||||
private IndexDetails getIndexMetadataUnsafe(long index) throws IOException {
|
||||
// Return index details if loaded
|
||||
IndexDetails details = loadedIndices.getOrDefault(index, null);
|
||||
if (details != null) return details;
|
||||
|
||||
// Try to load the details from file
|
||||
SeekableByteChannel currentMetadataFileChannel = metadataFileChannel.position(index * IndexDetails.TOTAL_BYTES);
|
||||
final long metadataPosition = index * IndexDetails.TOTAL_BYTES;
|
||||
if (metadataPosition + IndexDetails.TOTAL_BYTES > metadataFileChannel.size()) {
|
||||
// Avoid underflow exception
|
||||
return null;
|
||||
}
|
||||
SeekableByteChannel currentMetadataFileChannel = metadataFileChannel.position(metadataPosition);
|
||||
ByteBuffer currentMetadataByteBuffer = ByteBuffer.allocateDirect(IndexDetails.TOTAL_BYTES);
|
||||
currentMetadataFileChannel.read(currentMetadataByteBuffer);
|
||||
currentMetadataByteBuffer.flip();
|
||||
// If it's not deleted continue
|
||||
if ((currentMetadataByteBuffer.get() & IndexDetails.MASK_DELETED) == 0) {
|
||||
if ((currentMetadataByteBuffer.getInt() & IndexDetails.MASK_DELETED) == 0) {
|
||||
final long offset = currentMetadataByteBuffer.getLong();
|
||||
final int size = currentMetadataByteBuffer.getInt();
|
||||
final int type = currentMetadataByteBuffer.getInt();
|
||||
@ -182,11 +201,6 @@ public class FileIndexManager implements IndexManager {
|
||||
return details;
|
||||
}
|
||||
|
||||
private void editIndex(long index, IndexDetails details) {
|
||||
loadedIndices.put(index, details);
|
||||
dirtyLoadedIndices.add(index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (closed) {
|
||||
@ -208,7 +222,8 @@ public class FileIndexManager implements IndexManager {
|
||||
if (index - lastIndex != 1) {
|
||||
metadata = metadata.position(index * IndexDetails.TOTAL_BYTES);
|
||||
}
|
||||
metadataEntryBuffer.put((byte) 0);
|
||||
metadataEntryBuffer.clear();
|
||||
metadataEntryBuffer.putInt(0);
|
||||
metadataEntryBuffer.putLong(indexDetails.getOffset());
|
||||
metadataEntryBuffer.putInt(indexDetails.getSize());
|
||||
metadataEntryBuffer.putInt(indexDetails.getType());
|
||||
@ -221,7 +236,7 @@ public class FileIndexManager implements IndexManager {
|
||||
ByteBuffer updatedMaskBuffer = ByteBuffer.allocateDirect(1);
|
||||
for (Long index : removedIndices) {
|
||||
metadata = metadata.position(index * IndexDetails.TOTAL_BYTES);
|
||||
updatedMaskBuffer.put(IndexDetails.MASK_DELETED);
|
||||
updatedMaskBuffer.putInt(IndexDetails.MASK_DELETED);
|
||||
updatedMaskBuffer.flip();
|
||||
metadata.write(updatedMaskBuffer);
|
||||
}
|
||||
|
@ -6,12 +6,12 @@ public class IndexDetails {
|
||||
/**
|
||||
* The bitmask is used to determine if an index has been deleted
|
||||
*/
|
||||
public static final int BITMASK = 1; // 1 byte
|
||||
public static final int OFFSET_BYTES = Long.SIZE;
|
||||
public static final int DATA_SIZE_BYTES = Integer.SIZE;
|
||||
public static final int TYPE_BYTES = Integer.SIZE;
|
||||
public static final int TOTAL_BYTES = BITMASK + OFFSET_BYTES + DATA_SIZE_BYTES + TYPE_BYTES;
|
||||
public static final byte MASK_DELETED = 0b00000001;
|
||||
public static final int BITMASK_SIZE = Integer.BYTES;
|
||||
public static final int OFFSET_BYTES = Long.BYTES;
|
||||
public static final int DATA_SIZE_BYTES = Integer.BYTES;
|
||||
public static final int TYPE_BYTES = Integer.BYTES;
|
||||
public static final int TOTAL_BYTES = BITMASK_SIZE + OFFSET_BYTES + DATA_SIZE_BYTES + TYPE_BYTES;
|
||||
public static final int MASK_DELETED = 0b00000001;
|
||||
private final long offset;
|
||||
private final int size;
|
||||
private final int type;
|
||||
|
@ -2,6 +2,8 @@ package org.warp.jcwdb;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.WeakHashMap;
|
||||
|
||||
@ -10,56 +12,134 @@ public class JCWDatabase implements AutoCloseable {
|
||||
protected final MixedIndexDatabase indices;
|
||||
private final WeakHashMap<Long, EntryReference<?>> references;
|
||||
private final WeakHashMap<Object, EntryReference<?>> referencesByObject;
|
||||
private volatile boolean closed;
|
||||
private final Object closeLock = new Object();
|
||||
private final Object indicesAccessLock = new Object();
|
||||
private final Object referencesAccessLock = new Object();
|
||||
|
||||
public JCWDatabase(Path dataFile, Path metadataFile) throws IOException {
|
||||
this.typesManager = new TypesManager(this);
|
||||
this.indices = new MixedIndexDatabase(typesManager, dataFile, metadataFile);
|
||||
this.references = new WeakHashMap<>();
|
||||
this.referencesByObject = new WeakHashMap<>();
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
try {
|
||||
JCWDatabase.this.close();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
public <T> EntryReference<T> getRoot() throws IOException {
|
||||
try {
|
||||
public <T> EntryReference<LightList<T>> getRoot() throws IOException {
|
||||
checkClosed();
|
||||
if (exists(0)) {
|
||||
return get(0);
|
||||
} catch (IOException e) {
|
||||
throw new IOException("Can't load root!", e);
|
||||
} else {
|
||||
LightList<T> newRoot = new LightList<T>(this, new ArrayList<>());
|
||||
return set(0, newRoot);
|
||||
}
|
||||
}
|
||||
|
||||
public <T> EntryReference<T> get(long index) throws IOException {
|
||||
EntryReference<T> ref = (EntryReference<T>) this.references.getOrDefault(index, null);
|
||||
if (ref == null) {
|
||||
int type = this.indices.getType(index);
|
||||
DBTypeParser<T> typeParser = this.typesManager.get(type);
|
||||
ref = new EntryReference<>(this, index, typeParser);
|
||||
this.references.put(index, ref);
|
||||
this.referencesByObject.put(ref.value, ref);
|
||||
}
|
||||
return ref;
|
||||
}
|
||||
|
||||
public <T> EntryReference<T> add(T o) throws IOException {
|
||||
EntryReference<T> ref = (EntryReference<T>) referencesByObject.getOrDefault(o, null);
|
||||
if (ref != null) {
|
||||
checkClosed();
|
||||
synchronized (referencesAccessLock) {
|
||||
EntryReference<T> ref = (EntryReference<T>) this.references.getOrDefault(index, null);
|
||||
if (ref == null) {
|
||||
int type;
|
||||
synchronized (indicesAccessLock) {
|
||||
type = this.indices.getType(index);
|
||||
}
|
||||
DBTypeParser<T> typeParser = this.typesManager.get(type);
|
||||
ref = new EntryReference<>(this, index, typeParser);
|
||||
this.references.put(index, ref);
|
||||
this.referencesByObject.put(ref.value, ref);
|
||||
}
|
||||
return ref;
|
||||
}
|
||||
DBTypeParser<T> typeParser = this.typesManager.get((Class<T>) o.getClass());
|
||||
long index = indices.add(typeParser.getWriter(o));
|
||||
ref = new EntryReference<>(this, index, typeParser);
|
||||
this.references.put(index, ref);
|
||||
this.referencesByObject.put(o, ref);
|
||||
return ref;
|
||||
}
|
||||
|
||||
protected <T> EntryReference<T> add(T value) throws IOException {
|
||||
checkClosed();
|
||||
synchronized (referencesAccessLock) {
|
||||
EntryReference<T> ref = (EntryReference<T>) referencesByObject.getOrDefault(value, null);
|
||||
if (ref != null) {
|
||||
return ref;
|
||||
}
|
||||
DBTypeParser<T> typeParser = this.typesManager.get((Class<T>) value.getClass());
|
||||
long index;
|
||||
synchronized (indicesAccessLock) {
|
||||
index = indices.add(typeParser.getWriter(value));
|
||||
}
|
||||
ref = new EntryReference<>(this, index, typeParser, value);
|
||||
this.references.put(index, ref);
|
||||
this.referencesByObject.put(value, ref);
|
||||
return ref;
|
||||
}
|
||||
}
|
||||
|
||||
protected boolean exists(long index) {
|
||||
checkClosed();
|
||||
synchronized (referencesAccessLock) {
|
||||
synchronized (indicesAccessLock) {
|
||||
return this.references.containsKey(index) || this.indices.has(index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected <T> EntryReference<T> set(long index, T value) throws IOException {
|
||||
checkClosed();
|
||||
synchronized (referencesAccessLock) {
|
||||
EntryReference<T> ref;
|
||||
if (exists(index)) {
|
||||
ref = get(index);
|
||||
ref.value = value;
|
||||
return ref;
|
||||
} else {
|
||||
DBTypeParser<T> typeParser = this.typesManager.get((Class<T>) value.getClass());
|
||||
synchronized (indicesAccessLock) {
|
||||
indices.set(index, typeParser.getWriter(value));
|
||||
}
|
||||
ref = new EntryReference<>(this, index, typeParser);
|
||||
this.references.put(index, ref);
|
||||
this.referencesByObject.put(value, ref);
|
||||
return ref;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void removeEntryReference(long index) {
|
||||
this.references.remove(index);
|
||||
synchronized (referencesAccessLock) {
|
||||
this.references.remove(index);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
for (Map.Entry<Long, EntryReference<?>> reference : references.entrySet()) {
|
||||
reference.getValue().close();
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
synchronized (closeLock) {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
closed = true;
|
||||
}
|
||||
|
||||
synchronized (referencesAccessLock) {
|
||||
for (Map.Entry<Long, EntryReference<?>> reference : references.entrySet()) {
|
||||
reference.getValue().save();
|
||||
}
|
||||
}
|
||||
synchronized (indicesAccessLock) {
|
||||
this.indices.close();
|
||||
}
|
||||
System.out.println("Database closed.");
|
||||
}
|
||||
|
||||
private void checkClosed() {
|
||||
if (closed) {
|
||||
throw new RuntimeException("Index Manager is closed.");
|
||||
}
|
||||
this.indices.close();
|
||||
}
|
||||
}
|
||||
|
@ -139,8 +139,12 @@ public class LightList<T> implements List<T> {
|
||||
|
||||
@Override
|
||||
public T get(int index) {
|
||||
// TODO: implement
|
||||
return null;
|
||||
try {
|
||||
return (T) db.get(internalList.get(index)).value;
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -7,13 +7,11 @@ import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
|
||||
public class MixedIndexDatabase implements IndexManager {
|
||||
private final TypesManager typesManager;
|
||||
private final Long2LongMap mostAccessedIndices;
|
||||
private final FileIndexManager fileIndices;
|
||||
private final CacheIndexManager cacheIndices;
|
||||
|
||||
public MixedIndexDatabase(TypesManager typesManager, Path dataFile, Path metadataFile) throws IOException {
|
||||
this.typesManager = typesManager;
|
||||
this.mostAccessedIndices = new Long2LongLinkedOpenHashMap();
|
||||
this.fileIndices = new FileIndexManager(typesManager, dataFile, metadataFile);
|
||||
this.cacheIndices = new CacheIndexManager(typesManager);
|
||||
@ -21,6 +19,7 @@ public class MixedIndexDatabase implements IndexManager {
|
||||
|
||||
@Override
|
||||
public <T> T get(long index, DBReader<T> reader) throws IOException {
|
||||
incrementUsage(index);
|
||||
if (cacheIndices.has(index)) {
|
||||
return cacheIndices.get(index, reader);
|
||||
} else {
|
||||
@ -62,6 +61,10 @@ public class MixedIndexDatabase implements IndexManager {
|
||||
return cacheIndices.has(index) || fileIndices.has(index);
|
||||
}
|
||||
|
||||
private void incrementUsage(long index) {
|
||||
mostAccessedIndices.put(index, mostAccessedIndices.getOrDefault(index, 0) + 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
// TODO: move all cached indices to filesIndices before closing.
|
||||
|
26
src/main/java/org/warp/jcwdb/exampleimpl/App.java
Normal file
26
src/main/java/org/warp/jcwdb/exampleimpl/App.java
Normal file
@ -0,0 +1,26 @@
|
||||
package org.warp.jcwdb.exampleimpl;
|
||||
|
||||
import org.warp.jcwdb.EntryReference;
|
||||
import org.warp.jcwdb.JCWDatabase;
|
||||
import org.warp.jcwdb.LightList;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
public class App {
|
||||
public static void main(String[] args) throws Exception {
|
||||
if (args.length > 2 && Boolean.parseBoolean(args[2])) {
|
||||
Files.delete(Paths.get(args[0]));
|
||||
Files.delete(Paths.get(args[1]));
|
||||
}
|
||||
JCWDatabase db = new JCWDatabase(Paths.get(args[0]), Paths.get(args[1]));
|
||||
LightList<String> root = ((EntryReference<LightList<String>>) db.getRoot().cast()).value;
|
||||
System.out.println("Root:");
|
||||
for (int i = 0; i < root.size(); i++) {
|
||||
System.out.println(" - " + root.get(i));
|
||||
}
|
||||
for (int i = 0; i < 100; i++) {
|
||||
root.add("Test " + System.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user