Implement netty resources

This commit is contained in:
Andrea Cavalli 2022-05-17 23:11:31 +02:00
parent d8e835549b
commit 4f5ab8ac6a
19 changed files with 155 additions and 150 deletions

View File

@ -144,6 +144,11 @@
</build> </build>
<dependencies> <dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty5-buffer</artifactId>
<version>5.0.0.Alpha1</version>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>

View File

@ -63,14 +63,16 @@ public abstract class AbstractCompactionFilterFactory<T extends AbstractCompacti
public abstract String name(); public abstract String name();
/** /**
* We override {@link RocksCallbackObject#disposeInternal()} * We override {@link RocksCallbackObject#disposeInternal(boolean)}
* as disposing of a rocksdb::AbstractCompactionFilterFactory requires * as disposing of a rocksdb::AbstractCompactionFilterFactory requires
* a slightly different approach as it is a std::shared_ptr * a slightly different approach as it is a std::shared_ptr
*/ */
@Override @Override
protected void disposeInternal() { protected void disposeInternal(boolean owningHandle) {
if (owningHandle) {
disposeInternal(nativeHandle_); disposeInternal(nativeHandle_);
} }
}
private native long createNewCompactionFilterFactory0(); private native long createNewCompactionFilterFactory0();
private native void disposeInternal(final long handle); private native void disposeInternal(final long handle);

View File

@ -325,9 +325,11 @@ public abstract class AbstractEventListener extends RocksCallbackObject implemen
* Deletes underlying C++ native callback object pointer * Deletes underlying C++ native callback object pointer
*/ */
@Override @Override
protected void disposeInternal() { protected void disposeInternal(boolean owningHandle) {
if (owningHandle) {
disposeInternal(nativeHandle_); disposeInternal(nativeHandle_);
} }
}
private native long createNewEventListener(final long enabledEventCallbackValues); private native long createNewEventListener(final long enabledEventCallbackValues);
private native void disposeInternal(final long handle); private native void disposeInternal(final long handle);

View File

@ -5,6 +5,7 @@
package org.rocksdb; package org.rocksdb;
import io.netty5.buffer.api.Drop;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
/** /**
@ -14,21 +15,36 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/ */
//@ThreadSafe //@ThreadSafe
public abstract class AbstractImmutableNativeReference public abstract class AbstractImmutableNativeReference
extends AbstractNativeReference { extends AbstractNativeReference<AbstractImmutableNativeReference> {
private static final Drop<AbstractImmutableNativeReference> DROP = new Drop<AbstractImmutableNativeReference>() {
@Override public void drop(AbstractImmutableNativeReference obj) {
obj.disposeInternal(obj.owningHandle_);
}
@Override public Drop<AbstractImmutableNativeReference> fork() {
return this;
}
@Override public void attach(AbstractImmutableNativeReference obj) {
}
};
/** /**
* A flag indicating whether the current {@code AbstractNativeReference} is * A flag indicating whether the current {@code AbstractNativeReference} is
* responsible to free the underlying C++ object * responsible to free the underlying C++ object
*/ */
protected final AtomicBoolean owningHandle_; protected volatile boolean owningHandle_;
protected AbstractImmutableNativeReference(final boolean owningHandle) { protected AbstractImmutableNativeReference(final boolean owningHandle) {
this.owningHandle_ = new AtomicBoolean(owningHandle); super(DROP);
this.owningHandle_ = owningHandle;
} }
@Override @Override
public boolean isOwningHandle() { public boolean isOwningHandle() {
return owningHandle_.get(); return owningHandle_ && isAccessible();
} }
/** /**
@ -46,14 +62,7 @@ public abstract class AbstractImmutableNativeReference
* </p> * </p>
*/ */
protected final void disOwnNativeHandle() { protected final void disOwnNativeHandle() {
owningHandle_.set(false); owningHandle_ = false;
}
@Override
public void close() {
if (owningHandle_.compareAndSet(true, false)) {
disposeInternal();
}
} }
/** /**
@ -61,5 +70,5 @@ public abstract class AbstractImmutableNativeReference
* which all subclasses of {@code AbstractImmutableNativeReference} must * which all subclasses of {@code AbstractImmutableNativeReference} must
* implement to release their underlying native C++ objects. * implement to release their underlying native C++ objects.
*/ */
protected abstract void disposeInternal(); protected abstract void disposeInternal(boolean owningHandle);
} }

