From 22a815eaf8802183102fa188283ee9c2a4d7bb53 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Thu, 12 Apr 2012 17:39:01 +0900 Subject: [PATCH] Revamp channel handler API - Merged LifeCycleAwareChannelHandler into ChannelHandler - Replaced ChannelUpstreamHandler and ChannelDownstreamHandler with ChannelReader and ChannelWriter - These two new interfaces are much more type-safe than its ancestor. - Simplified channel state model as described in #68 - Handler creates send/receive buffer. - Previously, Netty created them, but it led to more memory copies and inflexibility. I'm going to allow a handler to create a bounded queue for example. - It currently uses Queue but I'll define a new interface and make ChannelBuffer implement it (e.g. Queue) - Introduced AttributeMap which replaces attachments in Channel and ChannelHandlerContext and ChannelLocal --- .../main/java/io/netty/util/Attribute.java | 10 + .../main/java/io/netty/util/AttributeKey.java | 59 +++ .../main/java/io/netty/util/AttributeMap.java | 5 + .../channel/ChannelDownstreamHandler.java | 85 ----- .../java/io/netty/channel/ChannelHandler.java | 11 +- .../netty/channel/ChannelHandlerAdapter.java | 115 ++++++ .../netty/channel/ChannelHandlerContext.java | 100 ++--- .../java/io/netty/channel/ChannelReader.java | 19 + .../netty/channel/ChannelReaderAdapter.java | 73 ++++ .../netty/channel/ChannelReaderContext.java | 7 + .../netty/channel/ChannelUpstreamHandler.java | 88 ----- .../java/io/netty/channel/ChannelWriter.java | 16 + .../netty/channel/ChannelWriterAdapter.java | 69 ++++ .../netty/channel/ChannelWriterContext.java | 7 + .../channel/LifeCycleAwareChannelHandler.java | 36 -- .../SimpleChannelDownstreamHandler.java | 163 -------- .../netty/channel/SimpleChannelHandler.java | 348 ------------------ .../channel/SimpleChannelUpstreamHandler.java | 235 ------------ 18 files changed, 420 insertions(+), 1026 deletions(-) create mode 100644 common/src/main/java/io/netty/util/Attribute.java create mode 100644 common/src/main/java/io/netty/util/AttributeKey.java create mode 100644 common/src/main/java/io/netty/util/AttributeMap.java delete mode 100644 transport/src/main/java/io/netty/channel/ChannelDownstreamHandler.java create mode 100644 transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java create mode 100644 transport/src/main/java/io/netty/channel/ChannelReader.java create mode 100644 transport/src/main/java/io/netty/channel/ChannelReaderAdapter.java create mode 100644 transport/src/main/java/io/netty/channel/ChannelReaderContext.java delete mode 100644 transport/src/main/java/io/netty/channel/ChannelUpstreamHandler.java create mode 100644 transport/src/main/java/io/netty/channel/ChannelWriter.java create mode 100644 transport/src/main/java/io/netty/channel/ChannelWriterAdapter.java create mode 100644 transport/src/main/java/io/netty/channel/ChannelWriterContext.java delete mode 100644 transport/src/main/java/io/netty/channel/LifeCycleAwareChannelHandler.java delete mode 100644 transport/src/main/java/io/netty/channel/SimpleChannelDownstreamHandler.java delete mode 100644 transport/src/main/java/io/netty/channel/SimpleChannelHandler.java delete mode 100644 transport/src/main/java/io/netty/channel/SimpleChannelUpstreamHandler.java diff --git a/common/src/main/java/io/netty/util/Attribute.java b/common/src/main/java/io/netty/util/Attribute.java new file mode 100644 index 0000000000..7986ac66b4 --- /dev/null +++ b/common/src/main/java/io/netty/util/Attribute.java @@ -0,0 +1,10 @@ +package io.netty.util; + +public interface Attribute { + T get(); + void set(T value); + T getAndSet(T value); + T setIfAbsent(T value); + boolean compareAndSet(T oldValue, T newValue); + void remove(); +} diff --git a/common/src/main/java/io/netty/util/AttributeKey.java b/common/src/main/java/io/netty/util/AttributeKey.java new file mode 100644 index 0000000000..1527891528 --- /dev/null +++ b/common/src/main/java/io/netty/util/AttributeKey.java @@ -0,0 +1,59 @@ +package io.netty.util; + +import java.io.Serializable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public final class AttributeKey implements Serializable, Comparable> { + + private static final long serialVersionUID = 2783354860083517323L; + + private static final ConcurrentMap names = new ConcurrentHashMap(); + + private final String name; + private final Class valueType; + + public AttributeKey(String name, Class valueType) { + if (name == null) { + throw new NullPointerException("name"); + } + if (valueType == null) { + throw new NullPointerException("valueType"); + } + + if (names.putIfAbsent(name, Boolean.TRUE) != null) { + throw new IllegalArgumentException("key name already in use: " + name); + } + + this.name = name; + this.valueType = valueType; + } + + public String name() { + return name; + } + + public Class valueType() { + return valueType; + } + + @Override + public int hashCode() { + return System.identityHashCode(this); + } + + @Override + public boolean equals(Object o) { + return this == o; + } + + @Override + public int compareTo(AttributeKey o) { + return name().compareTo(o.name()); + } + + @Override + public String toString() { + return name(); + } +} diff --git a/common/src/main/java/io/netty/util/AttributeMap.java b/common/src/main/java/io/netty/util/AttributeMap.java new file mode 100644 index 0000000000..25d5e73909 --- /dev/null +++ b/common/src/main/java/io/netty/util/AttributeMap.java @@ -0,0 +1,5 @@ +package io.netty.util; + +public interface AttributeMap { + Attribute attr(AttributeKey key, Class type); +} diff --git a/transport/src/main/java/io/netty/channel/ChannelDownstreamHandler.java b/transport/src/main/java/io/netty/channel/ChannelDownstreamHandler.java deleted file mode 100644 index 3cfc13157c..0000000000 --- a/transport/src/main/java/io/netty/channel/ChannelDownstreamHandler.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel; - -/** - * Handles or intercepts a downstream {@link ChannelEvent}, and sends a - * {@link ChannelEvent} to the next handler in a {@link ChannelPipeline}. - *

