package org.warp.jcwdb; import; import; import it.unimi.dsi.fastutil.longs.*; import it.unimi.dsi.fastutil.objects.ObjectIterator; import org.warp.jcwdb.ann.DatabaseManager; import; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.SeekableByteChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; public class FileIndexManager implements IndexManager { public static final boolean ALWAYS_ALLOCATE_NEW = false; private final SeekableByteChannel dataFileChannel, metadataFileChannel; private volatile long metadataFileChannelSize; private final FileAllocator fileAllocator; private final ByteBuffer metadataByteBuffer = ByteBuffer.allocateDirect(IndexDetails.TOTAL_BYTES); private final ByteBuffer maskByteBuffer = ByteBuffer.allocateDirect(Long.BYTES); private volatile boolean closed; private final Object closeLock = new Object(); private final Object metadataByteBufferLock = new Object(); private final Object maskByteBufferLock = new Object(); private final Object indicesMapsAccessLock = new Object(); /** * Edit this using editIndex() * Get using getIndexMetadata() * This hashmap must contain all indices. */ private final Long2ObjectMap loadedIndices; /** * Edit this using editIndex() */ private final LongSet dirtyLoadedIndices, removedIndices; private long firstAllocableIndex; public FileIndexManager(Path dataFile, Path metadataFile) throws IOException { if (Cleaner.DISABLE_CLEANER) { loadedIndices = new Long2ObjectOpenHashMap<>(); dirtyLoadedIndices = new LongOpenHashSet(); removedIndices = new LongOpenHashSet(); } else { loadedIndices = new Long2ObjectLinkedOpenHashMap<>(); dirtyLoadedIndices = new LongLinkedOpenHashSet(); removedIndices = new LongLinkedOpenHashSet(); } if (Files.notExists(dataFile)) { Files.createFile(dataFile); } if (Files.notExists(metadataFile)) { Files.createFile(metadataFile); } dataFileChannel = Files.newByteChannel(dataFile, StandardOpenOption.READ, StandardOpenOption.WRITE); metadataFileChannel = Files.newByteChannel(metadataFile, StandardOpenOption.READ, StandardOpenOption.WRITE); metadataFileChannelSize = metadataFileChannel.size(); fileAllocator = createFileAllocator(dataFileChannel, metadataFileChannel.position(0)); firstAllocableIndex = getMetadataFileChannelSize() / (long) IndexDetails.TOTAL_BYTES; } private long getMetadataFileChannelSize() { return metadataFileChannelSize; } private FileAllocator createFileAllocator(final SeekableByteChannel dataFileChannel, final SeekableByteChannel metadataFileChannel) throws IOException { if (ALWAYS_ALLOCATE_NEW) { return new FileAllocator(dataFileChannel); } else { Long2IntMap freeBytes = new Long2IntRBTreeMap(); Long2IntMap usedBytes = new Long2IntRBTreeMap(); long firstOffset = 0; metadataFileChannel.position(0); while (metadataFileChannel.position() + IndexDetails.TOTAL_BYTES <= getMetadataFileChannelSize()) { IndexDetails indexDetails = readIndexDetailsAt(metadataFileChannel); if (indexDetails != null) { long offset = indexDetails.getOffset(); if (!usedBytes.containsKey(offset) || indexDetails.getSize() > usedBytes.get(offset)) { usedBytes.put(offset, indexDetails.getSize()); } if (offset < firstOffset) { firstOffset = offset; } } } long previousEntryOffset = 0; long previousEntrySize = 0; ObjectIterator it = usedBytes.long2IntEntrySet().iterator(); while (it.hasNext()) { final Long2IntMap.Entry entry =; final long entryOffset = entry.getLongKey(); final long entrySize = entry.getIntValue(); it.remove(); if (previousEntryOffset + previousEntrySize < entryOffset) { freeBytes.put(previousEntryOffset + previousEntrySize, (int) (entryOffset - (previousEntryOffset + previousEntrySize))); } previousEntryOffset = entryOffset; previousEntrySize = entrySize; } final long fileSize = previousEntryOffset + previousEntrySize; return new FileAllocator(dataFileChannel, fileSize, freeBytes); } } @Override public T get(long index, DBReader reader) throws IOException { checkClosed(); IndexDetails details = getIndexMetadata(index); Input i = new Input(Channels.newInputStream(dataFileChannel.position(details.getOffset()))); T result =, details.getSize()); return result; } @Override public IndexDetails set(long index, int size, DBWriter data) throws IOException { checkClosed(); IndexDetails indexDetails = getIndexMetadataUnsafe(index); if (ALWAYS_ALLOCATE_NEW || indexDetails == null || indexDetails.getSize() < size) { // Allocate new space IndexDetails newDetails = allocateAndWrite(index, size, data); if (indexDetails != null) { // Mark free the old bytes fileAllocator.markFree(indexDetails.getOffset(), indexDetails.getSize()); } return newDetails; } else { // Check if size changed if (size < indexDetails.getSize()) { // Mark free the unused bytes fileAllocator.markFree(indexDetails.getOffset() + size, size); } // Update index details indexDetails = editIndex(index, indexDetails, indexDetails.getOffset(), size); // Write data writeExact(indexDetails, size, data); // Before returning, return IndexDetails return indexDetails; } } @Override public long add(int size) { checkClosed(); final long offset = fileAllocator.allocate(size); final IndexDetails indexDetails = new IndexDetails(offset, size); final long index = createIndexMetadata(indexDetails); return index; } @Override public long add(int size, DBWriter data) throws IOException { checkClosed(); final long offset = fileAllocator.allocate(size); final IndexDetails indexDetails = new IndexDetails(offset, size); final long index = createIndexMetadata(indexDetails); writeExact(indexDetails, size, data); return index; } @Override public FullIndexDetails addAndGetDetails(int size, DBWriter data) throws IOException { checkClosed(); final long offset = fileAllocator.allocate(size); final IndexDetails indexDetails = new IndexDetails(offset, size); final long index = createIndexMetadata(indexDetails); writeExact(indexDetails, size, data); return new FullIndexDetails(index, indexDetails); } /** * Write the data at index. * The input size must be equal to the index size! */ private void writeExact(final IndexDetails indexDetails, int size, DBWriter data) throws IOException { if (indexDetails.getSize() != size) { throw new IOException("Unable to write " + size + " in a space of " + indexDetails.getSize()); } final long offset = indexDetails.getOffset(); final Output o = new Output(Channels.newOutputStream(dataFileChannel.position(offset)), size); data.write(o); o.flush(); } private IndexDetails allocateAndWrite(final long index, int size, DBWriter w) throws IOException { final long offset = fileAllocator.allocate(size); IndexDetails details = editIndex(index, offset, size); writeExact(details, size, w); return details; } @Override public void delete(long index) throws IOException { checkClosed(); IndexDetails indexDetails = getIndexMetadataUnsafe(index); if (indexDetails != null) { fileAllocator.markFree(indexDetails.getOffset(), indexDetails.getSize()); } synchronized (indicesMapsAccessLock) { dirtyLoadedIndices.remove(index); loadedIndices.remove(index); removedIndices.add(index); } } public void flushAndUnload(long index) throws IOException { if (removedIndices.contains(index)) { synchronized (indicesMapsAccessLock) { removedIndices.remove(index); dirtyLoadedIndices.remove(index); loadedIndices.remove(index); } // Update indices metadata SeekableByteChannel metadata = metadataFileChannel.position(index * IndexDetails.TOTAL_BYTES); eraseIndexDetails(metadata); } boolean isDirty = false; IndexDetails indexDetails = null; synchronized (indicesMapsAccessLock) { if (dirtyLoadedIndices.contains(index)) { indexDetails = loadedIndices.get(index); dirtyLoadedIndices.remove(index); } } if (isDirty) { // Update indices metadata long position = index * IndexDetails.TOTAL_BYTES; resizeMetadataFileChannel(position); SeekableByteChannel metadata = metadataFileChannel.position(position); writeIndexDetails(metadata, indexDetails); } synchronized (indicesMapsAccessLock) { loadedIndices.remove(index); } } @Override public boolean has(long index) { checkClosed(); try { return getIndexMetadataUnsafe(index) != null; } catch (IOException ex) { ex.printStackTrace(); return false; } } /** * Edit index data if a change is detected * @param index * @param oldData Old index data to check * @param offset offset * @param size size * @return */ private IndexDetails editIndex(long index, IndexDetails oldData, long offset, int size) { if (oldData.getOffset() != offset || oldData.getSize() != size) { return editIndex(index, offset, size); } else { return oldData; } } /** * Edit index data * @param index * @param offset * @param size * @return */ private IndexDetails editIndex(long index, long offset, int size) { IndexDetails indexDetails = new IndexDetails(offset, size); editIndex(index, indexDetails); return indexDetails; } /** * Edit index data * @param index * @param details */ private void editIndex(long index, IndexDetails details) { synchronized (indicesMapsAccessLock) { loadedIndices.put(index, details); dirtyLoadedIndices.add(index); } } private long createIndexMetadata(IndexDetails indexDetails) { synchronized (indicesMapsAccessLock) { long newIndex = firstAllocableIndex++; loadedIndices.put(newIndex, indexDetails); dirtyLoadedIndices.add(newIndex); removedIndices.remove(newIndex); return newIndex; } } private IndexDetails getIndexMetadataUnsafe(long index) throws IOException { // Return index details if loaded IndexDetails details; synchronized (indicesMapsAccessLock) { details = loadedIndices.getOrDefault(index, null); } if (details != null) return details; // Try to load the details from file final long metadataPosition = index * IndexDetails.TOTAL_BYTES; if (metadataPosition + IndexDetails.TOTAL_BYTES > getMetadataFileChannelSize()) { // Avoid underflow exception return null; } SeekableByteChannel currentMetadataFileChannel = metadataFileChannel.position(metadataPosition); IndexDetails indexDetails = readIndexDetailsAt(currentMetadataFileChannel); if (indexDetails != null) { editIndex(index, indexDetails); return indexDetails; } // No results found. Returning null return null; } private IndexDetails readIndexDetailsAt(SeekableByteChannel currentMetadataFileChannel) throws IOException { IndexDetails indexDetails = null; synchronized (metadataByteBufferLock) { metadataByteBuffer.rewind();; metadataByteBuffer.rewind(); // If it's not deleted continue final long offset = metadataByteBuffer.getLong(); if (offset >= 0) { // If it's < 0 it means that the index has been deleted final int size = metadataByteBuffer.getInt(); indexDetails = new IndexDetails(offset, size); } } return indexDetails; } private IndexDetails getIndexMetadata(long index) throws IOException { IndexDetails details = getIndexMetadataUnsafe(index); if (details == null) throw new IOException("Index " + index + " not found"); else return details; } @Override public void close() throws IOException { if (closed) { return; } synchronized (closeLock) { if (closed) { return; } closed = true; } // Update indices metadata flushAllFlushableIndices(); // Remove removed indices removeRemovedIndices(); fileAllocator.close(); } private void writeIndexDetails(SeekableByteChannel position, IndexDetails indexDetails) throws IOException { synchronized (metadataByteBufferLock) { final int size = indexDetails.getSize(); final long offset = indexDetails.getOffset(); metadataByteBuffer.rewind(); metadataByteBuffer.putLong(offset); metadataByteBuffer.putInt(size); metadataByteBuffer.rewind(); position.write(metadataByteBuffer); } } private void eraseIndexDetails(SeekableByteChannel position) throws IOException { synchronized (maskByteBufferLock) { maskByteBuffer.rewind(); maskByteBuffer.putLong(-1); // -1 = deleted maskByteBuffer.rewind(); position.write(maskByteBuffer); } } private void checkClosed() { if (closed) { throw new RuntimeException("Index Manager is closed."); } } @Override public long clean() { long cleaned = 0; long tim1 = System.currentTimeMillis(); try { cleaned += flushAllFlushableIndices(); } catch (IOException ex) { ex.printStackTrace(); } long tim2 = System.currentTimeMillis(); try { cleaned += removeRemovedIndices(); } catch (IOException ex) { ex.printStackTrace(); } long tim3 = System.currentTimeMillis(); cleaned += cleanExtraIndices(); long tim4 = System.currentTimeMillis(); if (Cleaner.ENABLE_CLEANER_LOGGING) System.out.println("[CLEANER] FileIndexManager CLEAN_TIME: " + (tim2-tim1) + "," + (tim3-tim2) + "," + (tim4-tim3)); return cleaned; } private long flushAllFlushableIndices() throws IOException { long flushedIndices = 0; SeekableByteChannel metadata = metadataFileChannel; long lastIndex = -2; synchronized (indicesMapsAccessLock) { for (long index : dirtyLoadedIndices) { IndexDetails indexDetails = loadedIndices.get(index); long position = index * IndexDetails.TOTAL_BYTES; resizeMetadataFileChannel(position); if (index - lastIndex != 1) { metadata = metadata.position(position); } writeIndexDetails(metadata, indexDetails); lastIndex = index; flushedIndices++; } dirtyLoadedIndices.clear(); } return flushedIndices; } private void resizeMetadataFileChannel(long position) { if (position + IndexDetails.TOTAL_BYTES > metadataFileChannelSize) { metadataFileChannelSize = position + IndexDetails.TOTAL_BYTES; } } private long removeRemovedIndices() throws IOException { SeekableByteChannel metadata = metadataFileChannel; synchronized (indicesMapsAccessLock) { long removed = this.removedIndices.size(); for (long index : this.removedIndices) { metadata = metadata.position(index * IndexDetails.TOTAL_BYTES); eraseIndexDetails(metadata); } this.removedIndices.clear(); return removed; } } private long cleanExtraIndices() { long removedIndices = 0; LongArrayList toUnload = new LongArrayList(); synchronized (indicesMapsAccessLock) { if (loadedIndices.size() > DatabaseManager.MAX_LOADED_INDICES) { long count = loadedIndices.size(); LongIterator it = loadedIndices.keySet().iterator(); while (it.hasNext()) { long loadedIndex = it.nextLong(); if (count < DatabaseManager.MAX_LOADED_INDICES * 3l / 2l) { break; } toUnload.add(loadedIndex); removedIndices++; count--; } } } for (long index : toUnload.elements()) { try { flushAndUnload(index); } catch (IOException e) { e.printStackTrace(); } } return removedIndices; } }