View File

@ -5,6 +5,10 @@
package org.rocksdb; package org.rocksdb;
import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.internal.ResourceSupport;
/** /**
* AbstractNativeReference is the base-class of all RocksDB classes that have * AbstractNativeReference is the base-class of all RocksDB classes that have
* a pointer to a native C++ {@code rocksdb} object. * a pointer to a native C++ {@code rocksdb} object.
@ -26,7 +30,13 @@ package org.rocksdb;
* and cannot know what other resources depend on it. * and cannot know what other resources depend on it.
* </p> * </p>
*/ */
public abstract class AbstractNativeReference implements AutoCloseable { public abstract class AbstractNativeReference<T extends AbstractNativeReference<T>>
extends ResourceSupport<AbstractNativeReference<T>, T> {
protected AbstractNativeReference(Drop<T> drop) {
super(drop);
}
/** /**
* Returns true if we are responsible for freeing the underlying C++ object * Returns true if we are responsible for freeing the underlying C++ object
* *
@ -34,15 +44,12 @@ public abstract class AbstractNativeReference implements AutoCloseable {
*/ */
protected abstract boolean isOwningHandle(); protected abstract boolean isOwningHandle();
/** @Override protected RuntimeException createResourceClosedException() {
* Frees the underlying C++ object return new IllegalStateException("Resource is closed");
* <p> }
* It is strong recommended that the developer calls this after they
* have finished using the object.</p> @Override protected final Owned<T> prepareSend() {
* <p> throw new UnsupportedOperationException("Sends are not supported");
* Note, that once an instance of {@link AbstractNativeReference} has been }
* closed, calling any of its functions will lead to undefined
* behavior.</p>
*/
@Override public abstract void close();
} }

View File

@ -123,11 +123,13 @@ public abstract class AbstractRocksIterator<P extends RocksObject>
* before freeing the native handle.</p> * before freeing the native handle.</p>
*/ */
@Override @Override
protected void disposeInternal() { protected void disposeInternal(boolean owningHandle) {
if (owningHandle) {
if (parent_.isOwningHandle()) { if (parent_.isOwningHandle()) {
disposeInternal(nativeHandle_); disposeInternal(nativeHandle_);
} }
} }
}
abstract boolean isValid0(long handle); abstract boolean isValid0(long handle);
abstract void seekToFirst0(long handle); abstract void seekToFirst0(long handle);

View File

@ -47,8 +47,10 @@ public abstract class AbstractTransactionNotifier
* Otherwise an undefined behavior will occur. * Otherwise an undefined behavior will occur.
*/ */
@Override @Override
protected void disposeInternal() { protected void disposeInternal(boolean owningHandle) {
if (owningHandle) {
disposeInternal(nativeHandle_); disposeInternal(nativeHandle_);
} }
}
protected final native void disposeInternal(final long handle); protected final native void disposeInternal(final long handle);
} }

View File

