Use byte buffers in streams
This commit is contained in:
parent
53519fbc4e
commit
eec46d6c50
@ -28,6 +28,7 @@ import org.rocksdb.ColumnFamilyHandle;
|
||||
import org.rocksdb.ColumnFamilyOptions;
|
||||
import org.rocksdb.CompressionType;
|
||||
import org.rocksdb.DBOptions;
|
||||
import org.rocksdb.DirectSlice;
|
||||
import org.rocksdb.InfoLogLevel;
|
||||
import org.rocksdb.Options;
|
||||
import org.rocksdb.ReadOptions;
|
||||
@ -35,13 +36,9 @@ import org.rocksdb.RocksDB;
|
||||
import org.rocksdb.RocksDBException;
|
||||
import org.rocksdb.RocksIterator;
|
||||
import org.rocksdb.WALRecoveryMode;
|
||||
import org.rocksdb.WriteBatch;
|
||||
import org.rocksdb.WriteOptions;
|
||||
import org.rocksdb.util.SizeUnit;
|
||||
|
||||
/**
|
||||
* Created by wens on 16-3-10.
|
||||
*/
|
||||
public class RocksdbFileStore {
|
||||
|
||||
private static final byte[] NEXT_ID_KEY = new byte[]{0x0};
|
||||
@ -54,11 +51,11 @@ public class RocksdbFileStore {
|
||||
@SuppressWarnings("UnstableApiUsage")
|
||||
private final Striped<ReadWriteLock> metaLock;
|
||||
|
||||
private static final ReadOptions DEFAULT_READ_OPTS = new ReadOptions();
|
||||
private static final ReadOptions DEFAULT_IT_READ_OPTS = new ReadOptions()
|
||||
.setReadaheadSize(SizeUnit.MB)
|
||||
.setVerifyChecksums(false);
|
||||
private static final WriteOptions DEFAULT_WRITE_OPTS = new WriteOptions();
|
||||
private static final ReadOptions DEFAULT_READ_OPTS = new ReadOptions()
|
||||
.setVerifyChecksums(false)
|
||||
.setIgnoreRangeDeletions(true);
|
||||
private final ReadOptions itReadOpts;
|
||||
private static final WriteOptions DEFAULT_WRITE_OPTS = new WriteOptions().setDisableWAL(true);
|
||||
private static final ByteBuffer EMPTY_BYTE_BUF = ByteBuffer.allocateDirect(0);
|
||||
|
||||
private final RocksDB db;
|
||||
@ -96,6 +93,10 @@ public class RocksdbFileStore {
|
||||
db.put(headers, NEXT_ID_KEY, Longs.toByteArray(100));
|
||||
incFlush();
|
||||
}
|
||||
this.itReadOpts = new ReadOptions()
|
||||
.setReadaheadSize(blockSize * 4L)
|
||||
.setVerifyChecksums(false)
|
||||
.setIgnoreRangeDeletions(true);
|
||||
} catch (RocksDBException e) {
|
||||
throw new IOException("Failed to open RocksDB meta file store", e);
|
||||
}
|
||||
@ -429,6 +430,12 @@ public class RocksdbFileStore {
|
||||
return buf;
|
||||
}
|
||||
|
||||
private ByteBuf getDataKeyPrefix(long id) {
|
||||
var buf = ByteBufAllocator.DEFAULT.buffer(Long.BYTES);
|
||||
buf.writeLongLE(id);
|
||||
return buf;
|
||||
}
|
||||
|
||||
private byte[] getDataKeyByteArray(long id, int i) {
|
||||
ByteBuffer bb = ByteBuffer.wrap(new byte[Long.BYTES + Integer.BYTES]);
|
||||
bb.order(ByteOrder.LITTLE_ENDIAN);
|
||||
@ -438,7 +445,7 @@ public class RocksdbFileStore {
|
||||
}
|
||||
|
||||
|
||||
public int load(String name, long position, byte[] buf, int offset, int len) throws IOException {
|
||||
public int load(String name, long position, ByteBuf buf, int offset, int len) throws IOException {
|
||||
var l = metaLock.get(name).readLock();
|
||||
l.lock();
|
||||
try {
|
||||
@ -453,7 +460,7 @@ public class RocksdbFileStore {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (buf.length < offset + len) {
|
||||
if (buf.capacity() < offset + len) {
|
||||
throw new IllegalArgumentException("len is too long");
|
||||
}
|
||||
|
||||
@ -463,50 +470,64 @@ public class RocksdbFileStore {
|
||||
|
||||
ByteBuf valBuf = getDataValueBuf();
|
||||
ByteBuffer valBuffer = valBuf.nioBuffer(0, blockSize);
|
||||
boolean shouldSeekTo = true;
|
||||
try (RocksIterator it = db.newIterator(data, DEFAULT_IT_READ_OPTS)) {
|
||||
int m;
|
||||
int r;
|
||||
int i;
|
||||
do {
|
||||
m = (int) (p % (long) blockSize);
|
||||
r = Math.min(blockSize - m, n);
|
||||
i = (int) (p / (long) blockSize);
|
||||
try {
|
||||
boolean shouldSeekTo = true;
|
||||
try (var ro = new ReadOptions(itReadOpts)) {
|
||||
ro.setIgnoreRangeDeletions(true);
|
||||
ByteBuf fileIdPrefix = getDataKeyPrefix(fileId);
|
||||
try {
|
||||
try (var lb = new DirectSlice(fileIdPrefix.internalNioBuffer(0, Long.BYTES), Long.BYTES)) {
|
||||
ro.setIterateLowerBound(lb);
|
||||
ro.setPrefixSameAsStart(true);
|
||||
try (RocksIterator it = db.newIterator(data, itReadOpts)) {
|
||||
int m;
|
||||
int r;
|
||||
int i;
|
||||
do {
|
||||
m = (int) (p % (long) blockSize);
|
||||
r = Math.min(blockSize - m, n);
|
||||
i = (int) (p / (long) blockSize);
|
||||
|
||||
//System.out.println("Reading block " + name + "(" + fileId + "):" + i);
|
||||
//System.out.println("Reading block " + name + "(" + fileId + "):" + i);
|
||||
|
||||
if (shouldSeekTo) {
|
||||
shouldSeekTo = false;
|
||||
ByteBuf dataKey = getDataKey(null, fileId, i);
|
||||
try {
|
||||
it.seek(dataKey.nioBuffer());
|
||||
} finally {
|
||||
dataKey.release();
|
||||
if (shouldSeekTo) {
|
||||
shouldSeekTo = false;
|
||||
ByteBuf dataKey = getDataKey(null, fileId, i);
|
||||
try {
|
||||
it.seek(dataKey.nioBuffer());
|
||||
} finally {
|
||||
dataKey.release();
|
||||
}
|
||||
if (!it.isValid()) {
|
||||
throw new IOException("Block " + name + "(" + fileId + ")" + ":" + i + " not found");
|
||||
}
|
||||
} else {
|
||||
it.next();
|
||||
if (!it.isValid()) {
|
||||
throw new IOException("Block " + name + "(" + fileId + ")" + ":" + i + " not found");
|
||||
}
|
||||
}
|
||||
assert Arrays.equals(getDataKeyByteArray(fileId, i), it.key());
|
||||
int dataRead = it.value(valBuffer);
|
||||
valBuf.writerIndex(dataRead);
|
||||
|
||||
valBuf.getBytes(m, buf, f, r);
|
||||
|
||||
valBuf.writerIndex(0);
|
||||
valBuf.readerIndex(0);
|
||||
|
||||
p += r;
|
||||
f += r;
|
||||
n -= r;
|
||||
} while (n != 0 && p < size);
|
||||
|
||||
return (int) (p - position);
|
||||
}
|
||||
}
|
||||
if (!it.isValid()) {
|
||||
throw new IOException("Block " + name + "(" + fileId + ")" + ":" + i + " not found");
|
||||
}
|
||||
} else {
|
||||
it.next();
|
||||
if (!it.isValid()) {
|
||||
throw new IOException("Block " + name + "(" + fileId + ")" + ":" + i + " not found");
|
||||
}
|
||||
assert Arrays.equals(getDataKeyByteArray(fileId, i), it.key());
|
||||
} finally {
|
||||
fileIdPrefix.release();
|
||||
}
|
||||
int dataRead = it.value(valBuffer);
|
||||
valBuf.writerIndex(dataRead);
|
||||
|
||||
valBuf.getBytes(m, buf, f, r);
|
||||
|
||||
valBuf.writerIndex(0);
|
||||
valBuf.readerIndex(0);
|
||||
|
||||
p += r;
|
||||
f += r;
|
||||
n -= r;
|
||||
} while (n != 0 && p < size);
|
||||
|
||||
return (int) (p - position);
|
||||
}
|
||||
} finally {
|
||||
valBuf.release();
|
||||
}
|
||||
@ -666,7 +687,7 @@ public class RocksdbFileStore {
|
||||
return keys;
|
||||
}
|
||||
|
||||
public void append(String name, byte[] buf, int offset, int len) throws IOException {
|
||||
public void append(String name, ByteBuf buf, int offset, int len) throws IOException {
|
||||
var l = metaLock.get(name).writeLock();
|
||||
l.lock();
|
||||
try {
|
||||
@ -687,7 +708,6 @@ public class RocksdbFileStore {
|
||||
ByteBuf dataKey = null;
|
||||
ByteBuf bb = getDataValueBuf();
|
||||
try {
|
||||
try (var wb = new WriteBatch(len)) {
|
||||
do {
|
||||
int m = (int) (size % (long) blockSize);
|
||||
int r = Math.min(blockSize - m, n);
|
||||
@ -709,7 +729,7 @@ public class RocksdbFileStore {
|
||||
bb.ensureWritable(r);
|
||||
bb.setBytes(m, buf, f, r);
|
||||
|
||||
wb.put(data, dataKey.nioBuffer(), bb.internalNioBuffer(0, m + r));
|
||||
db.put(data, DEFAULT_WRITE_OPTS, dataKey.nioBuffer(), bb.internalNioBuffer(0, m + r));
|
||||
incFlush();
|
||||
size += r;
|
||||
f += r;
|
||||
@ -720,9 +740,6 @@ public class RocksdbFileStore {
|
||||
bb.readerIndex(0);
|
||||
bb.writerIndex(0);
|
||||
} while (n != 0);
|
||||
db.write(DEFAULT_WRITE_OPTS, wb);
|
||||
wb.clear();
|
||||
}
|
||||
} finally {
|
||||
if (dataKey != null) {
|
||||
dataKey.release();
|
||||
|
@ -1,13 +1,12 @@
|
||||
package it.cavallium.dbengine.lucene.directory;
|
||||
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
|
||||
import io.net5.buffer.ByteBuf;
|
||||
import io.net5.buffer.ByteBufAllocator;
|
||||
import io.net5.buffer.Unpooled;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
|
||||
/**
|
||||
* Created by wens on 16-3-10.
|
||||
*/
|
||||
public class RocksdbInputStream extends IndexInput {
|
||||
|
||||
private final int bufferSize;
|
||||
@ -16,9 +15,7 @@ public class RocksdbInputStream extends IndexInput {
|
||||
|
||||
private final long length;
|
||||
|
||||
private byte[] currentBuffer;
|
||||
|
||||
private int currentBufferIndex;
|
||||
private ByteBuf currentBuffer;
|
||||
|
||||
private final RocksdbFileStore store;
|
||||
|
||||
@ -29,21 +26,28 @@ public class RocksdbInputStream extends IndexInput {
|
||||
}
|
||||
|
||||
public RocksdbInputStream(String name, RocksdbFileStore store, int bufferSize, long length) {
|
||||
this(name,
|
||||
store,
|
||||
bufferSize,
|
||||
length,
|
||||
ByteBufAllocator.DEFAULT.ioBuffer(bufferSize, bufferSize).writerIndex(bufferSize)
|
||||
);
|
||||
}
|
||||
|
||||
private RocksdbInputStream(String name, RocksdbFileStore store, int bufferSize, long length, ByteBuf currentBuffer) {
|
||||
super("RocksdbInputStream(name=" + name + ")");
|
||||
this.name = name;
|
||||
this.store = store;
|
||||
this.bufferSize = bufferSize;
|
||||
this.currentBuffer = new byte[this.bufferSize];
|
||||
this.currentBufferIndex = bufferSize;
|
||||
this.currentBuffer = currentBuffer;
|
||||
currentBuffer.readerIndex(bufferSize);
|
||||
this.position = 0;
|
||||
this.length = length;
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
//store.close();
|
||||
currentBuffer.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -57,7 +61,7 @@ public class RocksdbInputStream extends IndexInput {
|
||||
throw new IllegalArgumentException("pos must be between 0 and " + length);
|
||||
}
|
||||
position = pos;
|
||||
currentBufferIndex = this.bufferSize;
|
||||
currentBuffer.readerIndex(bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -72,7 +76,12 @@ public class RocksdbInputStream extends IndexInput {
|
||||
throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: " + this);
|
||||
}
|
||||
|
||||
return new RocksdbInputStream(name, store, bufferSize, offset + length) {
|
||||
return new RocksdbInputStream(name,
|
||||
store,
|
||||
bufferSize,
|
||||
offset + length,
|
||||
Unpooled.buffer(bufferSize, bufferSize).writerIndex(bufferSize)
|
||||
) {
|
||||
{
|
||||
seek(0L);
|
||||
}
|
||||
@ -112,18 +121,18 @@ public class RocksdbInputStream extends IndexInput {
|
||||
throw new EOFException("Read end");
|
||||
}
|
||||
loadBufferIfNeed();
|
||||
byte b = currentBuffer[currentBufferIndex++];
|
||||
byte b = currentBuffer.readByte();
|
||||
position++;
|
||||
return b;
|
||||
}
|
||||
|
||||
protected void loadBufferIfNeed() throws IOException {
|
||||
if (this.currentBufferIndex == this.bufferSize) {
|
||||
if (currentBuffer.readerIndex() == this.bufferSize) {
|
||||
int n = store.load(name, position, currentBuffer, 0, bufferSize);
|
||||
if (n == -1) {
|
||||
throw new EOFException("Read end");
|
||||
}
|
||||
this.currentBufferIndex = 0;
|
||||
currentBuffer.readerIndex(0);
|
||||
}
|
||||
}
|
||||
|
||||
@ -139,13 +148,12 @@ public class RocksdbInputStream extends IndexInput {
|
||||
do {
|
||||
loadBufferIfNeed();
|
||||
|
||||
int r = Math.min(bufferSize - currentBufferIndex, n);
|
||||
int r = Math.min(bufferSize - currentBuffer.readerIndex(), n);
|
||||
|
||||
System.arraycopy(currentBuffer, currentBufferIndex, b, f, r);
|
||||
currentBuffer.readBytes(b, f, r);
|
||||
|
||||
f += r;
|
||||
position += r;
|
||||
currentBufferIndex += r;
|
||||
n -= r;
|
||||
|
||||
} while (n != 0);
|
||||
@ -154,8 +162,7 @@ public class RocksdbInputStream extends IndexInput {
|
||||
@Override
|
||||
public IndexInput clone() {
|
||||
RocksdbInputStream in = (RocksdbInputStream) super.clone();
|
||||
in.currentBuffer = new byte[bufferSize];
|
||||
System.arraycopy(this.currentBuffer, 0, in.currentBuffer, 0, bufferSize);
|
||||
in.currentBuffer = in.currentBuffer.duplicate();
|
||||
return in;
|
||||
}
|
||||
}
|
@ -1,5 +1,7 @@
|
||||
package it.cavallium.dbengine.lucene.directory;
|
||||
|
||||
import io.net5.buffer.ByteBuf;
|
||||
import io.net5.buffer.ByteBufAllocator;
|
||||
import org.apache.lucene.store.BufferedChecksum;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.apache.lucene.util.Accountable;
|
||||
@ -9,18 +11,13 @@ import java.util.Collection;
|
||||
import java.util.zip.CRC32;
|
||||
import java.util.zip.Checksum;
|
||||
|
||||
/**
|
||||
* Created by wens on 16-3-10.
|
||||
*/
|
||||
public class RocksdbOutputStream extends IndexOutput implements Accountable {
|
||||
|
||||
private final int bufferSize;
|
||||
|
||||
private long position;
|
||||
|
||||
private final byte[] currentBuffer;
|
||||
|
||||
private int currentBufferIndex;
|
||||
private ByteBuf currentBuffer;
|
||||
|
||||
private boolean dirty;
|
||||
|
||||
@ -35,7 +32,7 @@ public class RocksdbOutputStream extends IndexOutput implements Accountable {
|
||||
this.name = name;
|
||||
this.store = store;
|
||||
this.bufferSize = bufferSize;
|
||||
this.currentBuffer = new byte[this.bufferSize];
|
||||
this.currentBuffer = ByteBufAllocator.DEFAULT.ioBuffer(bufferSize, bufferSize);
|
||||
this.position = 0;
|
||||
this.dirty = false;
|
||||
if (checksum) {
|
||||
@ -47,17 +44,19 @@ public class RocksdbOutputStream extends IndexOutput implements Accountable {
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (dirty) {
|
||||
flush();
|
||||
if (currentBuffer != null) {
|
||||
if (dirty) {
|
||||
flush();
|
||||
}
|
||||
currentBuffer.release();
|
||||
currentBuffer = null;
|
||||
}
|
||||
//store.close();
|
||||
}
|
||||
|
||||
|
||||
private void flush() throws IOException {
|
||||
|
||||
store.append(name, currentBuffer, 0, currentBufferIndex);
|
||||
currentBufferIndex = 0;
|
||||
store.append(name, currentBuffer, 0, currentBuffer.writerIndex());
|
||||
currentBuffer.writerIndex(0);
|
||||
dirty = false;
|
||||
}
|
||||
|
||||
@ -82,10 +81,10 @@ public class RocksdbOutputStream extends IndexOutput implements Accountable {
|
||||
if (crc != null) {
|
||||
crc.update(b);
|
||||
}
|
||||
if (currentBufferIndex == bufferSize) {
|
||||
if (currentBuffer.writerIndex() == bufferSize) {
|
||||
flush();
|
||||
}
|
||||
currentBuffer[currentBufferIndex++] = b;
|
||||
currentBuffer.writeByte(b);
|
||||
position++;
|
||||
dirty = true;
|
||||
}
|
||||
@ -99,13 +98,12 @@ public class RocksdbOutputStream extends IndexOutput implements Accountable {
|
||||
int f = offset;
|
||||
int n = length;
|
||||
do {
|
||||
if (currentBufferIndex == bufferSize) {
|
||||
if (currentBuffer.writerIndex() == bufferSize) {
|
||||
flush();
|
||||
}
|
||||
int r = Math.min(bufferSize - currentBufferIndex, n);
|
||||
System.arraycopy(b, f, currentBuffer, currentBufferIndex, r);
|
||||
int r = Math.min(bufferSize - currentBuffer.writerIndex(), n);
|
||||
currentBuffer.writeBytes(b, f, r);
|
||||
f += r;
|
||||
currentBufferIndex += r;
|
||||
position += r;
|
||||
n -= r;
|
||||
dirty = true;
|
||||
|
Loading…
Reference in New Issue
Block a user