Add missing 'operation(args, future)' for 'operation(args)'

- Fixes #818
- Fix inspector warnings
This commit is contained in:
Trustin Lee 2012-12-14 19:42:58 +09:00
parent 5ffb495746
commit eb23c9d27c
15 changed files with 158 additions and 94 deletions

View File

@ -122,7 +122,22 @@ public abstract class WebSocketClientHandshaker {
* @param channel * @param channel
* Channel * Channel
*/ */
public abstract ChannelFuture handshake(Channel channel); public ChannelFuture handshake(Channel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
return handshake(channel, channel.newFuture());
}
/**
* Begins the opening handshake
*
* @param channel
* Channel
* @param future
* the {@link ChannelFuture} to be notified when the opening handshake is sent
*/
public abstract ChannelFuture handshake(Channel channel, ChannelFuture future);
/** /**
* Validates and finishes the opening handshake initiated by {@link #handshake}}. * Validates and finishes the opening handshake initiated by {@link #handshake}}.

View File

@ -91,7 +91,7 @@ public class WebSocketClientHandshaker00 extends WebSocketClientHandshaker {
* Channel into which we can write our request * Channel into which we can write our request
*/ */
@Override @Override
public ChannelFuture handshake(Channel channel) { public ChannelFuture handshake(Channel channel, final ChannelFuture handshakeFuture) {
// Make keys // Make keys
int spaces1 = WebSocketUtil.randomNumber(1, 12); int spaces1 = WebSocketUtil.randomNumber(1, 12);
int spaces2 = WebSocketUtil.randomNumber(1, 12); int spaces2 = WebSocketUtil.randomNumber(1, 12);
@ -173,9 +173,7 @@ public class WebSocketClientHandshaker00 extends WebSocketClientHandshaker {
request.setHeader(Names.CONTENT_LENGTH, key3.length); request.setHeader(Names.CONTENT_LENGTH, key3.length);
request.setContent(Unpooled.copiedBuffer(key3)); request.setContent(Unpooled.copiedBuffer(key3));
final ChannelFuture handshakeFuture = channel.newFuture();
ChannelFuture future = channel.write(request); ChannelFuture future = channel.write(request);
future.addListener(new ChannelFutureListener() { future.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) { public void operationComplete(ChannelFuture future) {

View File

@ -97,7 +97,7 @@ public class WebSocketClientHandshaker08 extends WebSocketClientHandshaker {
* Channel into which we can write our request * Channel into which we can write our request
*/ */
@Override @Override
public ChannelFuture handshake(Channel channel) { public ChannelFuture handshake(Channel channel, final ChannelFuture handshakeFuture) {
// Get path // Get path
URI wsURL = getWebSocketUrl(); URI wsURL = getWebSocketUrl();
String path = wsURL.getPath(); String path = wsURL.getPath();
@ -151,9 +151,7 @@ public class WebSocketClientHandshaker08 extends WebSocketClientHandshaker {
} }
} }
final ChannelFuture handshakeFuture = channel.newFuture();
ChannelFuture future = channel.write(request); ChannelFuture future = channel.write(request);
future.addListener(new ChannelFutureListener() { future.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) { public void operationComplete(ChannelFuture future) {

View File

@ -97,7 +97,7 @@ public class WebSocketClientHandshaker13 extends WebSocketClientHandshaker {
* Channel into which we can write our request * Channel into which we can write our request
*/ */
@Override @Override
public ChannelFuture handshake(Channel channel) { public ChannelFuture handshake(Channel channel, final ChannelFuture handshakeFuture) {
// Get path // Get path
URI wsURL = getWebSocketUrl(); URI wsURL = getWebSocketUrl();
String path = wsURL.getPath(); String path = wsURL.getPath();
@ -151,9 +151,7 @@ public class WebSocketClientHandshaker13 extends WebSocketClientHandshaker {
} }
} }
final ChannelFuture handshakeFuture = channel.newFuture();
ChannelFuture future = channel.write(request); ChannelFuture future = channel.write(request);
future.addListener(new ChannelFutureListener() { future.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) { public void operationComplete(ChannelFuture future) {

View File

@ -29,6 +29,8 @@ import java.util.Set;
*/ */
public abstract class WebSocketServerHandshaker { public abstract class WebSocketServerHandshaker {
private static final String[] EMPTY_ARRAY = new String[0];
private final String webSocketUrl; private final String webSocketUrl;
private final String[] subprotocols; private final String[] subprotocols;
@ -64,7 +66,7 @@ public abstract class WebSocketServerHandshaker {
} }
this.subprotocols = subprotocolArray; this.subprotocols = subprotocolArray;
} else { } else {
this.subprotocols = new String[0]; this.subprotocols = EMPTY_ARRAY;
} }
this.maxFramePayloadLength = maxFramePayloadLength; this.maxFramePayloadLength = maxFramePayloadLength;
} }
@ -109,7 +111,24 @@ public abstract class WebSocketServerHandshaker {
* @param req * @param req
* HTTP Request * HTTP Request
*/ */
public abstract ChannelFuture handshake(Channel channel, HttpRequest req); public ChannelFuture handshake(Channel channel, HttpRequest req) {
if (channel == null) {
throw new NullPointerException("channel");
}
return handshake(channel, req, channel.newFuture());
}
/**
* Performs the opening handshake
*
* @param channel
* Channel
* @param req
* HTTP Request
* @param future
* the {@link ChannelFuture} to be notified when the opening handshake is done
*/
public abstract ChannelFuture handshake(Channel channel, HttpRequest req, ChannelFuture future);
/** /**
* Performs the closing handshake * Performs the closing handshake
@ -119,7 +138,24 @@ public abstract class WebSocketServerHandshaker {
* @param frame * @param frame
* Closing Frame that was received * Closing Frame that was received
*/ */
public abstract ChannelFuture close(Channel channel, CloseWebSocketFrame frame); public ChannelFuture close(Channel channel, CloseWebSocketFrame frame) {
if (channel == null) {
throw new NullPointerException("channel");
}
return close(channel, frame, channel.newFuture());
}
/**
* Performs the closing handshake
*
* @param channel
* Channel
* @param frame
* Closing Frame that was received
* @param future
* the {@link ChannelFuture} to be notified when the closing handshake is done
*/
public abstract ChannelFuture close(Channel channel, CloseWebSocketFrame frame, ChannelFuture future);
/** /**
* Selects the first matching supported sub protocol * Selects the first matching supported sub protocol

View File

@ -15,9 +15,6 @@
*/ */
package io.netty.handler.codec.http.websocketx; package io.netty.handler.codec.http.websocketx;
import static io.netty.handler.codec.http.HttpHeaders.Names.*;
import static io.netty.handler.codec.http.HttpHeaders.Values.*;
import static io.netty.handler.codec.http.HttpVersion.*;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
@ -36,6 +33,12 @@ import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.logging.InternalLogger; import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
import java.util.regex.Pattern;
import static io.netty.handler.codec.http.HttpHeaders.Names.*;
import static io.netty.handler.codec.http.HttpHeaders.Values.*;
import static io.netty.handler.codec.http.HttpVersion.*;
/** /**
* <p> * <p>
* Performs server side opening and closing handshakes for web socket specification version <a * Performs server side opening and closing handshakes for web socket specification version <a
@ -50,6 +53,9 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(WebSocketServerHandshaker00.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(WebSocketServerHandshaker00.class);
private static final Pattern BEGINNING_DIGIT = Pattern.compile("[^0-9]");
private static final Pattern BEGINNING_SPACE = Pattern.compile("[^ ]");
/** /**
* Constructor specifying the destination web socket location * Constructor specifying the destination web socket location
* *
@ -112,7 +118,7 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker {
* HTTP request * HTTP request
*/ */
@Override @Override
public ChannelFuture handshake(Channel channel, HttpRequest req) { public ChannelFuture handshake(Channel channel, HttpRequest req, ChannelFuture future) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug(String.format("Channel %s WS Version 00 server handshake", channel.id())); logger.debug(String.format("Channel %s WS Version 00 server handshake", channel.id()));
@ -152,8 +158,10 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker {
// Calculate the answer of the challenge. // Calculate the answer of the challenge.
String key1 = req.getHeader(SEC_WEBSOCKET_KEY1); String key1 = req.getHeader(SEC_WEBSOCKET_KEY1);
String key2 = req.getHeader(SEC_WEBSOCKET_KEY2); String key2 = req.getHeader(SEC_WEBSOCKET_KEY2);
int a = (int) (Long.parseLong(key1.replaceAll("[^0-9]", "")) / key1.replaceAll("[^ ]", "").length()); int a = (int) (Long.parseLong(BEGINNING_DIGIT.matcher(key1).replaceAll("")) /
int b = (int) (Long.parseLong(key2.replaceAll("[^0-9]", "")) / key2.replaceAll("[^ ]", "").length()); BEGINNING_SPACE.matcher(key1).replaceAll("").length());
int b = (int) (Long.parseLong(BEGINNING_DIGIT.matcher(key2).replaceAll("")) /
BEGINNING_SPACE.matcher(key2).replaceAll("").length());
long c = req.getContent().readLong(); long c = req.getContent().readLong();
ByteBuf input = Unpooled.buffer(16); ByteBuf input = Unpooled.buffer(16);
input.writeInt(a); input.writeInt(a);
@ -172,8 +180,7 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker {
} }
// Upgrade the connection and send the handshake response. // Upgrade the connection and send the handshake response.
ChannelFuture future = channel.write(res); channel.write(res, future);
future.addListener(new ChannelFutureListener() { future.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) { public void operationComplete(ChannelFuture future) {
@ -200,7 +207,7 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker {
* Web Socket frame that was received * Web Socket frame that was received
*/ */
@Override @Override
public ChannelFuture close(Channel channel, CloseWebSocketFrame frame) { public ChannelFuture close(Channel channel, CloseWebSocketFrame frame, ChannelFuture future) {
return channel.write(frame); return channel.write(frame, future);
} }
} }

View File

@ -109,7 +109,7 @@ public class WebSocketServerHandshaker08 extends WebSocketServerHandshaker {
* HTTP request * HTTP request
*/ */
@Override @Override
public ChannelFuture handshake(Channel channel, HttpRequest req) { public ChannelFuture handshake(Channel channel, HttpRequest req, ChannelFuture future) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug(String.format("Channel %s WS Version 8 server handshake", channel.id())); logger.debug(String.format("Channel %s WS Version 8 server handshake", channel.id()));
@ -144,7 +144,7 @@ public class WebSocketServerHandshaker08 extends WebSocketServerHandshaker {
} }
} }
ChannelFuture future = channel.write(res); channel.write(res, future);
// Upgrade the connection and send the handshake response. // Upgrade the connection and send the handshake response.
future.addListener(new ChannelFutureListener() { future.addListener(new ChannelFutureListener() {
@ -173,10 +173,8 @@ public class WebSocketServerHandshaker08 extends WebSocketServerHandshaker {
* Web Socket frame that was received * Web Socket frame that was received
*/ */
@Override @Override
public ChannelFuture close(Channel channel, CloseWebSocketFrame frame) { public ChannelFuture close(Channel channel, CloseWebSocketFrame frame, ChannelFuture future) {
ChannelFuture f = channel.write(frame); future.addListener(ChannelFutureListener.CLOSE);
f.addListener(ChannelFutureListener.CLOSE); return channel.write(frame, future);
return f;
} }
} }

View File

@ -108,7 +108,7 @@ public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker {
* HTTP request * HTTP request
*/ */
@Override @Override
public ChannelFuture handshake(Channel channel, HttpRequest req) { public ChannelFuture handshake(Channel channel, HttpRequest req, ChannelFuture future) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug(String.format("Channel %s WS Version 13 server handshake", channel.id())); logger.debug(String.format("Channel %s WS Version 13 server handshake", channel.id()));
@ -144,7 +144,7 @@ public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker {
} }
} }
ChannelFuture future = channel.write(res); channel.write(res, future);
// Upgrade the connection and send the handshake response. // Upgrade the connection and send the handshake response.
future.addListener(new ChannelFutureListener() { future.addListener(new ChannelFutureListener() {
@ -173,10 +173,8 @@ public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker {
* Web Socket frame that was received * Web Socket frame that was received
*/ */
@Override @Override
public ChannelFuture close(Channel channel, CloseWebSocketFrame frame) { public ChannelFuture close(Channel channel, CloseWebSocketFrame frame, ChannelFuture future) {
ChannelFuture f = channel.write(frame); future.addListener(ChannelFutureListener.CLOSE);
f.addListener(ChannelFutureListener.CLOSE); return channel.write(frame, future);
return f;
} }
} }

View File

@ -82,9 +82,21 @@ public interface SctpChannel extends Channel {
*/ */
ChannelFuture bindAddress(InetAddress localAddress); ChannelFuture bindAddress(InetAddress localAddress);
/**
* Bind a address to the already bound channel to enable multi-homing.
* The Channel bust be bound and yet to be connected.
*/
ChannelFuture bindAddress(InetAddress localAddress, ChannelFuture future);
/** /**
* Unbind the address from channel's multi-homing address list. * Unbind the address from channel's multi-homing address list.
* The address should be added already in multi-homing address list. * The address should be added already in multi-homing address list.
*/ */
ChannelFuture unbindAddress(InetAddress localAddress); ChannelFuture unbindAddress(InetAddress localAddress);
/**
* Unbind the address from channel's multi-homing address list.
* The address should be added already in multi-homing address list.
*/
ChannelFuture unbindAddress(InetAddress localAddress, ChannelFuture future);
} }

View File

@ -22,8 +22,8 @@ import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
/** /**
* A TCP/IP socket {@link Channel} which was either accepted by * A TCP/IP socket {@link Channel}.
* {@link ServerSocketChannel} or created by {@link ClientSocketChannelFactory}. *
* @apiviz.landmark * @apiviz.landmark
* @apiviz.composedOf io.netty.channel.socket.SocketChannelConfig * @apiviz.composedOf io.netty.channel.socket.SocketChannelConfig
*/ */
@ -51,4 +51,9 @@ public interface SocketChannel extends Channel {
* @see Socket#shutdownOutput() * @see Socket#shutdownOutput()
*/ */
ChannelFuture shutdownOutput(); ChannelFuture shutdownOutput();
/**
* @see Socket#shutdownOutput()
*/
ChannelFuture shutdownOutput(ChannelFuture future);
} }

View File

@ -115,10 +115,20 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override @Override
public ChannelFuture shutdownOutput() { public ChannelFuture shutdownOutput() {
final ChannelFuture future = newFuture(); return shutdownOutput(newFuture());
}
@Override
public ChannelFuture shutdownOutput(final ChannelFuture future) {
EventLoop loop = eventLoop(); EventLoop loop = eventLoop();
if (loop.inEventLoop()) { if (loop.inEventLoop()) {
shutdownOutput(future); try {
javaChannel().shutdownOutput();
outputShutdown = true;
future.setSuccess();
} catch (Throwable t) {
future.setFailure(t);
}
} else { } else {
loop.execute(new Runnable() { loop.execute(new Runnable() {
@Override @Override
@ -130,16 +140,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
return future; return future;
} }
private void shutdownOutput(ChannelFuture future) {
try {
javaChannel().shutdownOutput();
outputShutdown = true;
future.setSuccess();
} catch (Throwable t) {
future.setFailure(t);
}
}
@Override @Override
protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, final ChannelFuture future) { protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, final ChannelFuture future) {
if (localAddress != null) { if (localAddress != null) {

View File

@ -284,12 +284,11 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
@Override @Override
public ChannelFuture bindAddress(InetAddress localAddress) { public ChannelFuture bindAddress(InetAddress localAddress) {
ChannelFuture future = newFuture(); return bindAddress(localAddress, newFuture());
doBindAddress(localAddress, future);
return future;
} }
void doBindAddress(final InetAddress localAddress, final ChannelFuture future) { @Override
public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelFuture future) {
if (eventLoop().inEventLoop()) { if (eventLoop().inEventLoop()) {
try { try {
javaChannel().bindAddress(localAddress); javaChannel().bindAddress(localAddress);
@ -301,20 +300,20 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
eventLoop().execute(new Runnable() { eventLoop().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
doBindAddress(localAddress, future); bindAddress(localAddress, future);
} }
}); });
} }
return future;
} }
@Override @Override
public ChannelFuture unbindAddress(InetAddress localAddress) { public ChannelFuture unbindAddress(InetAddress localAddress) {
ChannelFuture future = newFuture(); return unbindAddress(localAddress, newFuture());
doUnbindAddress(localAddress, future);
return future;
} }
void doUnbindAddress(final InetAddress localAddress, final ChannelFuture future) { @Override
public ChannelFuture unbindAddress(final InetAddress localAddress, final ChannelFuture future) {
if (eventLoop().inEventLoop()) { if (eventLoop().inEventLoop()) {
try { try {
javaChannel().unbindAddress(localAddress); javaChannel().unbindAddress(localAddress);
@ -326,10 +325,10 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
eventLoop().execute(new Runnable() { eventLoop().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
doUnbindAddress(localAddress, future); unbindAddress(localAddress, future);
} }
}); });
} }
return future;
} }
} }

View File

@ -109,10 +109,19 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
@Override @Override
public ChannelFuture shutdownOutput() { public ChannelFuture shutdownOutput() {
final ChannelFuture future = newFuture(); return shutdownOutput(newFuture());
}
@Override
public ChannelFuture shutdownOutput(final ChannelFuture future) {
EventLoop loop = eventLoop(); EventLoop loop = eventLoop();
if (loop.inEventLoop()) { if (loop.inEventLoop()) {
shutdownOutput(future); try {
javaChannel().socket().shutdownOutput();
future.setSuccess();
} catch (Throwable t) {
future.setFailure(t);
}
} else { } else {
loop.execute(new Runnable() { loop.execute(new Runnable() {
@Override @Override
@ -124,15 +133,6 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
return future; return future;
} }
private void shutdownOutput(ChannelFuture future) {
try {
javaChannel().socket().shutdownOutput();
future.setSuccess();
} catch (Throwable t) {
future.setFailure(t);
}
}
@Override @Override
protected SocketAddress localAddress0() { protected SocketAddress localAddress0() {
return javaChannel().socket().getLocalSocketAddress(); return javaChannel().socket().getLocalSocketAddress();

View File

@ -254,12 +254,11 @@ public class OioSctpChannel extends AbstractOioMessageChannel
@Override @Override
public ChannelFuture bindAddress(InetAddress localAddress) { public ChannelFuture bindAddress(InetAddress localAddress) {
ChannelFuture future = newFuture(); return bindAddress(localAddress, newFuture());
doBindAddress(localAddress, future);
return future;
} }
void doBindAddress(final InetAddress localAddress, final ChannelFuture future) { @Override
public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelFuture future) {
if (eventLoop().inEventLoop()) { if (eventLoop().inEventLoop()) {
try { try {
ch.bindAddress(localAddress); ch.bindAddress(localAddress);
@ -271,20 +270,20 @@ public class OioSctpChannel extends AbstractOioMessageChannel
eventLoop().execute(new Runnable() { eventLoop().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
doBindAddress(localAddress, future); bindAddress(localAddress, future);
} }
}); });
} }
return future;
} }
@Override @Override
public ChannelFuture unbindAddress(InetAddress localAddress) { public ChannelFuture unbindAddress(InetAddress localAddress) {
ChannelFuture future = newFuture(); return unbindAddress(localAddress, newFuture());
doUnbindAddress(localAddress, future);
return future;
} }
void doUnbindAddress(final InetAddress localAddress, final ChannelFuture future) { @Override
public ChannelFuture unbindAddress(final InetAddress localAddress, final ChannelFuture future) {
if (eventLoop().inEventLoop()) { if (eventLoop().inEventLoop()) {
try { try {
ch.unbindAddress(localAddress); ch.unbindAddress(localAddress);
@ -296,9 +295,10 @@ public class OioSctpChannel extends AbstractOioMessageChannel
eventLoop().execute(new Runnable() { eventLoop().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
doUnbindAddress(localAddress, future); unbindAddress(localAddress, future);
} }
}); });
} }
return future;
} }
} }

View File

@ -119,10 +119,19 @@ public class OioSocketChannel extends AbstractOioByteChannel
@Override @Override
public ChannelFuture shutdownOutput() { public ChannelFuture shutdownOutput() {
final ChannelFuture future = newFuture(); return shutdownOutput(newFuture());
}
@Override
public ChannelFuture shutdownOutput(final ChannelFuture future) {
EventLoop loop = eventLoop(); EventLoop loop = eventLoop();
if (loop.inEventLoop()) { if (loop.inEventLoop()) {
shutdownOutput(future); try {
socket.shutdownOutput();
future.setSuccess();
} catch (Throwable t) {
future.setFailure(t);
}
} else { } else {
loop.execute(new Runnable() { loop.execute(new Runnable() {
@Override @Override
@ -134,15 +143,6 @@ public class OioSocketChannel extends AbstractOioByteChannel
return future; return future;
} }
private void shutdownOutput(ChannelFuture future) {
try {
socket.shutdownOutput();
future.setSuccess();
} catch (Throwable t) {
future.setFailure(t);
}
}
@Override @Override
protected SocketAddress localAddress0() { protected SocketAddress localAddress0() {
return socket.getLocalSocketAddress(); return socket.getLocalSocketAddress();