From 5d01062da43a53d0f9634bdd73280626d0ba1c0b Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 13 Jun 2012 22:24:32 +0200 Subject: [PATCH] Revert as it should be in nio2 branch "Commit first round of classes to support nio2/async channel api. Still work in progress.. See #396" This reverts commit 18aaae3c2e7462853298c991c7f95040e47dae96. --- pom.xml | 5 - .../io/netty/channel/AbstractChannel.java | 30 +-- .../netty/channel/AbstractServerChannel.java | 2 +- .../channel/embedded/EmbeddedByteChannel.java | 3 +- .../socket/nio/AbstractNioByteChannel.java | 5 +- .../socket/nio2/AbstractAsyncChannel.java | 176 ------------- .../socket/nio2/AsyncServerSocketChannel.java | 120 --------- .../socket/nio2/AsyncSocketchannel.java | 246 ------------------ .../channel/socket/nio2/package-info.java | 21 -- .../socket/oio/AbstractOioByteChannel.java | 3 +- 10 files changed, 19 insertions(+), 592 deletions(-) delete mode 100755 transport/src/main/java/io/netty/channel/socket/nio2/AbstractAsyncChannel.java delete mode 100755 transport/src/main/java/io/netty/channel/socket/nio2/AsyncServerSocketChannel.java delete mode 100755 transport/src/main/java/io/netty/channel/socket/nio2/AsyncSocketchannel.java delete mode 100644 transport/src/main/java/io/netty/channel/socket/nio2/package-info.java diff --git a/pom.xml b/pom.xml index 910c1c72de..3ac5c5e345 100644 --- a/pom.xml +++ b/pom.xml @@ -281,11 +281,6 @@ java.nio.channels.MembershipKey java.net.StandardSocketOptions java.net.StandardProtocolFamily - - java.nio.channels.AsynchronousChannel - java.nio.channels.AsynchronousSocketChannel - java.nio.channels.AsynchronousServerSocketChannel - java.nio.channels.AsynchronousChannelGroup diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 98d7fb7aff..42890f3c8b 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -85,7 +85,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private ClosedChannelException closedChannelException; private final Deque flushCheckpoints = new ArrayDeque(); private long writeCounter; - protected boolean inFlushNow; + private boolean inFlushNow; private boolean flushNowPending; /** Cache for the string representation of this channel */ @@ -623,7 +623,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } @Override - public void flushNow() { + public final void flushNow() { if (inFlushNow) { return; } @@ -631,13 +631,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha inFlushNow = true; ChannelHandlerContext ctx = directOutboundContext(); Throwable cause = null; - boolean handleFlush = true; try { if (ctx.hasOutboundByteBuffer()) { ByteBuf out = ctx.outboundByteBuffer(); int oldSize = out.readableBytes(); try { - handleFlush = doFlushByteBuffer(out); + doFlushByteBuffer(out); } catch (Throwable t) { cause = t; } finally { @@ -658,15 +657,14 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha writeCounter += oldSize - out.size(); } } - if (handleFlush) { - if (cause == null) { - notifyFlushFutures(); - } else { - notifyFlushFutures(cause); - pipeline.fireExceptionCaught(cause); - if (cause instanceof IOException) { - close(voidFuture()); - } + + if (cause == null) { + notifyFlushFutures(); + } else { + notifyFlushFutures(cause); + pipeline.fireExceptionCaught(cause); + if (cause instanceof IOException) { + close(voidFuture()); } } } finally { @@ -715,7 +713,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha protected abstract void doClose() throws Exception; protected abstract void doDeregister() throws Exception; - protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception { + protected void doFlushByteBuffer(ByteBuf buf) throws Exception { throw new UnsupportedOperationException(); } protected void doFlushMessageBuffer(MessageBuf buf) throws Exception { @@ -724,7 +722,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha protected abstract boolean isFlushPending(); - protected final void notifyFlushFutures() { + private void notifyFlushFutures() { if (flushCheckpoints.isEmpty()) { return; } @@ -762,7 +760,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } - protected final void notifyFlushFutures(Throwable cause) { + private void notifyFlushFutures(Throwable cause) { notifyFlushFutures(); for (;;) { FlushCheckpoint cp = flushCheckpoints.poll(); diff --git a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java index 6ac92e60b3..0c42167eeb 100644 --- a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java @@ -77,7 +77,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S } @Override - protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception { + protected void doFlushByteBuffer(ByteBuf buf) throws Exception { throw new UnsupportedOperationException(); } diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedByteChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedByteChannel.java index 8c1f7e3499..07e8ac5313 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedByteChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedByteChannel.java @@ -71,11 +71,10 @@ public class EmbeddedByteChannel extends AbstractEmbeddedChannel { } @Override - protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception { + protected void doFlushByteBuffer(ByteBuf buf) throws Exception { if (!lastOutboundBuffer().readable()) { lastOutboundBuffer().discardReadBytes(); } lastOutboundBuffer().writeBytes(buf); - return true; } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java index 99489e3ae7..fded47226c 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java @@ -85,11 +85,11 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel { } @Override - protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception { + protected void doFlushByteBuffer(ByteBuf buf) throws Exception { if (!buf.readable()) { // Reset reader/writerIndex to 0 if the buffer is empty. buf.clear(); - return true; + return; } for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { @@ -103,7 +103,6 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel { break; } } - return true; } protected abstract int doReadBytes(ByteBuf buf) throws Exception; diff --git a/transport/src/main/java/io/netty/channel/socket/nio2/AbstractAsyncChannel.java b/transport/src/main/java/io/netty/channel/socket/nio2/AbstractAsyncChannel.java deleted file mode 100755 index f2371239b1..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio2/AbstractAsyncChannel.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio2; - -import io.netty.channel.AbstractChannel; -import io.netty.channel.Channel; -import io.netty.channel.ChannelConfig; -import io.netty.channel.ChannelFuture; -import io.netty.channel.EventLoop; - -import java.net.ConnectException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.AsynchronousChannel; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -public abstract class AbstractAsyncChannel extends AbstractChannel { - - protected volatile AsynchronousChannel ch; - - /** - * The future of the current connection attempt. If not null, subsequent - * connection attempts will fail. - */ - protected ChannelFuture connectFuture; - protected ScheduledFuture connectTimeoutFuture; - private ConnectException connectTimeoutException; - - protected AbstractAsyncChannel(Channel parent, Integer id) { - super(parent, id); - } - - - @Override - public InetSocketAddress localAddress() { - if (ch == null) { - return null; - } - return (InetSocketAddress) super.localAddress(); - } - - @Override - public InetSocketAddress remoteAddress() { - if (ch == null) { - return null; - } - return (InetSocketAddress) super.remoteAddress(); - } - - protected AsynchronousChannel javaChannel() { - return ch; - } - - @Override - public ChannelConfig config() { - // TODO: Fix me - return null; - } - - @Override - public boolean isOpen() { - return ch == null || ch.isOpen(); - } - - @Override - protected boolean isCompatible(EventLoop loop) { - // TODO: Fix me - return true; - } - - - @Override - protected void doDeregister() throws Exception { - throw new UnsupportedOperationException("Deregistration is not supported by AbstractAsyncChannel"); - } - - @Override - protected AsyncUnsafe newUnsafe() { - return new AsyncUnsafe(); - } - - protected class AsyncUnsafe extends AbstractUnsafe { - - @Override - public void connect(final SocketAddress remoteAddress, - final SocketAddress localAddress, final ChannelFuture future) { - if (eventLoop().inEventLoop()) { - if (!ensureOpen(future)) { - return; - } - - try { - if (connectFuture != null) { - throw new IllegalStateException("connection attempt already made"); - } - connectFuture = future; - - doConnect(remoteAddress, localAddress, future); - - // Schedule connect timeout. - int connectTimeoutMillis = config().getConnectTimeoutMillis(); - if (connectTimeoutMillis > 0) { - connectTimeoutFuture = eventLoop().schedule(new Runnable() { - @Override - public void run() { - if (connectTimeoutException == null) { - connectTimeoutException = new ConnectException("connection timed out"); - } - ChannelFuture connectFuture = AbstractAsyncChannel.this.connectFuture; - if (connectFuture != null && - connectFuture.setFailure(connectTimeoutException)) { - pipeline().fireExceptionCaught(connectTimeoutException); - close(voidFuture()); - } - } - }, connectTimeoutMillis, TimeUnit.MILLISECONDS); - } - - } catch (Throwable t) { - future.setFailure(t); - pipeline().fireExceptionCaught(t); - closeIfClosed(); - } - } else { - eventLoop().execute(new Runnable() { - @Override - public void run() { - connect(remoteAddress, localAddress, future); - } - }); - } - } - - protected final void connectFailed(Throwable t) { - connectFuture.setFailure(t); - pipeline().fireExceptionCaught(t); - closeIfClosed(); - } - - protected final void connectSuccess() { - assert eventLoop().inEventLoop(); - assert connectFuture != null; - try { - boolean wasActive = isActive(); - connectFuture.setSuccess(); - if (!wasActive && isActive()) { - pipeline().fireChannelActive(); - } - } catch (Throwable t) { - connectFuture.setFailure(t); - pipeline().fireExceptionCaught(t); - closeIfClosed(); - } finally { - connectTimeoutFuture.cancel(false); - connectFuture = null; - } - } - } - protected abstract void doConnect(SocketAddress remoteAddress, - SocketAddress localAddress, ChannelFuture connectFuture); - -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio2/AsyncServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio2/AsyncServerSocketChannel.java deleted file mode 100755 index 816d7f6be6..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio2/AsyncServerSocketChannel.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio2; - -import io.netty.buffer.ChannelBufType; -import io.netty.channel.ChannelException; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ServerChannel; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; - -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.channels.AsynchronousChannelGroup; -import java.nio.channels.AsynchronousServerSocketChannel; -import java.nio.channels.AsynchronousSocketChannel; -import java.nio.channels.CompletionHandler; - -public class AsyncServerSocketChannel extends AbstractAsyncChannel implements ServerChannel { - - private static final AcceptHandler ACCEPT_HANDLER = new AcceptHandler(); - private static final InternalLogger logger = - InternalLoggerFactory.getInstance(AsyncServerSocketChannel.class); - - public AsyncServerSocketChannel() { - super(null, null); - } - - - @Override - protected AsynchronousServerSocketChannel javaChannel() { - return (AsynchronousServerSocketChannel) super.javaChannel(); - } - - @Override - public boolean isActive() { - return localAddress0() != null; - } - - @Override - public ChannelBufType bufferType() { - return ChannelBufType.MESSAGE; - } - - @Override - protected SocketAddress localAddress0() { - try { - return javaChannel().getLocalAddress(); - } catch (IOException e) { - throw new ChannelException(e); - } - } - - @Override - protected SocketAddress remoteAddress0() { - return null; - } - - @Override - protected void doBind(SocketAddress localAddress) throws Exception { - javaChannel().bind(localAddress); - } - - @Override - protected void doClose() throws Exception { - javaChannel().close(); - } - - @Override - protected boolean isFlushPending() { - return false; - } - - @Override - protected void doConnect( - SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) { - future.setFailure(new UnsupportedOperationException()); - } - - @Override - protected void doDisconnect() throws Exception { - throw new UnsupportedOperationException(); - } - - @Override - protected Runnable doRegister() throws Exception { - ch = AsynchronousServerSocketChannel.open(AsynchronousChannelGroup.withThreadPool(eventLoop())); - javaChannel().accept(this, ACCEPT_HANDLER); - return null; - } - - private static final class AcceptHandler - implements CompletionHandler { - public void completed(AsynchronousSocketChannel ch, AsyncServerSocketChannel channel) { - // register again this handler to accept new connections - channel.javaChannel().accept(channel, this); - - // create the socket add it to the buffer and fire the event - channel.outboundMessageBuffer().add(new AsyncSocketchannel(channel, null)); - channel.pipeline().fireInboundBufferUpdated(); - } - - public void failed(Throwable t, AsyncServerSocketChannel channel) { - logger.warn("Failed to create a new channel from an accepted socket.", t); - } - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio2/AsyncSocketchannel.java b/transport/src/main/java/io/netty/channel/socket/nio2/AsyncSocketchannel.java deleted file mode 100755 index 4c8f63ea90..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio2/AsyncSocketchannel.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio2; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ChannelBufType; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPipeline; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.AsynchronousChannelGroup; -import java.nio.channels.AsynchronousSocketChannel; -import java.nio.channels.CompletionHandler; -import java.util.concurrent.atomic.AtomicBoolean; - -public class AsyncSocketchannel extends AbstractAsyncChannel { - - private static final CompletionHandler CONNECT_HANDLER = new ConnectHandler(); - private static final CompletionHandler READ_HANDLER = new ReadHandler(); - private static final CompletionHandler WRITE_HANDLER = new WriteHandler(); - - private final AtomicBoolean flushing = new AtomicBoolean(false); - - public AsyncSocketchannel() { - this(null, null); - } - - public AsyncSocketchannel(AsyncServerSocketChannel parent, Integer id) { - super(parent, id); - } - - @Override - public boolean isActive() { - AsynchronousSocketChannel ch = javaChannel(); - return ch.isOpen(); - } - - @Override - protected AsynchronousSocketChannel javaChannel() { - return (AsynchronousSocketChannel) super.javaChannel(); - } - - @Override - public ChannelBufType bufferType() { - return ChannelBufType.BYTE; - } - - @Override - protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, final ChannelFuture future) { - assert ch != null; - if (localAddress != null) { - try { - javaChannel().bind(localAddress); - } catch (IOException e) { - future.setFailure(e); - return; - } - } - - javaChannel().connect(remoteAddress, this, CONNECT_HANDLER); - } - - @Override - protected InetSocketAddress localAddress0() { - try { - return (InetSocketAddress) javaChannel().getLocalAddress(); - } catch (IOException e) { - return null; - } - } - - @Override - protected InetSocketAddress remoteAddress0() { - try { - return (InetSocketAddress) javaChannel().getRemoteAddress(); - } catch (IOException e) { - return null; - } - } - - @Override - protected Runnable doRegister() throws Exception { - assert ch == null; - ch = AsynchronousSocketChannel.open(AsynchronousChannelGroup.withThreadPool(eventLoop())); - return null; - } - - private void read() { - javaChannel().read(pipeline().inboundByteBuffer().nioBuffer(), this, READ_HANDLER); - } - - @Override - protected void doBind(SocketAddress localAddress) throws Exception { - javaChannel().bind(localAddress); - } - - @Override - protected void doDisconnect() throws Exception { - doClose(); - } - - @Override - protected void doClose() throws Exception { - javaChannel().close(); - } - - @Override - protected boolean isFlushPending() { - // TODO: Fix me - return true; - } - - protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception { - if (flushing.compareAndSet(false, true)) { - javaChannel().write(buf.nioBuffer(), this, WRITE_HANDLER); - } - return false; - } - - private static boolean expandReadBuffer(ByteBuf byteBuf) { - if (!byteBuf.writable()) { - // FIXME: Magic number - byteBuf.ensureWritableBytes(4096); - return true; - } - return false; - } - - private static final class WriteHandler implements CompletionHandler { - - @Override - public void completed(Integer result, AsyncSocketchannel channel) { - ByteBuf buf = channel.pipeline().outboundByteBuffer(); - if (!buf.readable()) { - buf.discardReadBytes(); - } - - if (result > 0) { - channel.notifyFlushFutures(); - } - channel.flushing.set(false); - } - - @Override - public void failed(Throwable cause, AsyncSocketchannel channel) { - ByteBuf buf = channel.pipeline().outboundByteBuffer(); - if (!buf.readable()) { - buf.discardReadBytes(); - } - - channel.notifyFlushFutures(cause); - channel.pipeline().fireExceptionCaught(cause); - if (cause instanceof IOException) { - channel.close(channel.unsafe().voidFuture()); - } - channel.flushing.set(false); - } - } - - private static final class ReadHandler implements CompletionHandler { - - @Override - public void completed(Integer result, AsyncSocketchannel channel) { - assert channel.eventLoop().inEventLoop(); - - final ChannelPipeline pipeline = channel.pipeline(); - final ByteBuf byteBuf = pipeline.inboundByteBuffer(); - boolean closed = false; - boolean read = false; - try { - expandReadBuffer(byteBuf); - for (;;) { - int localReadAmount = result.intValue(); - if (localReadAmount > 0) { - read = true; - } else if (localReadAmount < 0) { - closed = true; - break; - } - if (!expandReadBuffer(byteBuf)) { - break; - } - } - } catch (Throwable t) { - if (read) { - read = false; - pipeline.fireInboundBufferUpdated(); - } - pipeline.fireExceptionCaught(t); - if (t instanceof IOException) { - channel.close(channel.unsafe().voidFuture()); - } - } finally { - if (read) { - pipeline.fireInboundBufferUpdated(); - } - if (closed && channel.isOpen()) { - channel.close(channel.unsafe().voidFuture()); - } else { - // start the next read - channel.read(); - } - } - } - - @Override - public void failed(Throwable t, AsyncSocketchannel channel) { - channel.pipeline().fireExceptionCaught(t); - if (t instanceof IOException) { - channel.close(channel.unsafe().voidFuture()); - } else { - // start the next read - channel.read(); - } - } - } - - private static final class ConnectHandler implements CompletionHandler { - - @Override - public void completed(Void result, AsyncSocketchannel channel) { - ((AsyncUnsafe) channel.unsafe()).connectSuccess(); - } - - @Override - public void failed(Throwable exc, AsyncSocketchannel channel) { - ((AsyncUnsafe) channel.unsafe()).connectFailed(exc); - } - } - -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio2/package-info.java b/transport/src/main/java/io/netty/channel/socket/nio2/package-info.java deleted file mode 100644 index f3c3c4d1ff..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio2/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -/** - * NIO2-based socket channel - * API implementation - recommended for a large number of connections (>= 1000). - */ -package io.netty.channel.socket.nio2; diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java index 563286b3b9..12697f74aa 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java @@ -86,12 +86,11 @@ abstract class AbstractOioByteChannel extends AbstractOioChannel { } @Override - protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception { + protected void doFlushByteBuffer(ByteBuf buf) throws Exception { while (buf.readable()) { doWriteBytes(buf); } buf.clear(); - return true; } protected abstract int available();