Remove the classes that are not part of Netty 4.0.0.Alpha1

- Will add them back before Beta1
This commit is contained in:
Trustin Lee 2012-05-27 19:39:10 -07:00
parent 528b5c4328
commit 626c5ef9c9
49 changed files with 0 additions and 4846 deletions

View File

@ -1,41 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ChannelBuffer;
import java.io.IOException;
public class BulkReply extends Reply {
public static final char MARKER = '$';
public final byte[] bytes;
public BulkReply(byte[] bytes) {
this.bytes = bytes;
}
@Override
public void write(ChannelBuffer os) throws IOException {
os.writeByte(MARKER);
if (bytes == null) {
os.writeBytes(Command.NEG_ONE_AND_CRLF);
} else {
os.writeBytes(Command.numAndCRLF(bytes.length));
os.writeBytes(bytes);
os.writeBytes(Command.CRLF);
}
}
}

View File

@ -1,133 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ChannelBuffer;
import io.netty.util.CharsetUtil;
import java.io.IOException;
/**
* Command serialization.
*/
public class Command {
static final byte[] ARGS_PREFIX = "*".getBytes();
static final byte[] CRLF = "\r\n".getBytes();
static final byte[] BYTES_PREFIX = "$".getBytes();
static final byte[] EMPTY_BYTES = new byte[0];
static final byte[] NEG_ONE_AND_CRLF = convertWithCRLF(-1);
private byte[][] arguments;
private Object[] objects;
public String getName() {
if (arguments == null) {
Object o = objects[0];
if (o instanceof byte[]) {
return new String((byte[]) o);
} else {
return o.toString();
}
} else {
return new String(arguments[0]);
}
}
public Command(byte[]... arguments) {
this.arguments = arguments;
objects = arguments;
}
public Command(Object... objects) {
this.objects = objects;
}
public void write(ChannelBuffer os) throws IOException {
writeDirect(os, objects);
}
public static void writeDirect(ChannelBuffer os, Object... objects) throws IOException {
int length = objects.length;
byte[][] arguments = new byte[length][];
for (int i = 0; i < length; i++) {
Object object = objects[i];
if (object == null) {
arguments[i] = EMPTY_BYTES;
} else if (object instanceof byte[]) {
arguments[i] = (byte[]) object;
} else {
arguments[i] = object.toString().getBytes(CharsetUtil.UTF_8);
}
}
writeDirect(os, arguments);
}
private static void writeDirect(ChannelBuffer os, byte[][] arguments) throws IOException {
os.writeBytes(ARGS_PREFIX);
os.writeBytes(numAndCRLF(arguments.length));
for (byte[] argument : arguments) {
os.writeBytes(BYTES_PREFIX);
os.writeBytes(numAndCRLF(argument.length));
os.writeBytes(argument);
os.writeBytes(CRLF);
}
}
private static final int NUM_MAP_LENGTH = 256;
private static byte[][] numAndCRLFMap = new byte[NUM_MAP_LENGTH][];
static {
for (int i = 0; i < NUM_MAP_LENGTH; i++) {
numAndCRLFMap[i] = convertWithCRLF(i);
}
}
// Optimized for the direct to ASCII bytes case
// Could be even more optimized but it is already
// about twice as fast as using Long.toString().getBytes()
public static byte[] numAndCRLF(long value) {
if (value >= 0 && value < NUM_MAP_LENGTH) {
return numAndCRLFMap[(int) value];
} else if (value == -1) {
return NEG_ONE_AND_CRLF;
}
return convertWithCRLF(value);
}
private static byte[] convertWithCRLF(long value) {
boolean negative = value < 0;
int index = negative ? 2 : 1;
long current = negative ? -value : value;
while ((current /= 10) > 0) {
index++;
}
byte[] bytes = new byte[index + 2];
if (negative) {
bytes[0] = '-';
}
current = negative ? -value : value;
long tmp = current;
while ((tmp /= 10) > 0) {
bytes[--index] = (byte) ('0' + (current % 10));
current = tmp;
}
bytes[--index] = (byte) ('0' + current);
// add CRLF
bytes[bytes.length - 2] = '\r';
bytes[bytes.length - 1] = '\n';
return bytes;
}
}

View File

@ -1,43 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ChannelBuffer;
import java.io.IOException;
/**
* {@link Reply} which will be returned if an error was detected
*
*
*/
public class ErrorReply extends Reply {
public static final char MARKER = '-';
private static final byte[] ERR = "ERR ".getBytes();
public final ChannelBuffer error;
public ErrorReply(ChannelBuffer error) {
this.error = error;
}
@Override
public void write(ChannelBuffer os) throws IOException {
os.writeByte(MARKER);
os.writeBytes(ERR);
os.writeBytes(error, 0, error.readableBytes());
os.writeBytes(Command.CRLF);
}
}

View File

@ -1,39 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ChannelBuffer;
import java.io.IOException;
/**
* {@link Reply} which will get returned if a {@link Integer} was requested via <code>GET</code>
*
*/
public class IntegerReply extends Reply {
public static final char MARKER = ':';
public final long integer;
public IntegerReply(long integer) {
this.integer = integer;
}
@Override
public void write(ChannelBuffer os) throws IOException {
os.writeByte(MARKER);
os.writeBytes(Command.numAndCRLF(integer));
}
}

View File

@ -1,99 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ChannelBuffer;
import java.io.IOException;
/**
* {@link Reply} which contains a bulk of {@link Reply}'s
*
*
*/
public class MultiBulkReply extends Reply {
public static final char MARKER = '*';
// State
public Object[] byteArrays;
private int size;
private int num;
public MultiBulkReply() {
}
public MultiBulkReply(Object... values) {
this.byteArrays = values;
}
public void read(RedisDecoder rd, ChannelBuffer is) throws IOException {
// If we attempted to read the size before, skip the '*' and reread it
if (size == -1) {
byte star = is.readByte();
if (star == MARKER) {
size = 0;
} else {
throw new AssertionError("Unexpected character in stream: " + star);
}
}
if (size == 0) {
// If the read fails, we need to skip the star
size = -1;
// Read the size, if this is successful we won't read the star again
size = RedisDecoder.readInteger(is);
byteArrays = new Object[size];
rd.checkpoint();
}
for (int i = num; i < size; i++) {
int read = is.readByte();
if (read == BulkReply.MARKER) {
byteArrays[i] = RedisDecoder.readBytes(is);
} else if (read == IntegerReply.MARKER) {
byteArrays[i] = RedisDecoder.readInteger(is);
} else {
throw new IOException("Unexpected character in stream: " + read);
}
num = i + 1;
rd.checkpoint();
}
}
@Override
public void write(ChannelBuffer os) throws IOException {
os.writeByte(MARKER);
if (byteArrays == null) {
os.writeBytes(Command.NEG_ONE_AND_CRLF);
} else {
os.writeBytes(Command.numAndCRLF(byteArrays.length));
for (Object value : byteArrays) {
if (value == null) {
os.writeByte(BulkReply.MARKER);
os.writeBytes(Command.NEG_ONE_AND_CRLF);
} else if (value instanceof byte[]) {
byte[] bytes = (byte[]) value;
os.writeByte(BulkReply.MARKER);
int length = bytes.length;
os.writeBytes(Command.numAndCRLF(length));
os.writeBytes(bytes);
os.writeBytes(Command.CRLF);
} else if (value instanceof Number) {
os.writeByte(IntegerReply.MARKER);
os.writeBytes(Command.numAndCRLF(((Number) value).longValue()));
}
}
}
}
}

View File

@ -1,24 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.redis;
public class PSubscribeReply extends SubscribeReply {
public PSubscribeReply(byte[][] patterns) {
super(patterns);
}
}

View File

@ -1,32 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ChannelBuffer;
import java.io.IOException;
public class PUnsubscribeReply extends UnsubscribeReply {
public PUnsubscribeReply(byte[][] patterns) {
super(patterns);
}
@Override
public void write(ChannelBuffer os) throws IOException {
// Do nothing
}
}

View File

@ -1,140 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferIndexFinder;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.util.VoidEnum;
import java.io.IOException;
/**
* {@link ReplayingDecoder} which handles Redis protocol
*
*
*/
public class RedisDecoder extends ReplayingDecoder<Reply, VoidEnum> {
private static final char CR = '\r';
private static final char LF = '\n';
private static final char ZERO = '0';
// We track the current multibulk reply in the case
// where we do not get a complete reply in a single
// decode invocation.
private MultiBulkReply reply;
/**
* Return a byte array which contains only the content of the request. The size of the content is read from the given {@link ChannelBuffer}
* via the {@link #readInteger(ChannelBuffer)} method
*
* @param is the {@link ChannelBuffer} to read from
* @throws IOException is thrown if the line-ending is not CRLF
*/
public static byte[] readBytes(ChannelBuffer is) throws IOException {
int size = readInteger(is);
if (size == -1) {
return null;
}
byte[] bytes = new byte[size];
is.readBytes(bytes, 0, size);
int cr = is.readByte();
int lf = is.readByte();
if (cr != CR || lf != LF) {
throw new IOException("Improper line ending: " + cr + ", " + lf);
}
return bytes;
}
/**
* Read an {@link Integer} from the {@link ChannelBuffer}
*/
public static int readInteger(ChannelBuffer is) throws IOException {
int size = 0;
int sign = 1;
int read = is.readByte();
if (read == '-') {
read = is.readByte();
sign = -1;
}
do {
if (read == CR) {
if (is.readByte() == LF) {
break;
}
}
int value = read - ZERO;
if (value >= 0 && value < 10) {
size *= 10;
size += value;
} else {
throw new IOException("Invalid character in integer");
}
read = is.readByte();
} while (true);
return size * sign;
}
@Override
public void checkpoint() {
super.checkpoint();
}
@Override
public Reply decode(ChannelInboundHandlerContext<Byte> channelHandlerContext, ChannelBuffer channelBuffer) throws Exception {
if (reply != null) {
reply.read(this, channelBuffer);
Reply ret = reply;
reply = null;
return ret;
}
int code = channelBuffer.readByte();
switch (code) {
case StatusReply.MARKER: {
ChannelBuffer status = channelBuffer.readBytes(channelBuffer.bytesBefore(ChannelBufferIndexFinder.CRLF));
channelBuffer.skipBytes(2);
return new StatusReply(status);
}
case ErrorReply.MARKER: {
ChannelBuffer error = channelBuffer.readBytes(channelBuffer.bytesBefore(ChannelBufferIndexFinder.CRLF));
channelBuffer.skipBytes(2);
return new ErrorReply(error);
}
case IntegerReply.MARKER: {
return new IntegerReply(readInteger(channelBuffer));
}
case BulkReply.MARKER: {
return new BulkReply(readBytes(channelBuffer));
}
case MultiBulkReply.MARKER: {
return decodeMultiBulkReply(channelBuffer);
}
default: {
throw new IOException("Unexpected character in stream: " + code);
}
}
}
private MultiBulkReply decodeMultiBulkReply(ChannelBuffer is) throws IOException {
if (reply == null) {
reply = new MultiBulkReply();
}
reply.read(this, is);
return reply;
}
}

View File

@ -1,50 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.handler.codec.MessageToStreamEncoder;
import io.netty.handler.codec.UnsupportedMessageTypeException;
/**
* {@link SimpleChannelDownstreamHandler} which encodes {@link Command}'s to {@link ChannelBuffer}'s
*/
@Sharable
public class RedisEncoder extends MessageToStreamEncoder<Object> {
@Override
public void encode(ChannelOutboundHandlerContext<Object> ctx, Object msg, ChannelBuffer out) throws Exception {
Object o = msg;
if (o instanceof Command) {
Command command = (Command) o;
command.write(out);
} else if (o instanceof Iterable) {
// Useful for transactions and database select
for (Object i : (Iterable<?>) o) {
if (i instanceof Command) {
Command command = (Command) i;
command.write(out);
} else {
break;
}
}
} else {
throw new UnsupportedMessageTypeException(msg, Command.class, Iterable.class);
}
}
}

View File

@ -1,46 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.util.CharsetUtil;
import java.io.IOException;
/**
* {@link Reply} which was sent as a Response to the written {@link Command}
* *
*/
public abstract class Reply {
/**
* Write the content of the {@link Reply} to the given {@link ChannelBuffer}
*/
public abstract void write(ChannelBuffer os) throws IOException;
@Override
public String toString() {
ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer();
try {
write(channelBuffer);
} catch (IOException e) {
throw new AssertionError("Trustin says this won't happen either");
}
return channelBuffer.toString(CharsetUtil.UTF_8);
}
}

View File

@ -1,40 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ChannelBuffer;
import java.io.IOException;
/**
* {@link Reply} which contains the status
*
*/
public class StatusReply extends Reply {
public static final char MARKER = '+';
public final ChannelBuffer status;
public StatusReply(ChannelBuffer status) {
this.status = status;
}
@Override
public void write(ChannelBuffer os) throws IOException {
os.writeByte(MARKER);
os.writeBytes(status, 0, status.readableBytes());
os.writeBytes(Command.CRLF);
}
}

View File

