Replace Bootstrap with ChannelBuilder and ServerChannelBuilder

- Added ChannelInitializer which is supposed to be used with the
  builders
- Echo examples use ChannelBuilder and ServerChannelBuilder now
- Replace ChannelFuture.rethrowIfFailed() with sync*()
- Bug fixes
This commit is contained in:
Trustin Lee 2012-05-14 23:57:23 +09:00
parent 18d1861243
commit 311f17f6ef
25 changed files with 618 additions and 1587 deletions

View File

@ -15,9 +15,14 @@
*/
package io.netty.example.echo;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBuilder;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.MultithreadEventLoop;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.socket.nio.SelectorEventLoop;
import io.netty.handler.logging.LoggingHandler;
@ -44,23 +49,33 @@ public class EchoClient {
}
public void run() throws Exception {
// Create a new socket and configure it.
SocketChannel s = new NioSocketChannel();
s.config().setTcpNoDelay(true);
s.pipeline().addLast("logger", new LoggingHandler(InternalLogLevel.INFO));
s.pipeline().addLast("echoer", new EchoClientHandler(firstMessageSize));
// Begin the communication by registering the channel to an event loop and connecting
// to the peer.
// Create the required event loop.
EventLoop loop = new MultithreadEventLoop(SelectorEventLoop.FACTORY);
loop.register(s).awaitUninterruptibly().rethrowIfFailed();
s.connect(new InetSocketAddress(host, port));
try {
// Configure the client.
ChannelBuilder b = new ChannelBuilder();
b.eventLoop(loop)
.channel(new NioSocketChannel())
.option(ChannelOption.TCP_NODELAY, true)
.remoteAddress(new InetSocketAddress(host, port))
.initializer(new ChannelInitializer() {
@Override
public void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("logger", new LoggingHandler(InternalLogLevel.INFO));
p.addLast("echoer", new EchoClientHandler(firstMessageSize));
}
});
// Wait until the connection is closed.
s.closeFuture().awaitUninterruptibly();
// Start the client.
ChannelFuture f = b.connect().sync();
// Terminate the event loop.
loop.shutdown();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
loop.shutdown();
}
}
public static void main(String[] args) throws Exception {

View File

@ -15,22 +15,20 @@
*/
package io.netty.example.echo;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.MultithreadEventLoop;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.ServerChannelBuilder;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.SelectorEventLoop;
import io.netty.handler.logging.LoggingHandler;
import io.netty.logging.InternalLogLevel;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;
/**
* Echoes back any received data from a client.
@ -44,39 +42,36 @@ public class EchoServer {
}
public void run() throws Exception {
// Configure the server.
final EventLoop loop = new MultithreadEventLoop(SelectorEventLoop.FACTORY);
ServerSocketChannel ssc = new NioServerSocketChannel();
ssc.pipeline().addLast("logger", new LoggingHandler(InternalLogLevel.INFO));
ssc.pipeline().addLast("acceptor", new ChannelInboundHandlerAdapter<SocketChannel>() {
// Create the required event loops.
EventLoop parentLoop = new MultithreadEventLoop(SelectorEventLoop.FACTORY);
EventLoop childLoop = new MultithreadEventLoop(SelectorEventLoop.FACTORY);
try {
// Configure the server.
ServerChannelBuilder b = new ServerChannelBuilder();
b.parentEventLoop(parentLoop)
.childEventLoop(childLoop)
.parentChannel(new NioServerSocketChannel())
.childOption(ChannelOption.TCP_NODELAY, true)
.localAddress(new InetSocketAddress(port))
.childInitializer(new ChannelInitializer() {
@Override
public void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("logger", new LoggingHandler(InternalLogLevel.INFO));
p.addLast("echoer", new EchoServerHandler());
}
});
@Override
public ChannelBufferHolder<SocketChannel> newInboundBuffer(
ChannelInboundHandlerContext<SocketChannel> ctx)
throws Exception {
return ChannelBufferHolders.messageBuffer(new ArrayDeque<SocketChannel>());
}
// Start the server.
ChannelFuture f = b.bind().sync();
@Override
public void inboundBufferUpdated(
ChannelInboundHandlerContext<SocketChannel> ctx)
throws Exception {
Queue<SocketChannel> in = ctx.in().messageBuffer();
for (;;) {
SocketChannel s = in.poll();
if (s == null) {
break;
}
s.config().setTcpNoDelay(true);
s.pipeline().addLast("logger", new LoggingHandler(InternalLogLevel.INFO));
s.pipeline().addLast("echoer", new EchoServerHandler());
loop.register(s);
}
}
});
loop.register(ssc).awaitUninterruptibly().rethrowIfFailed();
ssc.bind(new InetSocketAddress(port)).awaitUninterruptibly().rethrowIfFailed();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
parentLoop.shutdown();
childLoop.shutdown();
}
}
public static void main(String[] args) throws Exception {

View File

@ -105,10 +105,10 @@ public class WebSocketClient {
ChannelFuture future =
bootstrap.connect(
new InetSocketAddress(uri.getHost(), uri.getPort()));
future.awaitUninterruptibly().rethrowIfFailed();
future.awaitUninterruptibly().sync();
ch = future.channel();
handshaker.handshake(ch).awaitUninterruptibly().rethrowIfFailed();
handshaker.handshake(ch).awaitUninterruptibly().sync();
// Send 10 messages and wait for responses
System.out.println("WebSocket Client sending message");

View File

@ -52,7 +52,7 @@ public final class RedisClient {
}
});
ChannelFuture redis = cb.connect(new InetSocketAddress("localhost", 6379));
redis.await().rethrowIfFailed();
redis.await().sync();
Channel channel = redis.channel();
channel.write(new Command("set", "1", "value"));

View File

@ -1,507 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.bootstrap;
import static io.netty.channel.Channels.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.util.ExternalResourceReleasable;
/**
* A helper class which initializes a {@link Channel}. This class provides
* the common data structure for its subclasses which actually initialize
* {@link Channel}s and their child {@link Channel}s using the common data
* structure. Please refer to {@link ClientBootstrap}, {@link ServerBootstrap},
* and {@link ConnectionlessBootstrap} for client side, server-side, and
* connectionless (e.g. UDP) channel initialization respectively.
* @apiviz.uses io.netty.channel.ChannelFactory
*/
public class Bootstrap implements ExternalResourceReleasable {
private volatile ChannelFactory factory;
private volatile ChannelPipeline pipeline = pipeline();
private volatile ChannelPipelineFactory pipelineFactory = pipelineFactory(pipeline);
private volatile Map<String, Object> options = new HashMap<String, Object>();
/**
* Creates a new instance with no {@link ChannelFactory} set.
* {@link #setFactory(ChannelFactory)} must be called at once before any
* I/O operation is requested.
*/
protected Bootstrap() {
}
/**
* Creates a new instance with the specified initial {@link ChannelFactory}.
*/
protected Bootstrap(ChannelFactory channelFactory) {
setFactory(channelFactory);
}
/**
* Returns the {@link ChannelFactory} that will be used to perform an
* I/O operation.
*
* @throws IllegalStateException
* if the factory is not set for this bootstrap yet.
* The factory can be set in the constructor or
* {@link #setFactory(ChannelFactory)}.
*/
public ChannelFactory getFactory() {
ChannelFactory factory = this.factory;
if (factory == null) {
throw new IllegalStateException(
"factory is not set yet.");
}
return factory;
}
/**
* Sets the {@link ChannelFactory} that will be used to perform an I/O
* operation. This method can be called only once and can't be called at
* all if the factory was specified in the constructor.
*
* @throws IllegalStateException
* if the factory is already set
*/
public void setFactory(ChannelFactory factory) {
if (factory == null) {
throw new NullPointerException("factory");
}
if (this.factory != null) {
throw new IllegalStateException(
"factory can't change once set.");
}
this.factory = factory;
}
/**
* Returns the default {@link ChannelPipeline} which is cloned when a new
* {@link Channel} is created. {@link Bootstrap} creates a new pipeline
* which has the same entries with the returned pipeline for a new
* {@link Channel}.
* <p>
* Please note that this method is a convenience method that works only
* when <b>1)</b> you create only one channel from this bootstrap (e.g.
* one-time client-side or connectionless channel) or <b>2)</b> all handlers
* in the pipeline is stateless. You have to use
* {@link #setPipelineFactory(ChannelPipelineFactory)} if <b>1)</b> your
* pipeline contains a stateful {@link ChannelHandler} and <b>2)</b> one or
* more channels are going to be created by this bootstrap (e.g. server-side
* channels).
*
* @return the default {@link ChannelPipeline}
*
* @throws IllegalStateException
* if {@link #setPipelineFactory(ChannelPipelineFactory)} was
* called by a user last time.
*/
public ChannelPipeline getPipeline() {
ChannelPipeline pipeline = this.pipeline;
if (pipeline == null) {
throw new IllegalStateException(
"getPipeline() cannot be called " +
"if setPipelineFactory() was called.");
}
return pipeline;
}
/**
* Sets the default {@link ChannelPipeline} which is cloned when a new
* {@link Channel} is created. {@link Bootstrap} creates a new pipeline
* which has the same entries with the specified pipeline for a new channel.
* <p>
* Calling this method also sets the {@code pipelineFactory} property to an
* internal {@link ChannelPipelineFactory} implementation which returns
* a shallow copy of the specified pipeline.
* <p>
* Please note that this method is a convenience method that works only
* when <b>1)</b> you create only one channel from this bootstrap (e.g.
* one-time client-side or connectionless channel) or <b>2)</b> all handlers
* in the pipeline is stateless. You have to use
* {@link #setPipelineFactory(ChannelPipelineFactory)} if <b>1)</b> your
* pipeline contains a stateful {@link ChannelHandler} and <b>2)</b> one or
* more channels are going to be created by this bootstrap (e.g. server-side
* channels).
*/
public void setPipeline(ChannelPipeline pipeline) {
if (pipeline == null) {
throw new NullPointerException("pipeline");
}
this.pipeline = pipeline;
pipelineFactory = pipelineFactory(pipeline);
}
/**
* Dependency injection friendly convenience method for
* {@link #getPipeline()} which returns the default pipeline of this
* bootstrap as an ordered map.
* <p>
* Please note that this method is a convenience method that works only
* when <b>1)</b> you create only one channel from this bootstrap (e.g.
* one-time client-side or connectionless channel) or <b>2)</b> all handlers
* in the pipeline is stateless. You have to use
* {@link #setPipelineFactory(ChannelPipelineFactory)} if <b>1)</b> your
* pipeline contains a stateful {@link ChannelHandler} and <b>2)</b> one or
* more channels are going to be created by this bootstrap (e.g. server-side
* channels).
*
* @throws IllegalStateException
* if {@link #setPipelineFactory(ChannelPipelineFactory)} was
* called by a user last time.
*/
public Map<String, ChannelHandler> getPipelineAsMap() {
ChannelPipeline pipeline = this.pipeline;
if (pipeline == null) {
throw new IllegalStateException("pipelineFactory in use");
}
return pipeline.toMap();
}
/**
* Dependency injection friendly convenience method for
* {@link #setPipeline(ChannelPipeline)} which sets the default pipeline of
* this bootstrap from an ordered map.
* <p>
* Please note that this method is a convenience method that works only
* when <b>1)</b> you create only one channel from this bootstrap (e.g.
* one-time client-side or connectionless channel) or <b>2)</b> all handlers
* in the pipeline is stateless. You have to use
* {@link #setPipelineFactory(ChannelPipelineFactory)} if <b>1)</b> your
* pipeline contains a stateful {@link ChannelHandler} and <b>2)</b> one or
* more channels are going to be created by this bootstrap (e.g. server-side
* channels).
*
* @throws IllegalArgumentException
* if the specified map is not an ordered map
*/
public void setPipelineAsMap(Map<String, ChannelHandler> pipelineMap) {
if (pipelineMap == null) {
throw new NullPointerException("pipelineMap");
}
if (!isOrderedMap(pipelineMap)) {
throw new IllegalArgumentException(
"pipelineMap is not an ordered map. " +
"Please use " +
LinkedHashMap.class.getName() + ".");
}
ChannelPipeline pipeline = pipeline();
for (Map.Entry<String, ChannelHandler> e: pipelineMap.entrySet()) {
pipeline.addLast(e.getKey(), e.getValue());
}
setPipeline(pipeline);
}
/**
* Returns the {@link ChannelPipelineFactory} which creates a new
* {@link ChannelPipeline} for each new {@link Channel}.
*
* @see #getPipeline()
*/
public ChannelPipelineFactory getPipelineFactory() {
return pipelineFactory;
}
/**
* Sets the {@link ChannelPipelineFactory} which creates a new
* {@link ChannelPipeline} for each new {@link Channel}. Calling this
* method invalidates the current {@code pipeline} property of this
* bootstrap. Subsequent {@link #getPipeline()} and {@link #getPipelineAsMap()}
* calls will raise {@link IllegalStateException}.
*
* @see #setPipeline(ChannelPipeline)
* @see #setPipelineAsMap(Map)
*/
public void setPipelineFactory(ChannelPipelineFactory pipelineFactory) {
if (pipelineFactory == null) {
throw new NullPointerException("pipelineFactory");
}
pipeline = null;
this.pipelineFactory = pipelineFactory;
}
/**
* Returns the options which configures a new {@link Channel} and its
* child {@link Channel}s. The names of the child {@link Channel} options
* are prepended with {@code "child."} (e.g. {@code "child.keepAlive"}).
*/
public Map<String, Object> getOptions() {
return new TreeMap<String, Object>(options);
}
/**
* Sets the options which configures a new {@link Channel} and its child
* {@link Channel}s. To set the options of a child {@link Channel}, prepend
* {@code "child."} to the option name (e.g. {@code "child.keepAlive"}).
*/
public void setOptions(Map<String, Object> options) {
if (options == null) {
throw new NullPointerException("options");
}
this.options = new HashMap<String, Object>(options);
}
/**
* Returns the value of the option with the specified key. To retrieve
* the option value of a child {@link Channel}, prepend {@code "child."}
* to the option name (e.g. {@code "child.keepAlive"}).
*
* @param key the option name
*
* @return the option value if the option is found.
* {@code null} otherwise.
*/
public Object getOption(String key) {
if (key == null) {
throw new NullPointerException("key");
}
return options.get(key);
}
/**
* Sets an option with the specified key and value. If there's already
* an option with the same key, it is replaced with the new value. If the
* specified value is {@code null}, an existing option with the specified
* key is removed. To set the option value of a child {@link Channel},
* prepend {@code "child."} to the option name (e.g. {@code "child.keepAlive"}).
*
* @param key the option name
* @param value the option value
*/
public void setOption(String key, Object value) {
if (key == null) {
throw new NullPointerException("key");
}
if (value == null) {
options.remove(key);
} else {
options.put(key, value);
}
}
/**
* {@inheritDoc} This method simply delegates the call to
* {@link ChannelFactory#releaseExternalResources()}.
*/
@Override
public void releaseExternalResources() {
ChannelFactory factory = this.factory;
if (factory != null) {
factory.releaseExternalResources();
}
}
/**
* Returns {@code true} if and only if the specified {@code map} is an
* ordered map, like {@link LinkedHashMap} is.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
static boolean isOrderedMap(Map<?, ?> map) {
Class<?> mapType = map.getClass();
if (LinkedHashMap.class.isAssignableFrom(mapType)) {
// LinkedHashMap is an ordered map.
return true;
}
// Not a LinkedHashMap - start autodetection.
// Detect Apache Commons Collections OrderedMap implementations.
Class<?> type = mapType;
while (type != null) {
for (Class<?> i: type.getInterfaces()) {
if (i.getName().endsWith("OrderedMap")) {
// Seems like it's an ordered map - guessed from that
// it implements OrderedMap interface.
return true;
}
}
type = type.getSuperclass();
}
// Does not implement OrderedMap interface. As a last resort, try to
// create a new instance and test if the insertion order is maintained.
Map newMap;
try {
newMap = (Map) mapType.newInstance();
} catch (Exception e) {
// No default constructor - cannot proceed anymore.
return false;
}
// Run some tests.
List<String> expectedKeys = new ArrayList<String>();
String dummyValue = "dummyValue";
for (short element: ORDER_TEST_SAMPLES) {
String key = String.valueOf(element);
newMap.put(key, dummyValue);
expectedKeys.add(key);
Iterator<String> it = expectedKeys.iterator();
for (Object actualKey: newMap.keySet()) {
if (!it.next().equals(actualKey)) {
// Did not pass the test.
return false;
}
}
}
// The specified map passed the insertion order test.
return true;
}
private static final short[] ORDER_TEST_SAMPLES = {
682, 807, 637, 358, 570, 828, 407, 319,
105, 41, 563, 544, 518, 298, 418, 50,
156, 769, 984, 503, 191, 578, 309, 710,
327, 720, 591, 939, 374, 707, 43, 463,
227, 174, 30, 531, 135, 930, 190, 823,
925, 835, 328, 239, 415, 500, 144, 460,
83, 774, 921, 4, 95, 468, 687, 493,
991, 436, 245, 742, 149, 821, 142, 782,
297, 918, 917, 424, 978, 992, 79, 906,
535, 515, 850, 80, 125, 378, 307, 883,
836, 160, 27, 630, 668, 226, 560, 698,
467, 829, 476, 163, 977, 367, 325, 184,
204, 312, 486, 53, 179, 592, 252, 750,
893, 517, 937, 124, 148, 719, 973, 566,
405, 449, 452, 777, 349, 761, 167, 783,
220, 802, 117, 604, 216, 363, 120, 621,
219, 182, 817, 244, 438, 465, 934, 888,
628, 209, 631, 17, 870, 679, 826, 945,
680, 848, 974, 573, 626, 865, 109, 317,
91, 494, 965, 473, 725, 388, 302, 936,
660, 150, 122, 949, 295, 392, 63, 634,
772, 143, 990, 895, 538, 59, 541, 32,
669, 321, 811, 756, 82, 955, 953, 636,
390, 162, 688, 444, 70, 590, 183, 745,
543, 666, 951, 642, 747, 765, 98, 469,
884, 929, 178, 721, 994, 840, 353, 726,
940, 759, 624, 919, 667, 629, 272, 979,
326, 608, 453, 11, 322, 347, 647, 354,
381, 746, 472, 890, 249, 536, 733, 404,
170, 959, 34, 899, 195, 651, 140, 856,
201, 237, 51, 933, 268, 849, 294, 115,
157, 14, 854, 373, 186, 872, 71, 523,
931, 952, 655, 561, 607, 862, 554, 661,
313, 909, 511, 752, 986, 311, 287, 775,
505, 878, 422, 103, 299, 119, 107, 344,
487, 776, 445, 218, 549, 697, 454, 6,
462, 455, 52, 481, 594, 126, 112, 66,
877, 172, 153, 912, 834, 741, 610, 915,
964, 831, 575, 714, 250, 461, 814, 913,
369, 542, 882, 851, 427, 838, 867, 507,
434, 569, 20, 950, 792, 605, 798, 962,
923, 258, 972, 762, 809, 843, 674, 448,
280, 495, 285, 822, 283, 147, 451, 993,
794, 982, 748, 189, 274, 96, 73, 810,
401, 261, 277, 346, 527, 645, 601, 868,
248, 879, 371, 428, 559, 278, 265, 62,
225, 853, 483, 771, 9, 8, 339, 653,
263, 28, 477, 995, 208, 880, 292, 480,
516, 457, 286, 897, 21, 852, 971, 658,
623, 528, 316, 471, 860, 306, 638, 711,
875, 671, 108, 158, 646, 24, 257, 724,
193, 341, 902, 599, 565, 334, 506, 684,
960, 780, 429, 801, 910, 308, 383, 901,
489, 81, 512, 164, 755, 514, 723, 141,
296, 958, 686, 15, 799, 579, 598, 558,
414, 64, 420, 730, 256, 131, 45, 129,
259, 338, 999, 175, 740, 790, 324, 985,
896, 482, 841, 606, 377, 111, 372, 699,
988, 233, 243, 203, 781, 969, 903, 662,
632, 301, 44, 981, 36, 412, 946, 816,
284, 447, 214, 672, 758, 954, 804, 2,
928, 886, 421, 596, 574, 16, 892, 68,
546, 522, 490, 873, 656, 696, 864, 130,
40, 393, 926, 394, 932, 876, 664, 293,
154, 916, 55, 196, 842, 498, 177, 948,
540, 127, 271, 113, 844, 576, 132, 943,
12, 123, 291, 31, 212, 529, 547, 171,
582, 609, 793, 830, 221, 440, 568, 118,
406, 194, 827, 360, 622, 389, 800, 571,
213, 262, 403, 408, 881, 289, 635, 967,
432, 376, 649, 832, 857, 717, 145, 510,
159, 980, 683, 580, 484, 379, 246, 88,
567, 320, 643, 7, 924, 397, 10, 787,
845, 779, 670, 716, 19, 600, 382, 0,
210, 665, 228, 97, 266, 90, 304, 456,
180, 152, 425, 310, 768, 223, 702, 997,
577, 663, 290, 537, 416, 426, 914, 691,
23, 281, 497, 508, 48, 681, 581, 728,
99, 795, 530, 871, 957, 889, 206, 813,
839, 709, 805, 253, 151, 613, 65, 654,
93, 639, 784, 891, 352, 67, 430, 754,
76, 187, 443, 676, 362, 961, 874, 330,
331, 384, 85, 217, 855, 818, 738, 361,
314, 3, 615, 520, 355, 920, 689, 22,
188, 49, 904, 935, 136, 475, 693, 749,
519, 812, 100, 207, 963, 364, 464, 572,
731, 230, 833, 385, 499, 545, 273, 232,
398, 478, 975, 564, 399, 504, 35, 562,
938, 211, 26, 337, 54, 614, 586, 433,
450, 763, 238, 305, 941, 370, 885, 837,
234, 110, 137, 395, 368, 695, 342, 907,
396, 474, 176, 737, 796, 446, 37, 894,
727, 648, 431, 1, 366, 525, 553, 704,
329, 627, 479, 33, 492, 260, 241, 86,
185, 491, 966, 247, 13, 587, 602, 409,
335, 650, 235, 611, 470, 442, 597, 254,
343, 539, 146, 585, 593, 641, 770, 94,
976, 705, 181, 255, 315, 718, 526, 987,
692, 983, 595, 898, 282, 133, 439, 633,
534, 861, 269, 619, 677, 502, 375, 224,
806, 869, 417, 584, 612, 803, 58, 84,
788, 797, 38, 700, 751, 603, 652, 57,
240, 947, 350, 270, 333, 116, 736, 69,
74, 104, 767, 318, 735, 859, 357, 555,
411, 267, 712, 675, 532, 825, 496, 927,
942, 102, 46, 192, 114, 744, 138, 998,
72, 617, 134, 846, 166, 77, 900, 5,
303, 387, 400, 47, 729, 922, 222, 197,
351, 509, 524, 165, 485, 300, 944, 380,
625, 778, 685, 29, 589, 766, 161, 391,
423, 42, 734, 552, 215, 824, 908, 229,
89, 251, 199, 616, 78, 644, 242, 722,
25, 437, 732, 956, 275, 200, 970, 753,
791, 336, 556, 847, 703, 236, 715, 75,
863, 713, 785, 911, 786, 620, 551, 413,
39, 739, 820, 808, 764, 701, 819, 173,
989, 345, 690, 459, 60, 106, 887, 996,
365, 673, 968, 513, 18, 419, 550, 588,
435, 264, 789, 340, 659, 466, 356, 288,
56, 708, 557, 488, 760, 332, 402, 168,
202, 521, 757, 205, 706, 441, 773, 231,
583, 386, 678, 618, 815, 279, 87, 533,
61, 548, 92, 169, 694, 905, 198, 121,
410, 139, 657, 640, 743, 128, 458, 866,
501, 348, 155, 276, 101, 858, 323, 359,
};
}

View File

@ -1,271 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.bootstrap;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineException;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.Channels;
/**
* A helper class which creates a new client-side {@link Channel} and makes a
* connection attempt.
*
* <h3>Configuring a channel</h3>
*
* {@link #setOption(String, Object) Options} are used to configure a channel:
*
* <pre>
* {@link ClientBootstrap} b = ...;
*
* // Options for a new channel
* b.setOption("remoteAddress", new {@link InetSocketAddress}("example.com", 8080));
* b.setOption("tcpNoDelay", true);
* b.setOption("receiveBufferSize", 1048576);
* </pre>
*
* For the detailed list of available options, please refer to
* {@link ChannelConfig} and its sub-types.
*
* <h3>Configuring a channel pipeline</h3>
*
* Every channel has its own {@link ChannelPipeline} and you can configure it
* in two ways.
*
* The recommended approach is to specify a {@link ChannelPipelineFactory} by
* calling {@link #setPipelineFactory(ChannelPipelineFactory)}.
*
* <pre>
* {@link ClientBootstrap} b = ...;
* b.setPipelineFactory(new MyPipelineFactory());
*
* public class MyPipelineFactory implements {@link ChannelPipelineFactory} {
* public {@link ChannelPipeline} getPipeline() throws Exception {
* // Create and configure a new pipeline for a new channel.
* {@link ChannelPipeline} p = {@link Channels}.pipeline();
* p.addLast("encoder", new EncodingHandler());
* p.addLast("decoder", new DecodingHandler());
* p.addLast("logic", new LogicHandler());
* return p;
* }
* }
* </pre>
* <p>
* The alternative approach, which works only in a certain situation, is to use
* the default pipeline and let the bootstrap to shallow-copy the default
* pipeline for each new channel:
*
* <pre>
* {@link ClientBootstrap} b = ...;
* {@link ChannelPipeline} p = b.getPipeline();
*
* // Add handlers to the default pipeline.
* p.addLast("encoder", new EncodingHandler());
* p.addLast("decoder", new DecodingHandler());
* p.addLast("logic", new LogicHandler());
* </pre>
*
* Please note 'shallow-copy' here means that the added {@link ChannelHandler}s
* are not cloned but only their references are added to the new pipeline.
* Therefore, you cannot use this approach if you are going to open more than
* one {@link Channel}s or run a server that accepts incoming connections to
* create its child channels.
*
* <h3>Applying different settings for different {@link Channel}s</h3>
*
* {@link ClientBootstrap} is just a helper class. It neither allocates nor
* manages any resources. What manages the resources is the
* {@link ChannelFactory} implementation you specified in the constructor of
* {@link ClientBootstrap}. Therefore, it is OK to create as many
* {@link ClientBootstrap} instances as you want with the same
* {@link ChannelFactory} to apply different settings for different
* {@link Channel}s.
* @apiviz.landmark
*/
public class ClientBootstrap extends Bootstrap {
/**
* Creates a new instance with no {@link ChannelFactory} set.
* {@link #setFactory(ChannelFactory)} must be called before any I/O
* operation is requested.
*/
public ClientBootstrap() {
}
/**
* Creates a new instance with the specified initial {@link ChannelFactory}.
*/
public ClientBootstrap(ChannelFactory channelFactory) {
super(channelFactory);
}
/**
* Attempts a new connection with the current {@code "remoteAddress"} and
* {@code "localAddress"} option. If the {@code "localAddress"} option is
* not set, the local address of a new channel is determined automatically.
* This method is similar to the following code:
*
* <pre>
* {@link ClientBootstrap} b = ...;
* b.connect(b.getOption("remoteAddress"), b.getOption("localAddress"));
* </pre>
*
* @return a future object which notifies when this connection attempt
* succeeds or fails
*
* @throws IllegalStateException
* if {@code "remoteAddress"} option was not set
* @throws ClassCastException
* if {@code "remoteAddress"} or {@code "localAddress"} option's
* value is neither a {@link SocketAddress} nor {@code null}
* @throws ChannelPipelineException
* if this bootstrap's {@link #setPipelineFactory(ChannelPipelineFactory) pipelineFactory}
* failed to create a new {@link ChannelPipeline}
*/
public ChannelFuture connect() {
SocketAddress remoteAddress = (SocketAddress) getOption("remoteAddress");
if (remoteAddress == null) {
throw new IllegalStateException("remoteAddress option is not set.");
}
return connect(remoteAddress);
}
/**
* Attempts a new connection with the specified {@code remoteAddress} and
* the current {@code "localAddress"} option. If the {@code "localAddress"}
* option is not set, the local address of a new channel is determined
* automatically. This method is identical with the following code:
*
* <pre>
* {@link ClientBootstrap} b = ...;
* b.connect(remoteAddress, b.getOption("localAddress"));
* </pre>
*
* @return a future object which notifies when this connection attempt
* succeeds or fails
*
* @throws ClassCastException
* if {@code "localAddress"} option's value is
* neither a {@link SocketAddress} nor {@code null}
* @throws ChannelPipelineException
* if this bootstrap's {@link #setPipelineFactory(ChannelPipelineFactory) pipelineFactory}
* failed to create a new {@link ChannelPipeline}
*/
public ChannelFuture connect(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
SocketAddress localAddress = (SocketAddress) getOption("localAddress");
return connect(remoteAddress, localAddress);
}
/**
* Attempts a new connection with the specified {@code remoteAddress} and
* the specified {@code localAddress}. If the specified local address is
* {@code null}, the local address of a new channel is determined
* automatically.
*
* @return a future object which notifies when this connection attempt
* succeeds or fails
*
* @throws ChannelPipelineException
* if this bootstrap's {@link #setPipelineFactory(ChannelPipelineFactory) pipelineFactory}
* failed to create a new {@link ChannelPipeline}
*/
public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
ChannelPipeline pipeline;
try {
pipeline = getPipelineFactory().getPipeline();
} catch (Exception e) {
throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
}
// Set the options.
Channel ch = getFactory().newChannel(pipeline);
boolean success = false;
try {
ch.getConfig().setOptions(getOptions());
success = true;
} finally {
if (!success) {
ch.close();
}
}
// Bind.
if (localAddress != null) {
ch.bind(localAddress);
}
// Connect.
return ch.connect(remoteAddress);
}
/**
* Attempts to bind a channel with the specified {@code localAddress}. later the channel can be connected
* to a remoteAddress by calling {@link Channel#connect(SocketAddress)}.This method is useful where bind and connect
* need to be done in separate steps. (For example, SCTP Multihoming)
*
* @return a future object which notifies when this bind attempt
* succeeds or fails
*
* @throws ChannelPipelineException
* if this bootstrap's {@link #setPipelineFactory(ChannelPipelineFactory) pipelineFactory}
* failed to create a new {@link ChannelPipeline}
*/
public ChannelFuture bind(final SocketAddress localAddress) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
ChannelPipeline pipeline;
try {
pipeline = getPipelineFactory().getPipeline();
} catch (Exception e) {
throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
}
// Set the options.
Channel ch = getFactory().newChannel(pipeline);
boolean success = false;
try {
ch.getConfig().setOptions(getOptions());
success = true;
} finally {
if (!success) {
ch.close();
}
}
// Bind.
return ch.bind(localAddress);
}
}