@ -136,11 +136,13 @@ public class ColumnFamilyHandle extends RocksObject {
* initialized before freeing the native handle.</p> * initialized before freeing the native handle.</p>
*/ */
@Override @Override
protected void disposeInternal() { protected void disposeInternal(boolean owningHandle) {
if(rocksDB_.isOwningHandle()) { if (owningHandle) {
if (rocksDB_.isOwningHandle()) {
disposeInternal(nativeHandle_); disposeInternal(nativeHandle_);
} }
} }
}
private native byte[] getName(final long handle) throws RocksDBException; private native byte[] getName(final long handle) throws RocksDBException;
private native int getID(final long handle); private native int getID(final long handle);

View File

@ -27,9 +27,11 @@ public abstract class Filter extends RocksObject {
* Otherwise an undefined behavior will occur. * Otherwise an undefined behavior will occur.
*/ */
@Override @Override
protected void disposeInternal() { protected void disposeInternal(boolean owningHandle) {
if (owningHandle) {
disposeInternal(nativeHandle_); disposeInternal(nativeHandle_);
} }
}
@Override @Override
protected final native void disposeInternal(final long handle); protected final native void disposeInternal(final long handle);

View File

@ -109,14 +109,16 @@ public abstract class Logger extends RocksCallbackObject {
protected native byte infoLogLevel(long handle); protected native byte infoLogLevel(long handle);
/** /**
* We override {@link RocksCallbackObject#disposeInternal()} * We override {@link RocksCallbackObject#disposeInternal(boolean)}
* as disposing of a rocksdb::LoggerJniCallback requires * as disposing of a rocksdb::LoggerJniCallback requires
* a slightly different approach as it is a std::shared_ptr * a slightly different approach as it is a std::shared_ptr
*/ */
@Override @Override
protected void disposeInternal() { protected void disposeInternal(boolean owningHandle) {
if (owningHandle) {
disposeInternal(nativeHandle_); disposeInternal(nativeHandle_);
} }
}
private native void disposeInternal(final long handle); private native void disposeInternal(final long handle);
} }

View File

@ -46,14 +46,16 @@ public abstract class NativeComparatorWrapper
} }
/** /**
* We override {@link RocksCallbackObject#disposeInternal()} * We override {@link RocksCallbackObject#disposeInternal(boolean)}
* as disposing of a native rocksdb::Comparator extension requires * as disposing of a native rocksdb::Comparator extension requires
* a slightly different approach as it is not really a RocksCallbackObject * a slightly different approach as it is not really a RocksCallbackObject
*/ */
@Override @Override
protected void disposeInternal() { protected void disposeInternal(boolean owningHandle) {
if (owningHandle) {
disposeInternal(nativeHandle_); disposeInternal(nativeHandle_);
} }
}
private native void disposeInternal(final long handle); private native void disposeInternal(final long handle);
} }

View File

@ -109,35 +109,23 @@ public class OptimisticTransactionDB extends RocksDB
* @throws RocksDBException if an error occurs whilst closing. * @throws RocksDBException if an error occurs whilst closing.
*/ */
public void closeE() throws RocksDBException { public void closeE() throws RocksDBException {
if (owningHandle_.compareAndSet(true, false)) { this.close();
try { RocksDBException closeEx = this.closeEx;
closeDatabase(nativeHandle_); if (closeEx != null) {
} finally { this.closeEx = null;
disposeInternal(); throw closeEx;
}
} }
} }
/**
* This is similar to {@link #closeE()} except that it
* silently ignores any errors.
*
* This will not fsync the WAL files.
* If syncing is required, the caller must first call {@link #syncWal()}
* or {@link #write(WriteOptions, WriteBatch)} using an empty write batch
* with {@link WriteOptions#setSync(boolean)} set to true.
*
* See also {@link #close()}.
*/
@Override @Override
public void close() { protected void disposeInternal(boolean owningHandle) {
if (owningHandle_.compareAndSet(true, false)) { if (owningHandle) {
try { try {
closeDatabase(nativeHandle_); closeDatabase(nativeHandle_);
} catch (final RocksDBException e) { } catch (RocksDBException e) {
// silently ignore the error report closeEx = e;
} finally { } finally {
disposeInternal(); super.disposeInternal(true);
} }
} }
} }

View File

@ -65,9 +65,11 @@ public abstract class RocksCallbackObject extends
* Deletes underlying C++ native callback object pointer * Deletes underlying C++ native callback object pointer
*/ */
@Override @Override
protected void disposeInternal() { protected void disposeInternal(boolean owningHandle) {
if (owningHandle) {
disposeInternal(nativeHandle_); disposeInternal(nativeHandle_);
} }
}
private native void disposeInternal(final long handle); private native void disposeInternal(final long handle);
} }

View File

