Fix invalid memory access in AIO writes

To perform writes in AioSocketChannel, we get a ByteBuffer view of the
outbound buffer and specify it as a parameter when we call
AsynchronousSocketChannel.write().

In most cases, the write() operation is finished immediately.  However,
sometimes, it is scheduled for later execution.  In such a case, there's
a chance for a user's handler to append more data to the outbound
buffer.

When more data is appended to the outbound buffer, the outbound buffer
can expand its capacity by itself.  Changing the capacity of a buffer is
basically made of the following steps:

1. Allocate a larger new internal memory region.
2. Copy the current content of the buffer to the new memory region.
3. Rewire the buffer so that it refers to the new region.
4. Deallocate the old memory region.

Because the old memory region is deallocated at the step 4, the write
operation scheduled later will access the deallocated region, leading
all sort of data corruption or even segfaults.

To prevent this situation, I added suspendIntermediaryDeallocations()
and resumeIntermediaryDeallocations() to UnsafeByteBuf.

AioSocketChannel.doFlushByteBuf() now calls suspendIntermediaryDealloc()
to defer the deallocation of the old memory regions until the completion
handler is notified.
This commit is contained in:
Trustin Lee 2012-12-02 21:50:33 +09:00
parent 72e0acbe84
commit 5f9090a7f0
10 changed files with 125 additions and 7 deletions

View File