@ -1,38 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ChannelBuffer;
import java.io.IOException;
public class SubscribeReply extends Reply {
private final byte[][] patterns;
public SubscribeReply(byte[][] patterns) {
this.patterns = patterns;
}
public byte[][] getPatterns() {
return patterns;
}
@Override
public void write(ChannelBuffer os) throws IOException {
// Do nothing
}
}

View File

@ -1,38 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ChannelBuffer;
import java.io.IOException;
public class UnsubscribeReply extends Reply {
private final byte[][] patterns;
public UnsubscribeReply(byte[][] patterns) {
this.patterns = patterns;
}
@Override
public void write(ChannelBuffer os) throws IOException {
}
public byte[][] getPatterns() {
return patterns;
}
}

View File

@ -1,24 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Encoder and decoder which transform a
* <a href="http://http://redis.io/topics/protocol">Redis protocol commands and replies</a>
* into a {@link org.jboss.netty.buffer.ChannelBuffer}
* and vice versa.
*
*/
package io.netty.handler.codec.redis;

View File

@ -1,132 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.redis;
import static io.netty.buffer.ChannelBuffers.*;
import static org.junit.Assert.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.handler.codec.embedder.DecoderEmbedder;
import io.netty.util.CharsetUtil;
import java.io.IOException;
import org.junit.Before;
import org.junit.Test;
public class RedisCodecTest {
private DecoderEmbedder<ChannelBuffer> embedder;
@Before
public void setUp() {
embedder = new DecoderEmbedder<ChannelBuffer>(new RedisDecoder());
}
@Test
public void decodeReplies() {
{
Object receive = decode("+OK\r\n".getBytes());
assertTrue(receive instanceof StatusReply);
assertEquals("OK", ((StatusReply) receive).status.toString(CharsetUtil.UTF_8));
}
{
Object receive = decode("-ERROR\r\n".getBytes());
assertTrue(receive instanceof ErrorReply);
assertEquals("ERROR", ((ErrorReply) receive).error.toString(CharsetUtil.UTF_8));
}
{
Object receive = decode(":123\r\n".getBytes());
assertTrue(receive instanceof IntegerReply);
assertEquals(123, ((IntegerReply) receive).integer);
}
{
Object receive = decode("$5\r\nnetty\r\n".getBytes());
assertTrue(receive instanceof BulkReply);
assertEquals("netty", new String(((BulkReply) receive).bytes));
}
{
Object receive = decode("*2\r\n$5\r\nnetty\r\n$5\r\nrules\r\n".getBytes());
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
}
private Object decode(byte[] bytes) {
embedder.offer(wrappedBuffer(bytes));
return embedder.poll();
}
@Test
public void encodeCommands() throws IOException {
String setCommand = "*3\r\n" +
"$3\r\n" +
"SET\r\n" +
"$5\r\n" +
"mykey\r\n" +
"$7\r\n" +
"myvalue\r\n";
Command command = new Command("SET", "mykey", "myvalue");
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
command.write(cb);
assertEquals(setCommand, cb.toString(CharsetUtil.US_ASCII));
}
@Test
public void testReplayDecoding() {
{
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n".getBytes()));
Object receive = embedder.poll();
assertNull(receive);
embedder.offer(wrappedBuffer("$5\r\nrules\r\n".getBytes()));
receive = embedder.poll();
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
{
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n$5\r\nr".getBytes()));
Object receive = embedder.poll();
assertNull(receive);
embedder.offer(wrappedBuffer("ules\r\n".getBytes()));
receive = embedder.poll();
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
{
embedder.offer(wrappedBuffer("*2".getBytes()));
Object receive = embedder.poll();
assertNull(receive);
embedder.offer(wrappedBuffer("\r\n$5\r\nnetty\r\n$5\r\nrules\r\n".getBytes()));
receive = embedder.poll();
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
{
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n$5\r\nrules\r".getBytes()));
Object receive = embedder.poll();
assertNull(receive);
embedder.offer(wrappedBuffer("\n".getBytes()));
receive = embedder.poll();
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
}
}

View File

@ -1,120 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.redis;
import io.netty.bootstrap.ClientBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.Channels;
import io.netty.channel.socket.nio.NioClientSocketChannelFactory;
import io.netty.handler.codec.redis.Command;
import io.netty.handler.codec.redis.RedisDecoder;
import io.netty.handler.codec.redis.RedisEncoder;
import io.netty.handler.codec.redis.Reply;
import io.netty.handler.queue.BlockingReadHandler;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public final class RedisClient {
private static final byte[] VALUE = "value".getBytes();
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
final ClientBootstrap cb = new ClientBootstrap(new NioClientSocketChannelFactory(executor));
final BlockingReadHandler<Reply> blockingReadHandler = new BlockingReadHandler<Reply>();
cb.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("redisEncoder", new RedisEncoder());
pipeline.addLast("redisDecoder", new RedisDecoder());
pipeline.addLast("result", blockingReadHandler);
return pipeline;
}
});
ChannelFuture redis = cb.connect(new InetSocketAddress("localhost", 6379));
redis.await().sync();
Channel channel = redis.channel();
channel.write(new Command("set", "1", "value"));
System.out.print(blockingReadHandler.read());
channel.write(new Command("get", "1"));
System.out.print(blockingReadHandler.read());
int CALLS = 1000000;
int PIPELINE = 50;
requestResponse(blockingReadHandler, channel, CALLS);
pipelinedIndividualRequests(blockingReadHandler, channel, CALLS * 10, PIPELINE);
pipelinedListOfRequests(blockingReadHandler, channel, CALLS * 10, PIPELINE);
channel.close();
cb.releaseExternalResources();
}
private static void pipelinedListOfRequests(BlockingReadHandler<Reply> blockingReadHandler, Channel channel, long CALLS, int PIPELINE) throws IOException, InterruptedException {
long start = System.currentTimeMillis();
byte[] SET_BYTES = "SET".getBytes();
for (int i = 0; i < CALLS / PIPELINE; i++) {
List<Command> list = new ArrayList<Command>();
for (int j = 0; j < PIPELINE; j++) {
int base = i * PIPELINE;
list.add(new Command(SET_BYTES, String.valueOf(base + j).getBytes(), VALUE));
}
channel.write(list);
for (int j = 0; j < PIPELINE; j++) {
blockingReadHandler.read();
}
}
long end = System.currentTimeMillis();
System.out.println(CALLS * 1000 / (end - start) + " calls per second");
}
private static void pipelinedIndividualRequests(BlockingReadHandler<Reply> blockingReadHandler, Channel channel, long CALLS, int PIPELINE) throws IOException, InterruptedException {
long start = System.currentTimeMillis();
byte[] SET_BYTES = "SET".getBytes();
for (int i = 0; i < CALLS / PIPELINE; i++) {
int base = i * PIPELINE;
for (int j = 0; j < PIPELINE; j++) {
channel.write(new Command(SET_BYTES, String.valueOf(base + j).getBytes(), VALUE));
}
for (int j = 0; j < PIPELINE; j++) {
blockingReadHandler.read();
}
}
long end = System.currentTimeMillis();
System.out.println(CALLS * 1000 / (end - start) + " calls per second");
}
private static void requestResponse(BlockingReadHandler<Reply> blockingReadHandler, Channel channel, int CALLS) throws IOException, InterruptedException {
long start = System.currentTimeMillis();
byte[] SET_BYTES = "SET".getBytes();
for (int i = 0; i < CALLS; i++) {
channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE));
blockingReadHandler.read();
}
long end = System.currentTimeMillis();
System.out.println(CALLS * 1000 / (end - start) + " calls per second");
}
private RedisClient() {
}
}

View File

@ -1,92 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.sctp;
import io.netty.bootstrap.ClientBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.Channels;
import io.netty.channel.sctp.SctpClientSocketChannelFactory;
import io.netty.handler.execution.ExecutionHandler;
import io.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
/**
* Simple SCTP Echo Client
*/
public class SctpClient {
private final String host;
private final int port;
public SctpClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() {
// Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap(
new SctpClientSocketChannelFactory(
Executors.newCachedThreadPool()));
final ExecutionHandler executionHandler = new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(executionHandler, new SctpClientHandler());
}
});
bootstrap.setOption("sendBufferSize", 1048576);
bootstrap.setOption("receiveBufferSize", 1048576);
bootstrap.setOption("sctpNoDelay", true);
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
// Wait until the connection is closed or the connection attempt fails.
future.channel().getCloseFuture().awaitUninterruptibly();
// Please check SctpClientHandler to see, how echo message is sent & received
// Shut down thread pools to exit.
bootstrap.releaseExternalResources();
}
public static void main(String[] args) throws Exception {
// Print usage if no argument is specified.
if (args.length != 2) {
System.err.println(
"Usage: " + SctpClient.class.getSimpleName() +
" <host> <port>");
return;
}
// Parse options.
final String host = args[0];
final int port = Integer.parseInt(args[1]);
new SctpClient(host, port).run();
}
}

View File

@ -1,66 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.sctp;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.channel.sctp.SctpFrame;
/**
* Handler implementation for the echo client. It initiates the message
* and upon receiving echo back to the server
*/
public class SctpClientHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(SctpClientHandler.class.getName());
private final AtomicLong counter = new AtomicLong(0);
/**
* Creates a client-side handler.
*/
public SctpClientHandler() {
}
/**
* After connection is initialized, start the echo from client
*/
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent stateEvent) {
stateEvent.channel().write(new SctpFrame(0, 0, ChannelBuffers.wrappedBuffer("SCTP ECHO".getBytes())));
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) {
// Send back the received message to the remote peer.
logger.log(Level.INFO, "Received " + counter.incrementAndGet() + "th message from server, sending it back.");
messageEvent.channel().write(messageEvent.getMessage());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) {
// Close the connection when an exception is raised.
logger.log(Level.WARNING, "Unexpected exception from downstream.", exceptionEvent.cause());
exceptionEvent.channel().close();
}
}

View File

@ -1,74 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.sctp;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.Channels;
import io.netty.channel.sctp.SctpServerSocketChannelFactory;
import io.netty.handler.execution.ExecutionHandler;
import io.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
/**
* Echoes back any received data from a client.
*/
public class SctpServer {
private final int port;
public SctpServer(int port) {
this.port = port;
}
public void run() {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(
new SctpServerSocketChannelFactory(
Executors.newCachedThreadPool()));
final ExecutionHandler executionHandler = new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(executionHandler, new SctpServerHandler());
}
});
bootstrap.setOption("sendBufferSize", 1048576);
bootstrap.setOption("receiveBufferSize", 1048576);
bootstrap.setOption("child.sctpNoDelay", true);
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(port));
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 2955;
}
new SctpServer(port).run();
}
}

View File

@ -1,48 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.sctp;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Handler implementation for the echo server.
*/
public class SctpServerHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(SctpServerHandler.class.getName());
private final AtomicLong counter = new AtomicLong(0);
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) {
// Send back the received message to the remote peer.
logger.log(Level.INFO, "Received " + counter.incrementAndGet() + "th message from client, sending it back.");
messageEvent.channel().write(messageEvent.getMessage());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
// Close the connection when an exception is raised.
logger.log(Level.WARNING, "Unexpected exception from downstream.", event.cause());
event.channel().close();
}
}

View File

@ -1,100 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.stdio;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import io.netty.bootstrap.ClientBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.DefaultChannelPipeline;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelHandler;
import io.netty.channel.iostream.IoStreamAddress;
import io.netty.channel.iostream.IoStreamChannelFactory;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* An example demonstrating the use of the {@link io.netty.channel.iostream.IoStreamChannel}.
*/
public class StdioLogger {
private volatile boolean running = true;
public void run() {
final ExecutorService executorService = Executors.newCachedThreadPool();
final ClientBootstrap bootstrap = new ClientBootstrap(new IoStreamChannelFactory(executorService));
// Configure the event pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
DefaultChannelPipeline pipeline = new DefaultChannelPipeline();
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("loggingHandler", new SimpleChannelHandler() {
@Override
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e)
throws Exception {
final String message = (String) e.getMessage();
synchronized (System.out) {
e.channel().write("Message received: " + message);
}
if ("exit".equals(message)) {
running = false;
}
super.messageReceived(ctx, e);
}
}
);
return pipeline;
}
});
// Make a new connection.
ChannelFuture connectFuture = bootstrap.connect(new IoStreamAddress(System.in, System.out));
// Wait until the connection is made successfully.
Channel channel = connectFuture.awaitUninterruptibly().channel();
while (running) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// Close the connection.
channel.close().awaitUninterruptibly();
// Shut down all thread pools to exit.
bootstrap.releaseExternalResources();
}
public static void main(String[] args) {
new StdioLogger().run();
}
}

View File

