strangedb/src/main/java/org/warp/jcwdb/FileIndexManager.java

260 lines
8.2 KiB
Java

package org.warp.jcwdb;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.LongAVLTreeSet;
import it.unimi.dsi.fastutil.longs.LongSet;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
public class FileIndexManager implements IndexManager {
private final SeekableByteChannel dataFileChannel, metadataFileChannel;
private final FileAllocator fileAllocator;
private final ByteBuffer metadataByteBuffer = ByteBuffer.allocateDirect(IndexDetails.TOTAL_BYTES);
private final ByteBuffer maskByteBuffer = ByteBuffer.allocateDirect(Integer.BYTES);
private volatile boolean closed;
private final Object closeLock = new Object();
private final Object metadataByteBufferLock = new Object();
private final Object maskByteBufferLock = new Object();
/**
* Edit this using editIndex()
* Get using getIndexMetadata()
* This hashmap must contain all indices.
*/
private final Long2ObjectMap<IndexDetails> loadedIndices;
/**
* Edit this using editIndex()
*/
private final LongSet dirtyLoadedIndices, removedIndices;
private long firstAllocableIndex;
public FileIndexManager(Path dataFile, Path metadataFile) throws IOException {
loadedIndices = new Long2ObjectAVLTreeMap<>();
dirtyLoadedIndices = new LongAVLTreeSet();
removedIndices = new LongAVLTreeSet();
if (Files.notExists(dataFile)) {
Files.createFile(dataFile);
}
if (Files.notExists(metadataFile)) {
Files.createFile(metadataFile);
}
dataFileChannel = Files.newByteChannel(dataFile, StandardOpenOption.READ, StandardOpenOption.WRITE);
metadataFileChannel = Files.newByteChannel(metadataFile, StandardOpenOption.READ, StandardOpenOption.WRITE);
fileAllocator = new FileAllocator(dataFileChannel);
firstAllocableIndex = metadataFileChannel.size() / (long) IndexDetails.TOTAL_BYTES;
if (firstAllocableIndex == 0) {
firstAllocableIndex = 1;
}
}
@Override
public <T> T get(long index, DBReader<T> reader) throws IOException {
checkClosed();
IndexDetails details = getIndexMetadata(index);
Input i = new Input(Channels.newInputStream(dataFileChannel.position(details.getOffset())));
T result = reader.read(i, details.getSize());
return result;
}
@Override
public int getType(long index) throws IOException {
return getIndexMetadata(index).getType();
}
@Override
public <T> void set(long index, DBDataOutput<T> data) throws IOException {
checkClosed();
final IndexDetails indexDetails = getIndexMetadataUnsafe(index);
if (indexDetails == null || indexDetails.getSize() < data.getSize()) {
allocateAndWrite(index, data);
} else {
if (indexDetails.getSize() > data.getSize()) {
editIndex(index, indexDetails.getOffset(), data.getSize(), indexDetails.getType());
fileAllocator.markFree(indexDetails.getOffset()+data.getSize(), data.getSize());
}
writeExact(indexDetails, data);
}
}
@Override
public <T> long add(DBDataOutput<T> data) throws IOException {
checkClosed();
final int size = data.getSize();
final long offset = fileAllocator.allocate(size);
final int type = data.getType();
final IndexDetails indexDetails = new IndexDetails(offset, size, type);
final long index = createIndexMetadata(indexDetails);
writeExact(indexDetails, data);
return index;
}
/**
* Write the data at index.
* The input size must be equal to the index size!
* @param indexDetails
* @param data
* @throws IOException
*/
private void writeExact(final IndexDetails indexDetails, DBDataOutput<?> data) throws IOException {
final int dataSize = data.getSize();
if (indexDetails.getSize() != dataSize) {
throw new IOException("Unable to write " + dataSize + " in a space of " + indexDetails.getSize());
}
final long offset = indexDetails.getOffset();
final Output o = new Output(Channels.newOutputStream(dataFileChannel.position(offset)), dataSize);
data.getWriter().write(o);
o.flush();
}
private void allocateAndWrite(final long index, DBDataOutput<?> w) throws IOException {
final int size = w.getSize();
final int type = w.getType();
final long offset = fileAllocator.allocate(size);
IndexDetails details = editIndex(index, offset, size, type);
writeExact(details, w);
}
@Override
public void delete(long index) throws IOException {
checkClosed();
IndexDetails indexDetails = getIndexMetadataUnsafe(index);
if (indexDetails != null) {
fileAllocator.markFree(indexDetails.getOffset(), indexDetails.getSize());
}
dirtyLoadedIndices.remove(index);
loadedIndices.remove(index);
removedIndices.add(index);
}
@Override
public boolean has(long index) {
checkClosed();
try {
return getIndexMetadataUnsafe(index) != null;
} catch (IOException ex) {
ex.printStackTrace();
return false;
}
}
private IndexDetails editIndex(long index, long offset, int size, int type) {
IndexDetails indexDetails = new IndexDetails(offset, size, type);
editIndex(index, indexDetails);
return indexDetails;
}
private void editIndex(long index, IndexDetails details) {
loadedIndices.put(index, details);
dirtyLoadedIndices.add(index);
}
private long createIndexMetadata(IndexDetails indexDetails) {
long newIndex = firstAllocableIndex++;
loadedIndices.put(newIndex, indexDetails);
dirtyLoadedIndices.add(newIndex);
removedIndices.remove(newIndex);
return newIndex;
}
private IndexDetails getIndexMetadataUnsafe(long index) throws IOException {
// Return index details if loaded
IndexDetails details = loadedIndices.getOrDefault(index, null);
if (details != null) return details;
// Try to load the details from file
final long metadataPosition = index * IndexDetails.TOTAL_BYTES;
if (metadataPosition + IndexDetails.TOTAL_BYTES > metadataFileChannel.size()) {
// Avoid underflow exception
return null;
}
SeekableByteChannel currentMetadataFileChannel = metadataFileChannel.position(metadataPosition);
synchronized (metadataByteBufferLock) {
metadataByteBuffer.rewind();
currentMetadataFileChannel.read(metadataByteBuffer);
metadataByteBuffer.rewind();
// If it's not deleted continue
if ((metadataByteBuffer.getInt() & IndexDetails.MASK_DELETED) == 0) {
final long offset = metadataByteBuffer.getLong();
final int size = metadataByteBuffer.getInt();
final int type = metadataByteBuffer.getInt();
final IndexDetails indexDetails = new IndexDetails(offset, size, type);
editIndex(index, indexDetails);
return indexDetails;
}
}
// No results found. Returning null
return null;
}
private IndexDetails getIndexMetadata(long index) throws IOException {
IndexDetails details = getIndexMetadataUnsafe(index);
if (details == null)
throw new IOException("Index " + index + " not found");
else
return details;
}
@Override
public void close() throws IOException {
if (closed) {
return;
}
synchronized (closeLock) {
if (closed) {
return;
}
closed = true;
}
// Update indices metadata
SeekableByteChannel metadata = metadataFileChannel;
long lastIndex = -2;
for (long index : dirtyLoadedIndices) {
IndexDetails indexDetails = loadedIndices.get(index);
if (index - lastIndex != 1) {
metadata = metadata.position(index * IndexDetails.TOTAL_BYTES);
}
synchronized (metadataByteBufferLock) {
metadataByteBuffer.rewind();
metadataByteBuffer.putInt(0);
metadataByteBuffer.putLong(indexDetails.getOffset());
metadataByteBuffer.putInt(indexDetails.getSize());
metadataByteBuffer.putInt(indexDetails.getType());
metadataByteBuffer.rewind();
metadata.write(metadataByteBuffer);
}
lastIndex = index;
}
// Remove removed indices
for (long index : removedIndices) {
metadata = metadata.position(index * IndexDetails.TOTAL_BYTES);
synchronized (maskByteBufferLock) {
maskByteBuffer.rewind();
maskByteBuffer.putInt(IndexDetails.MASK_DELETED);
maskByteBuffer.rewind();
metadata.write(maskByteBuffer);
}
}
fileAllocator.close();
}
private void checkClosed() {
if (closed) {
throw new RuntimeException("Index Manager is closed.");
}
}
}