@ -24,12 +24,14 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Queue;
/**
@ -48,6 +50,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
private Component lastAccessed;
private int lastAccessedId;
private boolean freed;
private Queue<UnsafeByteBuf> suspendedDeallocations;
public DefaultCompositeByteBuf(ByteBufAllocator alloc, int maxNumComponents) {
super(Integer.MAX_VALUE);
@ -1259,7 +1262,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
return result + ", components=" + components.size() + ')';
}
private static final class Component {
private final class Component {
final UnsafeByteBuf buf;
final int length;
final boolean allocatedBySelf;
@ -1282,11 +1285,15 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
for (buf = this.buf; buf.unwrap() != null; buf = (UnsafeByteBuf) buf.unwrap()) {
continue;
}
buf.free(); // We should not get a NPE here. If so, it must be a bug.
if (suspendedDeallocations == null) {
buf.free(); // We should not get a NPE here. If so, it must be a bug.
} else {
suspendedDeallocations.add(buf);
}
}
}
@Override
public CompositeByteBuf readerIndex(int readerIndex) {
return (CompositeByteBuf) super.readerIndex(readerIndex);
@ -1536,6 +1543,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
discardReadComponents();
}
@Override
public boolean isFreed() {
return freed;
}
@ -1547,11 +1555,33 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
}
freed = true;
resumeIntermediaryDeallocations();
for (Component c: components) {
c.freeIfNecessary();
}
}
@Override
public void suspendIntermediaryDeallocations() {
if (suspendedDeallocations == null) {
suspendedDeallocations = new ArrayDeque<UnsafeByteBuf>(2);
}
}
@Override
public void resumeIntermediaryDeallocations() {
if (suspendedDeallocations == null) {
return;
}
Queue<UnsafeByteBuf> suspendedDeallocations = this.suspendedDeallocations;
this.suspendedDeallocations = null;
for (UnsafeByteBuf buf: suspendedDeallocations) {
buf.free();
}
}
@Override
public ByteBuf unwrap() {
return null;

View File

@ -258,7 +258,11 @@ public class DuplicatedByteBuf extends AbstractByteBuf {
}
@Override
public void free() {
public void free() { }
}
}
@Override
public void suspendIntermediaryDeallocations() { }
@Override
public void resumeIntermediaryDeallocations() { }
}

View File

@ -259,4 +259,10 @@ public class ReadOnlyByteBuf extends AbstractByteBuf {
@Override
public void free() { }
@Override
public void suspendIntermediaryDeallocations() { }
@Override
public void resumeIntermediaryDeallocations() { }
}

View File

@ -331,4 +331,10 @@ public class SlicedByteBuf extends AbstractByteBuf {
@Override
public void free() { }
@Override
public void suspendIntermediaryDeallocations() { }
@Override
public void resumeIntermediaryDeallocations() { }
}

View File

@ -816,4 +816,14 @@ public class SwappedByteBuf implements UnsafeByteBuf {
@Override
public void free() { }
@Override
public void suspendIntermediaryDeallocations() {
buf.suspendIntermediaryDeallocations();
}
@Override
public void resumeIntermediaryDeallocations() {
buf.resumeIntermediaryDeallocations();
}
}

View File

@ -26,6 +26,8 @@ import java.nio.ByteOrder;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.util.ArrayDeque;
import java.util.Queue;
/**
* A NIO {@link ByteBuffer} based buffer. It is recommended to use {@link Unpooled#directBuffer(int)}
@ -72,6 +74,7 @@ public class UnpooledDirectByteBuf extends AbstractByteBuf {
private int capacity;
private boolean freed;
private boolean doNotFree;
private Queue<ByteBuffer> suspendedDeallocations;
/**
* Creates a new direct buffer.
@ -137,7 +140,11 @@ public class UnpooledDirectByteBuf extends AbstractByteBuf {
if (doNotFree) {
doNotFree = false;
} else {
freeDirect(oldBuffer);
if (suspendedDeallocations == null) {
freeDirect(oldBuffer);
} else {
suspendedDeallocations.add(oldBuffer);
}
}
}
@ -514,9 +521,31 @@ public class UnpooledDirectByteBuf extends AbstractByteBuf {
return;
}
resumeIntermediaryDeallocations();
freeDirect(buffer);
}
@Override
public void suspendIntermediaryDeallocations() {
if (suspendedDeallocations == null) {
suspendedDeallocations = new ArrayDeque<ByteBuffer>(2);
}
}
@Override
public void resumeIntermediaryDeallocations() {
if (suspendedDeallocations == null) {
return;
}
Queue<ByteBuffer> suspendedDeallocations = this.suspendedDeallocations;
this.suspendedDeallocations = null;
for (ByteBuffer buf: suspendedDeallocations) {
freeDirect(buf);
}
}
@Override
public ByteBuf unwrap() {
return null;

View File

@ -385,6 +385,12 @@ public class UnpooledHeapByteBuf extends AbstractByteBuf {
freed = true;
}
@Override
public void suspendIntermediaryDeallocations() { }
@Override
public void resumeIntermediaryDeallocations() { }
@Override
public ByteBuf unwrap() {
return null;

View File

@ -53,6 +53,19 @@ public interface UnsafeByteBuf extends ByteBuf {
*/
void free();
/**
* Suspends the intermediary deallocation of the internal memory block of this buffer until asked via
* {@link #resumeIntermediaryDeallocations()}. An intermediary deallocation is usually made when the capacity of
* a buffer changes.
*/
void suspendIntermediaryDeallocations();
/**
* Resumes the intermediary deallocation of the internal memory block of this buffer, suspended by
* {@link #suspendIntermediaryDeallocations()}.
*/
void resumeIntermediaryDeallocations();
/**
* Return the underlying buffer instance if this buffer is a wrapper.
*

View File

@ -875,6 +875,16 @@ class ReplayingDecoderBuffer implements UnsafeByteBuf {
throw new UnreplayableOperationException();
}
@Override
public void suspendIntermediaryDeallocations() {
throw new UnreplayableOperationException();
}
@Override
public void resumeIntermediaryDeallocations() {
throw new UnreplayableOperationException();
}
@Override
public ByteBuf unwrap() {
throw new UnreplayableOperationException();

View File

@ -271,6 +271,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
if (asyncWriteInProgress) {
// JDK decided to write data (or notify handler) later.
((UnsafeByteBuf) buf).suspendIntermediaryDeallocations();
break;
}
@ -346,7 +347,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override
protected void completed0(T result, AioSocketChannel channel) {
channel.asyncWriteInProgress = false;
ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer();
((UnsafeByteBuf) buf).resumeIntermediaryDeallocations();
int writtenBytes = result.intValue();
if (writtenBytes > 0) {
// Update the readerIndex with the amount of read bytes