View File

@ -1,316 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.bootstrap;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineException;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.Channels;
/**
* A helper class which creates a new server-side {@link Channel} for a
* connectionless transport.
*
* <h3>Only for connectionless transports</h3>
*
* This bootstrap is for connectionless transports only such as UDP/IP.
* Use {@link ServerBootstrap} instead for connection oriented transports.
* Do not use this helper if you are using a connection oriented transport such
* as TCP/IP and local transport which accepts an incoming connection and lets
* the accepted child channels handle received messages.
*
* <h3>Configuring channels</h3>
*
* {@link #setOption(String, Object) Options} are used to configure a channel:
*
* <pre>
* {@link ConnectionlessBootstrap} b = ...;
*
* // Options for a new channel
* b.setOption("localAddress", new {@link InetSocketAddress}(8080));
* b.setOption("tcpNoDelay", true);
* b.setOption("receiveBufferSize", 1048576);
* </pre>
*
* For the detailed list of available options, please refer to
* {@link ChannelConfig} and its sub-types.
*
* <h3>Configuring a channel pipeline</h3>
*
* Every channel has its own {@link ChannelPipeline} and you can configure it
* in two ways.
*
* The recommended approach is to specify a {@link ChannelPipelineFactory} by
* calling {@link #setPipelineFactory(ChannelPipelineFactory)}.
*
* <pre>
* {@link ConnectionlessBootstrap} b = ...;
* b.setPipelineFactory(new MyPipelineFactory());
*
* public class MyPipelineFactory implements {@link ChannelPipelineFactory} {
* public {@link ChannelPipeline} getPipeline() throws Exception {
* // Create and configure a new pipeline for a new channel.
* {@link ChannelPipeline} p = {@link Channels}.pipeline();
* p.addLast("encoder", new EncodingHandler());
* p.addLast("decoder", new DecodingHandler());
* p.addLast("logic", new LogicHandler());
* return p;
* }
* }
* </pre>
* <p>
* The alternative approach, which works only in a certain situation, is to use
* the default pipeline and let the bootstrap to shallow-copy the default
* pipeline for each new channel:
*
* <pre>
* {@link ConnectionlessBootstrap} b = ...;
* {@link ChannelPipeline} p = b.getPipeline();
*
* // Add handlers to the default pipeline.
* p.addLast("encoder", new EncodingHandler());
* p.addLast("decoder", new DecodingHandler());
* p.addLast("logic", new LogicHandler());
* </pre>
*
* Please note 'shallow-copy' here means that the added {@link ChannelHandler}s
* are not cloned but only their references are added to the new pipeline.
* Therefore, you cannot use this approach if you are going to open more than
* one {@link Channel}s or run a server that accepts incoming connections to
* create its child channels.
*
* <h3>Applying different settings for different {@link Channel}s</h3>
*
* {@link ConnectionlessBootstrap} is just a helper class. It neither
* allocates nor manages any resources. What manages the resources is the
* {@link ChannelFactory} implementation you specified in the constructor of
* {@link ConnectionlessBootstrap}. Therefore, it is OK to create as
* many {@link ConnectionlessBootstrap} instances as you want with the same
* {@link ChannelFactory} to apply different settings for different
* {@link Channel}s.
* @apiviz.landmark
*/
public class ConnectionlessBootstrap extends Bootstrap {
/**
* Creates a new instance with no {@link ChannelFactory} set.
* {@link #setFactory(ChannelFactory)} must be called before any I/O
* operation is requested.
*/
public ConnectionlessBootstrap() {
}
/**
* Creates a new instance with the specified initial {@link ChannelFactory}.
*/
public ConnectionlessBootstrap(ChannelFactory channelFactory) {
super(channelFactory);
}
/**
* Creates a new channel which is bound to the local address which was
* specified in the current {@code "localAddress"} option. This method is
* similar to the following code:
*
* <pre>
* {@link ConnectionlessBootstrap} b = ...;
* b.bind(b.getOption("localAddress"));
* </pre>
*
* @return a new bound channel which accepts incoming connections
*
* @throws IllegalStateException
* if {@code "localAddress"} option was not set
* @throws ClassCastException
* if {@code "localAddress"} option's value is
* neither a {@link SocketAddress} nor {@code null}
* @throws ChannelException
* if failed to create a new channel and
* bind it to the local address
*/
public Channel bind() {
SocketAddress localAddress = (SocketAddress) getOption("localAddress");
if (localAddress == null) {
throw new IllegalStateException("localAddress option is not set.");
}
return bind(localAddress);
}
/**
* Creates a new channel which is bound to the specified local address.
*
* @return a new bound channel which accepts incoming connections
*
* @throws ChannelException
* if failed to create a new channel and
* bind it to the local address
*/
public Channel bind(final SocketAddress localAddress) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
ChannelPipeline pipeline;
try {
pipeline = getPipelineFactory().getPipeline();
} catch (Exception e) {
throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
}
Channel ch = getFactory().newChannel(pipeline);
// Apply options.
boolean success = false;
try {
ch.getConfig().setOptions(getOptions());
success = true;
} finally {
if (!success) {
ch.close();
}
}
// Bind
ChannelFuture future = ch.bind(localAddress);
// Wait for the future.
future.awaitUninterruptibly();
if (!future.isSuccess()) {
future.channel().close().awaitUninterruptibly();
throw new ChannelException("Failed to bind to: " + localAddress, future.cause());
}
return ch;
}
/**
* Creates a new connected channel with the current {@code "remoteAddress"}
* and {@code "localAddress"} option. If the {@code "localAddress"} option
* is not set, the local address of a new channel is determined
* automatically. This method is similar to the following code:
*
* <pre>
* {@link ConnectionlessBootstrap} b = ...;
* b.connect(b.getOption("remoteAddress"), b.getOption("localAddress"));
* </pre>
*
* @return a future object which notifies when the creation of the connected
* channel succeeds or fails
*
* @throws IllegalStateException
* if {@code "remoteAddress"} option was not set
* @throws ClassCastException
* if {@code "remoteAddress"} or {@code "localAddress"} option's
* value is neither a {@link SocketAddress} nor {@code null}
* @throws ChannelPipelineException
* if this bootstrap's {@link #setPipelineFactory(ChannelPipelineFactory) pipelineFactory}
* failed to create a new {@link ChannelPipeline}
*/
public ChannelFuture connect() {
SocketAddress remoteAddress = (SocketAddress) getOption("remoteAddress");
if (remoteAddress == null) {
throw new IllegalStateException("remoteAddress option is not set.");
}
return connect(remoteAddress);
}
/**
* Creates a new connected channel with the specified
* {@code "remoteAddress"} and the current {@code "localAddress"} option.
* If the {@code "localAddress"} option is not set, the local address of
* a new channel is determined automatically. This method is identical
* with the following code:
*
* <pre>
* {@link ConnectionlessBootstrap} b = ...;
* b.connect(remoteAddress, b.getOption("localAddress"));
* </pre>
*
* @return a future object which notifies when the creation of the connected
* channel succeeds or fails
*
* @throws ClassCastException
* if {@code "localAddress"} option's value is
* neither a {@link SocketAddress} nor {@code null}
* @throws ChannelPipelineException
* if this bootstrap's {@link #setPipelineFactory(ChannelPipelineFactory) pipelineFactory}
* failed to create a new {@link ChannelPipeline}
*/
public ChannelFuture connect(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remotedAddress");
}
SocketAddress localAddress = (SocketAddress) getOption("localAddress");
return connect(remoteAddress, localAddress);
}
/**
* Creates a new connected channel with the specified
* {@code "remoteAddress"} and the specified {@code "localAddress"}.
* If the specified local address is {@code null}, the local address of a
* new channel is determined automatically.
*
* @return a future object which notifies when the creation of the connected
* channel succeeds or fails
*
* @throws ChannelPipelineException
* if this bootstrap's {@link #setPipelineFactory(ChannelPipelineFactory) pipelineFactory}
* failed to create a new {@link ChannelPipeline}
*/
public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
ChannelPipeline pipeline;
try {
pipeline = getPipelineFactory().getPipeline();
} catch (Exception e) {
throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
}
// Set the options.
Channel ch = getFactory().newChannel(pipeline);
boolean success = false;
try {
ch.getConfig().setOptions(getOptions());
success = true;
} finally {
if (!success) {
ch.close();
}
}
// Bind.
if (localAddress != null) {
ch.bind(localAddress);
}
// Connect.
return ch.connect(remoteAddress);
}
}

