433 lines
13 KiB
Java
433 lines
13 KiB
Java
package org.warp.jcwdb;
|
|
|
|
import com.esotericsoftware.kryo.io.Input;
|
|
import com.esotericsoftware.kryo.io.Output;
|
|
import it.unimi.dsi.fastutil.longs.*;
|
|
|
|
import java.io.IOException;
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.channels.Channels;
|
|
import java.nio.channels.SeekableByteChannel;
|
|
import java.nio.file.Files;
|
|
import java.nio.file.Path;
|
|
import java.nio.file.StandardOpenOption;
|
|
import java.util.Iterator;
|
|
import java.util.function.Consumer;
|
|
|
|
public class FileIndexManager implements IndexManager {
|
|
private final SeekableByteChannel dataFileChannel, metadataFileChannel;
|
|
private final FileAllocator fileAllocator;
|
|
private final ByteBuffer metadataByteBuffer = ByteBuffer.allocateDirect(IndexDetails.TOTAL_BYTES);
|
|
private final ByteBuffer maskByteBuffer = ByteBuffer.allocateDirect(Integer.BYTES);
|
|
private volatile boolean closed;
|
|
private final Object closeLock = new Object();
|
|
private final Object metadataByteBufferLock = new Object();
|
|
private final Object maskByteBufferLock = new Object();
|
|
private final Object indicesMapsAccessLock = new Object();
|
|
|
|
/**
|
|
* Edit this using editIndex()
|
|
* Get using getIndexMetadata()
|
|
* This hashmap must contain all indices.
|
|
*/
|
|
private final Long2ObjectMap<IndexDetails> loadedIndices;
|
|
/**
|
|
* Edit this using editIndex()
|
|
*/
|
|
private final LongSet dirtyLoadedIndices, removedIndices;
|
|
private long firstAllocableIndex;
|
|
|
|
public FileIndexManager(Path dataFile, Path metadataFile) throws IOException {
|
|
loadedIndices = new Long2ObjectOpenHashMap<>();
|
|
dirtyLoadedIndices = new LongOpenHashSet();
|
|
removedIndices = new LongOpenHashSet();
|
|
if (Files.notExists(dataFile)) {
|
|
Files.createFile(dataFile);
|
|
}
|
|
if (Files.notExists(metadataFile)) {
|
|
Files.createFile(metadataFile);
|
|
}
|
|
dataFileChannel = Files.newByteChannel(dataFile, StandardOpenOption.READ, StandardOpenOption.WRITE);
|
|
metadataFileChannel = Files.newByteChannel(metadataFile, StandardOpenOption.READ, StandardOpenOption.WRITE);
|
|
fileAllocator = new FileAllocator(dataFileChannel);
|
|
firstAllocableIndex = metadataFileChannel.size() / (long) IndexDetails.TOTAL_BYTES;
|
|
if (firstAllocableIndex == 0) {
|
|
firstAllocableIndex = 1;
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public <T> T get(long index, DBReader<T> reader) throws IOException {
|
|
checkClosed();
|
|
IndexDetails details = getIndexMetadata(index);
|
|
Input i = new Input(Channels.newInputStream(dataFileChannel.position(details.getOffset())));
|
|
T result = reader.read(i, details.getSize());
|
|
return result;
|
|
}
|
|
|
|
@Override
|
|
public int getType(long index) throws IOException {
|
|
return getIndexMetadata(index).getType();
|
|
}
|
|
|
|
@Override
|
|
public long getHash(long index) throws IOException {
|
|
return getIndexMetadata(index).getHash();
|
|
}
|
|
|
|
@Override
|
|
public <T> IndexDetails set(long index, DBDataOutput<T> data) throws IOException {
|
|
checkClosed();
|
|
final int dataSize = data.getSize();
|
|
final IndexDetails indexDetails = getIndexMetadataUnsafe(index);
|
|
if (indexDetails == null || indexDetails.getSize() < dataSize) {
|
|
// Allocate new space
|
|
return allocateAndWrite(index, data);
|
|
} else {
|
|
// Check if size changed
|
|
if (dataSize < indexDetails.getSize()) {
|
|
// Mark free the unused bytes
|
|
fileAllocator.markFree(indexDetails.getOffset() + dataSize, dataSize);
|
|
}
|
|
// Update index details
|
|
editIndex(index, indexDetails, indexDetails.getOffset(), dataSize, indexDetails.getType(), data.calculateHash());
|
|
// Write data
|
|
writeExact(indexDetails, data);
|
|
// Before returning, return IndexDetails
|
|
return indexDetails;
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public <T> long add(DBDataOutput<T> data) throws IOException {
|
|
checkClosed();
|
|
final int size = data.getSize();
|
|
final long offset = fileAllocator.allocate(size);
|
|
final int type = data.getType();
|
|
final long hash = data.calculateHash();
|
|
final IndexDetails indexDetails = new IndexDetails(offset, size, type, hash);
|
|
final long index = createIndexMetadata(indexDetails);
|
|
writeExact(indexDetails, data);
|
|
return index;
|
|
}
|
|
|
|
/**
|
|
* Write the data at index.
|
|
* The input size must be equal to the index size!
|
|
*
|
|
* @param indexDetails
|
|
* @param data
|
|
* @throws IOException
|
|
*/
|
|
private void writeExact(final IndexDetails indexDetails, DBDataOutput<?> data) throws IOException {
|
|
final int dataSize = data.getSize();
|
|
if (indexDetails.getSize() != dataSize) {
|
|
throw new IOException("Unable to write " + dataSize + " in a space of " + indexDetails.getSize());
|
|
}
|
|
final long offset = indexDetails.getOffset();
|
|
|
|
final Output o = new Output(Channels.newOutputStream(dataFileChannel.position(offset)), dataSize);
|
|
data.getWriter().write(o);
|
|
o.flush();
|
|
}
|
|
|
|
private IndexDetails allocateAndWrite(final long index, DBDataOutput<?> w) throws IOException {
|
|
final int size = w.getSize();
|
|
final int type = w.getType();
|
|
final long hash = w.calculateHash();
|
|
final long offset = fileAllocator.allocate(size);
|
|
IndexDetails details = editIndex(index, offset, size, type, hash);
|
|
writeExact(details, w);
|
|
return details;
|
|
}
|
|
|
|
@Override
|
|
public void delete(long index) throws IOException {
|
|
checkClosed();
|
|
IndexDetails indexDetails = getIndexMetadataUnsafe(index);
|
|
if (indexDetails != null) {
|
|
fileAllocator.markFree(indexDetails.getOffset(), indexDetails.getSize());
|
|
}
|
|
synchronized (indicesMapsAccessLock) {
|
|
dirtyLoadedIndices.remove(index);
|
|
loadedIndices.remove(index);
|
|
removedIndices.add(index);
|
|
}
|
|
}
|
|
|
|
public void flushAndUnload(long index) throws IOException {
|
|
if (removedIndices.contains(index)) {
|
|
synchronized (indicesMapsAccessLock) {
|
|
removedIndices.remove(index);
|
|
dirtyLoadedIndices.remove(index);
|
|
loadedIndices.remove(index);
|
|
}
|
|
// Update indices metadata
|
|
SeekableByteChannel metadata = metadataFileChannel.position(index * IndexDetails.TOTAL_BYTES);
|
|
eraseIndexDetails(metadata);
|
|
}
|
|
boolean isDirty = false;
|
|
IndexDetails indexDetails = null;
|
|
synchronized (indicesMapsAccessLock) {
|
|
if (dirtyLoadedIndices.contains(index)) {
|
|
indexDetails = loadedIndices.get(index);
|
|
dirtyLoadedIndices.remove(index);
|
|
}
|
|
}
|
|
if (isDirty) {
|
|
// Update indices metadata
|
|
SeekableByteChannel metadata = metadataFileChannel.position(index * IndexDetails.TOTAL_BYTES);
|
|
writeIndexDetails(metadata, indexDetails);
|
|
}
|
|
synchronized (indicesMapsAccessLock) {
|
|
loadedIndices.remove(index);
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public boolean has(long index) {
|
|
checkClosed();
|
|
try {
|
|
return getIndexMetadataUnsafe(index) != null;
|
|
} catch (IOException ex) {
|
|
ex.printStackTrace();
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Edit index data if a change is detected
|
|
* @param index
|
|
* @param oldData Old index data to check
|
|
* @param offset offset
|
|
* @param size size
|
|
* @param type type
|
|
* @param hash hash
|
|
* @return
|
|
*/
|
|
private IndexDetails editIndex(long index, IndexDetails oldData, long offset, int size, int type, long hash) {
|
|
if (oldData.getOffset() != offset || oldData.getSize() != size || oldData.getType() != type || oldData.getHash() != hash) {
|
|
return editIndex(index, offset, size, type, hash);
|
|
} else {
|
|
return oldData;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Edit index data
|
|
* @param index
|
|
* @param offset
|
|
* @param size
|
|
* @param type
|
|
* @param hash
|
|
* @return
|
|
*/
|
|
private IndexDetails editIndex(long index, long offset, int size, int type, long hash) {
|
|
IndexDetails indexDetails = new IndexDetails(offset, size, type, hash);
|
|
editIndex(index, indexDetails);
|
|
return indexDetails;
|
|
}
|
|
|
|
/**
|
|
* Edit index data
|
|
* @param index
|
|
* @param details
|
|
*/
|
|
private void editIndex(long index, IndexDetails details) {
|
|
synchronized (indicesMapsAccessLock) {// FIXXXX main3
|
|
loadedIndices.put(index, details);
|
|
dirtyLoadedIndices.add(index);
|
|
}
|
|
}
|
|
|
|
private long createIndexMetadata(IndexDetails indexDetails) {
|
|
synchronized (indicesMapsAccessLock) {
|
|
long newIndex = firstAllocableIndex++;
|
|
loadedIndices.put(newIndex, indexDetails);
|
|
dirtyLoadedIndices.add(newIndex);
|
|
removedIndices.remove(newIndex);
|
|
return newIndex;
|
|
}
|
|
}
|
|
|
|
private IndexDetails getIndexMetadataUnsafe(long index) throws IOException {
|
|
// Return index details if loaded
|
|
IndexDetails details;
|
|
synchronized (indicesMapsAccessLock) {
|
|
details = loadedIndices.getOrDefault(index, null);
|
|
}
|
|
if (details != null) return details;
|
|
|
|
// Try to load the details from file
|
|
final long metadataPosition = index * IndexDetails.TOTAL_BYTES;
|
|
if (metadataPosition + IndexDetails.TOTAL_BYTES > metadataFileChannel.size()) {
|
|
// Avoid underflow exception
|
|
return null;
|
|
}
|
|
SeekableByteChannel currentMetadataFileChannel = metadataFileChannel.position(metadataPosition);
|
|
IndexDetails indexDetails = null;
|
|
synchronized (metadataByteBufferLock) {// FIXXXX main2
|
|
metadataByteBuffer.rewind();
|
|
currentMetadataFileChannel.read(metadataByteBuffer);
|
|
metadataByteBuffer.rewind();
|
|
// If it's not deleted continue
|
|
if ((metadataByteBuffer.getInt() & IndexDetails.MASK_DELETED) == 0) {
|
|
final long offset = metadataByteBuffer.getLong();
|
|
// final long sizeAndType = metadataByteBuffer.getLong();
|
|
// final int size = (int)(sizeAndType >> 32);
|
|
// final int type = (int)sizeAndType;
|
|
final int size = metadataByteBuffer.getInt();
|
|
final int type = metadataByteBuffer.getInt();
|
|
final long hash = metadataByteBuffer.getLong();
|
|
indexDetails = new IndexDetails(offset, size, type, hash);
|
|
}
|
|
}
|
|
|
|
if (indexDetails != null) {
|
|
editIndex(index, indexDetails);
|
|
return indexDetails;
|
|
}
|
|
|
|
// No results found. Returning null
|
|
return null;
|
|
}
|
|
|
|
private IndexDetails getIndexMetadata(long index) throws IOException {
|
|
IndexDetails details = getIndexMetadataUnsafe(index);
|
|
if (details == null)
|
|
throw new IOException("Index " + index + " not found");
|
|
else
|
|
return details;
|
|
}
|
|
|
|
@Override
|
|
public void close() throws IOException {
|
|
if (closed) {
|
|
return;
|
|
}
|
|
synchronized (closeLock) {
|
|
if (closed) {
|
|
return;
|
|
}
|
|
closed = true;
|
|
}
|
|
|
|
// Update indices metadata
|
|
flushAllIndices();
|
|
|
|
// Remove removed indices
|
|
removeRemovedIndices();
|
|
fileAllocator.close();
|
|
}
|
|
|
|
private void writeIndexDetails(SeekableByteChannel position, IndexDetails indexDetails) throws IOException {
|
|
synchronized (metadataByteBufferLock) {// FIXXXX cleaner3
|
|
final int size = indexDetails.getSize();
|
|
final int type = indexDetails.getType();
|
|
final long offset = indexDetails.getOffset();
|
|
final long hash = indexDetails.getHash();
|
|
metadataByteBuffer.rewind();
|
|
metadataByteBuffer.putInt(0);
|
|
metadataByteBuffer.putLong(offset);
|
|
metadataByteBuffer.putInt(size);
|
|
metadataByteBuffer.putInt(type);
|
|
//metadataByteBuffer.putLong((long)size << 32 | type & 0xFFFFFFFFL);
|
|
metadataByteBuffer.putLong(hash);
|
|
metadataByteBuffer.rewind();
|
|
position.write(metadataByteBuffer);
|
|
}
|
|
}
|
|
|
|
private void eraseIndexDetails(SeekableByteChannel position) throws IOException {
|
|
synchronized (maskByteBufferLock) {
|
|
maskByteBuffer.rewind();
|
|
maskByteBuffer.putInt(IndexDetails.MASK_DELETED);
|
|
maskByteBuffer.rewind();
|
|
position.write(maskByteBuffer);
|
|
}
|
|
}
|
|
|
|
private void checkClosed() {
|
|
if (closed) {
|
|
throw new RuntimeException("Index Manager is closed.");
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public long clean() {
|
|
long cleaned = 0;
|
|
try {
|
|
cleaned += flushAllIndices();
|
|
} catch (IOException ex) {
|
|
ex.printStackTrace();
|
|
}
|
|
try {
|
|
cleaned += removeRemovedIndices();
|
|
} catch (IOException ex) {
|
|
ex.printStackTrace();
|
|
}
|
|
cleaned += cleanExtraIndices();
|
|
return cleaned;
|
|
}
|
|
|
|
private long flushAllIndices() throws IOException {
|
|
long flushedIndices = 0;
|
|
SeekableByteChannel metadata = metadataFileChannel;
|
|
long lastIndex = -2;
|
|
synchronized (indicesMapsAccessLock) {
|
|
for (long index : dirtyLoadedIndices) {
|
|
IndexDetails indexDetails = loadedIndices.get(index);
|
|
if (index - lastIndex != 1) {
|
|
metadata = metadata.position(index * IndexDetails.TOTAL_BYTES);
|
|
}
|
|
writeIndexDetails(metadata, indexDetails);
|
|
lastIndex = index;
|
|
flushedIndices++;
|
|
}
|
|
dirtyLoadedIndices.clear();
|
|
}
|
|
return flushedIndices;
|
|
}
|
|
|
|
private long removeRemovedIndices() throws IOException {
|
|
SeekableByteChannel metadata = metadataFileChannel;
|
|
synchronized (indicesMapsAccessLock) {
|
|
long removed = this.removedIndices.size();
|
|
for (long index : this.removedIndices) {
|
|
metadata = metadata.position(index * IndexDetails.TOTAL_BYTES);
|
|
eraseIndexDetails(metadata);
|
|
}
|
|
this.removedIndices.clear();
|
|
return removed;
|
|
}
|
|
}
|
|
|
|
private long cleanExtraIndices() {
|
|
long removedIndices = 0;
|
|
LongArrayList toUnload = new LongArrayList();
|
|
synchronized (indicesMapsAccessLock) {
|
|
if (loadedIndices.size() > JCWDatabase.MAX_LOADED_INDICES) {
|
|
long count = loadedIndices.size();
|
|
LongIterator it = loadedIndices.keySet().iterator();
|
|
while (it.hasNext()) {
|
|
long loadedIndex = it.nextLong();
|
|
if (count < JCWDatabase.MAX_LOADED_INDICES * 3l / 2l) {
|
|
break;
|
|
}
|
|
toUnload.add(loadedIndex);
|
|
removedIndices++;
|
|
count--;
|
|
}
|
|
}
|
|
}
|
|
for (long index : toUnload.elements()) {
|
|
try {
|
|
flushAndUnload(index);
|
|
} catch (IOException e) {
|
|
e.printStackTrace();
|
|
}
|
|
}
|
|
return removedIndices;
|
|
}
|
|
}
|