@ -1,85 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
import java.util.concurrent.Executor;
import io.netty.util.ExternalResourceReleasable;
import io.netty.util.internal.ExecutorUtil;
/**
* A special {@link Executor} which allows to chain a series of
* {@link Executor}s and {@link ChannelEventRunnableFilter}.
*/
public class ChainedExecutor implements Executor, ExternalResourceReleasable {
private final Executor cur;
private final Executor next;
private final ChannelEventRunnableFilter filter;
/**
* Create a new {@link ChainedExecutor} which will used the given {@link ChannelEventRunnableFilter} to see if the {@link #cur} {@link Executor} should get used.
* Otherwise it will pass the work to the {@link #next} {@link Executor}
*
* @param filter the {@link ChannelEventRunnableFilter} which will be used to check if the {@link ChannelEventRunnable} should be passed to the cur or next {@link Executor}
* @param cur the {@link Executor} to use if the {@link ChannelEventRunnableFilter} match
* @param next the {@link Executor} to use if the {@link ChannelEventRunnableFilter} does not match
*/
public ChainedExecutor(ChannelEventRunnableFilter filter, Executor cur, Executor next) {
if (filter == null) {
throw new NullPointerException("filter");
}
if (cur == null) {
throw new NullPointerException("cur");
}
if (next == null) {
throw new NullPointerException("next");
}
this.filter = filter;
this.cur = cur;
this.next = next;
}
/**
* Execute the passed {@link ChannelEventRunnable} with the current {@link Executor} if the {@link ChannelEventRunnableFilter} match.
* Otherwise pass it to the next {@link Executor} in the chain.
*/
@Override
public void execute(Runnable command) {
assert command instanceof ChannelEventRunnable;
if (filter.filter((ChannelEventRunnable) command)) {
cur.execute(command);
} else {
next.execute(command);
}
}
@Override
public void releaseExternalResources() {
ExecutorUtil.terminate(cur, next);
releaseExternal(cur);
releaseExternal(next);
}
private static void releaseExternal(Executor executor) {
if (executor instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) executor).releaseExternalResources();
}
}
}

View File

@ -1,38 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
/**
* A {@link ChannelEventRunnable} which sends the specified {@link ChannelEvent} downstream.
*/
public class ChannelDownstreamEventRunnable extends ChannelEventRunnable {
public ChannelDownstreamEventRunnable(ChannelHandlerContext ctx, ChannelEvent e) {
super(ctx, e);
}
/**
* Send the {@link ChannelEvent} downstream
*/
@Override
public void run() {
ctx.sendDownstream(e);
}
}

View File

@ -1,28 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
/**
* {@link ChannelEventRunnableFilter} implementation which matches {@link ChannelDownstreamEventRunnable}
*
*/
public class ChannelDownstreamEventRunnableFilter implements ChannelEventRunnableFilter {
@Override
public boolean filter(ChannelEventRunnable event) {
return event instanceof ChannelDownstreamEventRunnable;
}
}

View File

@ -1,56 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.EstimatableObjectWrapper;
public abstract class ChannelEventRunnable implements Runnable, EstimatableObjectWrapper {
protected final ChannelHandlerContext ctx;
protected final ChannelEvent e;
int estimatedSize;
/**
* Creates a {@link Runnable} which sends the specified {@link ChannelEvent}
* upstream via the specified {@link ChannelHandlerContext}.
*/
public ChannelEventRunnable(ChannelHandlerContext ctx, ChannelEvent e) {
this.ctx = ctx;
this.e = e;
}
/**
* Returns the {@link ChannelHandlerContext} which will be used to
* send the {@link ChannelEvent} upstream.
*/
public ChannelHandlerContext getContext() {
return ctx;
}
/**
* Returns the {@link ChannelEvent} which will be sent upstream.
*/
public ChannelEvent getEvent() {
return e;
}
@Override
public Object unwrap() {
return e;
}
}

View File

@ -1,27 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
import java.util.concurrent.Executor;
public interface ChannelEventRunnableFilter {
/**
* Return <code>true</code> if the {@link ChannelEventRunnable} should get handled by the {@link Executor}
*
*/
boolean filter(ChannelEventRunnable event);
}

View File

@ -1,47 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
import java.util.concurrent.Executor;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
/**
* A {@link ChannelEventRunnable} which sends the specified {@link ChannelEvent} upstream.
* Most users will not see this type at all because it is used by
* {@link Executor} implementers only
*/
public class ChannelUpstreamEventRunnable extends ChannelEventRunnable {
/**
* Creates a {@link Runnable} which sends the specified {@link ChannelEvent}
* upstream via the specified {@link ChannelHandlerContext}.
*/
public ChannelUpstreamEventRunnable(ChannelHandlerContext ctx, ChannelEvent e) {
super(ctx, e);
}
/**
* Sends the event upstream.
*/
@Override
public void run() {
ctx.sendUpstream(e);
}
}

View File

@ -1,26 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
/**
* {@link ChannelEventRunnableFilter} which matches {@link ChannelDownstreamEventRunnable}
*/
public class ChannelUpstreamEventRunnableFilter implements ChannelEventRunnableFilter {
@Override
public boolean filter(ChannelEventRunnable event) {
return event instanceof ChannelDownstreamEventRunnable;
}
}

View File

@ -1,127 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.MessageEvent;
import io.netty.util.internal.ConcurrentIdentityWeakKeyHashMap;
/**
* The default {@link ObjectSizeEstimator} implementation for general purpose.
*/
public class DefaultObjectSizeEstimator implements ObjectSizeEstimator {
private final ConcurrentMap<Class<?>, Integer> class2size =
new ConcurrentIdentityWeakKeyHashMap<Class<?>, Integer>();
/**
* Creates a new instance.
*/
public DefaultObjectSizeEstimator() {
class2size.put(boolean.class, 4); // Probably an integer.
class2size.put(byte.class, 1);
class2size.put(char.class, 2);
class2size.put(int.class, 4);
class2size.put(short.class, 2);
class2size.put(long.class, 8);
class2size.put(float.class, 4);
class2size.put(double.class, 8);
class2size.put(void.class, 0);
}
@Override
public int estimateSize(Object o) {
if (o == null) {
return 8;
}
int answer = 8 + estimateSize(o.getClass(), null);
if (o instanceof EstimatableObjectWrapper) {
answer += estimateSize(((EstimatableObjectWrapper) o).unwrap());
} else if (o instanceof MessageEvent) {
answer += estimateSize(((MessageEvent) o).getMessage());
} else if (o instanceof ChannelBuffer) {
answer += ((ChannelBuffer) o).capacity();
} else if (o instanceof byte[]) {
answer += ((byte[]) o).length;
} else if (o instanceof ByteBuffer) {
answer += ((ByteBuffer) o).remaining();
} else if (o instanceof CharSequence) {
answer += ((CharSequence) o).length() << 1;
} else if (o instanceof Iterable<?>) {
for (Object m : (Iterable<?>) o) {
answer += estimateSize(m);
}
}
return align(answer);
}
private int estimateSize(Class<?> clazz, Set<Class<?>> visitedClasses) {
Integer objectSize = class2size.get(clazz);
if (objectSize != null) {
return objectSize;
}
if (visitedClasses != null) {
if (visitedClasses.contains(clazz)) {
return 0;
}
} else {
visitedClasses = new HashSet<Class<?>>();
}
visitedClasses.add(clazz);
int answer = 8; // Basic overhead.
for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
Field[] fields = c.getDeclaredFields();
for (Field f : fields) {
if ((f.getModifiers() & Modifier.STATIC) != 0) {
// Ignore static fields.
continue;
}
answer += estimateSize(f.getType(), visitedClasses);
}
}
visitedClasses.remove(clazz);
// Some alignment.
answer = align(answer);
// Put the final answer.
class2size.putIfAbsent(clazz, answer);
return answer;
}
private static int align(int size) {
int r = size % 8;
if (r != 0) {
size += 8 - r;
}
return size;
}
}

View File

@ -1,30 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
/**
* Represents an object which contains another object that needs to be taken
* into account by {@link ObjectSizeEstimator} for more accurate object size
* estimation.
*/
public interface EstimatableObjectWrapper {
/**
* Returns the underlying object that needs to be taken into account
* by {@link ObjectSizeEstimator} for more accurate object size estimation.
*/
Object unwrap();
}

View File