View File

@ -1,367 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.bootstrap;
import static io.netty.channel.Channels.*;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.Channels;
import io.netty.channel.ChildChannelStateEvent;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.ServerChannelFactory;
import io.netty.channel.SimpleChannelUpstreamHandler;
/**
* A helper class which creates a new server-side {@link Channel} and accepts
* incoming connections.
*
* <h3>Only for connection oriented transports</h3>
*
* This bootstrap is for connection oriented transports only such as TCP/IP
* and local transport. Use {@link ConnectionlessBootstrap} instead for
* connectionless transports. Do not use this helper if you are using a
* connectionless transport such as UDP/IP which does not accept an incoming
* connection but receives messages by itself without creating a child channel.
*
* <h3>Parent channel and its children</h3>
*
* A parent channel is a channel which is supposed to accept incoming
* connections. It is created by this bootstrap's {@link ChannelFactory} via
* {@link #bind()} and {@link #bind(SocketAddress)}.
* <p>
* Once successfully bound, the parent channel starts to accept incoming
* connections, and the accepted connections become the children of the
* parent channel.
*
* <h3>Configuring channels</h3>
*
* {@link #setOption(String, Object) Options} are used to configure both a
* parent channel and its child channels. To configure the child channels,
* prepend {@code "child."} prefix to the actual option names of a child
* channel:
*
* <pre>
* {@link ServerBootstrap} b = ...;
*
* // Options for a parent channel
* b.setOption("localAddress", new {@link InetSocketAddress}(8080));
* b.setOption("reuseAddress", true);
*
* // Options for its children
* b.setOption("child.tcpNoDelay", true);
* b.setOption("child.receiveBufferSize", 1048576);
* </pre>
*
* For the detailed list of available options, please refer to
* {@link ChannelConfig} and its sub-types.
*
* <h3>Configuring a parent channel pipeline</h3>
*
* It is rare to customize the pipeline of a parent channel because what it is
* supposed to do is very typical. However, you might want to add a handler
* to deal with some special needs such as degrading the process
* <a href="http://en.wikipedia.org/wiki/User_identifier_(Unix)">UID</a> from
* a <a href="http://en.wikipedia.org/wiki/Superuser">superuser</a> to a
* normal user and changing the current VM security manager for better
* security. To support such a case,
* the {@link #setParentHandler(ChannelHandler) parentHandler} property is
* provided.
*
* <h3>Configuring a child channel pipeline</h3>
*
* Every channel has its own {@link ChannelPipeline} and you can configure it
* in two ways.
*
* The recommended approach is to specify a {@link ChannelPipelineFactory} by
* calling {@link #setPipelineFactory(ChannelPipelineFactory)}.
*
* <pre>
* {@link ServerBootstrap} b = ...;
* b.setPipelineFactory(new MyPipelineFactory());
*
* public class MyPipelineFactory implements {@link ChannelPipelineFactory} {
* public {@link ChannelPipeline} getPipeline() throws Exception {
* // Create and configure a new pipeline for a new channel.
* {@link ChannelPipeline} p = {@link Channels}.pipeline();
* p.addLast("encoder", new EncodingHandler());
* p.addLast("decoder", new DecodingHandler());
* p.addLast("logic", new LogicHandler());
* return p;
* }
* }
* </pre>
* <p>
* The alternative approach, which works only in a certain situation, is to use
* the default pipeline and let the bootstrap to shallow-copy the default
* pipeline for each new channel:
*
* <pre>
* {@link ServerBootstrap} b = ...;
* {@link ChannelPipeline} p = b.getPipeline();
*
* // Add handlers to the default pipeline.
* p.addLast("encoder", new EncodingHandler());
* p.addLast("decoder", new DecodingHandler());
* p.addLast("logic", new LogicHandler());
* </pre>
*
* Please note 'shallow-copy' here means that the added {@link ChannelHandler}s
* are not cloned but only their references are added to the new pipeline.
* Therefore, you cannot use this approach if you are going to open more than
* one {@link Channel}s or run a server that accepts incoming connections to
* create its child channels.
*
* <h3>Applying different settings for different {@link Channel}s</h3>
*
* {@link ServerBootstrap} is just a helper class. It neither allocates nor
* manages any resources. What manages the resources is the
* {@link ChannelFactory} implementation you specified in the constructor of
* {@link ServerBootstrap}. Therefore, it is OK to create as many
* {@link ServerBootstrap} instances as you want with the same
* {@link ChannelFactory} to apply different settings for different
* {@link Channel}s.
* @apiviz.landmark
*/
public class ServerBootstrap extends Bootstrap {
private volatile ChannelHandler parentHandler;
/**
* Creates a new instance with no {@link ChannelFactory} set.
* {@link #setFactory(ChannelFactory)} must be called before any I/O
* operation is requested.
*/
public ServerBootstrap() {
}
/**
* Creates a new instance with the specified initial {@link ChannelFactory}.
*/
public ServerBootstrap(ChannelFactory channelFactory) {
super(channelFactory);
}
/**
* {@inheritDoc}
*
* @throws IllegalArgumentException
* if the specified {@code factory} is not a
* {@link ServerChannelFactory}
*/
@Override
public void setFactory(ChannelFactory factory) {
if (factory == null) {
throw new NullPointerException("factory");
}
if (!(factory instanceof ServerChannelFactory)) {
throw new IllegalArgumentException(
"factory must be a " +
ServerChannelFactory.class.getSimpleName() + ": " +
factory.getClass());
}
super.setFactory(factory);
}
/**
* Returns an optional {@link ChannelHandler} which intercepts an event
* of a newly bound server-side channel which accepts incoming connections.
*
* @return the parent channel handler.
* {@code null} if no parent channel handler is set.
*/
public ChannelHandler getParentHandler() {
return parentHandler;
}
/**
* Sets an optional {@link ChannelHandler} which intercepts an event of
* a newly bound server-side channel which accepts incoming connections.
*
* @param parentHandler
* the parent channel handler.
* {@code null} to unset the current parent channel handler.
*/
public void setParentHandler(ChannelHandler parentHandler) {
this.parentHandler = parentHandler;
}
/**
* Creates a new channel which is bound to the local address which was
* specified in the current {@code "localAddress"} option. This method is
* similar to the following code:
*
* <pre>
* {@link ServerBootstrap} b = ...;
* b.bind(b.getOption("localAddress"));
* </pre>
*
* @return a new bound channel which accepts incoming connections
*
* @throws IllegalStateException
* if {@code "localAddress"} option was not set
* @throws ClassCastException
* if {@code "localAddress"} option's value is
* neither a {@link SocketAddress} nor {@code null}
* @throws ChannelException
* if failed to create a new channel and
* bind it to the local address
*/
public Channel bind() {
SocketAddress localAddress = (SocketAddress) getOption("localAddress");
if (localAddress == null) {
throw new IllegalStateException("localAddress option is not set.");
}
return bind(localAddress);
}
/**
* Creates a new channel which is bound to the specified local address.
*
* @return a new bound channel which accepts incoming connections
*
* @throws ChannelException
* if failed to create a new channel and
* bind it to the local address
*/
public Channel bind(final SocketAddress localAddress) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
final BlockingQueue<ChannelFuture> futureQueue =
new LinkedBlockingQueue<ChannelFuture>();
ChannelHandler binder = new Binder(localAddress, futureQueue);
ChannelHandler parentHandler = getParentHandler();
ChannelPipeline bossPipeline = pipeline();
bossPipeline.addLast("binder", binder);
if (parentHandler != null) {
bossPipeline.addLast("userHandler", parentHandler);
}
Channel channel = getFactory().newChannel(bossPipeline);
// Wait until the future is available.
ChannelFuture future = null;
boolean interrupted = false;
do {
try {
future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
interrupted = true;
}
} while (future == null);
if (interrupted) {
Thread.currentThread().interrupt();
}
// Wait for the future.
future.awaitUninterruptibly();
if (!future.isSuccess()) {
future.channel().close().awaitUninterruptibly();
throw new ChannelException("Failed to bind to: " + localAddress, future.cause());
}
return channel;
}
private final class Binder extends SimpleChannelUpstreamHandler {
private final SocketAddress localAddress;
private final BlockingQueue<ChannelFuture> futureQueue;
private final Map<String, Object> childOptions =
new HashMap<String, Object>();
Binder(SocketAddress localAddress, BlockingQueue<ChannelFuture> futureQueue) {
this.localAddress = localAddress;
this.futureQueue = futureQueue;
}
@Override
public void channelOpen(
ChannelHandlerContext ctx,
ChannelStateEvent evt) {
try {
evt.channel().getConfig().setPipelineFactory(getPipelineFactory());
// Split options into two categories: parent and child.
Map<String, Object> allOptions = getOptions();
Map<String, Object> parentOptions = new HashMap<String, Object>();
for (Entry<String, Object> e: allOptions.entrySet()) {
if (e.getKey().startsWith("child.")) {
childOptions.put(
e.getKey().substring(6),
e.getValue());
} else if (!e.getKey().equals("pipelineFactory")) {
parentOptions.put(e.getKey(), e.getValue());
}
}
// Apply parent options.
evt.channel().getConfig().setOptions(parentOptions);
} finally {
ctx.sendUpstream(evt);
}
boolean finished = futureQueue.offer(evt.channel().bind(localAddress));
assert finished;
}
@Override
public void childChannelOpen(
ChannelHandlerContext ctx,
ChildChannelStateEvent e) throws Exception {
// Apply child options.
try {
e.getChildChannel().getConfig().setOptions(childOptions);
} catch (Throwable t) {
Channels.fireExceptionCaught(e.getChildChannel(), t);
}
ctx.sendUpstream(e);
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
boolean finished = futureQueue.offer(failedFuture(e.channel(), e.cause()));
assert finished;
ctx.sendUpstream(e);
}
}
}

