From 18aaae3c2e7462853298c991c7f95040e47dae96 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 13 Jun 2012 22:23:21 +0200 Subject: [PATCH] Commit first round of classes to support nio2/async channel api. Still work in progress.. See #396 --- pom.xml | 5 + .../io/netty/channel/AbstractChannel.java | 30 ++- .../netty/channel/AbstractServerChannel.java | 2 +- .../channel/embedded/EmbeddedByteChannel.java | 3 +- .../socket/nio/AbstractNioByteChannel.java | 5 +- .../socket/nio2/AbstractAsyncChannel.java | 176 +++++++++++++ .../socket/nio2/AsyncServerSocketChannel.java | 120 +++++++++ .../socket/nio2/AsyncSocketchannel.java | 246 ++++++++++++++++++ .../channel/socket/nio2/package-info.java | 21 ++ .../socket/oio/AbstractOioByteChannel.java | 3 +- 10 files changed, 592 insertions(+), 19 deletions(-) create mode 100755 transport/src/main/java/io/netty/channel/socket/nio2/AbstractAsyncChannel.java create mode 100755 transport/src/main/java/io/netty/channel/socket/nio2/AsyncServerSocketChannel.java create mode 100755 transport/src/main/java/io/netty/channel/socket/nio2/AsyncSocketchannel.java create mode 100644 transport/src/main/java/io/netty/channel/socket/nio2/package-info.java diff --git a/pom.xml b/pom.xml index 3ac5c5e345..910c1c72de 100644 --- a/pom.xml +++ b/pom.xml @@ -281,6 +281,11 @@ java.nio.channels.MembershipKey java.net.StandardSocketOptions java.net.StandardProtocolFamily + + java.nio.channels.AsynchronousChannel + java.nio.channels.AsynchronousSocketChannel + java.nio.channels.AsynchronousServerSocketChannel + java.nio.channels.AsynchronousChannelGroup diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 42890f3c8b..98d7fb7aff 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -85,7 +85,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private ClosedChannelException closedChannelException; private final Deque flushCheckpoints = new ArrayDeque(); private long writeCounter; - private boolean inFlushNow; + protected boolean inFlushNow; private boolean flushNowPending; /** Cache for the string representation of this channel */ @@ -623,7 +623,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } @Override - public final void flushNow() { + public void flushNow() { if (inFlushNow) { return; } @@ -631,12 +631,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha inFlushNow = true; ChannelHandlerContext ctx = directOutboundContext(); Throwable cause = null; + boolean handleFlush = true; try { if (ctx.hasOutboundByteBuffer()) { ByteBuf out = ctx.outboundByteBuffer(); int oldSize = out.readableBytes(); try { - doFlushByteBuffer(out); + handleFlush = doFlushByteBuffer(out); } catch (Throwable t) { cause = t; } finally { @@ -657,14 +658,15 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha writeCounter += oldSize - out.size(); } } - - if (cause == null) { - notifyFlushFutures(); - } else { - notifyFlushFutures(cause); - pipeline.fireExceptionCaught(cause); - if (cause instanceof IOException) { - close(voidFuture()); + if (handleFlush) { + if (cause == null) { + notifyFlushFutures(); + } else { + notifyFlushFutures(cause); + pipeline.fireExceptionCaught(cause); + if (cause instanceof IOException) { + close(voidFuture()); + } } } } finally { @@ -713,7 +715,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha protected abstract void doClose() throws Exception; protected abstract void doDeregister() throws Exception; - protected void doFlushByteBuffer(ByteBuf buf) throws Exception { + protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception { throw new UnsupportedOperationException(); } protected void doFlushMessageBuffer(MessageBuf buf) throws Exception { @@ -722,7 +724,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha protected abstract boolean isFlushPending(); - private void notifyFlushFutures() { + protected final void notifyFlushFutures() { if (flushCheckpoints.isEmpty()) { return; } @@ -760,7 +762,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } - private void notifyFlushFutures(Throwable cause) { + protected final void notifyFlushFutures(Throwable cause) { notifyFlushFutures(); for (;;) { FlushCheckpoint cp = flushCheckpoints.poll(); diff --git a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java index 0c42167eeb..6ac92e60b3 100644 --- a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java @@ -77,7 +77,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S } @Override - protected void doFlushByteBuffer(ByteBuf buf) throws Exception { + protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception { throw new UnsupportedOperationException(); } diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedByteChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedByteChannel.java index 07e8ac5313..8c1f7e3499 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedByteChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedByteChannel.java @@ -71,10 +71,11 @@ public class EmbeddedByteChannel extends AbstractEmbeddedChannel { } @Override - protected void doFlushByteBuffer(ByteBuf buf) throws Exception { + protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception { if (!lastOutboundBuffer().readable()) { lastOutboundBuffer().discardReadBytes(); } lastOutboundBuffer().writeBytes(buf); + return true; } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java index fded47226c..99489e3ae7 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java @@ -85,11 +85,11 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel { } @Override - protected void doFlushByteBuffer(ByteBuf buf) throws Exception { + protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception { if (!buf.readable()) { // Reset reader/writerIndex to 0 if the buffer is empty. buf.clear(); - return; + return true; } for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { @@ -103,6 +103,7 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel { break; } } + return true; } protected abstract int doReadBytes(ByteBuf buf) throws Exception; diff --git a/transport/src/main/java/io/netty/channel/socket/nio2/AbstractAsyncChannel.java b/transport/src/main/java/io/netty/channel/socket/nio2/AbstractAsyncChannel.java new file mode 100755 index 0000000000..f2371239b1 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio2/AbstractAsyncChannel.java @@ -0,0 +1,176 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.nio2; + +import io.netty.channel.AbstractChannel; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoop; + +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.AsynchronousChannel; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public abstract class AbstractAsyncChannel extends AbstractChannel { + + protected volatile AsynchronousChannel ch; + + /** + * The future of the current connection attempt. If not null, subsequent + * connection attempts will fail. + */ + protected ChannelFuture connectFuture; + protected ScheduledFuture connectTimeoutFuture; + private ConnectException connectTimeoutException; + + protected AbstractAsyncChannel(Channel parent, Integer id) { + super(parent, id); + } + + + @Override + public InetSocketAddress localAddress() { + if (ch == null) { + return null; + } + return (InetSocketAddress) super.localAddress(); + } + + @Override + public InetSocketAddress remoteAddress() { + if (ch == null) { + return null; + } + return (InetSocketAddress) super.remoteAddress(); + } + + protected AsynchronousChannel javaChannel() { + return ch; + } + + @Override + public ChannelConfig config() { + // TODO: Fix me + return null; + } + + @Override + public boolean isOpen() { + return ch == null || ch.isOpen(); + } + + @Override + protected boolean isCompatible(EventLoop loop) { + // TODO: Fix me + return true; + } + + + @Override + protected void doDeregister() throws Exception { + throw new UnsupportedOperationException("Deregistration is not supported by AbstractAsyncChannel"); + } + + @Override + protected AsyncUnsafe newUnsafe() { + return new AsyncUnsafe(); + } + + protected class AsyncUnsafe extends AbstractUnsafe { + + @Override + public void connect(final SocketAddress remoteAddress, + final SocketAddress localAddress, final ChannelFuture future) { + if (eventLoop().inEventLoop()) { + if (!ensureOpen(future)) { + return; + } + + try { + if (connectFuture != null) { + throw new IllegalStateException("connection attempt already made"); + } + connectFuture = future; + + doConnect(remoteAddress, localAddress, future); + + // Schedule connect timeout. + int connectTimeoutMillis = config().getConnectTimeoutMillis(); + if (connectTimeoutMillis > 0) { + connectTimeoutFuture = eventLoop().schedule(new Runnable() { + @Override + public void run() { + if (connectTimeoutException == null) { + connectTimeoutException = new ConnectException("connection timed out"); + } + ChannelFuture connectFuture = AbstractAsyncChannel.this.connectFuture; + if (connectFuture != null && + connectFuture.setFailure(connectTimeoutException)) { + pipeline().fireExceptionCaught(connectTimeoutException); + close(voidFuture()); + } + } + }, connectTimeoutMillis, TimeUnit.MILLISECONDS); + } + + } catch (Throwable t) { + future.setFailure(t); + pipeline().fireExceptionCaught(t); + closeIfClosed(); + } + } else { + eventLoop().execute(new Runnable() { + @Override + public void run() { + connect(remoteAddress, localAddress, future); + } + }); + } + } + + protected final void connectFailed(Throwable t) { + connectFuture.setFailure(t); + pipeline().fireExceptionCaught(t); + closeIfClosed(); + } + + protected final void connectSuccess() { + assert eventLoop().inEventLoop(); + assert connectFuture != null; + try { + boolean wasActive = isActive(); + connectFuture.setSuccess(); + if (!wasActive && isActive()) { + pipeline().fireChannelActive(); + } + } catch (Throwable t) { + connectFuture.setFailure(t); + pipeline().fireExceptionCaught(t); + closeIfClosed(); + } finally { + connectTimeoutFuture.cancel(false); + connectFuture = null; + } + } + } + protected abstract void doConnect(SocketAddress remoteAddress, + SocketAddress localAddress, ChannelFuture connectFuture); + +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio2/AsyncServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio2/AsyncServerSocketChannel.java new file mode 100755 index 0000000000..816d7f6be6 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio2/AsyncServerSocketChannel.java @@ -0,0 +1,120 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.nio2; + +import io.netty.buffer.ChannelBufType; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ServerChannel; +import io.netty.logging.InternalLogger; +import io.netty.logging.InternalLoggerFactory; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.channels.AsynchronousChannelGroup; +import java.nio.channels.AsynchronousServerSocketChannel; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.CompletionHandler; + +public class AsyncServerSocketChannel extends AbstractAsyncChannel implements ServerChannel { + + private static final AcceptHandler ACCEPT_HANDLER = new AcceptHandler(); + private static final InternalLogger logger = + InternalLoggerFactory.getInstance(AsyncServerSocketChannel.class); + + public AsyncServerSocketChannel() { + super(null, null); + } + + + @Override + protected AsynchronousServerSocketChannel javaChannel() { + return (AsynchronousServerSocketChannel) super.javaChannel(); + } + + @Override + public boolean isActive() { + return localAddress0() != null; + } + + @Override + public ChannelBufType bufferType() { + return ChannelBufType.MESSAGE; + } + + @Override + protected SocketAddress localAddress0() { + try { + return javaChannel().getLocalAddress(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + protected SocketAddress remoteAddress0() { + return null; + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { + javaChannel().bind(localAddress); + } + + @Override + protected void doClose() throws Exception { + javaChannel().close(); + } + + @Override + protected boolean isFlushPending() { + return false; + } + + @Override + protected void doConnect( + SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) { + future.setFailure(new UnsupportedOperationException()); + } + + @Override + protected void doDisconnect() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected Runnable doRegister() throws Exception { + ch = AsynchronousServerSocketChannel.open(AsynchronousChannelGroup.withThreadPool(eventLoop())); + javaChannel().accept(this, ACCEPT_HANDLER); + return null; + } + + private static final class AcceptHandler + implements CompletionHandler { + public void completed(AsynchronousSocketChannel ch, AsyncServerSocketChannel channel) { + // register again this handler to accept new connections + channel.javaChannel().accept(channel, this); + + // create the socket add it to the buffer and fire the event + channel.outboundMessageBuffer().add(new AsyncSocketchannel(channel, null)); + channel.pipeline().fireInboundBufferUpdated(); + } + + public void failed(Throwable t, AsyncServerSocketChannel channel) { + logger.warn("Failed to create a new channel from an accepted socket.", t); + } + } +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio2/AsyncSocketchannel.java b/transport/src/main/java/io/netty/channel/socket/nio2/AsyncSocketchannel.java new file mode 100755 index 0000000000..4c8f63ea90 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio2/AsyncSocketchannel.java @@ -0,0 +1,246 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.nio2; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ChannelBufType; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPipeline; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.AsynchronousChannelGroup; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.CompletionHandler; +import java.util.concurrent.atomic.AtomicBoolean; + +public class AsyncSocketchannel extends AbstractAsyncChannel { + + private static final CompletionHandler CONNECT_HANDLER = new ConnectHandler(); + private static final CompletionHandler READ_HANDLER = new ReadHandler(); + private static final CompletionHandler WRITE_HANDLER = new WriteHandler(); + + private final AtomicBoolean flushing = new AtomicBoolean(false); + + public AsyncSocketchannel() { + this(null, null); + } + + public AsyncSocketchannel(AsyncServerSocketChannel parent, Integer id) { + super(parent, id); + } + + @Override + public boolean isActive() { + AsynchronousSocketChannel ch = javaChannel(); + return ch.isOpen(); + } + + @Override + protected AsynchronousSocketChannel javaChannel() { + return (AsynchronousSocketChannel) super.javaChannel(); + } + + @Override + public ChannelBufType bufferType() { + return ChannelBufType.BYTE; + } + + @Override + protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, final ChannelFuture future) { + assert ch != null; + if (localAddress != null) { + try { + javaChannel().bind(localAddress); + } catch (IOException e) { + future.setFailure(e); + return; + } + } + + javaChannel().connect(remoteAddress, this, CONNECT_HANDLER); + } + + @Override + protected InetSocketAddress localAddress0() { + try { + return (InetSocketAddress) javaChannel().getLocalAddress(); + } catch (IOException e) { + return null; + } + } + + @Override + protected InetSocketAddress remoteAddress0() { + try { + return (InetSocketAddress) javaChannel().getRemoteAddress(); + } catch (IOException e) { + return null; + } + } + + @Override + protected Runnable doRegister() throws Exception { + assert ch == null; + ch = AsynchronousSocketChannel.open(AsynchronousChannelGroup.withThreadPool(eventLoop())); + return null; + } + + private void read() { + javaChannel().read(pipeline().inboundByteBuffer().nioBuffer(), this, READ_HANDLER); + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { + javaChannel().bind(localAddress); + } + + @Override + protected void doDisconnect() throws Exception { + doClose(); + } + + @Override + protected void doClose() throws Exception { + javaChannel().close(); + } + + @Override + protected boolean isFlushPending() { + // TODO: Fix me + return true; + } + + protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception { + if (flushing.compareAndSet(false, true)) { + javaChannel().write(buf.nioBuffer(), this, WRITE_HANDLER); + } + return false; + } + + private static boolean expandReadBuffer(ByteBuf byteBuf) { + if (!byteBuf.writable()) { + // FIXME: Magic number + byteBuf.ensureWritableBytes(4096); + return true; + } + return false; + } + + private static final class WriteHandler implements CompletionHandler { + + @Override + public void completed(Integer result, AsyncSocketchannel channel) { + ByteBuf buf = channel.pipeline().outboundByteBuffer(); + if (!buf.readable()) { + buf.discardReadBytes(); + } + + if (result > 0) { + channel.notifyFlushFutures(); + } + channel.flushing.set(false); + } + + @Override + public void failed(Throwable cause, AsyncSocketchannel channel) { + ByteBuf buf = channel.pipeline().outboundByteBuffer(); + if (!buf.readable()) { + buf.discardReadBytes(); + } + + channel.notifyFlushFutures(cause); + channel.pipeline().fireExceptionCaught(cause); + if (cause instanceof IOException) { + channel.close(channel.unsafe().voidFuture()); + } + channel.flushing.set(false); + } + } + + private static final class ReadHandler implements CompletionHandler { + + @Override + public void completed(Integer result, AsyncSocketchannel channel) { + assert channel.eventLoop().inEventLoop(); + + final ChannelPipeline pipeline = channel.pipeline(); + final ByteBuf byteBuf = pipeline.inboundByteBuffer(); + boolean closed = false; + boolean read = false; + try { + expandReadBuffer(byteBuf); + for (;;) { + int localReadAmount = result.intValue(); + if (localReadAmount > 0) { + read = true; + } else if (localReadAmount < 0) { + closed = true; + break; + } + if (!expandReadBuffer(byteBuf)) { + break; + } + } + } catch (Throwable t) { + if (read) { + read = false; + pipeline.fireInboundBufferUpdated(); + } + pipeline.fireExceptionCaught(t); + if (t instanceof IOException) { + channel.close(channel.unsafe().voidFuture()); + } + } finally { + if (read) { + pipeline.fireInboundBufferUpdated(); + } + if (closed && channel.isOpen()) { + channel.close(channel.unsafe().voidFuture()); + } else { + // start the next read + channel.read(); + } + } + } + + @Override + public void failed(Throwable t, AsyncSocketchannel channel) { + channel.pipeline().fireExceptionCaught(t); + if (t instanceof IOException) { + channel.close(channel.unsafe().voidFuture()); + } else { + // start the next read + channel.read(); + } + } + } + + private static final class ConnectHandler implements CompletionHandler { + + @Override + public void completed(Void result, AsyncSocketchannel channel) { + ((AsyncUnsafe) channel.unsafe()).connectSuccess(); + } + + @Override + public void failed(Throwable exc, AsyncSocketchannel channel) { + ((AsyncUnsafe) channel.unsafe()).connectFailed(exc); + } + } + +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio2/package-info.java b/transport/src/main/java/io/netty/channel/socket/nio2/package-info.java new file mode 100644 index 0000000000..f3c3c4d1ff --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio2/package-info.java @@ -0,0 +1,21 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * NIO2-based socket channel + * API implementation - recommended for a large number of connections (>= 1000). + */ +package io.netty.channel.socket.nio2; diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java index 12697f74aa..563286b3b9 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java @@ -86,11 +86,12 @@ abstract class AbstractOioByteChannel extends AbstractOioChannel { } @Override - protected void doFlushByteBuffer(ByteBuf buf) throws Exception { + protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception { while (buf.readable()) { doWriteBytes(buf); } buf.clear(); + return true; } protected abstract int available();