@ -1,214 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
import java.util.concurrent.Executor;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDownstreamHandler;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.ChannelState;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ChannelUpstreamHandler;
import io.netty.channel.Channels;
import io.netty.util.ExternalResourceReleasable;
import io.netty.util.internal.ExecutorUtil;
/**
* Forwards an upstream {@link ChannelEvent} to an {@link Executor}.
* <p>
* {@link ExecutionHandler} is often used when your {@link ChannelHandler}
* performs a blocking operation that takes long time or accesses a resource
* which is not CPU-bound business logic such as DB access. Running such
* operations in a pipeline without an {@link ExecutionHandler} will result in
* unwanted hiccup during I/O because an I/O thread cannot perform I/O until
* your handler returns the control to the I/O thread.
* <p>
* In most cases, an {@link ExecutionHandler} is coupled with an
* {@link OrderedMemoryAwareThreadPoolExecutor} because it guarantees the
* correct event execution order and prevents an {@link OutOfMemoryError}
* under load:
* <pre>
* public class DatabaseGatewayPipelineFactory implements {@link ChannelPipelineFactory} {
*
* <b>private final {@link ExecutionHandler} executionHandler;</b>
*
* public DatabaseGatewayPipelineFactory({@link ExecutionHandler} executionHandler) {
* this.executionHandler = executionHandler;
* }
*
* public {@link ChannelPipeline} getPipeline() {
* return {@link Channels}.pipeline(
* new DatabaseGatewayProtocolEncoder(),
* new DatabaseGatewayProtocolDecoder(),
* <b>executionHandler, // Must be shared</b>
* new DatabaseQueryingHandler());
* }
* }
* ...
*
* public static void main(String[] args) {
* {@link ServerBootstrap} bootstrap = ...;
* ...
* <b>{@link ExecutionHandler} executionHandler = new {@link ExecutionHandler}(
* new {@link OrderedMemoryAwareThreadPoolExecutor}(16, 1048576, 1048576))
* bootstrap.setPipelineFactory(
* new DatabaseGatewayPipelineFactory(executionHandler));</b>
* ...
* bootstrap.bind(...);
* ...
*
* while (!isServerReadyToShutDown()) {
* // ... wait ...
* }
*
* bootstrap.releaseExternalResources();
* <b>executionHandler.releaseExternalResources();</b>
* }
* </pre>
*
* Please refer to {@link OrderedMemoryAwareThreadPoolExecutor} for the
* detailed information about how the event order is guaranteed.
*
* <h3>SEDA (Staged Event-Driven Architecture)</h3>
* You can implement an alternative thread model such as
* <a href="http://en.wikipedia.org/wiki/Staged_event-driven_architecture">SEDA</a>
* by adding more than one {@link ExecutionHandler} to the pipeline.
*
* <h3>Using other {@link Executor} implementation</h3>
*
* Although it's recommended to use {@link OrderedMemoryAwareThreadPoolExecutor},
* you can use other {@link Executor} implementations. However, you must note
* that other {@link Executor} implementation might break your application
* because they often do not maintain event execution order nor interact with
* I/O threads to control the incoming traffic and avoid {@link OutOfMemoryError}.
*
* @apiviz.landmark
* @apiviz.has java.util.concurrent.ThreadPoolExecutor
*/
@Sharable
public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler, ExternalResourceReleasable {
private final Executor executor;
private final boolean handleDownstream;
private final boolean handleUpstream;
/**
* Creates a new instance with the specified {@link Executor} which only handles upstream events.
* Specify an {@link OrderedMemoryAwareThreadPoolExecutor} if unsure.
*/
public ExecutionHandler(Executor executor) {
this(executor, false, true);
}
/**
* Use {@link #ExecutionHandler(Executor, boolean, boolean)}
*
* {@link Deprecated}
*/
@Deprecated
public ExecutionHandler(Executor executor, boolean handleDownstream) {
this(executor, handleDownstream, true);
}
/**
* Creates a new instance with the specified {@link Executor}.
* Specify an {@link OrderedMemoryAwareThreadPoolExecutor} if unsure.
*/
public ExecutionHandler(Executor executor, boolean handleDownstream, boolean handleUpstream) {
if (executor == null) {
throw new NullPointerException("executor");
}
if (!handleDownstream && !handleUpstream) {
throw new IllegalArgumentException("You must handle at least handle one event type");
}
this.executor = executor;
this.handleDownstream = handleDownstream;
this.handleUpstream = handleUpstream;
}
/**
* Returns the {@link Executor} which was specified with the constructor.
*/
public Executor getExecutor() {
return executor;
}
/**
* Shuts down the {@link Executor} which was specified with the constructor
* and wait for its termination.
*/
@Override
public void releaseExternalResources() {
Executor executor = getExecutor();
ExecutorUtil.terminate(executor);
if (executor instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) executor).releaseExternalResources();
}
}
@Override
public void handleUpstream(
ChannelHandlerContext context, ChannelEvent e) throws Exception {
if (handleUpstream) {
executor.execute(new ChannelUpstreamEventRunnable(context, e));
} else {
context.sendUpstream(e);
}
}
@Override
public void handleDownstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
// check if the read was suspend
if (!handleReadSuspend(ctx, e)) {
if (handleDownstream) {
executor.execute(new ChannelDownstreamEventRunnable(ctx, e));
} else {
ctx.sendDownstream(e);
}
}
}
/**
* Handle suspended reads
*/
protected boolean handleReadSuspend(ChannelHandlerContext ctx, ChannelEvent e) {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent cse = (ChannelStateEvent) e;
if (cse.getState() == ChannelState.INTEREST_OPS &&
(((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
// setReadable(true) requested
boolean readSuspended = ctx.getAttachment() != null;
if (readSuspended) {
// Drop the request silently if MemoryAwareThreadPool has
// set the flag.
e.getFuture().setSuccess();
return true;
}
}
}
return false;
}
}

View File

@ -1,542 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.internal.QueueFactory;
import io.netty.util.internal.SharedResourceMisuseDetector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* A {@link ThreadPoolExecutor} which blocks the task submission when there's
* too many tasks in the queue. Both per-{@link Channel} and per-{@link Executor}
* limitation can be applied.
* <p>
* When a task (i.e. {@link Runnable}) is submitted,
* {@link MemoryAwareThreadPoolExecutor} calls {@link ObjectSizeEstimator#estimateSize(Object)}
* to get the estimated size of the task in bytes to calculate the amount of
* memory occupied by the unprocessed tasks.
* <p>
* If the total size of the unprocessed tasks exceeds either per-{@link Channel}
* or per-{@link Executor} threshold, any further {@link #execute(Runnable)}
* call will block until the tasks in the queue are processed so that the total
* size goes under the threshold.
*
* <h3>Using an alternative task size estimation strategy</h3>
*
* Although the default implementation does its best to guess the size of an
* object of unknown type, it is always good idea to to use an alternative
* {@link ObjectSizeEstimator} implementation instead of the
* {@link DefaultObjectSizeEstimator} to avoid incorrect task size calculation,
* especially when:
* <ul>
* <li>you are using {@link MemoryAwareThreadPoolExecutor} independently from
* {@link ExecutionHandler},</li>
* <li>you are submitting a task whose type is not {@link ChannelEventRunnable}, or</li>
* <li>the message type of the {@link MessageEvent} in the {@link ChannelEventRunnable}
* is not {@link ChannelBuffer}.</li>
* </ul>
* Here is an example that demonstrates how to implement an {@link ObjectSizeEstimator}
* which understands a user-defined object:
* <pre>
* public class MyRunnable implements {@link Runnable} {
*
* <b>private final byte[] data;</b>
*
* public MyRunnable(byte[] data) {
* this.data = data;
* }
*
* public void run() {
* // Process 'data' ..
* }
* }
*
* public class MyObjectSizeEstimator extends {@link DefaultObjectSizeEstimator} {
*
* {@literal @Override}
* public int estimateSize(Object o) {
* if (<b>o instanceof MyRunnable</b>) {
* <b>return ((MyRunnable) o).data.length + 8;</b>
* }
* return super.estimateSize(o);
* }
* }
*
* {@link ThreadPoolExecutor} pool = new {@link MemoryAwareThreadPoolExecutor}(
* 16, 65536, 1048576, 30, {@link TimeUnit}.SECONDS,
* <b>new MyObjectSizeEstimator()</b>,
* {@link Executors}.defaultThreadFactory());
*
* <b>pool.execute(new MyRunnable(data));</b>
* </pre>
*
* <h3>Event execution order</h3>
*
* Please note that this executor does not maintain the order of the
* {@link ChannelEvent}s for the same {@link Channel}. For example,
* you can even receive a {@code "channelClosed"} event before a
* {@code "messageReceived"} event, as depicted by the following diagram.
*
* For example, the events can be processed as depicted below:
*
* <pre>
* --------------------------------&gt; Timeline --------------------------------&gt;
*
* Thread X: --- Channel A (Event 2) --- Channel A (Event 1) ---------------------------&gt;
*
* Thread Y: --- Channel A (Event 3) --- Channel B (Event 2) --- Channel B (Event 3) ---&gt;
*
* Thread Z: --- Channel B (Event 1) --- Channel B (Event 4) --- Channel A (Event 4) ---&gt;
* </pre>
*
* To maintain the event order, you must use {@link OrderedMemoryAwareThreadPoolExecutor}.
* @apiviz.has io.netty.util.ObjectSizeEstimator oneway - -
* @apiviz.has io.netty.handler.execution.ChannelEventRunnable oneway - - executes
*/
public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
private static final SharedResourceMisuseDetector misuseDetector =
new SharedResourceMisuseDetector(MemoryAwareThreadPoolExecutor.class);
private volatile Settings settings;
private final ConcurrentMap<Channel, AtomicLong> channelCounters =
new ConcurrentHashMap<Channel, AtomicLong>();
private final Limiter totalLimiter;
/**
* Creates a new instance.
*
* @param corePoolSize the maximum number of active threads
* @param maxChannelMemorySize the maximum total size of the queued events per channel.
* Specify {@code 0} to disable.
* @param maxTotalMemorySize the maximum total size of the queued events for this pool
* Specify {@code 0} to disable.
*/
public MemoryAwareThreadPoolExecutor(
int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize) {
this(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, 30, TimeUnit.SECONDS);
}
/**
* Creates a new instance.
*
* @param corePoolSize the maximum number of active threads
* @param maxChannelMemorySize the maximum total size of the queued events per channel.
* Specify {@code 0} to disable.
* @param maxTotalMemorySize the maximum total size of the queued events for this pool
* Specify {@code 0} to disable.
* @param keepAliveTime the amount of time for an inactive thread to shut itself down
* @param unit the {@link TimeUnit} of {@code keepAliveTime}
*/
public MemoryAwareThreadPoolExecutor(
int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
long keepAliveTime, TimeUnit unit) {
this(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, Executors.defaultThreadFactory());
}
/**
* Creates a new instance.
*
* @param corePoolSize the maximum number of active threads
* @param maxChannelMemorySize the maximum total size of the queued events per channel.
* Specify {@code 0} to disable.
* @param maxTotalMemorySize the maximum total size of the queued events for this pool
* Specify {@code 0} to disable.
* @param keepAliveTime the amount of time for an inactive thread to shut itself down
* @param unit the {@link TimeUnit} of {@code keepAliveTime}
* @param threadFactory the {@link ThreadFactory} of this pool
*/
public MemoryAwareThreadPoolExecutor(
int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, new DefaultObjectSizeEstimator(), threadFactory);
}
/**
* Creates a new instance.
*
* @param corePoolSize the maximum number of active threads
* @param maxChannelMemorySize the maximum total size of the queued events per channel.
* Specify {@code 0} to disable.
* @param maxTotalMemorySize the maximum total size of the queued events for this pool
* Specify {@code 0} to disable.
* @param keepAliveTime the amount of time for an inactive thread to shut itself down
* @param unit the {@link TimeUnit} of {@code keepAliveTime}
* @param threadFactory the {@link ThreadFactory} of this pool
* @param objectSizeEstimator the {@link ObjectSizeEstimator} of this pool
*/
public MemoryAwareThreadPoolExecutor(
int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator,
ThreadFactory threadFactory) {
super(corePoolSize, corePoolSize, keepAliveTime, unit,
QueueFactory.createQueue(Runnable.class), threadFactory, new NewThreadRunsPolicy());
if (objectSizeEstimator == null) {
throw new NullPointerException("objectSizeEstimator");
}
if (maxChannelMemorySize < 0) {
throw new IllegalArgumentException(
"maxChannelMemorySize: " + maxChannelMemorySize);
}
if (maxTotalMemorySize < 0) {
throw new IllegalArgumentException(
"maxTotalMemorySize: " + maxTotalMemorySize);
}
allowCoreThreadTimeOut(true);
settings = new Settings(
objectSizeEstimator, maxChannelMemorySize);
if (maxTotalMemorySize == 0) {
totalLimiter = null;
} else {
totalLimiter = new Limiter(maxTotalMemorySize);
}
// Misuse check
misuseDetector.increase();
}
@Override
protected void terminated() {
super.terminated();
misuseDetector.decrease();
}
/**
* Returns the {@link ObjectSizeEstimator} of this pool.
*/
public ObjectSizeEstimator getObjectSizeEstimator() {
return settings.objectSizeEstimator;
}
/**
* Sets the {@link ObjectSizeEstimator} of this pool.
*/
public void setObjectSizeEstimator(ObjectSizeEstimator objectSizeEstimator) {
if (objectSizeEstimator == null) {
throw new NullPointerException("objectSizeEstimator");
}
settings = new Settings(
objectSizeEstimator,
settings.maxChannelMemorySize);
}
/**
* Returns the maximum total size of the queued events per channel.
*/
public long getMaxChannelMemorySize() {
return settings.maxChannelMemorySize;
}
/**
* Sets the maximum total size of the queued events per channel.
* Specify {@code 0} to disable.
*/
public void setMaxChannelMemorySize(long maxChannelMemorySize) {
if (maxChannelMemorySize < 0) {
throw new IllegalArgumentException(
"maxChannelMemorySize: " + maxChannelMemorySize);
}
if (getTaskCount() > 0) {
throw new IllegalStateException(
"can't be changed after a task is executed");
}
settings = new Settings(
settings.objectSizeEstimator,
maxChannelMemorySize);
}
/**
* Returns the maximum total size of the queued events for this pool.
*/
public long getMaxTotalMemorySize() {
return totalLimiter.limit;
}
/**
* @deprecated <tt>maxTotalMemorySize</tt> is not modifiable anymore.
*/
@Deprecated
public void setMaxTotalMemorySize(long maxTotalMemorySize) {
if (maxTotalMemorySize < 0) {
throw new IllegalArgumentException(
"maxTotalMemorySize: " + maxTotalMemorySize);
}
if (getTaskCount() > 0) {
throw new IllegalStateException(
"can't be changed after a task is executed");
}
}
@Override
public void execute(Runnable command) {
if (command instanceof ChannelDownstreamEventRunnable) {
throw new RejectedExecutionException("command must be enclosed with an upstream event.");
}
if (!(command instanceof ChannelEventRunnable)) {
command = new MemoryAwareRunnable(command);
}
increaseCounter(command);
doExecute(command);
}
/**
* Put the actual execution logic here. The default implementation simply
* calls {@link #doUnorderedExecute(Runnable)}.
*/
protected void doExecute(Runnable task) {
doUnorderedExecute(task);
}
/**
* Executes the specified task without maintaining the event order.
*/
protected final void doUnorderedExecute(Runnable task) {
super.execute(task);
}
@Override
public boolean remove(Runnable task) {
boolean removed = super.remove(task);
if (removed) {
decreaseCounter(task);
}
return removed;
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
decreaseCounter(r);
}
protected void increaseCounter(Runnable task) {
if (!shouldCount(task)) {
return;
}
Settings settings = this.settings;
long maxChannelMemorySize = settings.maxChannelMemorySize;
int increment = settings.objectSizeEstimator.estimateSize(task);
if (task instanceof ChannelEventRunnable) {
ChannelEventRunnable eventTask = (ChannelEventRunnable) task;
eventTask.estimatedSize = increment;
Channel channel = eventTask.getEvent().getChannel();
long channelCounter = getChannelCounter(channel).addAndGet(increment);
//System.out.println("IC: " + channelCounter + ", " + increment);
if (maxChannelMemorySize != 0 && channelCounter >= maxChannelMemorySize && channel.isOpen()) {
if (channel.isReadable()) {
//System.out.println("UNREADABLE");
ChannelHandlerContext ctx = eventTask.getContext();
if (ctx.getHandler() instanceof ExecutionHandler) {
// readSuspended = true;
ctx.setAttachment(Boolean.TRUE);
}
channel.setReadable(false);
}
}
} else {
((MemoryAwareRunnable) task).estimatedSize = increment;
}
if (totalLimiter != null) {
totalLimiter.increase(increment);
}
}
protected void decreaseCounter(Runnable task) {
if (!shouldCount(task)) {
return;
}
Settings settings = this.settings;
long maxChannelMemorySize = settings.maxChannelMemorySize;
int increment;
if (task instanceof ChannelEventRunnable) {
increment = ((ChannelEventRunnable) task).estimatedSize;
} else {
increment = ((MemoryAwareRunnable) task).estimatedSize;
}
if (totalLimiter != null) {
totalLimiter.decrease(increment);
}
if (task instanceof ChannelEventRunnable) {
ChannelEventRunnable eventTask = (ChannelEventRunnable) task;
Channel channel = eventTask.getEvent().getChannel();
long channelCounter = getChannelCounter(channel).addAndGet(-increment);
//System.out.println("DC: " + channelCounter + ", " + increment);
if (maxChannelMemorySize != 0 && channelCounter < maxChannelMemorySize && channel.isOpen()) {
if (!channel.isReadable()) {
//System.out.println("READABLE");
ChannelHandlerContext ctx = eventTask.getContext();
if (ctx.getHandler() instanceof ExecutionHandler) {
// check if the attachment was set as this means that we suspend the channel from reads. This only works when
// this pool is used with ExecutionHandler but I guess thats good enough for us.
//
// See #215
if (ctx.getAttachment() != null) {
// readSuspended = false;
ctx.setAttachment(null);
channel.setReadable(true);
}
} else {
channel.setReadable(true);
}
}
}
}
}
private AtomicLong getChannelCounter(Channel channel) {
AtomicLong counter = channelCounters.get(channel);
if (counter == null) {
counter = new AtomicLong();
AtomicLong oldCounter = channelCounters.putIfAbsent(channel, counter);
if (oldCounter != null) {
counter = oldCounter;
}
}
// Remove the entry when the channel closes.
if (!channel.isOpen()) {
channelCounters.remove(channel);
}
return counter;
}
/**
* Returns {@code true} if and only if the specified {@code task} should
* be counted to limit the global and per-channel memory consumption.
* To override this method, you must call {@code super.shouldCount()} to
* make sure important tasks are not counted.
*/
protected boolean shouldCount(Runnable task) {
if (task instanceof ChannelUpstreamEventRunnable) {
ChannelUpstreamEventRunnable r = (ChannelUpstreamEventRunnable) task;
ChannelEvent e = r.getEvent();
if (e instanceof WriteCompletionEvent) {
return false;
} else if (e instanceof ChannelStateEvent) {
if (((ChannelStateEvent) e).getState() == ChannelState.INTEREST_OPS) {
return false;
}
}
}
return true;
}
private static final class Settings {
final ObjectSizeEstimator objectSizeEstimator;
final long maxChannelMemorySize;
Settings(ObjectSizeEstimator objectSizeEstimator,
long maxChannelMemorySize) {
this.objectSizeEstimator = objectSizeEstimator;
this.maxChannelMemorySize = maxChannelMemorySize;
}
}
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
final Thread t = new Thread(r, "Temporary task executor");
t.start();
} catch (Throwable e) {
throw new RejectedExecutionException(
"Failed to start a new thread", e);
}
}
}
private static final class MemoryAwareRunnable implements Runnable {
final Runnable task;
int estimatedSize;
MemoryAwareRunnable(Runnable task) {
this.task = task;
}
@Override
public void run() {
task.run();
}
}
private static class Limiter {
final long limit;
private long counter;
private int waiters;
Limiter(long limit) {
this.limit = limit;
}
synchronized void increase(long amount) {
while (counter >= limit) {
waiters ++;
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
waiters --;
}
}
counter += amount;
}
synchronized void decrease(long amount) {
counter -= amount;
if (counter < limit && waiters > 0) {
notifyAll();
}
}
}
}