@ -39,6 +39,8 @@ public class RocksDB extends RocksObject {
private final List<ColumnFamilyHandle> ownedColumnFamilyHandles = new ArrayList<>(); private final List<ColumnFamilyHandle> ownedColumnFamilyHandles = new ArrayList<>();
protected RocksDBException closeEx = null;
/** /**
* Loads the necessary library files. * Loads the necessary library files.
* Calling this method twice will have no effect. * Calling this method twice will have no effect.
@ -607,45 +609,28 @@ public class RocksDB extends RocksObject {
* @throws RocksDBException if an error occurs whilst closing. * @throws RocksDBException if an error occurs whilst closing.
*/ */
public void closeE() throws RocksDBException { public void closeE() throws RocksDBException {
for (final ColumnFamilyHandle columnFamilyHandle : ownedColumnFamilyHandles) { this.close();
columnFamilyHandle.close(); RocksDBException closeEx = this.closeEx;
} if (closeEx != null) {
ownedColumnFamilyHandles.clear(); this.closeEx = null;
throw closeEx;
if (owningHandle_.compareAndSet(true, false)) {
try {
closeDatabase(nativeHandle_);
} finally {
disposeInternal();
}
} }
} }
/**
* This is similar to {@link #closeE()} except that it
* silently ignores any errors.
*
* This will not fsync the WAL files.
* If syncing is required, the caller must first call {@link #syncWal()}
* or {@link #write(WriteOptions, WriteBatch)} using an empty write batch
* with {@link WriteOptions#setSync(boolean)} set to true.
*
* See also {@link #close()}.
*/
@Override @Override
public void close() { protected void disposeInternal(boolean owningHandle) {
for (final ColumnFamilyHandle columnFamilyHandle : ownedColumnFamilyHandles) { for (final ColumnFamilyHandle columnFamilyHandle : ownedColumnFamilyHandles) {
columnFamilyHandle.close(); columnFamilyHandle.close();
} }
ownedColumnFamilyHandles.clear(); ownedColumnFamilyHandles.clear();
if (owningHandle_.compareAndSet(true, false)) { if (owningHandle) {
try { try {
closeDatabase(nativeHandle_); closeDatabase(nativeHandle_);
} catch (final RocksDBException e) { } catch (RocksDBException e) {
// silently ignore the error report closeEx = e;
} finally { } finally {
disposeInternal(); super.disposeInternal(true);
} }
} }
} }

View File

@ -5,6 +5,9 @@
package org.rocksdb; package org.rocksdb;
import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.Owned;
/** /**
* RocksMutableObject is an implementation of {@link AbstractNativeReference} * RocksMutableObject is an implementation of {@link AbstractNativeReference}
* whose reference to the underlying native C++ object can change. * whose reference to the underlying native C++ object can change.
@ -13,7 +16,21 @@ package org.rocksdb;
* has synchronization overheads and introduces complexity. Instead it is * has synchronization overheads and introduces complexity. Instead it is
* recommended to use {@link RocksObject} where possible.</p> * recommended to use {@link RocksObject} where possible.</p>
*/ */
public abstract class RocksMutableObject extends AbstractNativeReference { public abstract class RocksMutableObject extends AbstractNativeReference<RocksMutableObject> {
private static final Drop<RocksMutableObject> DROP = new Drop<RocksMutableObject>() {
@Override public void drop(RocksMutableObject obj) {
obj.doDrop();
}
@Override public Drop<RocksMutableObject> fork() {
return this;
}
@Override public void attach(RocksMutableObject obj) {
}
};
/** /**
* An mutable reference to the value of the C++ pointer pointing to some * An mutable reference to the value of the C++ pointer pointing to some
@ -23,9 +40,11 @@ public abstract class RocksMutableObject extends AbstractNativeReference {
private boolean owningHandle_; private boolean owningHandle_;
protected RocksMutableObject() { protected RocksMutableObject() {
super(DROP);
} }
protected RocksMutableObject(final long nativeHandle) { protected RocksMutableObject(final long nativeHandle) {
super(DROP);
this.nativeHandle_ = nativeHandle; this.nativeHandle_ = nativeHandle;
this.owningHandle_ = true; this.owningHandle_ = true;
} }
@ -56,7 +75,7 @@ public abstract class RocksMutableObject extends AbstractNativeReference {
@Override @Override
protected synchronized boolean isOwningHandle() { protected synchronized boolean isOwningHandle() {
return this.owningHandle_; return this.owningHandle_ && isAccessible();
} }
/** /**
@ -70,12 +89,10 @@ public abstract class RocksMutableObject extends AbstractNativeReference {
return this.nativeHandle_; return this.nativeHandle_;
} }
@Override private synchronized void doDrop() {
public synchronized final void close() {
if (isOwningHandle()) { if (isOwningHandle()) {
disposeInternal(); disposeInternal();
this.owningHandle_ = false; nativeHandle_ = 0;
this.nativeHandle_ = 0;
} }
} }

View File

@ -33,9 +33,11 @@ public abstract class RocksObject extends AbstractImmutableNativeReference {
* Deletes underlying C++ object pointer. * Deletes underlying C++ object pointer.
*/ */
@Override @Override
protected void disposeInternal() { protected void disposeInternal(boolean owningHandle) {
if (owningHandle) {
disposeInternal(nativeHandle_); disposeInternal(nativeHandle_);
} }
}
protected abstract void disposeInternal(final long handle); protected abstract void disposeInternal(final long handle);

View File

@ -117,35 +117,23 @@ public class TransactionDB extends RocksDB
* @throws RocksDBException if an error occurs whilst closing. * @throws RocksDBException if an error occurs whilst closing.
*/ */
public void closeE() throws RocksDBException { public void closeE() throws RocksDBException {
if (owningHandle_.compareAndSet(true, false)) { this.close();
try { RocksDBException closeEx = this.closeEx;
closeDatabase(nativeHandle_); if (closeEx != null) {
} finally { this.closeEx = null;
disposeInternal(); throw closeEx;
}
} }
} }
/**
* This is similar to {@link #closeE()} except that it
* silently ignores any errors.
*
* This will not fsync the WAL files.
* If syncing is required, the caller must first call {@link #syncWal()}
* or {@link #write(WriteOptions, WriteBatch)} using an empty write batch
* with {@link WriteOptions#setSync(boolean)} set to true.
*
* See also {@link #close()}.
*/
@Override @Override
public void close() { protected void disposeInternal(boolean owningHandle) {
if (owningHandle_.compareAndSet(true, false)) { if (owningHandle) {
try { try {
closeDatabase(nativeHandle_); closeDatabase(nativeHandle_);
} catch (final RocksDBException e) { } catch (RocksDBException e) {
// silently ignore the error report closeEx = e;
} finally { } finally {
disposeInternal(); super.disposeInternal(true);
} }
} }
} }

View File

@ -155,35 +155,23 @@ public class TtlDB extends RocksDB {
* @throws RocksDBException if an error occurs whilst closing. * @throws RocksDBException if an error occurs whilst closing.
*/ */
public void closeE() throws RocksDBException { public void closeE() throws RocksDBException {
if (owningHandle_.compareAndSet(true, false)) { this.close();
try { RocksDBException closeEx = this.closeEx;
closeDatabase(nativeHandle_); if (closeEx != null) {
} finally { this.closeEx = null;
disposeInternal(); throw closeEx;
}
} }
} }
/**
* <p>Close the TtlDB instance and release resource.</p>
*
*
* This will not fsync the WAL files.
* If syncing is required, the caller must first call {@link #syncWal()}
* or {@link #write(WriteOptions, WriteBatch)} using an empty write batch
* with {@link WriteOptions#setSync(boolean)} set to true.
*
* See also {@link #close()}.
*/
@Override @Override
public void close() { protected void disposeInternal(boolean owningHandle) {
if (owningHandle_.compareAndSet(true, false)) { if (owningHandle) {
try { try {
closeDatabase(nativeHandle_); closeDatabase(nativeHandle_);
} catch (final RocksDBException e) { } catch (RocksDBException e) {
// silently ignore the error report closeEx = e;
} finally { } finally {
disposeInternal(); super.disposeInternal(true);
} }
} }
} }

View File

@ -93,10 +93,8 @@ public class WBWIRocksIterator
} }
} }
@Override @Override protected void disposeInternal(boolean owningHandle) {
public void close() {
entry.close(); entry.close();
super.close();
} }
/** /**