strangedb/src/main/java/org/warp/jcwdb/FileIndexManager.java
2018-11-20 18:39:48 +01:00

167 lines
4.9 KiB
Java

package org.warp.jcwdb;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class FileIndexManager implements IndexManager {
private final TypesManager typesManager;
private final SeekableByteChannel dataFileChannel, metadataFileChannel;
private final FileAllocator fileAllocator;
private volatile boolean closed;
private final Object closeLock = new Object();
/**
* Edit this using editIndex()
* Get using getIndexMetadata()
* This hashmap must contain all indices.
*/
private final Map<Long, IndexDetails> loadedIndices;
/**
* Edit this using editIndex()
*/
private final Set<Long> dirtyLoadedIndices;
public FileIndexManager(TypesManager typesManager, Path dataFile, Path metadataFile) throws IOException {
this.typesManager = typesManager;
loadedIndices = new HashMap<>();
dirtyLoadedIndices = new HashSet<>();
dataFileChannel = Files.newByteChannel(dataFile, StandardOpenOption.CREATE);
metadataFileChannel = Files.newByteChannel(metadataFile, StandardOpenOption.CREATE);
fileAllocator = new FileAllocator(dataFileChannel);
}
@Override
public <T> T get(long index, DBReader<T> reader) throws IOException {
checkClosed();
IndexDetails details = getIndexMetadata(index);
Input i = new Input(Channels.newInputStream(dataFileChannel.position(details.getOffset())));
T result = reader.read(i);
i.close();
return result;
}
@Override
public int getType(long index) throws IOException {
return getIndexMetadata(index).getType();
}
@Override
public <T> void set(long index, DBDataOutput<T> data) throws IOException {
checkClosed();
IndexDetails indexDetails = getIndexMetadataUnsafe(index);
if (indexDetails == null || indexDetails.getSize() < data.getSize()) {
allocateAndWrite(index, data);
} else {
if (indexDetails.getSize() > data.getSize()) {
editIndex(index, indexDetails.getOffset(), data.getSize(), indexDetails.getType());
fileAllocator.markFree(indexDetails.getOffset()+data.getSize(), data.getSize());
}
write(index, indexDetails, data);
}
}
private void write(final long index, final IndexDetails indexDetails, DBDataOutput<?> data) throws IOException {
final long offset = indexDetails.getOffset();
final Output o = new Output(Channels.newOutputStream(dataFileChannel.position(offset)));
data.getWriter().write(o);
o.close();
}
private void allocateAndWrite(final long index, DBDataOutput<?> w) throws IOException {
final int size = w.getSize();
final int type = w.getType();
final long offset = fileAllocator.allocate(size);
IndexDetails details = editIndex(index, offset, size, type);
write(index, details, w);
}
@Override
public void delete(long index) {
checkClosed();
}
@Override
public boolean has(long index) {
checkClosed();
return false;
}
private IndexDetails editIndex(long index, long offset, int size, int type) {
IndexDetails indexDetails = new IndexDetails(offset, size, type);
editIndex(index, indexDetails);
return indexDetails;
}
private IndexDetails getIndexMetadataUnsafe(long index) {
IndexDetails details = loadedIndices.getOrDefault(index, null);
if (details != null) return details;
// TODO: implement index loading from file
return null;
}
private IndexDetails getIndexMetadata(long index) throws IOException {
IndexDetails details = getIndexMetadataUnsafe(index);
if (details == null)
throw new IOException("Index " + index + " not found");
else
return details;
}
private void editIndex(long index, IndexDetails details) {
loadedIndices.put(index, details);
dirtyLoadedIndices.add(index);
}
@Override
public void close() throws IOException {
if (closed) {
return;
}
synchronized (closeLock) {
if (closed) {
return;
}
closed = true;
}
SeekableByteChannel metadata = metadataFileChannel;
ByteBuffer metadataEntryBuffer = ByteBuffer.allocateDirect(IndexDetails.TOTAL_BYTES);
long lastIndex = -2;
for (Long index : dirtyLoadedIndices) {
IndexDetails indexDetails = loadedIndices.get(index);
if (index - lastIndex != 1) {
metadata = metadata.position(index * IndexDetails.TOTAL_BYTES);
}
metadataEntryBuffer.putLong(indexDetails.getOffset());
metadataEntryBuffer.putInt(indexDetails.getSize());
metadataEntryBuffer.putInt(indexDetails.getType());
metadata.write(metadataEntryBuffer);
lastIndex = index;
}
fileAllocator.close();
}
private void checkClosed() {
if (closed) {
throw new RuntimeException("Index Manager is closed.");
}
}
}