View File

@ -1,33 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
/**
* Estimates the size of an object in bytes.
* @apiviz.landmark
* @apiviz.uses io.netty.util.EstimatableObjectWrapper
*/
public interface ObjectSizeEstimator {
/**
* Returns the estimated size of the specified object in bytes.
*
* @return a positive integer which represents the size of the specified
* object in bytes
*/
int estimateSize(Object o);
}

View File

@ -1,165 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* {@link Executor} which should be used for downstream {@link ChannelEvent}'s. This implementation will take care of preserve the order of the events in a {@link Channel}.
* If you don't need to preserve the order just use one of the {@link Executor} implementations provided by the static methods of {@link Executors}.
* <br>
* <br>
*
* For more informations about how the order is preserved see {@link OrderedMemoryAwareThreadPoolExecutor}
*
*/
public final class OrderedDownstreamThreadPoolExecutor extends OrderedMemoryAwareThreadPoolExecutor {
/**
* Creates a new instance.
*
* @param corePoolSize the maximum number of active threads
*/
public OrderedDownstreamThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, 0L, 0L);
}
/**
* Creates a new instance.
*
* @param corePoolSize the maximum number of active threads
* @param keepAliveTime the amount of time for an inactive thread to shut itself down
* @param unit the {@link TimeUnit} of {@code keepAliveTime}
*/
public OrderedDownstreamThreadPoolExecutor(
int corePoolSize, long keepAliveTime, TimeUnit unit) {
super(corePoolSize, 0L, 0L, keepAliveTime, unit);
}
/**
* Creates a new instance.
*
* @param corePoolSize the maximum number of active threads
* @param keepAliveTime the amount of time for an inactive thread to shut itself down
* @param unit the {@link TimeUnit} of {@code keepAliveTime}
* @param threadFactory the {@link ThreadFactory} of this pool
*/
public OrderedDownstreamThreadPoolExecutor(
int corePoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
super(corePoolSize, 0L, 0L,
keepAliveTime, unit, threadFactory);
}
/**
* Return <code>null</code>
*/
@Override
public ObjectSizeEstimator getObjectSizeEstimator() {
return null;
}
/**
* Throws {@link UnsupportedOperationException} as there is not support for limit the memory size in this implementation
*/
@Override
public void setObjectSizeEstimator(ObjectSizeEstimator objectSizeEstimator) {
throw new UnsupportedOperationException("Not supported by this implementation");
}
/**
* Returns <code>0L</code>
*/
@Override
public long getMaxChannelMemorySize() {
return 0L;
}
/**
* Throws {@link UnsupportedOperationException} as there is not support for limit the memory size in this implementation
*/
@Override
public void setMaxChannelMemorySize(long maxChannelMemorySize) {
throw new UnsupportedOperationException("Not supported by this implementation");
}
/**
* Returns <code>0L</code>
*/
@Override
public long getMaxTotalMemorySize() {
return 0L;
}
/**
* Throws {@link UnsupportedOperationException} as there is not support for limit the memory size in this implementation
*/
@Override
public void setMaxTotalMemorySize(long maxTotalMemorySize) {
throw new UnsupportedOperationException("Not supported by this implementation");
}
/**
* Return <code>false</code> as we not need to cound the memory in this implementation
*/
@Override
protected boolean shouldCount(Runnable task) {
return false;
}
@Override
public void execute(Runnable command) {
// check if the Runnable was of an unsupported type
if (command instanceof ChannelUpstreamEventRunnable) {
throw new RejectedExecutionException("command must be enclosed with an downstream event.");
}
doExecute(command);
}
@Override
protected Executor getChildExecutor(ChannelEvent e) {
final Object key = getChildExecutorKey(e);
Executor executor = childExecutors.get(key);
if (executor == null) {
executor = new ChildExecutor();
Executor oldExecutor = childExecutors.putIfAbsent(key, executor);
if (oldExecutor != null) {
executor = oldExecutor;
} else {
// register a listener so that the ChildExecutor will get removed once the channel was closed
e.getChannel().getCloseFuture().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
removeChildExecutor(key);
}
});
}
}
return executor;
}
}

View File

@ -1,337 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
import java.util.IdentityHashMap;
import java.util.Queue;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelState;
import io.netty.channel.ChannelStateEvent;
import io.netty.util.internal.ConcurrentIdentityWeakKeyHashMap;
import io.netty.util.internal.QueueFactory;
/**
* A {@link MemoryAwareThreadPoolExecutor} which makes sure the events from the
* same {@link Channel} are executed sequentially.
* <p>
* <b>NOTE</b>: This thread pool inherits most characteristics of its super
* type, so please make sure to refer to {@link MemoryAwareThreadPoolExecutor}
* to understand how it works basically.
*
* <h3>Event execution order</h3>
*
* For example, let's say there are two executor threads that handle the events
* from the two channels:
* <pre>
* -------------------------------------&gt; Timeline ------------------------------------&gt;
*
* Thread X: --- Channel A (Event A1) --. .-- Channel B (Event B2) --- Channel B (Event B3) ---&gt;
* \ /
* X
* / \
* Thread Y: --- Channel B (Event B1) --' '-- Channel A (Event A2) --- Channel A (Event A3) ---&gt;
* </pre>
* As you see, the events from different channels are independent from each
* other. That is, an event of Channel B will not be blocked by an event of
* Channel A and vice versa, unless the thread pool is exhausted.
* <p>
* Also, it is guaranteed that the invocation will be made sequentially for the
* events from the same channel. For example, the event A2 is never executed
* before the event A1 is finished. (Although not recommended, if you want the
* events from the same channel to be executed simultaneously, please use
* {@link MemoryAwareThreadPoolExecutor} instead.)
* <p>
* However, it is not guaranteed that the invocation will be made by the same
* thread for the same channel. The events from the same channel can be
* executed by different threads. For example, the Event A2 is executed by the
* thread Y while the event A1 was executed by the thread X.
*
* <h3>Using a different key other than {@link Channel} to maintain event order</h3>
* <p>
* {@link OrderedMemoryAwareThreadPoolExecutor} uses a {@link Channel} as a key
* that is used for maintaining the event execution order, as explained in the
* previous section. Alternatively, you can extend it to change its behavior.
* For example, you can change the key to the remote IP of the peer:
*
* <pre>
* public class RemoteAddressBasedOMATPE extends {@link OrderedMemoryAwareThreadPoolExecutor} {
*
* ... Constructors ...
*
* {@code @Override}
* protected ConcurrentMap&lt;Object, Executor&gt; newChildExecutorMap() {
* // The default implementation returns a special ConcurrentMap that
* // uses identity comparison only (see {@link IdentityHashMap}).
* // Because SocketAddress does not work with identity comparison,
* // we need to employ more generic implementation.
* return new ConcurrentHashMap&lt;Object, Executor&gt;
* }
*
* protected Object getChildExecutorKey({@link ChannelEvent} e) {
* // Use the IP of the remote peer as a key.
* return ((InetSocketAddress) e.getChannel().getRemoteAddress()).getAddress();
* }
*
* // Make public so that you can call from anywhere.
* public boolean removeChildExecutor(Object key) {
* super.removeChildExecutor(key);
* }
* }
* </pre>
*
* Please be very careful of memory leak of the child executor map. You must
* call {@link #removeChildExecutor(Object)} when the life cycle of the key
* ends (e.g. all connections from the same IP were closed.) Also, please
* keep in mind that the key can appear again after calling {@link #removeChildExecutor(Object)}
* (e.g. a new connection could come in from the same old IP after removal.)
* If in doubt, prune the old unused or stall keys from the child executor map
* periodically:
*
* <pre>
* RemoteAddressBasedOMATPE executor = ...;
*
* on every 3 seconds:
*
* for (Iterator&lt;Object&gt; i = executor.getChildExecutorKeySet().iterator; i.hasNext();) {
* InetAddress ip = (InetAddress) i.next();
* if (there is no active connection from 'ip' now &&
* there has been no incoming connection from 'ip' for last 10 minutes) {
* i.remove();
* }
* }
* </pre>
*
* If the expected maximum number of keys is small and deterministic, you could
* use a weak key map such as <a href="http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/src/jsr166y/ConcurrentWeakHashMap.java?view=markup">ConcurrentWeakHashMap</a>
* or synchronized {@link WeakHashMap} instead of managing the life cycle of the
* keys by yourself.
*
* @apiviz.landmark
*/
public class OrderedMemoryAwareThreadPoolExecutor extends
MemoryAwareThreadPoolExecutor {
// TODO Make OMATPE focus on the case where Channel is the key.
// Add a new less-efficient TPE that allows custom key.
protected final ConcurrentMap<Object, Executor> childExecutors = newChildExecutorMap();
/**
* Creates a new instance.
*
* @param corePoolSize the maximum number of active threads
* @param maxChannelMemorySize the maximum total size of the queued events per channel.
* Specify {@code 0} to disable.
* @param maxTotalMemorySize the maximum total size of the queued events for this pool
* Specify {@code 0} to disable.
*/
public OrderedMemoryAwareThreadPoolExecutor(
int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize);
}
/**
* Creates a new instance.
*
* @param corePoolSize the maximum number of active threads
* @param maxChannelMemorySize the maximum total size of the queued events per channel.
* Specify {@code 0} to disable.
* @param maxTotalMemorySize the maximum total size of the queued events for this pool
* Specify {@code 0} to disable.
* @param keepAliveTime the amount of time for an inactive thread to shut itself down
* @param unit the {@link TimeUnit} of {@code keepAliveTime}
*/
public OrderedMemoryAwareThreadPoolExecutor(
int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
long keepAliveTime, TimeUnit unit) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize,
keepAliveTime, unit);
}
/**
* Creates a new instance.
*
* @param corePoolSize the maximum number of active threads
* @param maxChannelMemorySize the maximum total size of the queued events per channel.
* Specify {@code 0} to disable.
* @param maxTotalMemorySize the maximum total size of the queued events for this pool
* Specify {@code 0} to disable.
* @param keepAliveTime the amount of time for an inactive thread to shut itself down
* @param unit the {@link TimeUnit} of {@code keepAliveTime}
* @param threadFactory the {@link ThreadFactory} of this pool
*/
public OrderedMemoryAwareThreadPoolExecutor(
int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize,
keepAliveTime, unit, threadFactory);
}
/**
* Creates a new instance.
*
* @param corePoolSize the maximum number of active threads
* @param maxChannelMemorySize the maximum total size of the queued events per channel.
* Specify {@code 0} to disable.
* @param maxTotalMemorySize the maximum total size of the queued events for this pool
* Specify {@code 0} to disable.
* @param keepAliveTime the amount of time for an inactive thread to shut itself down
* @param unit the {@link TimeUnit} of {@code keepAliveTime}
* @param threadFactory the {@link ThreadFactory} of this pool
* @param objectSizeEstimator the {@link ObjectSizeEstimator} of this pool
*/
public OrderedMemoryAwareThreadPoolExecutor(
int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
long keepAliveTime, TimeUnit unit,
ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize,
keepAliveTime, unit, objectSizeEstimator, threadFactory);
}
protected ConcurrentMap<Object, Executor> newChildExecutorMap() {
return new ConcurrentIdentityWeakKeyHashMap<Object, Executor>();
}
protected Object getChildExecutorKey(ChannelEvent e) {
return e.getChannel();
}
protected Set<Object> getChildExecutorKeySet() {
return childExecutors.keySet();
}
protected boolean removeChildExecutor(Object key) {
// FIXME: Succeed only when there is no task in the ChildExecutor's queue.
// Note that it will need locking which might slow down task submission.
return childExecutors.remove(key) != null;
}
/**
* Executes the specified task concurrently while maintaining the event
* order.
*/
@Override
protected void doExecute(Runnable task) {
if (!(task instanceof ChannelEventRunnable)) {
doUnorderedExecute(task);
} else {
ChannelEventRunnable r = (ChannelEventRunnable) task;
getChildExecutor(r.getEvent()).execute(task);
}
}
protected Executor getChildExecutor(ChannelEvent e) {
Object key = getChildExecutorKey(e);
Executor executor = childExecutors.get(key);
if (executor == null) {
executor = new ChildExecutor();
Executor oldExecutor = childExecutors.putIfAbsent(key, executor);
if (oldExecutor != null) {
executor = oldExecutor;
}
}
// Remove the entry when the channel closes.
if (e instanceof ChannelStateEvent) {
Channel channel = e.getChannel();
ChannelStateEvent se = (ChannelStateEvent) e;
if (se.getState() == ChannelState.OPEN &&
!channel.isOpen()) {
removeChildExecutor(key);
}
}
return executor;
}
@Override
protected boolean shouldCount(Runnable task) {
if (task instanceof ChildExecutor) {
return false;
}
return super.shouldCount(task);
}
void onAfterExecute(Runnable r, Throwable t) {
afterExecute(r, t);
}
protected final class ChildExecutor implements Executor, Runnable {
private final Queue<Runnable> tasks = QueueFactory.createQueue(Runnable.class);
private final AtomicBoolean isRunning = new AtomicBoolean();
@Override
public void execute(Runnable command) {
// TODO: What todo if the add return false ?
tasks.add(command);
if (!isRunning.get()) {
doUnorderedExecute(this);
}
}
@Override
public void run() {
boolean acquired = false;
// check if its already running by using CAS. If so just return here. So in the worst case the thread
// is executed and do nothing
if (isRunning.compareAndSet(false, true)) {
acquired = true;
try {
Thread thread = Thread.currentThread();
for (;;) {
final Runnable task = tasks.poll();
// if the task is null we should exit the loop
if (task == null) {
break;
}
boolean ran = false;
beforeExecute(thread, task);
try {
task.run();
ran = true;
onAfterExecute(task, null);
} catch (RuntimeException e) {
if (!ran) {
onAfterExecute(task, e);
}
throw e;
}
}
} finally {
// set it back to not running
isRunning.set(false);
}
if (acquired && !isRunning.get() && tasks.peek() != null) {
doUnorderedExecute(this);
}
}
}
}
}

