package org.warp.jcwdb; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import it.unimi.dsi.fastutil.longs.*; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.SeekableByteChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.function.Consumer; public class FileIndexManager implements IndexManager { private final SeekableByteChannel dataFileChannel, metadataFileChannel; private final FileAllocator fileAllocator; private final ByteBuffer metadataByteBuffer = ByteBuffer.allocateDirect(IndexDetails.TOTAL_BYTES); private final ByteBuffer maskByteBuffer = ByteBuffer.allocateDirect(Integer.BYTES); private volatile boolean closed; private final Object closeLock = new Object(); private final Object metadataByteBufferLock = new Object(); private final Object maskByteBufferLock = new Object(); private final Object indicesMapsAccessLock = new Object(); /** * Edit this using editIndex() * Get using getIndexMetadata() * This hashmap must contain all indices. */ private final Long2ObjectMap loadedIndices; /** * Edit this using editIndex() */ private final LongSet dirtyLoadedIndices, removedIndices; private long firstAllocableIndex; public FileIndexManager(Path dataFile, Path metadataFile) throws IOException { loadedIndices = new Long2ObjectOpenHashMap<>(); dirtyLoadedIndices = new LongOpenHashSet(); removedIndices = new LongOpenHashSet(); if (Files.notExists(dataFile)) { Files.createFile(dataFile); } if (Files.notExists(metadataFile)) { Files.createFile(metadataFile); } dataFileChannel = Files.newByteChannel(dataFile, StandardOpenOption.READ, StandardOpenOption.WRITE); metadataFileChannel = Files.newByteChannel(metadataFile, StandardOpenOption.READ, StandardOpenOption.WRITE); fileAllocator = new FileAllocator(dataFileChannel); firstAllocableIndex = metadataFileChannel.size() / (long) IndexDetails.TOTAL_BYTES; if (firstAllocableIndex == 0) { firstAllocableIndex = 1; } } @Override public T get(long index, DBReader reader) throws IOException { checkClosed(); IndexDetails details = getIndexMetadata(index); Input i = new Input(Channels.newInputStream(dataFileChannel.position(details.getOffset()))); T result = reader.read(i, details.getSize()); return result; } @Override public int getType(long index) throws IOException { return getIndexMetadata(index).getType(); } @Override public long getHash(long index) throws IOException { return getIndexMetadata(index).getHash(); } @Override public IndexDetails set(long index, DBDataOutput data) throws IOException { checkClosed(); final int dataSize = data.getSize(); final IndexDetails indexDetails = getIndexMetadataUnsafe(index); if (indexDetails == null || indexDetails.getSize() < dataSize) { // Allocate new space return allocateAndWrite(index, data); } else { // Check if size changed if (dataSize < indexDetails.getSize()) { // Mark free the unused bytes fileAllocator.markFree(indexDetails.getOffset() + dataSize, dataSize); } // Update index details editIndex(index, indexDetails, indexDetails.getOffset(), dataSize, indexDetails.getType(), data.calculateHash()); // Write data writeExact(indexDetails, data); // Before returning, return IndexDetails return indexDetails; } } @Override public long add(DBDataOutput data) throws IOException { checkClosed(); final int size = data.getSize(); final long offset = fileAllocator.allocate(size); final int type = data.getType(); final long hash = data.calculateHash(); final IndexDetails indexDetails = new IndexDetails(offset, size, type, hash); final long index = createIndexMetadata(indexDetails); writeExact(indexDetails, data); return index; } /** * Write the data at index. * The input size must be equal to the index size! * * @param indexDetails * @param data * @throws IOException */ private void writeExact(final IndexDetails indexDetails, DBDataOutput data) throws IOException { final int dataSize = data.getSize(); if (indexDetails.getSize() != dataSize) { throw new IOException("Unable to write " + dataSize + " in a space of " + indexDetails.getSize()); } final long offset = indexDetails.getOffset(); final Output o = new Output(Channels.newOutputStream(dataFileChannel.position(offset)), dataSize); data.getWriter().write(o); o.flush(); } private IndexDetails allocateAndWrite(final long index, DBDataOutput w) throws IOException { final int size = w.getSize(); final int type = w.getType(); final long hash = w.calculateHash(); final long offset = fileAllocator.allocate(size); IndexDetails details = editIndex(index, offset, size, type, hash); writeExact(details, w); return details; } @Override public void delete(long index) throws IOException { checkClosed(); IndexDetails indexDetails = getIndexMetadataUnsafe(index); if (indexDetails != null) { fileAllocator.markFree(indexDetails.getOffset(), indexDetails.getSize()); } synchronized (indicesMapsAccessLock) { dirtyLoadedIndices.remove(index); loadedIndices.remove(index); removedIndices.add(index); } } public void flushAndUnload(long index) throws IOException { if (removedIndices.contains(index)) { synchronized (indicesMapsAccessLock) { removedIndices.remove(index); dirtyLoadedIndices.remove(index); loadedIndices.remove(index); } // Update indices metadata SeekableByteChannel metadata = metadataFileChannel.position(index * IndexDetails.TOTAL_BYTES); eraseIndexDetails(metadata); } boolean isDirty = false; IndexDetails indexDetails = null; synchronized (indicesMapsAccessLock) { if (dirtyLoadedIndices.contains(index)) { indexDetails = loadedIndices.get(index); dirtyLoadedIndices.remove(index); } } if (isDirty) { // Update indices metadata SeekableByteChannel metadata = metadataFileChannel.position(index * IndexDetails.TOTAL_BYTES); writeIndexDetails(metadata, indexDetails); } synchronized (indicesMapsAccessLock) { loadedIndices.remove(index); } } @Override public boolean has(long index) { checkClosed(); try { return getIndexMetadataUnsafe(index) != null; } catch (IOException ex) { ex.printStackTrace(); return false; } } /** * Edit index data if a change is detected * @param index * @param oldData Old index data to check * @param offset offset * @param size size * @param type type * @param hash hash * @return */ private IndexDetails editIndex(long index, IndexDetails oldData, long offset, int size, int type, long hash) { if (oldData.getOffset() != offset || oldData.getSize() != size || oldData.getType() != type || oldData.getHash() != hash) { return editIndex(index, offset, size, type, hash); } else { return oldData; } } /** * Edit index data * @param index * @param offset * @param size * @param type * @param hash * @return */ private IndexDetails editIndex(long index, long offset, int size, int type, long hash) { IndexDetails indexDetails = new IndexDetails(offset, size, type, hash); editIndex(index, indexDetails); return indexDetails; } /** * Edit index data * @param index * @param details */ private void editIndex(long index, IndexDetails details) { synchronized (indicesMapsAccessLock) { loadedIndices.put(index, details); dirtyLoadedIndices.add(index); } } private long createIndexMetadata(IndexDetails indexDetails) { synchronized (indicesMapsAccessLock) { long newIndex = firstAllocableIndex++; loadedIndices.put(newIndex, indexDetails); dirtyLoadedIndices.add(newIndex); removedIndices.remove(newIndex); return newIndex; } } private IndexDetails getIndexMetadataUnsafe(long index) throws IOException { // Return index details if loaded IndexDetails details; synchronized (indicesMapsAccessLock) { details = loadedIndices.getOrDefault(index, null); } if (details != null) return details; // Try to load the details from file final long metadataPosition = index * IndexDetails.TOTAL_BYTES; if (metadataPosition + IndexDetails.TOTAL_BYTES > metadataFileChannel.size()) { // Avoid underflow exception return null; } SeekableByteChannel currentMetadataFileChannel = metadataFileChannel.position(metadataPosition); synchronized (metadataByteBufferLock) { metadataByteBuffer.rewind(); currentMetadataFileChannel.read(metadataByteBuffer); metadataByteBuffer.rewind(); // If it's not deleted continue if ((metadataByteBuffer.getInt() & IndexDetails.MASK_DELETED) == 0) { final long offset = metadataByteBuffer.getLong(); // final long sizeAndType = metadataByteBuffer.getLong(); // final int size = (int)(sizeAndType >> 32); // final int type = (int)sizeAndType; final int size = metadataByteBuffer.getInt(); final int type = metadataByteBuffer.getInt(); final long hash = metadataByteBuffer.getLong(); final IndexDetails indexDetails = new IndexDetails(offset, size, type, hash); editIndex(index, indexDetails); return indexDetails; } } // No results found. Returning null return null; } private IndexDetails getIndexMetadata(long index) throws IOException { IndexDetails details = getIndexMetadataUnsafe(index); if (details == null) throw new IOException("Index " + index + " not found"); else return details; } @Override public void close() throws IOException { if (closed) { return; } synchronized (closeLock) { if (closed) { return; } closed = true; } // Update indices metadata SeekableByteChannel metadata = metadataFileChannel; long lastIndex = -2; synchronized (indicesMapsAccessLock) { for (long index : dirtyLoadedIndices) { IndexDetails indexDetails = loadedIndices.get(index); if (index - lastIndex != 1) { metadata = metadata.position(index * IndexDetails.TOTAL_BYTES); } writeIndexDetails(metadata, indexDetails); lastIndex = index; } } // Remove removed indices synchronized (indicesMapsAccessLock) { for (long index : removedIndices) { metadata = metadata.position(index * IndexDetails.TOTAL_BYTES); eraseIndexDetails(metadata); } } fileAllocator.close(); } private void writeIndexDetails(SeekableByteChannel position, IndexDetails indexDetails) throws IOException { synchronized (metadataByteBufferLock) { final int size = indexDetails.getSize(); final int type = indexDetails.getType(); final long offset = indexDetails.getOffset(); final long hash = indexDetails.getHash(); metadataByteBuffer.rewind(); metadataByteBuffer.putInt(0); metadataByteBuffer.putLong(offset); metadataByteBuffer.putInt(size); metadataByteBuffer.putInt(type); //metadataByteBuffer.putLong((long)size << 32 | type & 0xFFFFFFFFL); metadataByteBuffer.putLong(hash); metadataByteBuffer.rewind(); position.write(metadataByteBuffer); } } private void eraseIndexDetails(SeekableByteChannel position) throws IOException { synchronized (maskByteBufferLock) { maskByteBuffer.rewind(); maskByteBuffer.putInt(IndexDetails.MASK_DELETED); maskByteBuffer.rewind(); position.write(maskByteBuffer); } } private void checkClosed() { if (closed) { throw new RuntimeException("Index Manager is closed."); } } @Override public long clean() { return cleanExtraIndices(); } private long cleanExtraIndices() { long removedIndices = 0; LongArrayList toUnload = new LongArrayList(); synchronized (indicesMapsAccessLock) { if (loadedIndices.size() > JCWDatabase.MAX_LOADED_REFERENCES) { long count = loadedIndices.size(); LongIterator it = loadedIndices.keySet().iterator(); while (it.hasNext()) { long loadedIndex = it.nextLong(); if (count < JCWDatabase.MAX_LOADED_REFERENCES * 3l / 2l) { break; } toUnload.add(loadedIndex); removedIndices++; count--; } } } for (long index : toUnload.elements()) { try { flushAndUnload(index); } catch (IOException e) { e.printStackTrace(); } } return removedIndices; } }