View File

@ -1,24 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* IoC/DI friendly helper classes which enable an easy implementation of
* typical client side and server side channel initialization.
*
* @apiviz.landmark
* @apiviz.exclude ^io\.netty\.util\.
*/
package io.netty.bootstrap;

View File

@ -367,7 +367,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (AbstractChannel.this.eventLoop != null) {
if (isRegistered()) {
throw new IllegalStateException("registered to an event loop already");
}
if (!isCompatible(eventLoop)) {
@ -581,10 +581,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
future.setSuccess();
registered = false;
future.setSuccess();
pipeline().fireChannelUnregistered();
eventLoop = null;
}
} else {
eventLoop().execute(new Runnable() {

View File

@ -0,0 +1,164 @@
package io.netty.channel;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
public class ChannelBuilder {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelBuilder.class);
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
private EventLoop eventLoop;
private Channel channel;
private ChannelHandler initializer;
private SocketAddress localAddress;
private SocketAddress remoteAddress;
public ChannelBuilder eventLoop(EventLoop eventLoop) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
this.eventLoop = eventLoop;
return this;
}
public ChannelBuilder channel(Channel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;
return this;
}
public <T> ChannelBuilder option(ChannelOption<T> option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
if (value == null) {
options.remove(option);
} else {
options.put(option, value);
}
return this;
}
public ChannelBuilder initializer(ChannelHandler initializer) {
if (initializer == null) {
throw new NullPointerException("initializer");
}
this.initializer = initializer;
return this;
}
public ChannelBuilder localAddress(SocketAddress localAddress) {
this.localAddress = localAddress;
return this;
}
public ChannelBuilder remoteAddress(SocketAddress remoteAddress) {
this.remoteAddress = remoteAddress;
return this;
}
public ChannelFuture bind() {
validate();
return bind(channel.newFuture());
}
public ChannelFuture bind(ChannelFuture future) {
validate();
if (localAddress == null) {
throw new IllegalStateException("localAddress not set");
}
try {
init();
} catch (Throwable t) {
future.setFailure(t);
return future;
}
return channel.bind(localAddress, future).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
public ChannelFuture connect() {
validate();
return connect(channel.newFuture());
}
public ChannelFuture connect(ChannelFuture future) {
validate();
if (remoteAddress == null) {
throw new IllegalStateException("remoteAddress not set");
}
try {
init();
} catch (Throwable t) {
future.setFailure(t);
return future;
}
if (localAddress == null) {
channel.connect(remoteAddress, future);
} else {
channel.connect(remoteAddress, localAddress, future);
}
return future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
private void init() throws Exception {
if (channel.isActive()) {
throw new IllegalStateException("channel already active:: " + channel);
}
if (channel.isRegistered()) {
throw new IllegalStateException("channel already registered: " + channel);
}
if (!channel.isOpen()) {
throw new ClosedChannelException();
}
ChannelPipeline p = channel.pipeline();
p.addLast(generateName(initializer), initializer);
for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
try {
if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + channel, t);
}
}
eventLoop.register(channel).syncUninterruptibly();
}
public void validate() {
if (eventLoop == null) {
throw new IllegalStateException("eventLoop not set");
}
if (channel == null) {
throw new IllegalStateException("channel not set");
}
if (initializer == null) {
throw new IllegalStateException("initializer not set");
}
}
static String generateName(ChannelHandler handler) {
String type = handler.getClass().getSimpleName();
StringBuilder buf = new StringBuilder(type.length() + 10);
buf.append(type);
buf.append("-0");
buf.append(Long.toHexString(System.identityHashCode(handler) & 0xFFFFFFFFL | 0x100000000L));
buf.setCharAt(buf.length() - 9, 'x');
return buf.toString();
}
}

View File

@ -15,8 +15,6 @@
*/
package io.netty.channel;
import io.netty.bootstrap.ClientBootstrap;
import java.util.concurrent.TimeUnit;
/**
@ -247,7 +245,7 @@ public interface ChannelFuture {
* {@linkplain #isDone() done}. If this future is already
* completed, the specified listener is notified immediately.
*/
void addListener(ChannelFutureListener listener);
ChannelFuture addListener(ChannelFutureListener listener);
/**
* Removes the specified listener from this future.
@ -256,13 +254,21 @@ public interface ChannelFuture {
* listener is not associated with this future, this method
* does nothing and returns silently.
*/
void removeListener(ChannelFutureListener listener);
ChannelFuture removeListener(ChannelFutureListener listener);
/**
* Rethrows the exception that caused this future fail if this future is
* complete and failed.
* Waits for this future until it is done, and rethrows the cause of the failure if this future
* failed. If the cause of the failure is a checked exception, it is wrapped with a new
* {@link ChannelException} before being thrown.
*/
ChannelFuture rethrowIfFailed() throws Exception;
ChannelFuture sync() throws InterruptedException;
/**
* Waits for this future until it is done, and rethrows the cause of the failure if this future
* failed. If the cause of the failure is a checked exception, it is wrapped with a new
* {@link ChannelException} before being thrown.
*/
ChannelFuture syncUninterruptibly();
/**
* Waits for this future to be completed.

View File

@ -39,7 +39,7 @@ public interface ChannelFutureListener extends EventListener {
ChannelFutureListener CLOSE = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
future.channel().close(null);
future.channel().close();
}
};
@ -51,7 +51,7 @@ public interface ChannelFutureListener extends EventListener {
@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
future.channel().close(null);
future.channel().close();
}
}
};

View File

@ -15,7 +15,6 @@
*/
package io.netty.channel;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.group.ChannelGroup;
import java.lang.annotation.Documented;
@ -24,6 +23,7 @@ import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.nio.channels.Channels;
/**
* Handles or intercepts a {@link ChannelEvent}, and sends a

View File

@ -0,0 +1,89 @@
package io.netty.channel;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
public abstract class ChannelInitializer extends ChannelInboundHandlerAdapter<Object> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);
public abstract void initChannel(Channel ch) throws Exception;
@Override
public ChannelBufferHolder<Object> newInboundBuffer(
ChannelInboundHandlerContext<Object> ctx) throws Exception {
return ChannelBufferHolders.inboundBypassBuffer(ctx);
}
@Override
public final void beforeAdd(ChannelHandlerContext ctx) throws Exception {
super.beforeAdd(ctx);
}
@Override
public final void afterAdd(ChannelHandlerContext ctx) throws Exception {
super.afterAdd(ctx);
}
@Override
public final void beforeRemove(ChannelHandlerContext ctx) throws Exception {
super.beforeRemove(ctx);
}
@Override
public final void afterRemove(ChannelHandlerContext ctx) throws Exception {
super.afterRemove(ctx);
}
@Override
public final void channelRegistered(ChannelInboundHandlerContext<Object> ctx)
throws Exception {
try {
initChannel(ctx.channel());
ctx.pipeline().remove(this);
// Note that we do not call ctx.fireChannelRegistered() because a user might have
// inserted a handler before the initializer using pipeline.addFirst().
System.out.println(ctx.pipeline().toMap());
ctx.pipeline().fireChannelRegistered();
} catch (Throwable t) {
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel());
ctx.close();
}
}
@Override
public final void channelUnregistered(ChannelInboundHandlerContext<Object> ctx)
throws Exception {
super.channelUnregistered(ctx);
}
@Override
public final void channelActive(ChannelInboundHandlerContext<Object> ctx)
throws Exception {
super.channelActive(ctx);
}
@Override
public final void channelInactive(ChannelInboundHandlerContext<Object> ctx)
throws Exception {
super.channelInactive(ctx);
}
@Override
public final void exceptionCaught(ChannelInboundHandlerContext<Object> ctx,
Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
@Override
public final void userEventTriggered(ChannelInboundHandlerContext<Object> ctx,
Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
}
@Override
public final void inboundBufferUpdated(ChannelInboundHandlerContext<Object> ctx)
throws Exception {
super.inboundBufferUpdated(ctx);
}
}

View File

@ -9,6 +9,8 @@ import java.util.concurrent.ConcurrentMap;
public class ChannelOption<T> implements Comparable<ChannelOption<T>> {
private static final ConcurrentMap<String, Boolean> names = new ConcurrentHashMap<String, Boolean>();
public static final ChannelOption<Integer> CONNECT_TIMEOUT_MILLIS =
new ChannelOption<Integer>("CONNECT_TIMEOUT_MILLIS", Integer.class);
public static final ChannelOption<Integer> WRITE_SPIN_COUNT =
@ -73,8 +75,6 @@ public class ChannelOption<T> implements Comparable<ChannelOption<T>> {
public static final ChannelOption<SocketAddress> SCTP_SET_PEER_PRIMARY_ADDR =
new ChannelOption<SocketAddress>("SCTP_SET_PEER_PRIMARY_ADDR", SocketAddress.class);
private static final ConcurrentMap<String, Boolean> names = new ConcurrentHashMap<String, Boolean>();
private final String name;
private final Class<T> valueType;
private final String strVal;

View File

@ -44,7 +44,7 @@ public abstract class CompleteChannelFuture implements ChannelFuture {
}
@Override
public void addListener(final ChannelFutureListener listener) {
public ChannelFuture addListener(final ChannelFutureListener listener) {
if (channel().eventLoop().inEventLoop()) {
notifyListener(listener);
} else {
@ -55,6 +55,7 @@ public abstract class CompleteChannelFuture implements ChannelFuture {
}
});
}
return this;
}
private void notifyListener(ChannelFutureListener listener) {
@ -70,8 +71,9 @@ public abstract class CompleteChannelFuture implements ChannelFuture {
}
@Override
public void removeListener(ChannelFutureListener listener) {
public ChannelFuture removeListener(ChannelFutureListener listener) {
// NOOP
return this;
}
@Override

View File

@ -119,7 +119,7 @@ public class DefaultChannelFuture implements ChannelFuture {
}
@Override
public void addListener(final ChannelFutureListener listener) {
public ChannelFuture addListener(final ChannelFutureListener listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
@ -160,10 +160,12 @@ public class DefaultChannelFuture implements ChannelFuture {
});
}
}
return this;
}
@Override
public void removeListener(ChannelFutureListener listener) {
public ChannelFuture removeListener(ChannelFutureListener listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
@ -185,28 +187,39 @@ public class DefaultChannelFuture implements ChannelFuture {
}
}
}
return this;
}
@Override
public ChannelFuture rethrowIfFailed() throws Exception {
if (!isDone()) {
throw new IllegalStateException("not done yet");
}
public ChannelFuture sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
@Override
public ChannelFuture syncUninterruptibly() {
awaitUninterruptibly();
rethrowIfFailed();
return this;
}
private void rethrowIfFailed() {
Throwable cause = cause();
if (cause == null) {
return this;
return;
}
if (cause instanceof Exception) {
throw (Exception) cause;
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
if (cause instanceof Error) {
throw (Error) cause;
}
throw new RuntimeException(cause);
throw new ChannelException(cause);
}
@Override
@ -333,7 +346,7 @@ public class DefaultChannelFuture implements ChannelFuture {
}
}
private void checkDeadLock() {
private static void checkDeadLock() {
if (isUseDeadLockChecker() && DeadLockProofWorker.PARENT.get() != null) {
throw new IllegalStateException(
"await*() in I/O thread causes a dead lock or " +

View File

@ -15,6 +15,8 @@
*/
package io.netty.channel;
import java.nio.channels.Channels;
/**
* The {@link CompleteChannelFuture} which is failed already. It is
* recommended to use {@link Channels#failedFuture(Channel, Throwable)}
@ -49,15 +51,24 @@ public class FailedChannelFuture extends CompleteChannelFuture {
}
@Override
public ChannelFuture rethrowIfFailed() throws Exception {
if (cause instanceof Exception) {
throw (Exception) cause;
public ChannelFuture sync() throws InterruptedException {
return rethrow();
}
@Override
public ChannelFuture syncUninterruptibly() {
return rethrow();
}
private ChannelFuture rethrow() {
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
if (cause instanceof Error) {
throw (Error) cause;
}
throw new RuntimeException(cause);
throw new ChannelException(cause);
}
}

View File

@ -0,0 +1,193 @@
package io.netty.channel;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.SocketAddresses;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
public class ServerChannelBuilder {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerChannelBuilder.class);
private static final InetSocketAddress DEFAULT_LOCAL_ADDR = new InetSocketAddress(SocketAddresses.LOCALHOST, 0);
private final Acceptor acceptor = new Acceptor();
private final Map<ChannelOption<?>, Object> parentOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private EventLoop parentEventLoop;
private EventLoop childEventLoop;
private ServerChannel parentChannel;
private ChannelHandler parentInitializer;
private ChannelHandler childInitializer;
private SocketAddress localAddress;
public ServerChannelBuilder parentEventLoop(EventLoop parentEventLoop) {
if (parentEventLoop == null) {
throw new NullPointerException("parentEventLoop");
}
this.parentEventLoop = parentEventLoop;
return this;
}
public ServerChannelBuilder childEventLoop(EventLoop childEventLoop) {
if (childEventLoop == null) {
throw new NullPointerException("childEventLoop");
}
this.childEventLoop = childEventLoop;
return this;
}
public ServerChannelBuilder parentChannel(ServerChannel parentChannel) {
if (parentChannel == null) {
throw new NullPointerException("parentChannel");
}
this.parentChannel = parentChannel;
return this;
}
public <T> ServerChannelBuilder parentOption(ChannelOption<T> parentOption, T value) {
if (parentOption == null) {
throw new NullPointerException("parentOption");
}
if (value == null) {
parentOptions.remove(parentOption);
} else {
parentOptions.put(parentOption, value);
}
return this;
}
public <T> ServerChannelBuilder childOption(ChannelOption<T> childOption, T value) {
if (childOption == null) {
throw new NullPointerException("childOption");
}
if (value == null) {
childOptions.remove(childOption);
} else {
childOptions.put(childOption, value);
}
return this;
}
public ServerChannelBuilder parentInitializer(ChannelHandler parentInitializer) {
this.parentInitializer = parentInitializer;
return this;
}
public ServerChannelBuilder childInitializer(ChannelHandler childInitializer) {
if (childInitializer == null) {
throw new NullPointerException("childInitializer");
}
this.childInitializer = childInitializer;
return this;
}
public ServerChannelBuilder localAddress(SocketAddress localAddress) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
this.localAddress = localAddress;
return this;
}
public ChannelFuture bind() {
validate();
return bind(parentChannel.newFuture());
}
public ChannelFuture bind(ChannelFuture future) {
validate();
if (parentChannel.isActive()) {
future.setFailure(new IllegalStateException("parentChannel already bound: " + parentChannel));
return future;
}
if (parentChannel.isRegistered()) {
future.setFailure(new IllegalStateException("parentChannel already registered: " + parentChannel));
return future;
}
if (!parentChannel.isOpen()) {
future.setFailure(new ClosedChannelException());
return future;
}
ChannelPipeline p = parentChannel.pipeline();
if (parentInitializer != null) {
p.addLast(ChannelBuilder.generateName(parentInitializer), parentInitializer);
}
p.addLast(ChannelBuilder.generateName(acceptor), acceptor);
ChannelFuture f = parentEventLoop.register(parentChannel).awaitUninterruptibly();
if (!f.isSuccess()) {
future.setFailure(f.cause());
return future;
}
parentChannel.bind(localAddress, future).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
return future;
}
public void validate() {
if (parentEventLoop == null) {
throw new IllegalStateException("parentEventLoop not set");
}
if (parentChannel == null) {
throw new IllegalStateException("parentChannel not set");
}
if (childInitializer == null) {
throw new IllegalStateException("childInitializer not set");
}
if (childEventLoop == null) {
logger.warn("childEventLoop is not set. Using parentEventLoop instead.");
childEventLoop = parentEventLoop;
}
if (localAddress == null) {
logger.warn("localAddress is not set. Using " + DEFAULT_LOCAL_ADDR + " instead.");
localAddress = DEFAULT_LOCAL_ADDR;
}
}
private class Acceptor extends ChannelInboundHandlerAdapter<Channel> {
@Override
public ChannelBufferHolder<Channel> newInboundBuffer(ChannelInboundHandlerContext<Channel> ctx) {
return ChannelBufferHolders.messageBuffer(new ArrayDeque<Channel>());
}
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<Channel> ctx) {
Queue<Channel> in = ctx.in().messageBuffer();
for (;;) {
Channel child = in.poll();
if (child == null) {
break;
}
child.pipeline().addLast(ChannelBuilder.generateName(childInitializer), childInitializer);
for (Entry<ChannelOption<?>, Object> e: childOptions.entrySet()) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
try {
childEventLoop.register(child);
} catch (Throwable t) {
logger.warn("Failed to register an accepted channel: " + child, t);
}
}
}
}
}

View File

@ -15,6 +15,8 @@
*/
package io.netty.channel;
import java.nio.channels.Channels;
/**
* The {@link CompleteChannelFuture} which is succeeded already. It is
* recommended to use {@link Channels#succeededFuture(Channel)} instead of
@ -42,7 +44,12 @@ public class SucceededChannelFuture extends CompleteChannelFuture {
}
@Override
public ChannelFuture rethrowIfFailed() throws Exception {
public ChannelFuture sync() throws InterruptedException {
return this;
}
@Override
public ChannelFuture syncUninterruptibly() {
return this;
}
}

View File

@ -19,13 +19,15 @@ public class VoidChannelFuture implements ChannelFuture.Unsafe {
}
@Override
public void addListener(final ChannelFutureListener listener) {
public ChannelFuture addListener(final ChannelFutureListener listener) {
fail();
return this;
}
@Override
public void removeListener(ChannelFutureListener listener) {
public ChannelFuture removeListener(ChannelFutureListener listener) {
// NOOP
return this;
}
@Override
@ -92,7 +94,13 @@ public class VoidChannelFuture implements ChannelFuture.Unsafe {
}
@Override
public ChannelFuture rethrowIfFailed() throws Exception {
public ChannelFuture sync() throws InterruptedException {
fail();
return this;
}
@Override
public ChannelFuture syncUninterruptibly() {
fail();
return this;
}

View File

@ -172,7 +172,7 @@ public class SelectorEventLoop extends SingleThreadEventLoop {
}
}
private void processTaskQueue() throws IOException {
private void processTaskQueue() {
for (;;) {
final Runnable task = pollTask();
if (task == null) {
@ -184,7 +184,7 @@ public class SelectorEventLoop extends SingleThreadEventLoop {
}
}
private void processSelectedKeys() throws IOException {
private void processSelectedKeys() {
for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
final SelectionKey k = i.next();
final Channel ch = (Channel) k.attachment();
@ -222,16 +222,17 @@ public class SelectorEventLoop extends SingleThreadEventLoop {
}
}
private boolean cleanUpCancelledKeys() throws IOException {
private boolean cleanUpCancelledKeys() {
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
selector.selectNow();
SelectorUtil.cleanupKeys(selector);
return true;
}
return false;
}
private void closeAll() {
SelectorUtil.cleanupKeys(selector);
Set<SelectionKey> keys = selector.keys();
Collection<Channel> channels = new ArrayList<Channel>(keys.size());
for (SelectionKey k: keys) {

View File

@ -22,7 +22,7 @@ import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.Selector;
public final class SelectorUtil {
final class SelectorUtil {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SelectorUtil.class);
@ -64,6 +64,14 @@ public final class SelectorUtil {
}
}
static void cleanupKeys(Selector selector) {
try {
selector.selectNow();
} catch (Throwable t) {
logger.warn("Failed to update SelectionKeys.", t);
}
}
private SelectorUtil() {
// Unused
}

View File

@ -101,7 +101,12 @@ public class CompleteChannelFutureTest {
}
@Override
public ChannelFuture rethrowIfFailed() throws Exception {
public ChannelFuture sync() throws InterruptedException {
throw new Error();
}
@Override
public ChannelFuture syncUninterruptibly() {
throw new Error();
}
}