View File

@ -1,26 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* {@link java.util.concurrent.Executor}-based implementation of various
* thread models that separate business logic from I/O threads
*
* @apiviz.exclude ^java\.lang\.
* @apiviz.exclude \.netty\.channel\.
* @apiviz.exclude \.ExternalResourceReleasable$
* @apiviz.exclude \.Channel[A-Za-z]*EventRunnable[A-Za-z]*$
*/
package io.netty.handler.execution;

View File

@ -1,79 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.region;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelDownstreamHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureAggregator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.Channels;
import io.netty.channel.MessageEvent;
/**
* {@link WritableByteChannel} implementation which will take care to wrap the {@link ByteBuffer} to a {@link ChannelBuffer} and forward it to the next {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} on every {@link #write(ByteBuffer)}
* operation.
*/
public class ChannelWritableByteChannel implements WritableByteChannel {
private boolean closed;
private final ChannelHandlerContext context;
private final ChannelFutureAggregator aggregator;
private final SocketAddress remote;
public ChannelWritableByteChannel(ChannelHandlerContext context, MessageEvent event) {
this(context, new ChannelFutureAggregator(event.getFuture()), event.getRemoteAddress());
}
public ChannelWritableByteChannel(ChannelHandlerContext context, ChannelFutureAggregator aggregator, SocketAddress remote) {
this.context = context;
this.aggregator = aggregator;
this.remote = remote;
}
@Override
public boolean isOpen() {
return !closed && context.channel().isOpen();
}
@Override
public void close() throws IOException {
closed = true;
}
@Override
public int write(ByteBuffer src) throws IOException {
int written = src.remaining();
// create a new ChannelFuture and add it to the aggregator
ChannelFuture future = Channels.future(context.channel(), true);
aggregator.addFuture(future);
Channels.write(context, future, ChannelBuffers.wrappedBuffer(src), remote);
return written;
}
}

View File

@ -1,75 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.region;
import java.nio.channels.WritableByteChannel;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelDownstreamHandler;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.FileRegion;
import io.netty.channel.MessageEvent;
/**
* {@link ChannelDownstreamHandler} implementation which encodes a {@link FileRegion} to {@link ChannelBuffer}'s if one of the given {@link ChannelHandler} was found in the {@link ChannelPipeline}.
*
* This {@link ChannelDownstreamHandler} should be used if you plan to write {@link FileRegion} objects and also have some {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} which needs to transform
* the to be written {@link ChannelBuffer} in any case. This could be for example {@link ChannelDownstreamHandler}'s which needs to encrypt or compress messages.
*
* Users of this {@link FileRegionEncoder} should add / remove this {@link ChannelDownstreamHandler} on the fly to get the best performance out of their system.
*
*
*/
@ChannelHandler.Sharable
public class FileRegionEncoder implements ChannelDownstreamHandler {
@Override
public void handleDownstream(
ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
if (!(evt instanceof MessageEvent)) {
ctx.sendDownstream(evt);
return;
}
MessageEvent e = (MessageEvent) evt;
Object originalMessage = e.getMessage();
if (originalMessage instanceof FileRegion) {
FileRegion fr = (FileRegion) originalMessage;
WritableByteChannel bchannel = new ChannelWritableByteChannel(ctx, e);
int length = 0;
long i = 0;
while ((i = fr.transferTo(bchannel, length)) > 0) {
length += i;
if (length >= fr.getCount()) {
break;
}
}
} else {
// no converting is needed so just sent the event downstream
ctx.sendDownstream(evt);
}
}
}

View File

@ -1,22 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* TODO: Replace this package and provide this functionality in the core.
*
* @apiviz.exclude \.channel\.
*/
package io.netty.handler.region;

View File

@ -1,387 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.traffic;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelState;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelHandler;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.ExternalResourceReleasable;
import io.netty.util.internal.ExecutorUtil;
/**
* AbstractTrafficShapingHandler allows to limit the global bandwidth
* (see {@link GlobalTrafficShapingHandler}) or per session
* bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
* It allows too to implement an almost real time monitoring of the bandwidth using
* the monitors from {@link TrafficCounter} that will call back every checkInterval
* the method doAccounting of this handler.<br>
* <br>
*
* If you want for any particular reasons to stop the monitoring (accounting) or to change
* the read/write limit or the check interval, several methods allow that for you:<br>
* <ul>
* <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
* <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
* or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
* <li></li>
* </ul>
*/
public abstract class AbstractTrafficShapingHandler extends
SimpleChannelHandler implements ExternalResourceReleasable {
/**
* Internal logger
*/
static InternalLogger logger = InternalLoggerFactory
.getInstance(AbstractTrafficShapingHandler.class);
/**
* Default delay between two checks: 1s
*/
public static final long DEFAULT_CHECK_INTERVAL = 1000;
/**
* Default minimal time to wait
*/
private static final long MINIMAL_WAIT = 10;
/**
* Traffic Counter
*/
protected TrafficCounter trafficCounter;
/**
* Executor to associated to any TrafficCounter
*/
protected Executor executor;
/**
* Limit in B/s to apply to write
*/
private long writeLimit;
/**
* Limit in B/s to apply to read
*/
private long readLimit;
/**
* Delay between two performance snapshots
*/
protected long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
/**
* Boolean associated with the release of this TrafficShapingHandler.
* It will be true only once when the releaseExternalRessources is called
* to prevent waiting when shutdown.
*/
final AtomicBoolean release = new AtomicBoolean(false);
private void init(
Executor newExecutor, long newWriteLimit, long newReadLimit, long newCheckInterval) {
executor = newExecutor;
writeLimit = newWriteLimit;
readLimit = newReadLimit;
checkInterval = newCheckInterval;
//logger.info("TSH: "+writeLimit+":"+readLimit+":"+checkInterval+":"+isPerChannel());
}
/**
*
* @param newTrafficCounter the TrafficCounter to set
*/
void setTrafficCounter(TrafficCounter newTrafficCounter) {
trafficCounter = newTrafficCounter;
}
/**
* @param executor
* created for instance like Executors.newCachedThreadPool
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
*/
public AbstractTrafficShapingHandler(Executor executor, long writeLimit,
long readLimit, long checkInterval) {
init(executor, writeLimit, readLimit, checkInterval);
}
/**
* @param executor
* created for instance like Executors.newCachedThreadPool
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
*/
public AbstractTrafficShapingHandler(Executor executor, long writeLimit,
long readLimit) {
init(executor, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
}
/**
* Change the underlying limitations and check interval.
*/
public void configure(long newWriteLimit, long newReadLimit,
long newCheckInterval) {
this.configure(newWriteLimit, newReadLimit);
this.configure(newCheckInterval);
}
/**
* Change the underlying limitations.
*/
public void configure(long newWriteLimit, long newReadLimit) {
writeLimit = newWriteLimit;
readLimit = newReadLimit;
if (trafficCounter != null) {
trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
}
}
/**
* Change the check interval.
*/
public void configure(long newCheckInterval) {
checkInterval = newCheckInterval;
if (trafficCounter != null) {
trafficCounter.configure(checkInterval);
}
}
/**
* Called each time the accounting is computed from the TrafficCounters.
* This method could be used for instance to implement almost real time accounting.
*
* @param counter
* the TrafficCounter that computes its performance
*/
protected void doAccounting(TrafficCounter counter) {
// NOOP by default
}
/**
* Class to implement setReadable at fix time
*/
private class ReopenRead implements Runnable {
/**
* Associated ChannelHandlerContext
*/
private ChannelHandlerContext ctx;
/**
* Time to wait before clearing the channel
*/
private long timeToWait;
/**
* @param ctx
* the associated channelHandlerContext
* @param timeToWait
*/
protected ReopenRead(ChannelHandlerContext ctx, long timeToWait) {
this.ctx = ctx;
this.timeToWait = timeToWait;
}
/**
* Truly run the waken up of the channel
*/
@Override
public void run() {
try {
if (release.get()) {
return;
}
Thread.sleep(timeToWait);
} catch (InterruptedException e) {
// interruption so exit
return;
}
// logger.info("WAKEUP!");
if (ctx != null && ctx.channel() != null &&
ctx.channel().isConnected()) {
//logger.info(" setReadable TRUE: "+timeToWait);
// readSuspended = false;
ctx.setAttachment(null);
ctx.channel().setReadable(true);
}
}
}
/**
*
* @return the time that should be necessary to wait to respect limit. Can
* be negative time
*/
private long getTimeToWait(long limit, long bytes, long lastTime,
long curtime) {
long interval = curtime - lastTime;
if (interval == 0) {
// Time is too short, so just lets continue
return 0;
}
return bytes * 1000 / limit - interval;
}
@Override
public void messageReceived(ChannelHandlerContext arg0, MessageEvent arg1)
throws Exception {
try {
long curtime = System.currentTimeMillis();
long size = ((ChannelBuffer) arg1.getMessage()).readableBytes();
if (trafficCounter != null) {
trafficCounter.bytesRecvFlowControl(arg0, size);
if (readLimit == 0) {
// no action
return;
}
// compute the number of ms to wait before reopening the channel
long wait = getTimeToWait(readLimit, trafficCounter
.getCurrentReadBytes(), trafficCounter.getLastTime(),
curtime);
if (wait > MINIMAL_WAIT) { // At least 10ms seems a minimal time in order to
Channel channel = arg0.channel();
// try to limit the traffic
if (channel != null && channel.isConnected()) {
// Channel version
if (executor == null) {
// Sleep since no executor
//logger.info("Read sleep since no executor for "+wait+" ms for "+this);
if (release.get()) {
return;
}
Thread.sleep(wait);
return;
}
if (arg0.getAttachment() == null) {
// readSuspended = true;
arg0.setAttachment(Boolean.TRUE);
channel.setReadable(false);
//logger.info("Read will wakeup after "+wait+" ms "+this);
executor.execute(new ReopenRead(arg0, wait));
} else {
// should be waiting: but can occurs sometime so as a FIX
//logger.info("Read sleep ok but should not be here: "+wait+" "+this);
if (release.get()) {
return;
}
Thread.sleep(wait);
}
} else {
// Not connected or no channel
//logger.info("Read sleep "+wait+" ms for "+this);
if (release.get()) {
return;
}
Thread.sleep(wait);
}
}
}
} finally {
// The message is then just passed to the next handler
super.messageReceived(arg0, arg1);
}
}
@Override
public void writeRequested(ChannelHandlerContext arg0, MessageEvent arg1)
throws Exception {
try {
long curtime = System.currentTimeMillis();
long size = ((ChannelBuffer) arg1.getMessage()).readableBytes();
if (trafficCounter != null) {
trafficCounter.bytesWriteFlowControl(size);
if (writeLimit == 0) {
return;
}
// compute the number of ms to wait before continue with the channel
long wait = getTimeToWait(writeLimit, trafficCounter
.getCurrentWrittenBytes(), trafficCounter.getLastTime(),
curtime);
if (wait > MINIMAL_WAIT) {
// Global or Channel
if (release.get()) {
return;
}
Thread.sleep(wait);
}
}
} finally {
// The message is then just passed to the next handler
super.writeRequested(arg0, arg1);
}
}
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent cse = (ChannelStateEvent) e;
if (cse.getState() == ChannelState.INTEREST_OPS &&
(((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
// setReadable(true) requested
boolean readSuspended = ctx.getAttachment() != null;
if (readSuspended) {
// Drop the request silently if this handler has
// set the flag.
e.getFuture().setSuccess();
return;
}
}
}
super.handleDownstream(ctx, e);
}
/**
*
* @return the current TrafficCounter (if
* channel is still connected)
*/
public TrafficCounter getTrafficCounter() {
return trafficCounter;
}
@Override
public void releaseExternalResources() {
if (trafficCounter != null) {
trafficCounter.stop();
}
release.set(true);
ExecutorUtil.terminate(executor);
}
@Override
public String toString() {
return "TrafficShaping with Write Limit: " + writeLimit +
" Read Limit: " + readLimit + " and Counter: " +
(trafficCounter != null? trafficCounter.toString() : "none");
}
}

View File

@ -1,113 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.traffic;
import java.util.concurrent.Executor;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.ChannelStateEvent;
import io.netty.handler.execution.ExecutionHandler;
import io.netty.handler.execution.MemoryAwareThreadPoolExecutor;
import io.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
/**
* This implementation of the {@link AbstractTrafficShapingHandler} is for channel
* traffic shaping, that is to say a per channel limitation of the bandwidth.<br><br>
*
* The general use should be as follow:<br>
* <ul>
* <li>Add in your pipeline a new ChannelTrafficShapingHandler, before a recommended {@link ExecutionHandler} (like
* {@link OrderedMemoryAwareThreadPoolExecutor} or {@link MemoryAwareThreadPoolExecutor}).<br>
* <tt>ChannelTrafficShapingHandler myHandler = new ChannelTrafficShapingHandler(executor);</tt><br>
* executor could be created using <tt>Executors.newCachedThreadPool();</tt><br>
* <tt>pipeline.addLast("CHANNEL_TRAFFIC_SHAPING", myHandler);</tt><br><br>
*
* <b>Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created
* for each new channel as the counter cannot be shared among all channels.</b> For instance, if you have a
* {@link ChannelPipelineFactory}, you should create a new ChannelTrafficShapingHandler in this
* {@link ChannelPipelineFactory} each time getPipeline() method is called.<br><br>
*
* Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
* or the check interval (in millisecond) that represents the delay between two computations of the
* bandwidth and so the call back of the doAccounting method (0 means no accounting at all).<br><br>
*
* A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
* it is recommended to set a positive value, even if it is high since the precision of the
* Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
* the less precise the traffic shaping will be. It is suggested as higher value something close
* to 5 or 10 minutes.<br>
* </li>
* <li>When you shutdown your application, release all the external resources like the executor
* by calling:<br>
* <tt>myHandler.releaseExternalResources();</tt><br>
* </li>
* </ul><br>
*/
public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
/**
* @param executor
* @param writeLimit
* @param readLimit
* @param checkInterval
*/
public ChannelTrafficShapingHandler(Executor executor, long writeLimit,
long readLimit, long checkInterval) {
super(executor, writeLimit, readLimit, checkInterval);
}
/**
* @param executor
* @param writeLimit
* @param readLimit
*/
public ChannelTrafficShapingHandler(Executor executor, long writeLimit,
long readLimit) {
super(executor, writeLimit, readLimit);
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
if (trafficCounter != null) {
trafficCounter.stop();
trafficCounter = null;
}
super.channelClosed(ctx, e);
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
// readSuspended = true;
ctx.setAttachment(Boolean.TRUE);
ctx.channel().setReadable(false);
if (trafficCounter == null) {
// create a new counter now
trafficCounter = new TrafficCounter(this, executor, "ChannelTC" +
ctx.channel().getId(), checkInterval);
}
if (trafficCounter != null) {
trafficCounter.start();
}
super.channelConnected(ctx, e);
// readSuspended = false;
ctx.setAttachment(null);
ctx.channel().setReadable(true);
}
}

View File

@ -1,94 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.traffic;
import java.util.concurrent.Executor;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.execution.ExecutionHandler;
import io.netty.handler.execution.MemoryAwareThreadPoolExecutor;
import io.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
/**
* This implementation of the {@link AbstractTrafficShapingHandler} is for global
* traffic shaping, that is to say a global limitation of the bandwidth, whatever
* the number of opened channels.<br><br>
*
* The general use should be as follow:<br>
* <ul>
* <li>Create your unique GlobalTrafficShapingHandler like:<br><br>
* <tt>GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(executor);</tt><br><br>
* executor could be created using <tt>Executors.newCachedThreadPool();</tt><br>
* <tt>pipeline.addLast("GLOBAL_TRAFFIC_SHAPING", myHandler);</tt><br><br>
*
* <b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created
* and shared among all channels as the counter must be shared among all channels.</b><br><br>
*
* Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
* or the check interval (in millisecond) that represents the delay between two computations of the
* bandwidth and so the call back of the doAccounting method (0 means no accounting at all).<br><br>
*
* A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
* it is recommended to set a positive value, even if it is high since the precision of the
* Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
* the less precise the traffic shaping will be. It is suggested as higher value something close
* to 5 or 10 minutes.<br>
* </li>
* <li>Add it in your pipeline, before a recommended {@link ExecutionHandler} (like
* {@link OrderedMemoryAwareThreadPoolExecutor} or {@link MemoryAwareThreadPoolExecutor}).<br>
* <tt>pipeline.addLast("GLOBAL_TRAFFIC_SHAPING", myHandler);</tt><br><br>
* </li>
* <li>When you shutdown your application, release all the external resources like the executor
* by calling:<br>
* <tt>myHandler.releaseExternalResources();</tt><br>
* </li>
* </ul><br>
*/
@Sharable
public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
/**
* Create the global TrafficCounter
*/
void createGlobalTrafficCounter() {
TrafficCounter tc = new TrafficCounter(this, executor, "GlobalTC",
checkInterval);
setTrafficCounter(tc);
tc.start();
}
/**
* @param executor
* @param writeLimit
* @param readLimit
* @param checkInterval
*/
public GlobalTrafficShapingHandler(Executor executor, long writeLimit,
long readLimit, long checkInterval) {
super(executor, writeLimit, readLimit, checkInterval);
createGlobalTrafficCounter();
}
/**
* @param executor
* @param writeLimit
* @param readLimit
*/
public GlobalTrafficShapingHandler(Executor executor, long writeLimit,
long readLimit) {
super(executor, writeLimit, readLimit);
createGlobalTrafficCounter();
}
}

