2022-04-09 02:45:42 +02:00
|
|
|
package it.cavallium.dbengine.database.disk;
|
2020-12-07 22:15:18 +01:00
|
|
|
|
2021-08-29 23:18:03 +02:00
|
|
|
import static it.cavallium.dbengine.database.LLUtils.isDirect;
|
2022-03-16 19:19:26 +01:00
|
|
|
import static it.cavallium.dbengine.database.LLUtils.isReadOnlyDirect;
|
2021-08-29 23:18:03 +02:00
|
|
|
|
2022-03-16 13:47:56 +01:00
|
|
|
import io.netty5.buffer.api.Buffer;
|
|
|
|
import io.netty5.buffer.api.BufferAllocator;
|
2022-03-16 19:19:26 +01:00
|
|
|
import io.netty5.buffer.api.ReadableComponent;
|
2022-03-16 13:47:56 +01:00
|
|
|
import io.netty5.buffer.api.Send;
|
|
|
|
import io.netty5.util.internal.PlatformDependent;
|
2021-04-30 19:15:04 +02:00
|
|
|
import it.cavallium.dbengine.database.LLUtils;
|
2021-10-20 01:51:34 +02:00
|
|
|
import it.cavallium.dbengine.database.disk.RocksDBColumn;
|
2020-12-07 22:15:18 +01:00
|
|
|
import java.nio.ByteBuffer;
|
2021-04-30 19:15:04 +02:00
|
|
|
import java.util.ArrayList;
|
|
|
|
import java.util.List;
|
2022-04-09 02:45:42 +02:00
|
|
|
import org.rocksdb.ColumnFamilyHandle;
|
|
|
|
import org.rocksdb.RocksDBException;
|
|
|
|
import org.rocksdb.WriteBatch;
|
|
|
|
import org.rocksdb.WriteOptions;
|
2020-12-07 22:15:18 +01:00
|
|
|
|
2021-04-30 19:15:04 +02:00
|
|
|
public class CappedWriteBatch extends WriteBatch {
|
2020-12-07 22:15:18 +01:00
|
|
|
|
2021-05-02 19:18:15 +02:00
|
|
|
/**
|
|
|
|
* Default: true, Use false to debug problems with direct buffers
|
|
|
|
*/
|
2021-05-03 02:45:29 +02:00
|
|
|
private static final boolean USE_FAST_DIRECT_BUFFERS = true;
|
2021-10-20 01:51:34 +02:00
|
|
|
private final RocksDBColumn db;
|
2021-09-01 00:01:56 +02:00
|
|
|
private final BufferAllocator alloc;
|
2020-12-07 22:15:18 +01:00
|
|
|
private final int cap;
|
2022-05-20 10:20:00 +02:00
|
|
|
private final WriteOptions writeOptions;
|
2021-09-01 00:01:56 +02:00
|
|
|
|
2021-08-29 23:18:03 +02:00
|
|
|
private final List<Buffer> buffersToRelease;
|
2021-09-01 00:01:56 +02:00
|
|
|
private final List<ByteBuffer> byteBuffersToRelease;
|
2020-12-07 22:15:18 +01:00
|
|
|
|
|
|
|
/**
|
2021-10-20 01:51:34 +02:00
|
|
|
* @param db
|
2020-12-07 22:15:18 +01:00
|
|
|
* @param cap The limit of operations
|
|
|
|
*/
|
2021-10-20 01:51:34 +02:00
|
|
|
public CappedWriteBatch(RocksDBColumn db,
|
2021-09-01 00:01:56 +02:00
|
|
|
BufferAllocator alloc,
|
2021-02-01 11:00:27 +01:00
|
|
|
int cap,
|
|
|
|
int reservedWriteBatchSize,
|
|
|
|
long maxWriteBatchSize,
|
2022-05-20 10:20:00 +02:00
|
|
|
WriteOptions writeOptions) {
|
2021-04-30 19:15:04 +02:00
|
|
|
super(reservedWriteBatchSize);
|
2020-12-07 22:15:18 +01:00
|
|
|
this.db = db;
|
2021-09-01 00:01:56 +02:00
|
|
|
this.alloc = alloc;
|
2020-12-07 22:15:18 +01:00
|
|
|
this.cap = cap;
|
|
|
|
this.writeOptions = writeOptions;
|
2021-04-30 19:15:04 +02:00
|
|
|
this.setMaxBytes(maxWriteBatchSize);
|
|
|
|
this.buffersToRelease = new ArrayList<>();
|
2021-09-01 00:01:56 +02:00
|
|
|
this.byteBuffersToRelease = new ArrayList<>();
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
2021-03-13 19:01:36 +01:00
|
|
|
private synchronized void flushIfNeeded(boolean force) throws RocksDBException {
|
2021-04-30 19:15:04 +02:00
|
|
|
if (this.count() >= (force ? 1 : cap)) {
|
2021-10-17 19:52:43 +02:00
|
|
|
try {
|
|
|
|
db.write(writeOptions, this.getWriteBatch());
|
|
|
|
this.clear();
|
|
|
|
} finally {
|
|
|
|
releaseAllBuffers();
|
|
|
|
}
|
2021-04-30 19:15:04 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private synchronized void releaseAllBuffers() {
|
2021-05-02 19:18:15 +02:00
|
|
|
if (!buffersToRelease.isEmpty()) {
|
2021-08-29 23:18:03 +02:00
|
|
|
for (Buffer byteBuffer : buffersToRelease) {
|
|
|
|
byteBuffer.close();
|
2021-05-02 19:18:15 +02:00
|
|
|
}
|
|
|
|
buffersToRelease.clear();
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
2021-09-01 00:01:56 +02:00
|
|
|
if (!byteBuffersToRelease.isEmpty()) {
|
|
|
|
for (var byteBuffer : byteBuffersToRelease) {
|
|
|
|
PlatformDependent.freeDirectBuffer(byteBuffer);
|
|
|
|
}
|
|
|
|
byteBuffersToRelease.clear();
|
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-03-13 19:01:36 +01:00
|
|
|
public synchronized int count() {
|
2021-04-30 19:15:04 +02:00
|
|
|
return super.count();
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-03-13 19:01:36 +01:00
|
|
|
public synchronized void put(byte[] key, byte[] value) throws RocksDBException {
|
2021-04-30 19:15:04 +02:00
|
|
|
super.put(key, value);
|
2020-12-07 22:15:18 +01:00
|
|
|
flushIfNeeded(false);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-03-13 19:01:36 +01:00
|
|
|
public synchronized void put(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[] value) throws RocksDBException {
|
2021-04-30 19:15:04 +02:00
|
|
|
super.put(columnFamilyHandle, key, value);
|
2020-12-07 22:15:18 +01:00
|
|
|
flushIfNeeded(false);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-03-13 19:01:36 +01:00
|
|
|
public synchronized void put(ByteBuffer key, ByteBuffer value) throws RocksDBException {
|
2021-04-30 19:15:04 +02:00
|
|
|
super.put(key, value);
|
2020-12-07 22:15:18 +01:00
|
|
|
flushIfNeeded(false);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-03-13 19:01:36 +01:00
|
|
|
public synchronized void put(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key, ByteBuffer value) throws RocksDBException {
|
2021-04-30 19:15:04 +02:00
|
|
|
super.put(columnFamilyHandle, key, value);
|
|
|
|
flushIfNeeded(false);
|
|
|
|
}
|
|
|
|
|
2021-08-29 23:18:03 +02:00
|
|
|
public synchronized void put(ColumnFamilyHandle columnFamilyHandle,
|
|
|
|
Send<Buffer> keyToReceive,
|
|
|
|
Send<Buffer> valueToReceive) throws RocksDBException {
|
|
|
|
var key = keyToReceive.receive();
|
|
|
|
var value = valueToReceive.receive();
|
2021-12-12 02:17:36 +01:00
|
|
|
if (USE_FAST_DIRECT_BUFFERS
|
2022-03-16 19:19:26 +01:00
|
|
|
&& (isReadOnlyDirect(key))
|
|
|
|
&& (isReadOnlyDirect(value))) {
|
|
|
|
ByteBuffer keyNioBuffer = ((ReadableComponent) key).readableBuffer();
|
|
|
|
ByteBuffer valueNioBuffer = ((ReadableComponent) value).readableBuffer();
|
2021-09-01 00:01:56 +02:00
|
|
|
buffersToRelease.add(value);
|
2021-05-02 19:18:15 +02:00
|
|
|
buffersToRelease.add(key);
|
2021-09-01 00:01:56 +02:00
|
|
|
|
2021-12-12 02:17:36 +01:00
|
|
|
super.put(columnFamilyHandle, keyNioBuffer, valueNioBuffer);
|
2021-05-02 19:18:15 +02:00
|
|
|
} else {
|
2021-04-30 19:15:04 +02:00
|
|
|
try {
|
2021-05-02 19:18:15 +02:00
|
|
|
byte[] keyArray = LLUtils.toArray(key);
|
|
|
|
byte[] valueArray = LLUtils.toArray(value);
|
|
|
|
super.put(columnFamilyHandle, keyArray, valueArray);
|
2021-04-30 19:15:04 +02:00
|
|
|
} finally {
|
2021-08-29 23:18:03 +02:00
|
|
|
key.close();
|
|
|
|
value.close();
|
2021-04-30 19:15:04 +02:00
|
|
|
}
|
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
flushIfNeeded(false);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-03-13 19:01:36 +01:00
|
|
|
public synchronized void merge(byte[] key, byte[] value) throws RocksDBException {
|
2021-04-30 19:15:04 +02:00
|
|
|
super.merge(key, value);
|
2020-12-07 22:15:18 +01:00
|
|
|
flushIfNeeded(false);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-03-13 19:01:36 +01:00
|
|
|
public synchronized void merge(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[] value) throws RocksDBException {
|
2021-04-30 19:15:04 +02:00
|
|
|
super.merge(columnFamilyHandle, key, value);
|
2020-12-07 22:15:18 +01:00
|
|
|
flushIfNeeded(false);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-03-13 19:01:36 +01:00
|
|
|
public synchronized void delete(byte[] key) throws RocksDBException {
|
2021-04-30 19:15:04 +02:00
|
|
|
super.delete(key);
|
2020-12-07 22:15:18 +01:00
|
|
|
flushIfNeeded(false);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-03-13 19:01:36 +01:00
|
|
|
public synchronized void delete(ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException {
|
2021-04-30 19:15:04 +02:00
|
|
|
super.delete(columnFamilyHandle, key);
|
|
|
|
flushIfNeeded(false);
|
|
|
|
}
|
|
|
|
|
2021-08-29 23:18:03 +02:00
|
|
|
public synchronized void delete(ColumnFamilyHandle columnFamilyHandle, Send<Buffer> keyToReceive) throws RocksDBException {
|
|
|
|
var key = keyToReceive.receive();
|
2022-03-16 19:19:26 +01:00
|
|
|
if (USE_FAST_DIRECT_BUFFERS && isReadOnlyDirect(key)) {
|
|
|
|
ByteBuffer keyNioBuffer = ((ReadableComponent) key).readableBuffer();
|
2021-05-02 19:18:15 +02:00
|
|
|
buffersToRelease.add(key);
|
2022-03-27 01:22:20 +01:00
|
|
|
delete(columnFamilyHandle, keyNioBuffer);
|
2021-05-02 19:18:15 +02:00
|
|
|
} else {
|
|
|
|
try {
|
|
|
|
super.delete(columnFamilyHandle, LLUtils.toArray(key));
|
|
|
|
} finally {
|
2021-08-29 23:18:03 +02:00
|
|
|
key.close();
|
2021-05-02 19:18:15 +02:00
|
|
|
}
|
2021-04-30 19:15:04 +02:00
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
flushIfNeeded(false);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-03-13 19:01:36 +01:00
|
|
|
public synchronized void singleDelete(byte[] key) throws RocksDBException {
|
2021-04-30 19:15:04 +02:00
|
|
|
super.singleDelete(key);
|
2020-12-07 22:15:18 +01:00
|
|
|
flushIfNeeded(false);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-03-13 19:01:36 +01:00
|
|
|
public synchronized void singleDelete(ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException {
|
2021-04-30 19:15:04 +02:00
|
|
|
super.singleDelete(columnFamilyHandle, key);
|
2020-12-07 22:15:18 +01:00
|
|
|
flushIfNeeded(false);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-03-27 01:22:20 +01:00
|
|
|
public synchronized void delete(ByteBuffer key) throws RocksDBException {
|
|
|
|
super.delete(key);
|
2020-12-07 22:15:18 +01:00
|
|
|
flushIfNeeded(false);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-03-27 01:22:20 +01:00
|
|
|
public synchronized void delete(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key) throws RocksDBException {
|
|
|
|
super.delete(columnFamilyHandle, key);
|
2020-12-07 22:15:18 +01:00
|
|
|
flushIfNeeded(false);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-03-13 19:01:36 +01:00
|
|
|
public synchronized void deleteRange(byte[] beginKey, byte[] endKey) throws RocksDBException {
|
2021-04-30 19:15:04 +02:00
|
|
|
super.deleteRange(beginKey, endKey);
|
2020-12-07 22:15:18 +01:00
|
|
|
flushIfNeeded(false);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-03-13 19:01:36 +01:00
|
|
|
public synchronized void deleteRange(ColumnFamilyHandle columnFamilyHandle, byte[] beginKey, byte[] endKey)
|
2020-12-07 22:15:18 +01:00
|
|
|
throws RocksDBException {
|
2021-04-30 19:15:04 +02:00
|
|
|
super.deleteRange(columnFamilyHandle, beginKey, endKey);
|
2020-12-07 22:15:18 +01:00
|
|
|
flushIfNeeded(false);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-03-13 19:01:36 +01:00
|
|
|
public synchronized void putLogData(byte[] blob) throws RocksDBException {
|
2021-04-30 19:15:04 +02:00
|
|
|
super.putLogData(blob);
|
2020-12-07 22:15:18 +01:00
|
|
|
flushIfNeeded(false);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-03-13 19:01:36 +01:00
|
|
|
public synchronized void clear() {
|
2021-04-30 19:15:04 +02:00
|
|
|
super.clear();
|
|
|
|
releaseAllBuffers();
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-03-13 19:01:36 +01:00
|
|
|
public synchronized void setSavePoint() {
|
2021-04-30 19:15:04 +02:00
|
|
|
super.setSavePoint();
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-03-13 19:01:36 +01:00
|
|
|
public synchronized void rollbackToSavePoint() throws RocksDBException {
|
2021-04-30 19:15:04 +02:00
|
|
|
super.rollbackToSavePoint();
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-03-13 19:01:36 +01:00
|
|
|
public synchronized void popSavePoint() throws RocksDBException {
|
2021-04-30 19:15:04 +02:00
|
|
|
super.popSavePoint();
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-03-13 19:01:36 +01:00
|
|
|
public synchronized void setMaxBytes(long maxBytes) {
|
2021-04-30 19:15:04 +02:00
|
|
|
super.setMaxBytes(maxBytes);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-03-13 19:01:36 +01:00
|
|
|
public synchronized WriteBatch getWriteBatch() {
|
2021-05-02 19:18:15 +02:00
|
|
|
return super.getWriteBatch();
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
2021-03-13 19:01:36 +01:00
|
|
|
public synchronized void writeToDbAndClose() throws RocksDBException {
|
2021-05-02 19:18:15 +02:00
|
|
|
try {
|
|
|
|
flushIfNeeded(true);
|
|
|
|
super.close();
|
|
|
|
} finally {
|
|
|
|
releaseAllBuffers();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
public void flush() throws RocksDBException {
|
|
|
|
try {
|
|
|
|
flushIfNeeded(true);
|
|
|
|
} finally {
|
|
|
|
releaseAllBuffers();
|
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-05-18 00:52:01 +02:00
|
|
|
protected void disposeInternal(boolean owningHandle) {
|
|
|
|
super.disposeInternal(owningHandle);
|
2021-04-30 19:15:04 +02:00
|
|
|
releaseAllBuffers();
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
}
|