- * The most common use case of this interface is to intercept an I/O request - * such as {@link Channel#write(Object)} and {@link Channel#close()}. - * - *

{@link SimpleChannelDownstreamHandler}

- *

- * In most cases, you will get to use a {@link SimpleChannelDownstreamHandler} - * to implement a downstream handler because it provides an individual handler - * method for each event type. You might want to implement this interface - * directly though if you want to handle various types of events in more - * generic way. - * - *

Firing an event to the next handler

- *

- * You can forward the received event downstream or upstream. In most cases, - * {@link ChannelDownstreamHandler} will send the event downstream - * (i.e. outbound) although it is legal to send the event upstream (i.e. inbound): - * - *

- * // Sending the event downstream (outbound)
- * void handleDownstream({@link ChannelHandlerContext} ctx, {@link ChannelEvent} e) throws Exception {
- *     ...
- *     ctx.sendDownstream(e);
- *     ...
- * }
- *
- * // Sending the event upstream (inbound)
- * void handleDownstream({@link ChannelHandlerContext} ctx, {@link ChannelEvent} e) throws Exception {
- *     ...
- *     ctx.sendUpstream(new {@link UpstreamChannelStateEvent}(...));
- *     ...
- * }
- * 
- * - *

Using the helper class to send an event

- *

- * You will also find various helper methods in {@link Channels} to be useful - * to generate and send an artificial or manipulated event. - *

- * Caution: - *

- * Use the *Later(..) methods of the {@link Channels} class if you want to send an upstream event from a {@link ChannelDownstreamHandler} otherwise you may run into threading issues. - * - *

State management

- * - * Please refer to {@link ChannelHandler}. - * - *

Thread safety

- *

- * {@link #handleDownstream(ChannelHandlerContext, ChannelEvent) handleDownstream} - * may be invoked by more than one thread simultaneously. If the handler - * accesses a shared resource or stores stateful information, you might need - * proper synchronization in the handler implementation. - * @apiviz.exclude ^io\.netty\.handler\..*$ - */ -public interface ChannelDownstreamHandler extends ChannelHandler { - - /** - * Handles the specified downstream event. - * - * @param ctx the context object for this handler - * @param e the downstream event to process or intercept - */ - void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception; -} diff --git a/transport/src/main/java/io/netty/channel/ChannelHandler.java b/transport/src/main/java/io/netty/channel/ChannelHandler.java index 4bd6bfd4f4..6d5d49240c 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandler.java @@ -15,6 +15,9 @@ */ package io.netty.channel; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.group.ChannelGroup; + import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; @@ -22,9 +25,6 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.group.ChannelGroup; - /** * Handles or intercepts a {@link ChannelEvent}, and sends a * {@link ChannelEvent} to the next handler in a {@link ChannelPipeline}. @@ -207,6 +207,11 @@ import io.netty.channel.group.ChannelGroup; */ public interface ChannelHandler { + void beforeAdd(ChannelHandlerContext ctx) throws Exception; + void afterAdd(ChannelHandlerContext ctx) throws Exception; + void beforeRemove(ChannelHandlerContext ctx) throws Exception; + void afterRemove(ChannelHandlerContext ctx) throws Exception; + /** * Indicates that the same instance of the annotated {@link ChannelHandler} * can be added to one or more {@link ChannelPipeline}s multiple times diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java new file mode 100644 index 0000000000..586738eb16 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java @@ -0,0 +1,115 @@ +package io.netty.channel; + +import io.netty.util.internal.QueueFactory; + +import java.net.SocketAddress; +import java.util.Queue; + +public class ChannelHandlerAdapter implements ChannelReader, ChannelWriter { + @Override + public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + // Do nothing by default. + } + + @Override + public void afterAdd(ChannelHandlerContext ctx) throws Exception { + // Do nothing by default. + } + + @Override + public void beforeRemove(ChannelHandlerContext ctx) throws Exception { + // Do nothing by default. + } + + @Override + public void afterRemove(ChannelHandlerContext ctx) throws Exception { + // Do nothing by default. + } + + @Override + public void channelRegistered(ChannelReaderContext ctx) throws Exception { + ctx.next().channelRegistered(); + } + + @Override + public void channelUnregistered(ChannelReaderContext ctx) throws Exception { + ctx.next().channelUnregistered(); + } + + @Override + public void channelActive(ChannelReaderContext ctx) throws Exception { + ctx.next().channelActive(); + } + + @Override + public void channelInactive(ChannelReaderContext ctx) throws Exception { + ctx.next().channelInactive(); + } + + @Override + public void exceptionCaught(ChannelReaderContext ctx, Throwable cause) throws Exception { + ctx.next().exceptionCaught(cause); + } + + @Override + public void userEventTriggered(ChannelReaderContext ctx, Object evt) throws Exception { + ctx.next().userEventTriggered(evt); + } + + @Override + @SuppressWarnings("unchecked") + public Queue newReceiveBuffer(ChannelReaderContext ctx) throws Exception { + return (Queue) QueueFactory.createQueue(Object.class); + } + + @Override + public void receiveBufferUpdated(ChannelReaderContext ctx) throws Exception { + ctx.in().transferTo(ctx.next().in()); + } + + @Override + public void receiveBufferClosed(ChannelReaderContext ctx) throws Exception { + ctx.next().in().close(); + } + + @Override + public void bind(ChannelWriterContext ctx, SocketAddress localAddress, ChannelFuture future) throws Exception { + ctx.next().bind(localAddress, future); + } + + @Override + public void connect(ChannelWriterContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) throws Exception { + ctx.next().connect(remoteAddress, localAddress, future); + } + + @Override + public void disconnect(ChannelWriterContext ctx, ChannelFuture future) throws Exception { + ctx.next().disconnect(future); + } + + @Override + public void close(ChannelWriterContext ctx, ChannelFuture future) throws Exception { + ctx.next().close(future); + } + + @Override + public void deregister(ChannelWriterContext ctx, ChannelFuture future) throws Exception { + ctx.next().deregister(future); + } + + @Override + @SuppressWarnings("unchecked") + public Queue newSendBuffer(ChannelWriterContext ctx) throws Exception { + return (Queue) QueueFactory.createQueue(Object.class); + } + + @Override + public void sendBufferUpdated(ChannelWriterContext ctx) throws Exception { + ctx.out().transferTo(ctx.next().out()); + } + + @Override + public void sendBufferClosed(ChannelWriterContext ctx) throws Exception { + ctx.next().out().close(); + } +} diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java index 078139be69..3602c44607 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java @@ -16,6 +16,11 @@ package io.netty.channel; +import io.netty.util.AttributeMap; + +import java.net.SocketAddress; +import java.util.Queue; + /** * Enables a {@link ChannelHandler} to interact with its {@link ChannelPipeline} * and other handlers. A handler can send a {@link ChannelEvent} upstream or @@ -118,75 +123,34 @@ package io.netty.channel; * pipeline, and how to handle the event in your application. * @apiviz.owns io.netty.channel.ChannelHandler */ -public interface ChannelHandlerContext { +public interface ChannelHandlerContext extends AttributeMap { - /** - * Returns the {@link Channel} that the {@link ChannelPipeline} belongs to. - * This method is a shortcut to getPipeline().getChannel(). - */ - Channel getChannel(); + String name(); + Channel channel(); + ChannelReader handler(); + NextHandler next(); - /** - * Returns the {@link ChannelPipeline} that the {@link ChannelHandler} - * belongs to. - */ - ChannelPipeline getPipeline(); + // XXX: What happens if inbound queue is bounded (limited capacity) and it's full? + // 1) EventLoop removes OP_READ + // 2) Once the first inbound buffer is drained to some level, EventLoop adds OP_READ again. + // * To achieve this, EventLoop has to specify a wrapped Queue when calling inboundBufferUpdated. + interface NextHandler { + // For readers + void channelRegistered(); + void channelUnregistered(); + void channelActive(); + void channelInactive(); + void exceptionCaught(Throwable cause); + void userEventTriggered(Object event); + Queue in(); - /** - * Returns the name of the {@link ChannelHandler} in the - * {@link ChannelPipeline}. - */ - String getName(); - - /** - * Returns the {@link ChannelHandler} that this context object is - * serving. - */ - ChannelHandler getHandler(); - - /** - * Returns {@code true} if and only if the {@link ChannelHandler} is an - * instance of {@link ChannelUpstreamHandler}. - */ - boolean canHandleUpstream(); - - /** - * Returns {@code true} if and only if the {@link ChannelHandler} is an - * instance of {@link ChannelDownstreamHandler}. - */ - boolean canHandleDownstream(); - - /** - * Sends the specified {@link ChannelEvent} to the - * {@link ChannelUpstreamHandler} which is placed in the closest upstream - * from the handler associated with this context. It is recommended to use - * the shortcut methods in {@link Channels} rather than calling this method - * directly. - */ - void sendUpstream(ChannelEvent e); - - /** - * Sends the specified {@link ChannelEvent} to the - * {@link ChannelDownstreamHandler} which is placed in the closest - * downstream from the handler associated with this context. It is - * recommended to use the shortcut methods in {@link Channels} rather than - * calling this method directly. - */ - void sendDownstream(ChannelEvent e); - - /** - * Retrieves an object which is {@link #setAttachment(Object) attached} to - * this context. - * - * @return {@code null} if no object was attached or - * {@code null} was attached - */ - Object getAttachment(); - - /** - * Attaches an object to this context to store a stateful information - * specific to the {@link ChannelHandler} which is associated with this - * context. - */ - void setAttachment(Object attachment); + // For writers + void bind(SocketAddress localAddress, ChannelFuture future); + void connect(SocketAddress remoteAddress, ChannelFuture future); + void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future); + void disconnect(ChannelFuture future); + void close(ChannelFuture future); + void deregister(ChannelFuture future); + Queue out(); + } } diff --git a/transport/src/main/java/io/netty/channel/ChannelReader.java b/transport/src/main/java/io/netty/channel/ChannelReader.java new file mode 100644 index 0000000000..4baa787eb3 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelReader.java @@ -0,0 +1,19 @@ +package io.netty.channel; + +import java.util.Queue; + +public interface ChannelReader extends ChannelHandler { + + void channelRegistered(ChannelReaderContext ctx) throws Exception; + void channelUnregistered(ChannelReaderContext ctx) throws Exception; + + void channelActive(ChannelReaderContext ctx) throws Exception; + void channelInactive(ChannelReaderContext ctx) throws Exception; + + void exceptionCaught(ChannelReaderContext ctx, Throwable cause) throws Exception; + void userEventTriggered(ChannelReaderContext ctx, Object evt) throws Exception; + + Queue newReceiveBuffer(ChannelReaderContext ctx) throws Exception; + void receiveBufferUpdated(ChannelReaderContext ctx) throws Exception; + void receiveBufferClosed(ChannelReaderContext ctx) throws Exception; +} diff --git a/transport/src/main/java/io/netty/channel/ChannelReaderAdapter.java b/transport/src/main/java/io/netty/channel/ChannelReaderAdapter.java new file mode 100644 index 0000000000..08e0bf814f --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelReaderAdapter.java @@ -0,0 +1,73 @@ +package io.netty.channel; + +import io.netty.util.internal.QueueFactory; + +import java.util.Queue; + +public class ChannelReaderAdapter implements ChannelReader { + @Override + public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + // Do nothing by default. + } + + @Override + public void afterAdd(ChannelHandlerContext ctx) throws Exception { + // Do nothing by default. + } + + @Override + public void beforeRemove(ChannelHandlerContext ctx) throws Exception { + // Do nothing by default. + } + + @Override + public void afterRemove(ChannelHandlerContext ctx) throws Exception { + // Do nothing by default. + } + + @Override + public void channelRegistered(ChannelReaderContext ctx) throws Exception { + ctx.next().channelRegistered(); + } + + @Override + public void channelUnregistered(ChannelReaderContext ctx) throws Exception { + ctx.next().channelUnregistered(); + } + + @Override + public void channelActive(ChannelReaderContext ctx) throws Exception { + ctx.next().channelActive(); + } + + @Override + public void channelInactive(ChannelReaderContext ctx) throws Exception { + ctx.next().channelInactive(); + } + + @Override + public void exceptionCaught(ChannelReaderContext ctx, Throwable cause) throws Exception { + ctx.next().exceptionCaught(cause); + } + + @Override + public void userEventTriggered(ChannelReaderContext ctx, Object evt) throws Exception { + ctx.next().userEventTriggered(evt); + } + + @Override + @SuppressWarnings("unchecked") + public Queue newReceiveBuffer(ChannelReaderContext ctx) throws Exception { + return (Queue) QueueFactory.createQueue(Object.class); + } + + @Override + public void receiveBufferUpdated(ChannelReaderContext ctx) throws Exception { + ctx.in().transferTo(ctx.next().in()); + } + + @Override + public void receiveBufferClosed(ChannelReaderContext ctx) throws Exception { + ctx.next().in().close(); + } +} diff --git a/transport/src/main/java/io/netty/channel/ChannelReaderContext.java b/transport/src/main/java/io/netty/channel/ChannelReaderContext.java new file mode 100644 index 0000000000..190906ed07 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelReaderContext.java @@ -0,0 +1,7 @@ +package io.netty.channel; + +import java.util.Queue; + +public interface ChannelReaderContext extends ChannelHandlerContext { + Queue in(); +} diff --git a/transport/src/main/java/io/netty/channel/ChannelUpstreamHandler.java b/transport/src/main/java/io/netty/channel/ChannelUpstreamHandler.java deleted file mode 100644 index 5203e44b19..0000000000 --- a/transport/src/main/java/io/netty/channel/ChannelUpstreamHandler.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel; - -/** - * Handles or intercepts an upstream {@link ChannelEvent}, and sends a - * {@link ChannelEvent} to the next handler in a {@link ChannelPipeline}. - *

- * The most common use case of this interface is to intercept an I/O event - * generated by I/O workers to transform the received messages or execute - * the relevant business logic. - * - *

{@link SimpleChannelUpstreamHandler}

- *

- * In most cases, you will get to use a {@link SimpleChannelUpstreamHandler} to - * implement an upstream handler because it provides an individual handler - * method for each event type. You might want to implement this interface - * directly though if you want to handle various types of events in more - * generic way. - * - *

Firing an event to the next handler

- *

- * You can forward the received event upstream or downstream. In most cases, - * {@link ChannelUpstreamHandler} will send the event upstream (i.e. inbound) - * although it is legal to send the event downstream (i.e. outbound): - * - *

- * // Sending the event upstream (inbound)
- * void handleUpstream({@link ChannelHandlerContext} ctx, {@link ChannelEvent} e) throws Exception {
- *     ...
- *     ctx.sendUpstream(e);
- *     ...
- * }
- *
- * // Sending the event downstream (outbound)
- * void handleDownstream({@link ChannelHandlerContext} ctx, {@link ChannelEvent} e) throws Exception {
- *     ...
- *     ctx.sendDownstream(new {@link DownstreamMessageEvent}(...));
- *     ...
- * }
- * 
- * - *

Using the helper class to send an event

- *

- * You will also find various helper methods in {@link Channels} to be useful - * to generate and send an artificial or manipulated event. - * - *

State management

- * - * Please refer to {@link ChannelHandler}. - * - *

Thread safety

- *

- * {@link #handleUpstream(ChannelHandlerContext, ChannelEvent) handleUpstream} - * will be invoked sequentially by the same thread (i.e. an I/O thread) and - * therefore a handler does not need to worry about being invoked with a new - * upstream event before the previous upstream event is finished. - *

- * This does not necessarily mean that there's a dedicated thread per - * {@link Channel}; the I/O thread of some transport can serve more than one - * {@link Channel} (e.g. NIO transport), while the I/O thread of other - * transports can serve only one (e.g. OIO transport). - * - * @apiviz.exclude ^io\.netty\.handler\..*$ - */ -public interface ChannelUpstreamHandler extends ChannelHandler { - - /** - * Handles the specified upstream event. - * - * @param ctx the context object for this handler - * @param e the upstream event to process or intercept - */ - void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception; -} diff --git a/transport/src/main/java/io/netty/channel/ChannelWriter.java b/transport/src/main/java/io/netty/channel/ChannelWriter.java new file mode 100644 index 0000000000..9fa870dcee --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelWriter.java @@ -0,0 +1,16 @@ +package io.netty.channel; + +import java.net.SocketAddress; +import java.util.Queue; + +public interface ChannelWriter extends ChannelHandler { + void bind(ChannelWriterContext ctx, SocketAddress localAddress, ChannelFuture future) throws Exception; + void connect(ChannelWriterContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) throws Exception; + void disconnect(ChannelWriterContext ctx, ChannelFuture future) throws Exception; + void close(ChannelWriterContext ctx, ChannelFuture future) throws Exception; + void deregister(ChannelWriterContext ctx, ChannelFuture future) throws Exception; + + Queue newSendBuffer(ChannelWriterContext ctx) throws Exception; + void sendBufferUpdated(ChannelWriterContext ctx) throws Exception; + void sendBufferClosed(ChannelWriterContext ctx) throws Exception; +} diff --git a/transport/src/main/java/io/netty/channel/ChannelWriterAdapter.java b/transport/src/main/java/io/netty/channel/ChannelWriterAdapter.java new file mode 100644 index 0000000000..2e1ba102a1 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelWriterAdapter.java @@ -0,0 +1,69 @@ +package io.netty.channel; + +import io.netty.util.internal.QueueFactory; + +import java.net.SocketAddress; +import java.util.Queue; + +public class ChannelWriterAdapter implements ChannelWriter { + @Override + public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + // Do nothing by default. + } + + @Override + public void afterAdd(ChannelHandlerContext ctx) throws Exception { + // Do nothing by default. + } + + @Override + public void beforeRemove(ChannelHandlerContext ctx) throws Exception { + // Do nothing by default. + } + + @Override + public void afterRemove(ChannelHandlerContext ctx) throws Exception { + // Do nothing by default. + } + + @Override + public void bind(ChannelWriterContext ctx, SocketAddress localAddress, ChannelFuture future) throws Exception { + ctx.next().bind(localAddress, future); + } + + @Override + public void connect(ChannelWriterContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) throws Exception { + ctx.next().connect(remoteAddress, localAddress, future); + } + + @Override + public void disconnect(ChannelWriterContext ctx, ChannelFuture future) throws Exception { + ctx.next().disconnect(future); + } + + @Override + public void close(ChannelWriterContext ctx, ChannelFuture future) throws Exception { + ctx.next().close(future); + } + + @Override + public void deregister(ChannelWriterContext ctx, ChannelFuture future) throws Exception { + ctx.next().deregister(future); + } + + @Override + @SuppressWarnings("unchecked") + public Queue newSendBuffer(ChannelWriterContext ctx) throws Exception { + return (Queue) QueueFactory.createQueue(Object.class); + } + + @Override + public void sendBufferUpdated(ChannelWriterContext ctx) throws Exception { + ctx.out().transferTo(ctx.next().out()); + } + + @Override + public void sendBufferClosed(ChannelWriterContext ctx) throws Exception { + ctx.next().out().close(); + } +} diff --git a/transport/src/main/java/io/netty/channel/ChannelWriterContext.java b/transport/src/main/java/io/netty/channel/ChannelWriterContext.java new file mode 100644 index 0000000000..faa8452f29 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelWriterContext.java @@ -0,0 +1,7 @@ +package io.netty.channel; + +import java.util.Queue; + +public interface ChannelWriterContext extends ChannelHandlerContext { + Queue out(); +} diff --git a/transport/src/main/java/io/netty/channel/LifeCycleAwareChannelHandler.java b/transport/src/main/java/io/netty/channel/LifeCycleAwareChannelHandler.java deleted file mode 100644 index 44bffc0234..0000000000 --- a/transport/src/main/java/io/netty/channel/LifeCycleAwareChannelHandler.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel; - -/** - * A {@link ChannelHandler} that is notified when it is added to or removed - * from a {@link ChannelPipeline}. - * - *

Invalid access to the {@link ChannelHandlerContext}

- * - * Calling {@link ChannelHandlerContext#sendUpstream(ChannelEvent)} or - * {@link ChannelHandlerContext#sendDownstream(ChannelEvent)} in - * {@link #beforeAdd(ChannelHandlerContext)} or {@link #afterRemove(ChannelHandlerContext)} - * might lead to an unexpected behavior. It is because the context object - * might not have been fully added to the pipeline or the context object is not - * a part of the pipeline anymore respectively. - */ -public interface LifeCycleAwareChannelHandler extends ChannelHandler { - void beforeAdd(ChannelHandlerContext ctx) throws Exception; - void afterAdd(ChannelHandlerContext ctx) throws Exception; - void beforeRemove(ChannelHandlerContext ctx) throws Exception; - void afterRemove(ChannelHandlerContext ctx) throws Exception; -} diff --git a/transport/src/main/java/io/netty/channel/SimpleChannelDownstreamHandler.java b/transport/src/main/java/io/netty/channel/SimpleChannelDownstreamHandler.java deleted file mode 100644 index f98de0bf90..0000000000 --- a/transport/src/main/java/io/netty/channel/SimpleChannelDownstreamHandler.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel; - -import java.net.SocketAddress; - - -/** - * A {@link ChannelDownstreamHandler} which provides an individual handler - * method for each event type. This handler down-casts the received downstream - * event into more meaningful sub-type event and calls an appropriate handler - * method with the down-cast event. The names of the methods starts with the - * name of the operation and ends with {@code "Requested"} - * (e.g. {@link #writeRequested(ChannelHandlerContext, MessageEvent) writeRequested}.) - *

- * Please use {@link SimpleChannelHandler} if you need to implement both - * {@link ChannelUpstreamHandler} and {@link ChannelDownstreamHandler}. - * - *

Overriding the {@link #handleDownstream(ChannelHandlerContext, ChannelEvent) handleDownstream} method

- *

- * You can override the {@link #handleDownstream(ChannelHandlerContext, ChannelEvent) handleDownstream} - * method just like overriding an ordinary Java method. Please make sure to - * call {@code super.handleDownstream()} so that other handler methods are - * invoked properly: - *

- *
public class MyChannelHandler extends {@link SimpleChannelDownstreamHandler} {
- *
- *     {@code @Override}
- *     public void handleDownstream({@link ChannelHandlerContext} ctx, {@link ChannelEvent} e) throws Exception {
- *
- *         // Log all channel state changes.
- *         if (e instanceof {@link MessageEvent}) {
- *             logger.info("Writing:: " + e);
- *         }
- *
- *         super.handleDownstream(ctx, e);
- *     }
- * }
- * - *

- * Caution: - *

- * Use the *Later(..) methods of the {@link Channels} class if you want to send an upstream event from a {@link ChannelDownstreamHandler} otherwise you may run into threading issues. - * - */ -public class SimpleChannelDownstreamHandler implements ChannelDownstreamHandler { - - /** - * Creates a new instance. - */ - public SimpleChannelDownstreamHandler() { - } - - /** - * {@inheritDoc} Down-casts the received downstream event into more - * meaningful sub-type event and calls an appropriate handler method with - * the down-casted event. - */ - @Override - public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) - throws Exception { - - if (e instanceof MessageEvent) { - writeRequested(ctx, (MessageEvent) e); - } else if (e instanceof ChannelStateEvent) { - ChannelStateEvent evt = (ChannelStateEvent) e; - switch (evt.getState()) { - case OPEN: - if (!Boolean.TRUE.equals(evt.getValue())) { - closeRequested(ctx, evt); - } - break; - case BOUND: - if (evt.getValue() != null) { - bindRequested(ctx, evt); - } else { - unbindRequested(ctx, evt); - } - break; - case CONNECTED: - if (evt.getValue() != null) { - connectRequested(ctx, evt); - } else { - disconnectRequested(ctx, evt); - } - break; - case INTEREST_OPS: - setInterestOpsRequested(ctx, evt); - break; - default: - ctx.sendDownstream(e); - } - } else { - ctx.sendDownstream(e); - } - } - - /** - * Invoked when {@link Channel#write(Object)} is called. - */ - public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - ctx.sendDownstream(e); - } - - /** - * Invoked when {@link Channel#bind(SocketAddress)} was called. - */ - public void bindRequested(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendDownstream(e); - - } - - /** - * Invoked when {@link Channel#connect(SocketAddress)} was called. - */ - public void connectRequested(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendDownstream(e); - - } - - /** - * Invoked when {@link Channel#setInterestOps(int)} was called. - */ - public void setInterestOpsRequested(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendDownstream(e); - } - - /** - * Invoked when {@link Channel#disconnect()} was called. - */ - public void disconnectRequested(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendDownstream(e); - - } - - /** - * Invoked when {@link Channel#unbind()} was called. - */ - public void unbindRequested(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendDownstream(e); - - } - - /** - * Invoked when {@link Channel#close()} was called. - */ - public void closeRequested(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendDownstream(e); - } -} diff --git a/transport/src/main/java/io/netty/channel/SimpleChannelHandler.java b/transport/src/main/java/io/netty/channel/SimpleChannelHandler.java deleted file mode 100644 index eb72156693..0000000000 --- a/transport/src/main/java/io/netty/channel/SimpleChannelHandler.java +++ /dev/null @@ -1,348 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel; - -import java.net.SocketAddress; - -import io.netty.buffer.ChannelBuffer; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; - - -/** - * A {@link ChannelHandler} which provides an individual handler method - * for each event type. This handler down-casts the received upstream or - * or downstream event into more meaningful sub-type event and calls an - * appropriate handler method with the down-cast event. For an upstream - * event, the names of the methods are identical to the upstream event names, - * as introduced in the {@link ChannelEvent} documentation. For a - * downstream event, the names of the methods starts with the name of the - * operation and ends with {@code "Requested"} - * (e.g. {@link #writeRequested(ChannelHandlerContext, MessageEvent) writeRequested}.) - *

- * Please use {@link SimpleChannelUpstreamHandler} or - * {@link SimpleChannelDownstreamHandler} if you want to intercept only - * upstream or downstream events. - * - *

Overriding the {@link #handleUpstream(ChannelHandlerContext, ChannelEvent) handleUpstream} - * and {@link #handleDownstream(ChannelHandlerContext, ChannelEvent) handleDownstream} method

- *

- * You can override the {@link #handleUpstream(ChannelHandlerContext, ChannelEvent) handleUpstream} - * and {@link #handleDownstream(ChannelHandlerContext, ChannelEvent) handleDownstream} - * method just like overriding an ordinary Java method. Please make sure to - * call {@code super.handleUpstream()} or {@code super.handleDownstream()} so - * that other handler methods are invoked properly: - *

- *
public class MyChannelHandler extends {@link SimpleChannelHandler} {
- *
- *     {@code @Override}
- *     public void handleUpstream({@link ChannelHandlerContext} ctx, {@link ChannelEvent} e) throws Exception {
- *
- *         // Log all channel state changes.
- *         if (e instanceof {@link ChannelStateEvent}) {
- *             logger.info("Channel state changed: " + e);
- *         }
- *
- *         super.handleUpstream(ctx, e);
- *     }
- *
- *     {@code @Override}
- *     public void handleDownstream({@link ChannelHandlerContext} ctx, {@link ChannelEvent} e) throws Exception {
- *
- *         // Log all channel state changes.
- *         if (e instanceof {@link MessageEvent}) {
- *             logger.info("Writing:: " + e);
- *         }
- *
- *         super.handleDownstream(ctx, e);
- *     }
- * }
- */ -public class SimpleChannelHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler { - - private static final InternalLogger logger = - InternalLoggerFactory.getInstance(SimpleChannelHandler.class.getName()); - - /** - * Creates a new instance. - */ - public SimpleChannelHandler() { - } - - /** - * {@inheritDoc} Down-casts the received upstream event into more - * meaningful sub-type event and calls an appropriate handler method with - * the down-casted event. - */ - @Override - public void handleUpstream( - ChannelHandlerContext ctx, ChannelEvent e) throws Exception { - - if (e instanceof MessageEvent) { - messageReceived(ctx, (MessageEvent) e); - } else if (e instanceof WriteCompletionEvent) { - WriteCompletionEvent evt = (WriteCompletionEvent) e; - writeComplete(ctx, evt); - } else if (e instanceof ChildChannelStateEvent) { - ChildChannelStateEvent evt = (ChildChannelStateEvent) e; - if (evt.getChildChannel().isOpen()) { - childChannelOpen(ctx, evt); - } else { - childChannelClosed(ctx, evt); - } - } else if (e instanceof ChannelStateEvent) { - ChannelStateEvent evt = (ChannelStateEvent) e; - switch (evt.getState()) { - case OPEN: - if (Boolean.TRUE.equals(evt.getValue())) { - channelOpen(ctx, evt); - } else { - channelClosed(ctx, evt); - } - break; - case BOUND: - if (evt.getValue() != null) { - channelBound(ctx, evt); - } else { - channelUnbound(ctx, evt); - } - break; - case CONNECTED: - if (evt.getValue() != null) { - channelConnected(ctx, evt); - } else { - channelDisconnected(ctx, evt); - } - break; - case INTEREST_OPS: - channelInterestChanged(ctx, evt); - break; - default: - ctx.sendUpstream(e); - } - } else if (e instanceof ExceptionEvent) { - exceptionCaught(ctx, (ExceptionEvent) e); - } else { - ctx.sendUpstream(e); - } - } - - /** - * Invoked when a message object (e.g: {@link ChannelBuffer}) was received - * from a remote peer. - */ - public void messageReceived( - ChannelHandlerContext ctx, MessageEvent e) throws Exception { - ctx.sendUpstream(e); - } - - /** - * Invoked when an exception was raised by an I/O thread or a - * {@link ChannelHandler}. - */ - public void exceptionCaught( - ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { - if (this == ctx.getPipeline().getLast()) { - logger.warn( - "EXCEPTION, please implement " + getClass().getName() + - ".exceptionCaught() for proper handling.", e.getCause()); - } - ctx.sendUpstream(e); - } - - /** - * Invoked when a {@link Channel} is open, but not bound nor connected. - */ - public void channelOpen( - ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendUpstream(e); - } - - /** - * Invoked when a {@link Channel} is open and bound to a local address, - * but not connected. - */ - public void channelBound( - ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendUpstream(e); - } - - /** - * Invoked when a {@link Channel} is open, bound to a local address, and - * connected to a remote address. - */ - public void channelConnected( - ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendUpstream(e); - } - - /** - * Invoked when a {@link Channel}'s {@link Channel#getInterestOps() interestOps} - * was changed. - */ - public void channelInterestChanged( - ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendUpstream(e); - } - - /** - * Invoked when a {@link Channel} was disconnected from its remote peer. - */ - public void channelDisconnected( - ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendUpstream(e); - } - - /** - * Invoked when a {@link Channel} was unbound from the current local address. - */ - public void channelUnbound( - ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendUpstream(e); - } - - /** - * Invoked when a {@link Channel} was closed and all its related resources - * were released. - */ - public void channelClosed( - ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendUpstream(e); - } - - /** - * Invoked when something was written into a {@link Channel}. - */ - public void writeComplete( - ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception { - ctx.sendUpstream(e); - } - - /** - * Invoked when a child {@link Channel} was open. - * (e.g. a server channel accepted a connection) - */ - public void childChannelOpen( - ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception { - ctx.sendUpstream(e); - } - - /** - * Invoked when a child {@link Channel} was closed. - * (e.g. the accepted connection was closed) - */ - public void childChannelClosed( - ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception { - ctx.sendUpstream(e); - } - - /** - * {@inheritDoc} Down-casts the received downstream event into more - * meaningful sub-type event and calls an appropriate handler method with - * the down-casted event. - */ - @Override - public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) - throws Exception { - - if (e instanceof MessageEvent) { - writeRequested(ctx, (MessageEvent) e); - } else if (e instanceof ChannelStateEvent) { - ChannelStateEvent evt = (ChannelStateEvent) e; - switch (evt.getState()) { - case OPEN: - if (!Boolean.TRUE.equals(evt.getValue())) { - closeRequested(ctx, evt); - } - break; - case BOUND: - if (evt.getValue() != null) { - bindRequested(ctx, evt); - } else { - unbindRequested(ctx, evt); - } - break; - case CONNECTED: - if (evt.getValue() != null) { - connectRequested(ctx, evt); - } else { - disconnectRequested(ctx, evt); - } - break; - case INTEREST_OPS: - setInterestOpsRequested(ctx, evt); - break; - default: - ctx.sendDownstream(e); - } - } else { - ctx.sendDownstream(e); - } - } - - /** - * Invoked when {@link Channel#write(Object)} is called. - */ - public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - ctx.sendDownstream(e); - } - - /** - * Invoked when {@link Channel#bind(SocketAddress)} was called. - */ - public void bindRequested(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendDownstream(e); - - } - - /** - * Invoked when {@link Channel#connect(SocketAddress)} was called. - */ - public void connectRequested(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendDownstream(e); - - } - - /** - * Invoked when {@link Channel#setInterestOps(int)} was called. - */ - public void setInterestOpsRequested(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendDownstream(e); - } - - /** - * Invoked when {@link Channel#disconnect()} was called. - */ - public void disconnectRequested(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendDownstream(e); - - } - - /** - * Invoked when {@link Channel#unbind()} was called. - */ - public void unbindRequested(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendDownstream(e); - - } - - /** - * Invoked when {@link Channel#close()} was called. - */ - public void closeRequested(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendDownstream(e); - } -} diff --git a/transport/src/main/java/io/netty/channel/SimpleChannelUpstreamHandler.java b/transport/src/main/java/io/netty/channel/SimpleChannelUpstreamHandler.java deleted file mode 100644 index c4b620a412..0000000000 --- a/transport/src/main/java/io/netty/channel/SimpleChannelUpstreamHandler.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel; - -import io.netty.buffer.ChannelBuffer; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; - - -/** - * A {@link ChannelUpstreamHandler} which provides an individual handler method - * for each event type. This handler down-casts the received upstream event - * into more meaningful sub-type event and calls an appropriate handler method - * with the down-cast event. The names of the methods are identical to the - * upstream event names, as introduced in the {@link ChannelEvent} documentation. - *

- * Please use {@link SimpleChannelHandler} if you need to implement both - * {@link ChannelUpstreamHandler} and {@link ChannelDownstreamHandler}. - * - *

Overriding the {@link #handleUpstream(ChannelHandlerContext, ChannelEvent) handleUpstream} method

- *

- * You can override the {@link #handleUpstream(ChannelHandlerContext, ChannelEvent) handleUpstream} - * method just like overriding an ordinary Java method. Please make sure to - * call {@code super.handleUpstream()} so that other handler methods are invoked - * properly: - *

- *
public class MyChannelHandler extends {@link SimpleChannelUpstreamHandler} {
- *
- *     {@code @Override}
- *     public void handleUpstream({@link ChannelHandlerContext} ctx, {@link ChannelEvent} e) throws Exception {
- *
- *         // Log all channel state changes.
- *         if (e instanceof {@link ChannelStateEvent}) {
- *             logger.info("Channel state changed: " + e);
- *         }
- *
- *         super.handleUpstream(ctx, e);
- *     }
- * }
- */ -public class SimpleChannelUpstreamHandler implements ChannelUpstreamHandler { - - private static final InternalLogger logger = - InternalLoggerFactory.getInstance(SimpleChannelUpstreamHandler.class.getName()); - - /** - * Creates a new instance. - */ - public SimpleChannelUpstreamHandler() { - } - - /** - * {@inheritDoc} Down-casts the received upstream event into more - * meaningful sub-type event and calls an appropriate handler method with - * the down-casted event. - */ - @Override - public void handleUpstream( - ChannelHandlerContext ctx, ChannelEvent e) throws Exception { - - if (e instanceof MessageEvent) { - messageReceived(ctx, (MessageEvent) e); - } else if (e instanceof WriteCompletionEvent) { - WriteCompletionEvent evt = (WriteCompletionEvent) e; - writeComplete(ctx, evt); - } else if (e instanceof ChildChannelStateEvent) { - ChildChannelStateEvent evt = (ChildChannelStateEvent) e; - if (evt.getChildChannel().isOpen()) { - childChannelOpen(ctx, evt); - } else { - childChannelClosed(ctx, evt); - } - } else if (e instanceof ChannelStateEvent) { - ChannelStateEvent evt = (ChannelStateEvent) e; - switch (evt.getState()) { - case OPEN: - if (Boolean.TRUE.equals(evt.getValue())) { - channelOpen(ctx, evt); - } else { - channelClosed(ctx, evt); - } - break; - case BOUND: - if (evt.getValue() != null) { - channelBound(ctx, evt); - } else { - channelUnbound(ctx, evt); - } - break; - case CONNECTED: - if (evt.getValue() != null) { - channelConnected(ctx, evt); - } else { - channelDisconnected(ctx, evt); - } - break; - case INTEREST_OPS: - channelInterestChanged(ctx, evt); - break; - default: - ctx.sendUpstream(e); - } - } else if (e instanceof ExceptionEvent) { - exceptionCaught(ctx, (ExceptionEvent) e); - } else { - ctx.sendUpstream(e); - } - } - - /** - * Invoked when a message object (e.g: {@link ChannelBuffer}) was received - * from a remote peer. - */ - public void messageReceived( - ChannelHandlerContext ctx, MessageEvent e) throws Exception { - ctx.sendUpstream(e); - } - - /** - * Invoked when an exception was raised by an I/O thread or a - * {@link ChannelHandler}. - */ - public void exceptionCaught( - ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { - if (this == ctx.getPipeline().getLast()) { - logger.warn( - "EXCEPTION, please implement " + getClass().getName() + - ".exceptionCaught() for proper handling.", e.getCause()); - } - ctx.sendUpstream(e); - } - - /** - * Invoked when a {@link Channel} is open, but not bound nor connected. - *
- * Be aware that this event is fired from within the Boss-Thread so you should not execute any heavy operation in there as it will block the dispatching to other workers! - */ - public void channelOpen( - ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendUpstream(e); - } - - /** - * Invoked when a {@link Channel} is open and bound to a local address, - * but not connected. - *
- * Be aware that this event is fired from within the Boss-Thread so you should not execute any heavy operation in there as it will block the dispatching to other workers! - */ - public void channelBound( - ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendUpstream(e); - } - - /** - * Invoked when a {@link Channel} is open, bound to a local address, and - * connected to a remote address. - */ - public void channelConnected( - ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendUpstream(e); - } - - /** - * Invoked when a {@link Channel}'s {@link Channel#getInterestOps() interestOps} - * was changed. - */ - public void channelInterestChanged( - ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendUpstream(e); - } - - /** - * Invoked when a {@link Channel} was disconnected from its remote peer. - */ - public void channelDisconnected( - ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendUpstream(e); - } - - /** - * Invoked when a {@link Channel} was unbound from the current local address. - */ - public void channelUnbound( - ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendUpstream(e); - } - - /** - * Invoked when a {@link Channel} was closed and all its related resources - * were released. - */ - public void channelClosed( - ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.sendUpstream(e); - } - - /** - * Invoked when something was written into a {@link Channel}. - */ - public void writeComplete( - ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception { - ctx.sendUpstream(e); - } - - /** - * Invoked when a child {@link Channel} was open. - * (e.g. a server channel accepted a connection) - */ - public void childChannelOpen( - ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception { - ctx.sendUpstream(e); - } - - /** - * Invoked when a child {@link Channel} was closed. - * (e.g. the accepted connection was closed) - */ - public void childChannelClosed( - ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception { - ctx.sendUpstream(e); - } -}