View File

@ -1,401 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.traffic;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import io.netty.channel.ChannelHandlerContext;
/**
* TrafficCounter is associated with {@link AbstractTrafficShapingHandler}.<br>
* <br>
* A TrafficCounter has for goal to count the traffic in order to enable to limit the traffic or not,
* globally or per channel. It compute statistics on read and written bytes at the specified
* interval and call back the {@link AbstractTrafficShapingHandler} doAccounting method at every
* specified interval. If this interval is set to 0, therefore no accounting will be done and only
* statistics will be computed at each receive or write operations.
*/
public class TrafficCounter {
/**
* Current written bytes
*/
private final AtomicLong currentWrittenBytes = new AtomicLong();
/**
* Current read bytes
*/
private final AtomicLong currentReadBytes = new AtomicLong();
/**
* Long life written bytes
*/
private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
/**
* Long life read bytes
*/
private final AtomicLong cumulativeReadBytes = new AtomicLong();
/**
* Last Time where cumulative bytes where reset to zero
*/
private long lastCumulativeTime;
/**
* Last writing bandwidth
*/
private long lastWriteThroughput;
/**
* Last reading bandwidth
*/
private long lastReadThroughput;
/**
* Last Time Check taken
*/
private final AtomicLong lastTime = new AtomicLong();
/**
* Last written bytes number during last check interval
*/
private long lastWrittenBytes;
/**
* Last read bytes number during last check interval
*/
private long lastReadBytes;
/**
* Delay between two captures
*/
AtomicLong checkInterval = new AtomicLong(
AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL);
// default 1 s
/**
* Name of this Monitor
*/
final String name;
/**
* The associated TrafficShapingHandler
*/
private AbstractTrafficShapingHandler trafficShapingHandler;
/**
* Default Executor
*/
private Executor executor;
/**
* Is Monitor active
*/
AtomicBoolean monitorActive = new AtomicBoolean();
/**
* Monitor
*/
private TrafficMonitoring trafficMonitoring;
/**
* Class to implement monitoring at fix delay
*/
private class TrafficMonitoring implements Runnable {
/**
* The associated TrafficShapingHandler
*/
private final AbstractTrafficShapingHandler trafficShapingHandler1;
/**
* The associated TrafficCounter
*/
private final TrafficCounter counter;
/**
* @param trafficShapingHandler
* @param counter
*/
protected TrafficMonitoring(
AbstractTrafficShapingHandler trafficShapingHandler,
TrafficCounter counter) {
trafficShapingHandler1 = trafficShapingHandler;
this.counter = counter;
}
/**
* Default run
*/
@Override
public void run() {
try {
Thread.currentThread().setName(name);
for (; monitorActive.get();) {
long check = counter.checkInterval.get();
if (check > 0) {
Thread.sleep(check);
} else {
// Delay goes to 0, so exit
return;
}
long endTime = System.currentTimeMillis();
counter.resetAccounting(endTime);
if (trafficShapingHandler1 != null) {
trafficShapingHandler1.doAccounting(counter);
}
}
} catch (InterruptedException e) {
// End of computations
}
}
}
/**
* Start the monitoring process
*/
public void start() {
synchronized (lastTime) {
if (monitorActive.get()) {
return;
}
lastTime.set(System.currentTimeMillis());
if (checkInterval.get() > 0) {
monitorActive.set(true);
trafficMonitoring = new TrafficMonitoring(
trafficShapingHandler, this);
executor.execute(trafficMonitoring);
}
}
}
/**
* Stop the monitoring process
*/
public void stop() {
synchronized (lastTime) {
if (!monitorActive.get()) {
return;
}
monitorActive.set(false);
resetAccounting(System.currentTimeMillis());
if (trafficShapingHandler != null) {
trafficShapingHandler.doAccounting(this);
}
}
}
/**
* Reset the accounting on Read and Write
*
* @param newLastTime
*/
void resetAccounting(long newLastTime) {
synchronized (lastTime) {
long interval = newLastTime - lastTime.getAndSet(newLastTime);
if (interval == 0) {
// nothing to do
return;
}
lastReadBytes = currentReadBytes.getAndSet(0);
lastWrittenBytes = currentWrittenBytes.getAndSet(0);
lastReadThroughput = lastReadBytes / interval * 1000;
// nb byte / checkInterval in ms * 1000 (1s)
lastWriteThroughput = lastWrittenBytes / interval * 1000;
// nb byte / checkInterval in ms * 1000 (1s)
}
}
/**
* Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the executorService to use, its
* name, the checkInterval between two computations in millisecond
* @param trafficShapingHandler the associated AbstractTrafficShapingHandler
* @param executor
* Should be a CachedThreadPool for efficiency
* @param name
* the name given to this monitor
* @param checkInterval
* the checkInterval in millisecond between two computations
*/
public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler,
Executor executor, String name, long checkInterval) {
this.trafficShapingHandler = trafficShapingHandler;
this.executor = executor;
this.name = name;
lastCumulativeTime = System.currentTimeMillis();
configure(checkInterval);
}
/**
* Change checkInterval between
* two computations in millisecond
*
* @param newcheckInterval
*/
public void configure(long newcheckInterval) {
if (checkInterval.get() != newcheckInterval) {
checkInterval.set(newcheckInterval);
if (newcheckInterval <= 0) {
stop();
// No more active monitoring
lastTime.set(System.currentTimeMillis());
} else {
// Start if necessary
start();
}
}
}
/**
* Computes counters for Read.
*
* @param ctx
* the associated channelHandlerContext
* @param recv
* the size in bytes to read
*/
void bytesRecvFlowControl(ChannelHandlerContext ctx, long recv) {
currentReadBytes.addAndGet(recv);
cumulativeReadBytes.addAndGet(recv);
}
/**
* Computes counters for Write.
*
* @param write
* the size in bytes to write
*/
void bytesWriteFlowControl(long write) {
currentWrittenBytes.addAndGet(write);
cumulativeWrittenBytes.addAndGet(write);
}
/**
*
* @return the current checkInterval between two computations of traffic counter
* in millisecond
*/
public long getCheckInterval() {
return checkInterval.get();
}
/**
*
* @return the Read Throughput in bytes/s computes in the last check interval
*/
public long getLastReadThroughput() {
return lastReadThroughput;
}
/**
*
* @return the Write Throughput in bytes/s computes in the last check interval
*/
public long getLastWriteThroughput() {
return lastWriteThroughput;
}
/**
*
* @return the number of bytes read during the last check Interval
*/
public long getLastReadBytes() {
return lastReadBytes;
}
/**
*
* @return the number of bytes written during the last check Interval
*/
public long getLastWrittenBytes() {
return lastWrittenBytes;
}
/**
*
* @return the current number of bytes read since the last checkInterval
*/
public long getCurrentReadBytes() {
return currentReadBytes.get();
}
/**
*
* @return the current number of bytes written since the last check Interval
*/
public long getCurrentWrittenBytes() {
return currentWrittenBytes.get();
}
/**
* @return the Time in millisecond of the last check as of System.currentTimeMillis()
*/
public long getLastTime() {
return lastTime.get();
}
/**
* @return the cumulativeWrittenBytes
*/
public long getCumulativeWrittenBytes() {
return cumulativeWrittenBytes.get();
}
/**
* @return the cumulativeReadBytes
*/
public long getCumulativeReadBytes() {
return cumulativeReadBytes.get();
}
/**
* @return the lastCumulativeTime in millisecond as of System.currentTimeMillis()
* when the cumulative counters were reset to 0.
*/
public long getLastCumulativeTime() {
return lastCumulativeTime;
}
/**
* Reset both read and written cumulative bytes counters and the associated time.
*/
public void resetCumulativeTime() {
lastCumulativeTime = System.currentTimeMillis();
cumulativeReadBytes.set(0);
cumulativeWrittenBytes.set(0);
}
/**
* @return the name
*/
public String getName() {
return name;
}
/**
* String information
*/
@Override
public String toString() {
return "Monitor " + name + " Current Speed Read: " +
(lastReadThroughput >> 10) + " KB/s, Write: " +
(lastWriteThroughput >> 10) + " KB/s Current Read: " +
(currentReadBytes.get() >> 10) + " KB Current Write: " +
(currentWrittenBytes.get() >> 10) + " KB";
}
}

