Fix AssertionError from AsyncSocketChannel.beginRead()
An AssertionError is triggered by a ByteBuf when beginRead() attempts to access the buffer which has been freed already. This commit ensures the buffer is not freed before performing an I/O operation. To determine if the buffer has been freed, UnsafeByteBuf.isFreed() has been added.
This commit is contained in:
parent
95e8ec1db9
commit
bfe2a96505
@ -1536,6 +1536,10 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
|
|||||||
discardReadComponents();
|
discardReadComponents();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isFreed() {
|
||||||
|
return freed;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void free() {
|
public void free() {
|
||||||
if (freed) {
|
if (freed) {
|
||||||
|
@ -252,6 +252,11 @@ public class DuplicatedByteBuf extends AbstractByteBuf {
|
|||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isFreed() {
|
||||||
|
return buffer.isFreed();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void free() {
|
public void free() {
|
||||||
|
|
||||||
|
@ -252,6 +252,11 @@ public class ReadOnlyByteBuf extends AbstractByteBuf {
|
|||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isFreed() {
|
||||||
|
return buffer.isFreed();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void free() { }
|
public void free() { }
|
||||||
}
|
}
|
||||||
|
@ -324,6 +324,11 @@ public class SlicedByteBuf extends AbstractByteBuf {
|
|||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isFreed() {
|
||||||
|
return buffer.isFreed();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void free() { }
|
public void free() { }
|
||||||
}
|
}
|
||||||
|
@ -809,6 +809,11 @@ public class SwappedByteBuf implements UnsafeByteBuf {
|
|||||||
buf.discardSomeReadBytes();
|
buf.discardSomeReadBytes();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isFreed() {
|
||||||
|
return buf.isFreed();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void free() { }
|
public void free() { }
|
||||||
}
|
}
|
||||||
|
@ -498,6 +498,11 @@ public class UnpooledDirectByteBuf extends AbstractByteBuf {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isFreed() {
|
||||||
|
return freed;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void free() {
|
public void free() {
|
||||||
if (freed) {
|
if (freed) {
|
||||||
|
@ -375,6 +375,11 @@ public class UnpooledHeapByteBuf extends AbstractByteBuf {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isFreed() {
|
||||||
|
return freed;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void free() {
|
public void free() {
|
||||||
freed = true;
|
freed = true;
|
||||||
|
@ -42,6 +42,11 @@ public interface UnsafeByteBuf extends ByteBuf {
|
|||||||
*/
|
*/
|
||||||
void discardSomeReadBytes();
|
void discardSomeReadBytes();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns {@code true} if and only if this buffer has been deallocated by {@link #free()}.
|
||||||
|
*/
|
||||||
|
boolean isFreed();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deallocates the internal memory block of this buffer or returns it to the {@link ByteBufAllocator} it came
|
* Deallocates the internal memory block of this buffer or returns it to the {@link ByteBufAllocator} it came
|
||||||
* from. The result of accessing a released buffer is unspecified and can even cause JVM crash.
|
* from. The result of accessing a released buffer is unspecified and can even cause JVM crash.
|
||||||
|
@ -865,6 +865,11 @@ class ReplayingDecoderBuffer implements UnsafeByteBuf {
|
|||||||
throw new UnreplayableOperationException();
|
throw new UnreplayableOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isFreed() {
|
||||||
|
return ((UnsafeByteBuf) buffer).isFreed();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void free() {
|
public void free() {
|
||||||
throw new UnreplayableOperationException();
|
throw new UnreplayableOperationException();
|
||||||
|
@ -17,6 +17,7 @@ package io.netty.channel.socket.aio;
|
|||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.ChannelBufType;
|
import io.netty.buffer.ChannelBufType;
|
||||||
|
import io.netty.buffer.UnsafeByteBuf;
|
||||||
import io.netty.channel.ChannelException;
|
import io.netty.channel.ChannelException;
|
||||||
import io.netty.channel.ChannelFlushFutureNotifier;
|
import io.netty.channel.ChannelFlushFutureNotifier;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
@ -244,6 +245,9 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
|||||||
try {
|
try {
|
||||||
if (buf.readable()) {
|
if (buf.readable()) {
|
||||||
for (;;) {
|
for (;;) {
|
||||||
|
if (((UnsafeByteBuf) buf).isFreed()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
// Ensure the readerIndex of the buffer is 0 before beginning an async write.
|
// Ensure the readerIndex of the buffer is 0 before beginning an async write.
|
||||||
// Otherwise, JDK can write into a wrong region of the buffer when a handler calls
|
// Otherwise, JDK can write into a wrong region of the buffer when a handler calls
|
||||||
// discardReadBytes() later, modifying the readerIndex and the writerIndex unexpectedly.
|
// discardReadBytes() later, modifying the readerIndex and the writerIndex unexpectedly.
|
||||||
@ -288,7 +292,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void beginRead() {
|
private void beginRead() {
|
||||||
if (inBeginRead || asyncReadInProgress || inputShutdown || readSuspended.get()) {
|
if (inBeginRead || asyncReadInProgress || readSuspended.get()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -297,6 +301,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
|||||||
try {
|
try {
|
||||||
for (;;) {
|
for (;;) {
|
||||||
ByteBuf byteBuf = pipeline().inboundByteBuffer();
|
ByteBuf byteBuf = pipeline().inboundByteBuffer();
|
||||||
|
if (((UnsafeByteBuf) byteBuf).isFreed()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (!byteBuf.readable()) {
|
if (!byteBuf.readable()) {
|
||||||
byteBuf.discardReadBytes();
|
byteBuf.discardReadBytes();
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user