Replace UnsafeByteBuf with ByteBuf.unsafe() again

* UnsafeByteBuf is gone. I added ByteBuf.unsafe() back.
* To avoid extra instantiation, all ByteBuf implementations implement the ByteBuf.Unsafe interface.
* To hide this implementation detail, all ByteBuf implementations are package-private.
* AbstractByteBuf and SwappedByteBuf are public and they do not implement ByteBuf.Unsafe because they don't need to.
* unwrap() is not an unsafe operation anymore.
* ChannelBuf also has unsafe() and Unsafe. ByteBuf.Unsafe extends ChannelBuf.unsafe(). ChannelBuf.unsafe() provides free() operation so that a user does not need to down-cast the buffer in freeInbound/OutboundBuffer().
This commit is contained in:
Trustin Lee 2012-12-05 19:28:56 +09:00
parent a9af028077
commit 51e6519b67
37 changed files with 269 additions and 283 deletions

View File

@ -28,7 +28,7 @@ import java.nio.charset.Charset;
/**
* A skeletal implementation of a buffer.
*/
public abstract class AbstractByteBuf implements UnsafeByteBuf {
public abstract class AbstractByteBuf implements ByteBuf {
private int readerIndex;
private int writerIndex;

View File

@ -272,6 +272,13 @@ public interface ByteBuf extends ChannelBuf, Comparable<ByteBuf> {
*/
ByteBuf order(ByteOrder endianness);
/**
* Return the underlying buffer instance if this buffer is a wrapper of another buffer.
*
* @return {@code null} if this buffer is not a wrapper
*/
ByteBuf unwrap();
/**
* Returns {@code true} if and only if this buffer is backed by an
* NIO direct buffer.
@ -1836,4 +1843,50 @@ public interface ByteBuf extends ChannelBuf, Comparable<ByteBuf> {
*/
@Override
String toString();
/**
* Provides access to potentially unsafe operations of this buffer.
*/
@Override
Unsafe unsafe();
/**
* Provides the potentially unsafe operations of {@link ByteBuf}.
*/
interface Unsafe extends ChannelBuf.Unsafe {
/**
* Returns the internal NIO buffer that is reused for I/O.
*
* @throws UnsupportedOperationException if the buffer has no internal NIO buffer
*/
ByteBuffer internalNioBuffer();
/**
* Returns the internal NIO buffer array that is reused for I/O.
*
* @throws UnsupportedOperationException if the buffer has no internal NIO buffer array
*/
ByteBuffer[] internalNioBuffers();
/**
* Similar to {@link ByteBuf#discardReadBytes()} except that this method might discard
* some, all, or none of read bytes depending on its internal implementation to reduce
* overall memory bandwidth consumption at the cost of potentially additional memory
* consumption.
*/
void discardSomeReadBytes();
/**
* Suspends the intermediary deallocation of the internal memory block of this buffer until asked via
* {@link #resumeIntermediaryDeallocations()}. An intermediary deallocation is usually made when the capacity of
* a buffer changes.
*/
void suspendIntermediaryDeallocations();
/**
* Resumes the intermediary deallocation of the internal memory block of this buffer, suspended by
* {@link #suspendIntermediaryDeallocations()}.
*/
void resumeIntermediaryDeallocations();
}
}

View File

@ -20,4 +20,25 @@ public interface ChannelBuf {
* The ChannelBufType which will be handled by the ChannelBuf implementation
*/
ChannelBufType type();
/**
* Provides access to potentially unsafe operations of this buffer.
*/
Unsafe unsafe();
/**
* Provides the potentially unsafe operations of {@link ByteBuf}.
*/
interface Unsafe {
/**
* Returns {@code true} if and only if this buffer has been deallocated by {@link #free()}.
*/
boolean isFreed();
/**
* Deallocates the internal memory block of this buffer or returns it to the {@link ByteBufAllocator} it came
* from. The result of accessing a released buffer is unspecified and can even cause JVM crash.
*/
void free();
}
}

View File

@ -15,6 +15,7 @@
*/
package io.netty.buffer;
import io.netty.buffer.ByteBuf.Unsafe;
import io.netty.util.internal.DetectionUtil;
import java.io.IOException;
@ -39,7 +40,7 @@ import java.util.Queue;
* is recommended to use {@link Unpooled#wrappedBuffer(ByteBuf...)}
* instead of calling the constructor explicitly.
*/
public class DefaultCompositeByteBuf extends AbstractByteBuf implements CompositeByteBuf {
final class DefaultCompositeByteBuf extends AbstractByteBuf implements CompositeByteBuf, Unsafe {
private static final ByteBuffer[] EMPTY_NIOBUFFERS = new ByteBuffer[0];
@ -50,7 +51,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
private Component lastAccessed;
private int lastAccessedId;
private boolean freed;
private Queue<UnsafeByteBuf> suspendedDeallocations;
private Queue<ByteBuf> suspendedDeallocations;
public DefaultCompositeByteBuf(ByteBufAllocator alloc, int maxNumComponents) {
super(Integer.MAX_VALUE);
@ -265,7 +266,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
// noinspection ForLoopReplaceableByForEach
for (int i = 0; i < numComponents; i ++) {
Component c = components.get(i);
UnsafeByteBuf b = c.buf;
ByteBuf b = c.buf;
consolidated.writeBytes(b);
c.freeIfNecessary();
}
@ -1136,10 +1137,10 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
for (int i = 0; i < numComponents; i ++) {
Component c = components.get(i);
UnsafeByteBuf b = c.buf;
ByteBuf b = c.buf;
consolidated.writeBytes(b);
c.freeIfNecessary();
}
}
components.clear();
components.add(new Component(consolidated, true));
@ -1260,14 +1261,14 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
}
private final class Component {
final UnsafeByteBuf buf;
final ByteBuf buf;
final int length;
final boolean allocatedBySelf;
int offset;
int endOffset;
Component(ByteBuf buf, boolean allocatedBySelf) {
this.buf = (UnsafeByteBuf) buf;
this.buf = buf;
length = buf.readableBytes();
this.allocatedBySelf = allocatedBySelf;
}
@ -1278,13 +1279,13 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
}
// Unwrap so that we can free slices, too.
UnsafeByteBuf buf;
for (buf = this.buf; buf.unwrap() != null; buf = (UnsafeByteBuf) buf.unwrap()) {
ByteBuf buf;
for (buf = this.buf; buf.unwrap() != null; buf = buf.unwrap()) {
continue;
}
if (suspendedDeallocations == null) {
buf.free(); // We should not get a NPE here. If so, it must be a bug.
buf.unsafe().free(); // We should not get a NPE here. If so, it must be a bug.
} else {
suspendedDeallocations.add(buf);
}
@ -1519,7 +1520,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
@Override
public ByteBuffer internalNioBuffer() {
if (components.size() == 1) {
return components.get(0).buf.internalNioBuffer();
return components.get(0).buf.unsafe().internalNioBuffer();
}
throw new UnsupportedOperationException();
}
@ -1529,7 +1530,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
ByteBuffer[] nioBuffers = new ByteBuffer[components.size()];
int index = 0;
for (Component component : components) {
nioBuffers[index++] = component.buf.internalNioBuffer();
nioBuffers[index++] = component.buf.unsafe().internalNioBuffer();
}
return nioBuffers;
}
@ -1560,7 +1561,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
@Override
public void suspendIntermediaryDeallocations() {
if (suspendedDeallocations == null) {
suspendedDeallocations = new ArrayDeque<UnsafeByteBuf>(2);
suspendedDeallocations = new ArrayDeque<ByteBuf>(2);
}
}
@ -1570,11 +1571,11 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
return;
}
Queue<UnsafeByteBuf> suspendedDeallocations = this.suspendedDeallocations;
Queue<ByteBuf> suspendedDeallocations = this.suspendedDeallocations;
this.suspendedDeallocations = null;
for (UnsafeByteBuf buf: suspendedDeallocations) {
buf.free();
for (ByteBuf buf: suspendedDeallocations) {
buf.unsafe().free();
}
}
@ -1582,4 +1583,9 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
public ByteBuf unwrap() {
return null;
}
@Override
public Unsafe unsafe() {
return this;
}
}

View File

@ -15,20 +15,20 @@
*/
package io.netty.buffer;
import io.netty.buffer.ChannelBuf.Unsafe;
import java.util.ArrayDeque;
import java.util.Collection;
public class DefaultMessageBuf<T> extends ArrayDeque<T> implements MessageBuf<T> {
final class DefaultMessageBuf<T> extends ArrayDeque<T> implements MessageBuf<T>, Unsafe {
private static final long serialVersionUID = 1229808623624907552L;
public DefaultMessageBuf() { }
private boolean freed;
public DefaultMessageBuf(Collection<? extends T> c) {
super(c);
}
DefaultMessageBuf() { }
public DefaultMessageBuf(int initialCapacity) {
DefaultMessageBuf(int initialCapacity) {
super(initialCapacity);
}
@ -64,4 +64,19 @@ public class DefaultMessageBuf<T> extends ArrayDeque<T> implements MessageBuf<T>
}
return cnt;
}
@Override
public Unsafe unsafe() {
return this;
}
@Override
public boolean isFreed() {
return freed;
}
@Override
public void free() {
freed = true;
}
}

View File

@ -15,6 +15,8 @@
*/
package io.netty.buffer;
import io.netty.buffer.ByteBuf.Unsafe;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -29,11 +31,11 @@ import java.nio.channels.ScatteringByteChannel;
* parent. It is recommended to use {@link ByteBuf#duplicate()} instead
* of calling the constructor explicitly.
*/
public class DuplicatedByteBuf extends AbstractByteBuf {
final class DuplicatedByteBuf extends AbstractByteBuf implements Unsafe {
private final UnsafeByteBuf buffer;
private final ByteBuf buffer;
public DuplicatedByteBuf(UnsafeByteBuf buffer) {
public DuplicatedByteBuf(ByteBuf buffer) {
super(buffer.maxCapacity());
if (buffer instanceof DuplicatedByteBuf) {
@ -239,12 +241,12 @@ public class DuplicatedByteBuf extends AbstractByteBuf {
@Override
public ByteBuffer internalNioBuffer() {
return buffer.internalNioBuffer();
return buffer.unsafe().internalNioBuffer();
}
@Override
public ByteBuffer[] internalNioBuffers() {
return buffer.internalNioBuffers();
return buffer.unsafe().internalNioBuffers();
}
@Override
@ -254,7 +256,7 @@ public class DuplicatedByteBuf extends AbstractByteBuf {
@Override
public boolean isFreed() {
return buffer.isFreed();
return buffer.unsafe().isFreed();
}
@Override
@ -265,5 +267,10 @@ public class DuplicatedByteBuf extends AbstractByteBuf {
@Override
public void resumeIntermediaryDeallocations() { }
@Override
public Unsafe unsafe() {
return this;
}
}

View File

@ -15,15 +15,18 @@
*/
package io.netty.buffer;
import io.netty.buffer.ChannelBuf.Unsafe;
import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;
public class QueueBackedMessageBuf<T> implements MessageBuf<T> {
final class QueueBackedMessageBuf<T> implements MessageBuf<T>, Unsafe {
private final Queue<T> queue;
private boolean freed;
public QueueBackedMessageBuf(Queue<T> queue) {
QueueBackedMessageBuf(Queue<T> queue) {
if (queue == null) {
throw new NullPointerException("queue");
}
@ -153,6 +156,21 @@ public class QueueBackedMessageBuf<T> implements MessageBuf<T> {
return cnt;
}
@Override
public Unsafe unsafe() {
return this;
}
@Override
public boolean isFreed() {
return freed;
}
@Override
public void free() {
freed = true;
}
@Override
public String toString() {
return queue.toString();

View File

@ -15,6 +15,8 @@
*/
package io.netty.buffer;
import io.netty.buffer.ByteBuf.Unsafe;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -29,11 +31,11 @@ import java.nio.channels.ScatteringByteChannel;
* recommended to use {@link Unpooled#unmodifiableBuffer(ByteBuf)}
* instead of calling the constructor explicitly.
*/
public class ReadOnlyByteBuf extends AbstractByteBuf {
final class ReadOnlyByteBuf extends AbstractByteBuf implements Unsafe {
private final UnsafeByteBuf buffer;
private final ByteBuf buffer;
public ReadOnlyByteBuf(UnsafeByteBuf buffer) {
public ReadOnlyByteBuf(ByteBuf buffer) {
super(buffer.maxCapacity());
if (buffer instanceof ReadOnlyByteBuf) {
@ -125,14 +127,12 @@ public class ReadOnlyByteBuf extends AbstractByteBuf {
}
@Override
public int setBytes(int index, InputStream in, int length)
throws IOException {
public int setBytes(int index, InputStream in, int length) {
throw new ReadOnlyBufferException();
}
@Override
public int setBytes(int index, ScatteringByteChannel in, int length)
throws IOException {
public int setBytes(int index, ScatteringByteChannel in, int length) {
throw new ReadOnlyBufferException();
}
@ -179,7 +179,7 @@ public class ReadOnlyByteBuf extends AbstractByteBuf {
@Override
public ByteBuf slice(int index, int length) {
return new ReadOnlyByteBuf((UnsafeByteBuf) buffer.slice(index, length));
return new ReadOnlyByteBuf(buffer.slice(index, length));
}
@Override
@ -239,12 +239,12 @@ public class ReadOnlyByteBuf extends AbstractByteBuf {
@Override
public ByteBuffer internalNioBuffer() {
return buffer.internalNioBuffer();
return buffer.unsafe().internalNioBuffer();
}
@Override
public ByteBuffer[] internalNioBuffers() {
return buffer.internalNioBuffers();
return buffer.unsafe().internalNioBuffers();
}
@Override
@ -254,7 +254,7 @@ public class ReadOnlyByteBuf extends AbstractByteBuf {
@Override
public boolean isFreed() {
return buffer.isFreed();
return buffer.unsafe().isFreed();
}
@Override
@ -265,4 +265,9 @@ public class ReadOnlyByteBuf extends AbstractByteBuf {
@Override
public void resumeIntermediaryDeallocations() { }
@Override
public Unsafe unsafe() {
return this;
}
}

View File

@ -15,6 +15,8 @@
*/
package io.netty.buffer;
import io.netty.buffer.ByteBuf.Unsafe;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -30,13 +32,13 @@ import java.nio.channels.ScatteringByteChannel;
* {@link ByteBuf#slice(int, int)} instead of calling the constructor
* explicitly.
*/
public class SlicedByteBuf extends AbstractByteBuf {
final class SlicedByteBuf extends AbstractByteBuf implements Unsafe {
private final UnsafeByteBuf buffer;
private final ByteBuf buffer;
private final int adjustment;
private final int length;
public SlicedByteBuf(UnsafeByteBuf buffer, int index, int length) {
public SlicedByteBuf(ByteBuf buffer, int index, int length) {
super(length);
if (index < 0 || index > buffer.capacity()) {
throw new IndexOutOfBoundsException("Invalid index of " + index
@ -52,7 +54,7 @@ public class SlicedByteBuf extends AbstractByteBuf {
this.buffer = ((SlicedByteBuf) buffer).buffer;
adjustment = ((SlicedByteBuf) buffer).adjustment + index;
} else if (buffer instanceof DuplicatedByteBuf) {
this.buffer = (UnsafeByteBuf) buffer.unwrap();
this.buffer = buffer.unwrap();
adjustment = index;
} else {
this.buffer = buffer;
@ -326,7 +328,7 @@ public class SlicedByteBuf extends AbstractByteBuf {
@Override
public boolean isFreed() {
return buffer.isFreed();
return buffer.unsafe().isFreed();
}
@Override
@ -337,4 +339,9 @@ public class SlicedByteBuf extends AbstractByteBuf {
@Override
public void resumeIntermediaryDeallocations() { }
@Override
public Unsafe unsafe() {
return this;
}
}

View File

@ -24,12 +24,12 @@ import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.charset.Charset;
public class SwappedByteBuf implements UnsafeByteBuf {
public final class SwappedByteBuf implements ByteBuf {
private final UnsafeByteBuf buf;
private final ByteBuf buf;
private final ByteOrder order;
public SwappedByteBuf(UnsafeByteBuf buf) {
public SwappedByteBuf(ByteBuf buf) {
if (buf == null) {
throw new NullPointerException("buf");
}
@ -795,35 +795,7 @@ public class SwappedByteBuf implements UnsafeByteBuf {
}
@Override
public ByteBuffer internalNioBuffer() {
return buf.internalNioBuffer();
}
@Override
public ByteBuffer[] internalNioBuffers() {
return buf.internalNioBuffers();
}
@Override
public void discardSomeReadBytes() {
buf.discardSomeReadBytes();
}
@Override
public boolean isFreed() {
return buf.isFreed();
}
@Override
public void free() { }
@Override
public void suspendIntermediaryDeallocations() {
buf.suspendIntermediaryDeallocations();
}
@Override
public void resumeIntermediaryDeallocations() {
buf.resumeIntermediaryDeallocations();
public Unsafe unsafe() {
return buf.unsafe();
}
}

View File

@ -662,7 +662,7 @@ public final class Unpooled {
* {@code buffer}.
*/
public static ByteBuf unmodifiableBuffer(ByteBuf buffer) {
return new ReadOnlyByteBuf((UnsafeByteBuf) buffer);
return new ReadOnlyByteBuf(buffer);
}
/**

View File

@ -15,6 +15,7 @@
*/
package io.netty.buffer;
import io.netty.buffer.ByteBuf.Unsafe;
import sun.misc.Cleaner;
import java.io.IOException;
@ -35,7 +36,7 @@ import java.util.Queue;
* constructor explicitly.
*/
@SuppressWarnings("restriction")
public class UnpooledDirectByteBuf extends AbstractByteBuf {
final class UnpooledDirectByteBuf extends AbstractByteBuf implements Unsafe {
private static final Field CLEANER_FIELD;
@ -550,4 +551,9 @@ public class UnpooledDirectByteBuf extends AbstractByteBuf {
public ByteBuf unwrap() {
return null;
}
@Override
public Unsafe unsafe() {
return this;
}
}

View File

@ -15,6 +15,8 @@
*/
package io.netty.buffer;
import io.netty.buffer.ByteBuf.Unsafe;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -27,7 +29,7 @@ import java.nio.channels.ScatteringByteChannel;
/**
* Big endian Java heap buffer implementation.
*/
public class UnpooledHeapByteBuf extends AbstractByteBuf {
final class UnpooledHeapByteBuf extends AbstractByteBuf implements Unsafe {
private final ByteBufAllocator alloc;
private byte[] array;
@ -395,4 +397,9 @@ public class UnpooledHeapByteBuf extends AbstractByteBuf {
public ByteBuf unwrap() {
return null;
}
@Override
public Unsafe unsafe() {
return this;
}
}

View File

@ -1,75 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import java.nio.ByteBuffer;
public interface UnsafeByteBuf extends ByteBuf {
/**
* Returns the internal NIO buffer that is reused for I/O.
*
* @throws UnsupportedOperationException if the buffer has no internal NIO buffer
*/
ByteBuffer internalNioBuffer();
/**
* Returns the internal NIO buffer array that is reused for I/O.
*
* @throws UnsupportedOperationException if the buffer has no internal NIO buffer array
*/
ByteBuffer[] internalNioBuffers();
/**
* Similar to {@link ByteBuf#discardReadBytes()} except that this method might discard
* some, all, or none of read bytes depending on its internal implementation to reduce
* overall memory bandwidth consumption at the cost of potentially additional memory
* consumption.
*/
void discardSomeReadBytes();
/**
* Returns {@code true} if and only if this buffer has been deallocated by {@link #free()}.
*/
boolean isFreed();
/**
* Deallocates the internal memory block of this buffer or returns it to the {@link ByteBufAllocator} it came
* from. The result of accessing a released buffer is unspecified and can even cause JVM crash.
*/
void free();
/**
* Suspends the intermediary deallocation of the internal memory block of this buffer until asked via
* {@link #resumeIntermediaryDeallocations()}. An intermediary deallocation is usually made when the capacity of
* a buffer changes.
*/
void suspendIntermediaryDeallocations();
/**
* Resumes the intermediary deallocation of the internal memory block of this buffer, suspended by
* {@link #suspendIntermediaryDeallocations()}.
*/
void resumeIntermediaryDeallocations();
/**
* Return the underlying buffer instance if this buffer is a wrapper.
*
* @return {@code null} if this buffer is not a wrapper
*/
ByteBuf unwrap();
}

View File

@ -28,7 +28,7 @@ public class DuplicateChannelBufferTest extends AbstractChannelBufferTest {
@Override
protected ByteBuf newBuffer(int length) {
buffer = new DuplicatedByteBuf((UnsafeByteBuf) Unpooled.buffer(length));
buffer = new DuplicatedByteBuf(Unpooled.buffer(length));
assertEquals(0, buffer.writerIndex());
return buffer;
}

View File

@ -46,7 +46,7 @@ public class ReadOnlyChannelBufferTest {
@Test
public void testUnwrap() {
ByteBuf buf = buffer(1);
assertSame(buf, ((UnsafeByteBuf) Unpooled.unmodifiableBuffer(buf)).unwrap());
assertSame(buf, Unpooled.unmodifiableBuffer(buf).unwrap());
}
@Test
@ -74,7 +74,7 @@ public class ReadOnlyChannelBufferTest {
@Test
public void shouldForwardReadCallsBlindly() throws Exception {
ByteBuf buf = createStrictMock(UnsafeByteBuf.class);
ByteBuf buf = createStrictMock(ByteBuf.class);
expect(buf.order()).andReturn(BIG_ENDIAN).anyTimes();
expect(buf.maxCapacity()).andReturn(65536).anyTimes();
expect(buf.readerIndex()).andReturn(0).anyTimes();

View File

@ -16,7 +16,6 @@
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnsafeByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
@ -71,7 +70,7 @@ public abstract class ByteToByteDecoder extends ChannelInboundByteHandlerAdapter
}
}
((UnsafeByteBuf) in).discardSomeReadBytes();
in.unsafe().discardSomeReadBytes();
if (out.readableBytes() > oldOutSize) {
ctx.fireInboundBufferUpdated();
}

View File

@ -16,7 +16,6 @@
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnsafeByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundByteHandlerAdapter;
@ -44,7 +43,7 @@ public abstract class ByteToByteEncoder extends ChannelOutboundByteHandlerAdapte
}
}
((UnsafeByteBuf) in).discardSomeReadBytes();
in.unsafe().discardSomeReadBytes();
ctx.flush(future);
}

View File

@ -16,8 +16,6 @@
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBuf;
import io.netty.buffer.UnsafeByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerUtil;
import io.netty.channel.ChannelInboundByteHandler;
@ -40,11 +38,6 @@ public abstract class ByteToMessageDecoder<O>
return ctx.alloc().buffer();
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, ChannelBuf buf) throws Exception {
((UnsafeByteBuf) buf).free();
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
callDecode(ctx);
@ -98,7 +91,7 @@ public abstract class ByteToMessageDecoder<O>
break;
}
} catch (Throwable t) {
((UnsafeByteBuf) in).discardSomeReadBytes();
in.unsafe().discardSomeReadBytes();
if (decoded) {
decoded = false;
@ -113,7 +106,7 @@ public abstract class ByteToMessageDecoder<O>
}
}
((UnsafeByteBuf) in).discardSomeReadBytes();
in.unsafe().discardSomeReadBytes();
if (decoded) {
ctx.fireInboundBufferUpdated();

View File

@ -15,7 +15,6 @@
*/
package io.netty.handler.codec;
import io.netty.buffer.ChannelBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
@ -37,11 +36,6 @@ public abstract class MessageToMessageDecoder<I, O>
return Unpooled.messageBuffer();
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, ChannelBuf buf) throws Exception {
// Nothing to free
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx)
throws Exception {

View File

@ -16,7 +16,6 @@
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnsafeByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
@ -459,7 +458,7 @@ public abstract class ReplayingDecoder<O, S> extends ByteToMessageDecoder<O> {
private void fireInboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
final int oldReaderIndex = in.readerIndex();
((UnsafeByteBuf) in).discardSomeReadBytes();
in.unsafe().discardSomeReadBytes();
final int newReaderIndex = in.readerIndex();
checkpoint -= oldReaderIndex - newReaderIndex;
ctx.fireInboundBufferUpdated();

View File

@ -16,15 +16,14 @@
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBuf.Unsafe;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufIndexFinder;
import io.netty.buffer.ChannelBufType;
import io.netty.buffer.SwappedByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnsafeByteBuf;
import io.netty.util.internal.Signal;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@ -33,7 +32,7 @@ import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.charset.Charset;
class ReplayingDecoderBuffer implements UnsafeByteBuf {
final class ReplayingDecoderBuffer implements ByteBuf, Unsafe {
private static final Signal REPLAY = ReplayingDecoder.REPLAY;
@ -206,14 +205,12 @@ class ReplayingDecoderBuffer implements UnsafeByteBuf {
}
@Override
public int getBytes(int index, GatheringByteChannel out, int length)
throws IOException {
public int getBytes(int index, GatheringByteChannel out, int length) {
throw new UnreplayableOperationException();
}
@Override
public ByteBuf getBytes(int index, OutputStream out, int length)
throws IOException {
public ByteBuf getBytes(int index, OutputStream out, int length) {
throw new UnreplayableOperationException();
}
@ -454,8 +451,7 @@ class ReplayingDecoderBuffer implements UnsafeByteBuf {
}
@Override
public int readBytes(GatheringByteChannel out, int length)
throws IOException {
public int readBytes(GatheringByteChannel out, int length) {
throw new UnreplayableOperationException();
}
@ -472,7 +468,7 @@ class ReplayingDecoderBuffer implements UnsafeByteBuf {
}
@Override
public ByteBuf readBytes(OutputStream out, int length) throws IOException {
public ByteBuf readBytes(OutputStream out, int length) {
throw new UnreplayableOperationException();
}
@ -599,8 +595,7 @@ class ReplayingDecoderBuffer implements UnsafeByteBuf {
}
@Override
public int setBytes(int index, InputStream in, int length)
throws IOException {
public int setBytes(int index, InputStream in, int length) {
throw new UnreplayableOperationException();
}
@ -610,8 +605,7 @@ class ReplayingDecoderBuffer implements UnsafeByteBuf {
}
@Override
public int setBytes(int index, ScatteringByteChannel in, int length)
throws IOException {
public int setBytes(int index, ScatteringByteChannel in, int length) {
throw new UnreplayableOperationException();
}
@ -778,13 +772,12 @@ class ReplayingDecoderBuffer implements UnsafeByteBuf {
}
@Override
public int writeBytes(InputStream in, int length) throws IOException {
public int writeBytes(InputStream in, int length) {
throw new UnreplayableOperationException();
}
@Override
public int writeBytes(ScatteringByteChannel in, int length)
throws IOException {
public int writeBytes(ScatteringByteChannel in, int length) {
throw new UnreplayableOperationException();
}
@ -867,7 +860,7 @@ class ReplayingDecoderBuffer implements UnsafeByteBuf {
@Override
public boolean isFreed() {
return ((UnsafeByteBuf) buffer).isFreed();
return buffer.unsafe().isFreed();
}
@Override
@ -889,4 +882,9 @@ class ReplayingDecoderBuffer implements UnsafeByteBuf {
public ByteBuf unwrap() {
throw new UnreplayableOperationException();
}
@Override
public Unsafe unsafe() {
return this;
}
}

View File

@ -17,7 +17,6 @@ package io.netty.handler.codec.serialization;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.UnsafeByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.Attribute;
@ -98,7 +97,7 @@ public class CompatibleObjectEncoder extends MessageToByteEncoder<Object> {
oos.reset();
// Also discard the byproduct to avoid OOM on the sending side.
((UnsafeByteBuf) out).discardSomeReadBytes();
out.unsafe().discardSomeReadBytes();
}
}

View File

@ -17,7 +17,6 @@ package io.netty.handler.logging;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBuf;
import io.netty.buffer.UnsafeByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandler;
@ -119,12 +118,12 @@ public class ByteLoggingHandler
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, ChannelBuf buf) throws Exception {
((UnsafeByteBuf) buf).free();
buf.unsafe().free();
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, ChannelBuf buf) throws Exception {
((UnsafeByteBuf) buf).free();
buf.unsafe().free();
}
@Override

View File

@ -18,7 +18,6 @@ package io.netty.handler.ssl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.ChannelBuf;
import io.netty.buffer.UnsafeByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFlushFutureNotifier;
import io.netty.channel.ChannelFuture;
@ -385,12 +384,12 @@ public class SslHandler
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, ChannelBuf buf) throws Exception {
((UnsafeByteBuf) buf).free();
buf.unsafe().free();
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, ChannelBuf buf) throws Exception {
((UnsafeByteBuf) buf).free();
buf.unsafe().free();
}
@Override
@ -410,7 +409,7 @@ public class SslHandler
final ByteBuf in = ctx.outboundByteBuffer();
final ByteBuf out = ctx.nextOutboundByteBuffer();
((UnsafeByteBuf) out).discardSomeReadBytes();
out.unsafe().discardSomeReadBytes();
// Do not encrypt the first write request if this handler is
// created with startTLS flag turned on.
@ -482,7 +481,7 @@ public class SslHandler
setHandshakeFailure(e);
throw e;
} finally {
((UnsafeByteBuf) in).discardSomeReadBytes();
in.unsafe().discardSomeReadBytes();
flush0(ctx, bytesConsumed);
}
}

View File

@ -15,7 +15,6 @@
*/
package io.netty.bootstrap;
import io.netty.buffer.ChannelBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
@ -231,11 +230,6 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap> {
return Unpooled.messageBuffer();
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, ChannelBuf buf) throws Exception {
// Nothing to free
}
@SuppressWarnings("unchecked")
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) {

View File

@ -16,8 +16,6 @@
package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBuf;
import io.netty.buffer.UnsafeByteBuf;
/**
@ -38,11 +36,6 @@ public abstract class ChannelInboundByteHandlerAdapter
return ctx.alloc().buffer();
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, ChannelBuf buf) throws Exception {
((UnsafeByteBuf) buf).free();
}
@Override
public final void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
ByteBuf in = ctx.inboundByteBuffer();
@ -50,7 +43,7 @@ public abstract class ChannelInboundByteHandlerAdapter
inboundBufferUpdated(ctx, in);
} finally {
if (!in.readable()) {
((UnsafeByteBuf) in).discardSomeReadBytes();
in.unsafe().discardSomeReadBytes();
}
}
}

View File

@ -16,9 +16,16 @@
package io.netty.channel;
import io.netty.buffer.ChannelBuf;
public abstract class ChannelInboundHandlerAdapter
extends ChannelStateHandlerAdapter implements ChannelInboundHandler {
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, ChannelBuf buf) throws Exception {
buf.unsafe().free();
}
/**
* Does nothing by default. Sub-classes may override this if needed.
*/

View File

@ -15,7 +15,6 @@
*/
package io.netty.channel;
import io.netty.buffer.ChannelBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
@ -33,11 +32,6 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
return Unpooled.messageBuffer();
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, ChannelBuf buf) throws Exception {
// Nothing to free
}
@SuppressWarnings("unchecked")
@Override
public final void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {

View File

@ -16,8 +16,6 @@
package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBuf;
import io.netty.buffer.UnsafeByteBuf;
public abstract class ChannelOutboundByteHandlerAdapter
extends ChannelOutboundHandlerAdapter implements ChannelOutboundByteHandler {
@ -25,9 +23,4 @@ public abstract class ChannelOutboundByteHandlerAdapter
public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ctx.alloc().buffer();
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, ChannelBuf buf) throws Exception {
((UnsafeByteBuf) buf).free();
}
}

View File

@ -15,8 +15,15 @@
*/
package io.netty.channel;
import io.netty.buffer.ChannelBuf;
public abstract class ChannelOutboundHandlerAdapter
extends ChannelOperationHandlerAdapter implements ChannelOutboundHandler {
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, ChannelBuf buf) throws Exception {
buf.unsafe().free();
}
@Override
public abstract void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception;
}

View File

@ -15,7 +15,6 @@
*/
package io.netty.channel;
import io.netty.buffer.ChannelBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
@ -25,9 +24,4 @@ public abstract class ChannelOutboundMessageHandlerAdapter<I>
public MessageBuf<I> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return Unpooled.messageBuffer();
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, ChannelBuf buf) throws Exception {
// Nothing to free
}
}

View File

@ -20,7 +20,6 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ChannelBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnsafeByteBuf;
import io.netty.util.DefaultAttributeMap;
import java.net.SocketAddress;
@ -60,9 +59,9 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
EventExecutor executor; // not thread-safe but OK because it never changes once set.
private MessageBuf<Object> inMsgBuf;
private UnsafeByteBuf inByteBuf;
private ByteBuf inByteBuf;
private MessageBuf<Object> outMsgBuf;
private UnsafeByteBuf outByteBuf;
private ByteBuf outByteBuf;
// When the two handlers run in a different thread and they are next to each other,
// each other's buffers can be accessed at the same time resulting in a race condition.
@ -272,8 +271,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
throw new ChannelPipelineException("A user handler's newInboundBuffer() returned null");
}
if (buf instanceof UnsafeByteBuf) {
inByteBuf = (UnsafeByteBuf) buf;
if (buf instanceof ByteBuf) {
inByteBuf = (ByteBuf) buf;
inByteBridge = new AtomicReference<ByteBridge>();
inMsgBuf = null;
inMsgBridge = null;
@ -347,8 +346,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
throw new ChannelPipelineException("A user handler's newOutboundBuffer() returned null");
}
if (buf instanceof UnsafeByteBuf) {
outByteBuf = (UnsafeByteBuf) buf;
if (buf instanceof ByteBuf) {
outByteBuf = (ByteBuf) buf;
outByteBridge = new AtomicReference<ByteBridge>();
outMsgBuf = null;
outMsgBridge = null;
@ -695,7 +694,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
ByteBuf currentInboundByteBuf = inboundByteBuffer();
inByteBuf = (UnsafeByteBuf) newInboundByteBuf;
inByteBuf = newInboundByteBuf;
return currentInboundByteBuf;
}
@ -746,7 +745,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
ByteBuf currentOutboundByteBuf = outboundByteBuffer();
outByteBuf = (UnsafeByteBuf) newOutboundByteBuf;
outByteBuf = newOutboundByteBuf;
return currentOutboundByteBuf;
}
@ -1206,15 +1205,15 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
static final class ByteBridge {
final UnsafeByteBuf byteBuf;
final ByteBuf byteBuf;
private final Queue<UnsafeByteBuf> exchangeBuf = new ConcurrentLinkedQueue<UnsafeByteBuf>();
private final Queue<ByteBuf> exchangeBuf = new ConcurrentLinkedQueue<ByteBuf>();
private final ChannelHandlerContext ctx;
ByteBridge(ChannelHandlerContext ctx) {
this.ctx = ctx;
// TODO Choose whether to use heap or direct buffer depending on the context's buffer type.
byteBuf = (UnsafeByteBuf) ctx.alloc().buffer();
byteBuf = ctx.alloc().buffer();
}
private void fill() {
@ -1231,14 +1230,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
byteBuf.readBytes(data);
byteBuf.discardSomeReadBytes();
byteBuf.unsafe().discardSomeReadBytes();
exchangeBuf.add((UnsafeByteBuf) data);
exchangeBuf.add(data);
}
private void flush(ByteBuf out) {
for (;;) {
UnsafeByteBuf data = exchangeBuf.poll();
ByteBuf data = exchangeBuf.poll();
if (data == null) {
break;
}
@ -1246,7 +1245,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
try {
out.writeBytes(data);
} finally {
data.free();
data.unsafe().free();
}
}
}

View File

@ -19,7 +19,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnsafeByteBuf;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
@ -1471,9 +1470,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, ChannelBuf buf) {
if (buf instanceof UnsafeByteBuf) {
((UnsafeByteBuf) buf).free();
}
buf.unsafe().free();
}
@Override

View File

@ -19,7 +19,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnsafeByteBuf;
import io.netty.channel.AbstractChannel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException;
@ -212,11 +211,6 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel {
return lastInboundMessageBuffer;
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, ChannelBuf buf) throws Exception {
// Nothing to free
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
// Do nothing.
@ -241,11 +235,6 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel {
return lastInboundByteBuffer;
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, ChannelBuf buf) throws Exception {
((UnsafeByteBuf) buf).free();
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
// No nothing

View File

@ -17,7 +17,6 @@ package io.netty.channel.socket.aio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBufType;
import io.netty.buffer.UnsafeByteBuf;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFlushFutureNotifier;
import io.netty.channel.ChannelFuture;
@ -247,7 +246,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
try {
if (buf.readable()) {
for (;;) {
if (((UnsafeByteBuf) buf).isFreed()) {
if (buf.unsafe().isFreed()) {
break;
}
// Ensure the readerIndex of the buffer is 0 before beginning an async write.
@ -273,7 +272,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
if (asyncWriteInProgress) {
// JDK decided to write data (or notify handler) later.
((UnsafeByteBuf) buf).suspendIntermediaryDeallocations();
buf.unsafe().suspendIntermediaryDeallocations();
break;
}
@ -309,7 +308,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
try {
for (;;) {
ByteBuf byteBuf = pipeline().inboundByteBuffer();
if (((UnsafeByteBuf) byteBuf).isFreed()) {
if (byteBuf.unsafe().isFreed()) {
break;
}
@ -356,7 +355,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
channel.asyncWriteInProgress = false;
ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer();
((UnsafeByteBuf) buf).resumeIntermediaryDeallocations();
buf.unsafe().resumeIntermediaryDeallocations();
int writtenBytes = result.intValue();
if (writtenBytes > 0) {

View File

@ -20,7 +20,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnsafeByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
@ -413,7 +412,7 @@ public class LocalTransportThreadModelTest {
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, ChannelBuf buf) {
((UnsafeByteBuf) buf).free();
buf.unsafe().free();
}
@Override
@ -460,7 +459,7 @@ public class LocalTransportThreadModelTest {
out.add(msg);
}
}
((UnsafeByteBuf) in).discardSomeReadBytes();
in.unsafe().discardSomeReadBytes();
if (swallow) {
future.setSuccess();
} else {
@ -503,7 +502,7 @@ public class LocalTransportThreadModelTest {
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, ChannelBuf buf) throws Exception {
((UnsafeByteBuf) buf).free();
buf.unsafe().free();
}
@Override