View File

@ -1,91 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Implementation of a Traffic Shaping Handler and Dynamic Statistics.<br>
* <br><br>
* <P>The main goal of this package is to allow to shape the traffic (bandwidth limitation),
* but also to get statistics on how many bytes are read or written. Both functions can
* be active or inactive (traffic or statistics).</P>
*
* <P>Two classes implement this behavior:<br>
* <ul>
* <li> <tt>{@link io.netty.handler.traffic.TrafficCounter}</tt>: this class implements the counters needed by the handlers.
* It can be accessed to get some extra information like the read or write bytes since last check, the read and write
* bandwidth from last check...</li><br><br>
*
* <li> <tt>{@link io.netty.handler.traffic.AbstractTrafficShapingHandler}</tt>: this abstract class implements the kernel
* of the traffic shaping. It could be extended to fit your needs. Two classes are proposed as default
* implementations: see {@link io.netty.handler.traffic.ChannelTrafficShapingHandler} and see {@link io.netty.handler.traffic.GlobalTrafficShapingHandler}
* respectively for Channel traffic shaping and Global traffic shaping.</li><br><br>
*
* The insertion in the pipeline of one of those handlers can be wherever you want, but
* <b>it must be placed before any <tt>{@link io.netty.handler.execution.MemoryAwareThreadPoolExecutor}</tt>
* in your pipeline</b>.<br>
* <b><i>It is really recommended to have such a</i> <tt>{@link io.netty.handler.execution.MemoryAwareThreadPoolExecutor}</tt>
* <i>(either non ordered or </i> <tt>{@link io.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor}</tt>
* <i>) in your pipeline</i></b>
* when you want to use this feature with some real traffic shaping, since it will allow to relax the constraint on
* NioWorker to do other jobs if necessary.<br>
* Instead, if you don't, you can have the following situation: if there are more clients
* connected and doing data transfer (either in read or write) than NioWorker, your global performance can be under
* your specifications or even sometimes it will block for a while which can turn to "timeout" operations.
* For instance, let says that you've got 2 NioWorkers, and 10 clients wants to send data to your server.
* If you set a bandwidth limitation of 100KB/s for each channel (client), you could have a final limitation of about
* 60KB/s for each channel since NioWorkers are stopping by this handler.<br>
* When it is used as a read traffic shaper, the handler will set the channel as not readable, so as to relax the
* NioWorkers.<br><br>
* An {@link io.netty.util.ObjectSizeEstimator} can be passed at construction to specify what
* is the size of the object to be read or write accordingly to the type of
* object. If not specified, it will used the {@link io.netty.util.DefaultObjectSizeEstimator} implementation.<br><br>
* </ul></P>
*
* <P>Standard use could be as follow:</P>
*
* <P><ul>
* <li>To activate or deactivate the traffic shaping, change the value corresponding to your desire as
* [Global or per Channel] [Write or Read] Limitation in byte/s.</li><br>
* A value of <tt>0</tt>
* stands for no limitation, so the traffic shaping is deactivate (on what you specified).<br>
* You can either change those values with the method <tt>configure</tt> in {@link io.netty.handler.traffic.AbstractTrafficShapingHandler}.<br>
* <br>
*
* <li>To activate or deactivate the statistics, you can adjust the delay to a low (suggested not less than 200ms
* for efficiency reasons) or a high value (let say 24H in millisecond is huge enough to not get the problem)
* or even using <tt>0</tt> which means no computation will be done.</li><br>
* If you want to do anything with this statistics, just override the <tt>doAccounting</tt> method.<br>
* This interval can be changed either from the method <tt>configure</tt> in {@link io.netty.handler.traffic.AbstractTrafficShapingHandler}
* or directly using the method <tt>configure</tt> of {@link io.netty.handler.traffic.TrafficCounter}.<br><br>
*
* </ul></P><br><br>
*
* <P>So in your application you will create your own TrafficShapingHandler and set the values to fit your needs.</P>
* <tt>XXXXXTrafficShapingHandler myHandler = new XXXXXTrafficShapingHandler(executor);</tt><br><br>
* where executor could be created using <tt>Executors.newCachedThreadPool();</tt> and XXXXX could be either
* Global or Channel<br>
* <tt>pipeline.addLast("XXXXX_TRAFFIC_SHAPING", myHandler);</tt><br>
* <tt>...</tt><br>
* <tt>pipeline.addLast("MemoryExecutor",new ExecutionHandler(memoryAwareThreadPoolExecutor));</tt><br><br>
* <P>Note that a new {@link io.netty.handler.traffic.ChannelTrafficShapingHandler} must be created for each new channel,
* but only one {@link io.netty.handler.traffic.GlobalTrafficShapingHandler} must be created for all channels.</P>
*
* <P>Note also that you can create different GlobalTrafficShapingHandler if you want to separate classes of
* channels (for instance either from business point of view or from bind address point of view).</P>
*
* @apiviz.exclude ^java\.lang\.
*/
package io.netty.handler.traffic;

View File

@ -1,48 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.iostream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketAddress;
/**
* A {@link java.net.SocketAddress} implementation holding an
* {@link java.io.InputStream} and an {@link java.io.OutputStream} instance used
* as "remote" address to connect to with a {@link IoStreamChannel}.
*/
public class IoStreamAddress extends SocketAddress {
private static final long serialVersionUID = -4382415449059935960L;
private final InputStream inputStream;
private final OutputStream outputStream;
public IoStreamAddress(final InputStream inputStream, final OutputStream outputStream) {
this.inputStream = inputStream;
this.outputStream = outputStream;
}
public InputStream getInputStream() {
return inputStream;
}
public OutputStream getOutputStream() {
return outputStream;
}
}

View File

@ -1,75 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.iostream;
import java.net.SocketAddress;
import io.netty.channel.AbstractChannel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
/**
* A channel to an {@link java.io.InputStream} and an
* {@link java.io.OutputStream}.
*/
public class IoStreamChannel extends AbstractChannel {
IoStreamChannel(final ChannelFactory factory, final ChannelPipeline pipeline, final ChannelSink sink) {
super(null, factory, pipeline, sink);
}
@Override
public ChannelConfig getConfig() {
return ((IoStreamChannelSink) getPipeline().getSink()).getConfig();
}
@Override
public boolean isBound() {
return ((IoStreamChannelSink) getPipeline().getSink()).isBound();
}
@Override
public boolean isConnected() {
return ((IoStreamChannelSink) getPipeline().getSink()).isConnected();
}
@Override
public SocketAddress getLocalAddress() {
return null;
}
@Override
public SocketAddress getRemoteAddress() {
return ((IoStreamChannelSink) getPipeline().getSink()).getRemoteAddress();
}
@Override
public ChannelFuture bind(final SocketAddress localAddress) {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture unbind() {
throw new UnsupportedOperationException();
}
void doSetClosed() {
setClosed();
}
}

View File

@ -1,57 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.iostream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.internal.ExecutorUtil;
import java.util.concurrent.ExecutorService;
/**
* A {@link io.netty.channel.ChannelFactory} for creating {@link IoStreamChannel} instances.
*/
public class IoStreamChannelFactory implements ChannelFactory {
private final ChannelGroup channels = new DefaultChannelGroup("IOStreamChannelFactory-ChannelGroup");
private final ExecutorService executorService;
public IoStreamChannelFactory(ExecutorService executorService) {
this.executorService = executorService;
}
@Override
public Channel newChannel(final ChannelPipeline pipeline) {
IoStreamChannelSink sink = new IoStreamChannelSink(executorService);
IoStreamChannel channel = new IoStreamChannel(this, pipeline, sink);
sink.setChannel(channel);
channels.add(channel);
return channel;
}
@Override
public void releaseExternalResources() {
ChannelGroupFuture close = channels.close();
close.awaitUninterruptibly();
ExecutorUtil.terminate(executorService);
}
}

View File

@ -1,180 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.iostream;
import static io.netty.channel.Channels.*;
import java.io.OutputStream;
import java.io.PushbackInputStream;
import java.util.concurrent.ExecutorService;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelState;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageEvent;
/**
* A {@link io.netty.channel.ChannelSink} implementation which reads from
* an {@link java.io.InputStream} and writes to an {@link java.io.OutputStream}.
*/
public class IoStreamChannelSink extends AbstractChannelSink {
private static class ReadRunnable implements Runnable {
private final IoStreamChannelSink channelSink;
public ReadRunnable(final IoStreamChannelSink channelSink) {
this.channelSink = channelSink;
}
@Override
public void run() {
PushbackInputStream in = channelSink.inputStream;
while (channelSink.channel.isOpen()) {
byte[] buf;
int readBytes;
try {
int bytesToRead = in.available();
if (bytesToRead > 0) {
buf = new byte[bytesToRead];
readBytes = in.read(buf);
} else {
// peek into the stream if it was closed (value=-1)
int b = in.read();
if (b < 0) {
break;
}
// push back the byte which was read too much
in.unread(b);
continue;
}
} catch (Throwable t) {
if (!channelSink.channel.getCloseFuture().isDone()) {
fireExceptionCaught(channelSink.channel, t);
}
break;
}
fireMessageReceived(channelSink.channel, ChannelBuffers.wrappedBuffer(buf, 0, readBytes));
}
// Clean up.
close(channelSink.channel);
}
}
private final ExecutorService executorService;
private IoStreamChannel channel;
public IoStreamChannelSink(final ExecutorService executorService) {
this.executorService = executorService;
}
public boolean isConnected() {
return inputStream != null && outputStream != null;
}
public IoStreamAddress getRemoteAddress() {
return remoteAddress;
}
public boolean isBound() {
return false;
}
public ChannelConfig getConfig() {
return config;
}
public void setChannel(final IoStreamChannel channel) {
this.channel = channel;
}
private IoStreamAddress remoteAddress;
private OutputStream outputStream;
private PushbackInputStream inputStream;
private final ChannelConfig config = new DefaultChannelConfig();
@Override
public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception {
final ChannelFuture future = e.getFuture();
if (e instanceof ChannelStateEvent) {
final ChannelStateEvent stateEvent = (ChannelStateEvent) e;
final ChannelState state = stateEvent.getState();
final Object value = stateEvent.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
outputStream = null;
inputStream = null;
((IoStreamChannel) e.getChannel()).doSetClosed();
}
break;
case BOUND:
throw new UnsupportedOperationException();
case CONNECTED:
if (value != null) {
remoteAddress = (IoStreamAddress) value;
outputStream = remoteAddress.getOutputStream();
inputStream = new PushbackInputStream(remoteAddress.getInputStream());
executorService.execute(new ReadRunnable(this));
future.setSuccess();
}
break;
case INTEREST_OPS:
// TODO implement
throw new UnsupportedOperationException();
}
} else if (e instanceof MessageEvent) {
final MessageEvent event = (MessageEvent) e;
if (event.getMessage() instanceof ChannelBuffer) {
final ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
buffer.readBytes(outputStream, buffer.readableBytes());
outputStream.flush();
future.setSuccess();
} else {
throw new IllegalArgumentException("Only ChannelBuffer objects are supported to be written onto the IOStreamChannelSink! " + "Please check if the encoder pipeline is configured correctly.");
}
}
}
}

View File

@ -1,24 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* A blocking transport which uses an existing {@link java.io.InputStream} and
* {@link java.io.OutputStream}.
*
* @apiviz.exclude ^java\.lang\.
* @apiviz.exclude Channel$
*/
package io.netty.channel.iostream;