Convert DOS line ending to UNIX line ending

This commit is contained in:
Trustin Lee 2012-05-15 17:14:02 +09:00
parent dd2e36e5d9
commit 894ececbb7
25 changed files with 3629 additions and 3629 deletions

View File

@ -1,134 +1,134 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.handler.codec.embedder.DecoderEmbedder;
import io.netty.util.CharsetUtil;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static io.netty.buffer.ChannelBuffers.wrappedBuffer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class RedisCodecTest {
private DecoderEmbedder<ChannelBuffer> embedder;
@Before
public void setUp() {
embedder = new DecoderEmbedder<ChannelBuffer>(new RedisDecoder());
}
@Test
public void decodeReplies() throws IOException {
{
Object receive = decode("+OK\r\n".getBytes());
assertTrue(receive instanceof StatusReply);
assertEquals("OK", ((StatusReply) receive).status.toString(CharsetUtil.UTF_8));
}
{
Object receive = decode("-ERROR\r\n".getBytes());
assertTrue(receive instanceof ErrorReply);
assertEquals("ERROR", ((ErrorReply) receive).error.toString(CharsetUtil.UTF_8));
}
{
Object receive = decode(":123\r\n".getBytes());
assertTrue(receive instanceof IntegerReply);
assertEquals(123, ((IntegerReply) receive).integer);
}
{
Object receive = decode("$5\r\nnetty\r\n".getBytes());
assertTrue(receive instanceof BulkReply);
assertEquals("netty", new String(((BulkReply) receive).bytes));
}
{
Object receive = decode("*2\r\n$5\r\nnetty\r\n$5\r\nrules\r\n".getBytes());
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
}
private Object decode(byte[] bytes) {
embedder.offer(wrappedBuffer(bytes));
return embedder.poll();
}
@Test
public void encodeCommands() throws IOException {
String setCommand = "*3\r\n" +
"$3\r\n" +
"SET\r\n" +
"$5\r\n" +
"mykey\r\n" +
"$7\r\n" +
"myvalue\r\n";
Command command = new Command("SET", "mykey", "myvalue");
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
command.write(cb);
assertEquals(setCommand, cb.toString(CharsetUtil.US_ASCII));
}
@Test
public void testReplayDecoding() {
{
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n".getBytes()));
Object receive = embedder.poll();
assertNull(receive);
embedder.offer(wrappedBuffer("$5\r\nrules\r\n".getBytes()));
receive = embedder.poll();
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
{
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n$5\r\nr".getBytes()));
Object receive = embedder.poll();
assertNull(receive);
embedder.offer(wrappedBuffer("ules\r\n".getBytes()));
receive = embedder.poll();
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
{
embedder.offer(wrappedBuffer("*2".getBytes()));
Object receive = embedder.poll();
assertNull(receive);
embedder.offer(wrappedBuffer("\r\n$5\r\nnetty\r\n$5\r\nrules\r\n".getBytes()));
receive = embedder.poll();
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
{
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n$5\r\nrules\r".getBytes()));
Object receive = embedder.poll();
assertNull(receive);
embedder.offer(wrappedBuffer("\n".getBytes()));
receive = embedder.poll();
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
}
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.handler.codec.embedder.DecoderEmbedder;
import io.netty.util.CharsetUtil;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static io.netty.buffer.ChannelBuffers.wrappedBuffer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class RedisCodecTest {
private DecoderEmbedder<ChannelBuffer> embedder;
@Before
public void setUp() {
embedder = new DecoderEmbedder<ChannelBuffer>(new RedisDecoder());
}
@Test
public void decodeReplies() throws IOException {
{
Object receive = decode("+OK\r\n".getBytes());
assertTrue(receive instanceof StatusReply);
assertEquals("OK", ((StatusReply) receive).status.toString(CharsetUtil.UTF_8));
}
{
Object receive = decode("-ERROR\r\n".getBytes());
assertTrue(receive instanceof ErrorReply);
assertEquals("ERROR", ((ErrorReply) receive).error.toString(CharsetUtil.UTF_8));
}
{
Object receive = decode(":123\r\n".getBytes());
assertTrue(receive instanceof IntegerReply);
assertEquals(123, ((IntegerReply) receive).integer);
}
{
Object receive = decode("$5\r\nnetty\r\n".getBytes());
assertTrue(receive instanceof BulkReply);
assertEquals("netty", new String(((BulkReply) receive).bytes));
}
{
Object receive = decode("*2\r\n$5\r\nnetty\r\n$5\r\nrules\r\n".getBytes());
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
}
private Object decode(byte[] bytes) {
embedder.offer(wrappedBuffer(bytes));
return embedder.poll();
}
@Test
public void encodeCommands() throws IOException {
String setCommand = "*3\r\n" +
"$3\r\n" +
"SET\r\n" +
"$5\r\n" +
"mykey\r\n" +
"$7\r\n" +
"myvalue\r\n";
Command command = new Command("SET", "mykey", "myvalue");
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
command.write(cb);
assertEquals(setCommand, cb.toString(CharsetUtil.US_ASCII));
}
@Test
public void testReplayDecoding() {
{
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n".getBytes()));
Object receive = embedder.poll();
assertNull(receive);
embedder.offer(wrappedBuffer("$5\r\nrules\r\n".getBytes()));
receive = embedder.poll();
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
{
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n$5\r\nr".getBytes()));
Object receive = embedder.poll();
assertNull(receive);
embedder.offer(wrappedBuffer("ules\r\n".getBytes()));
receive = embedder.poll();
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
{
embedder.offer(wrappedBuffer("*2".getBytes()));
Object receive = embedder.poll();
assertNull(receive);
embedder.offer(wrappedBuffer("\r\n$5\r\nnetty\r\n$5\r\nrules\r\n".getBytes()));
receive = embedder.poll();
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
{
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n$5\r\nrules\r".getBytes()));
Object receive = embedder.poll();
assertNull(receive);
embedder.offer(wrappedBuffer("\n".getBytes()));
receive = embedder.poll();
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
}
}

View File

@ -1,77 +1,77 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Deflater;
/**
* Utility that detects various properties specific to the current runtime
* environment, such as Java version and the availability of the
* {@code sun.misc.Unsafe} object.
*/
public final class DetectionUtil {
private static final int JAVA_VERSION = javaVersion0();
private static final boolean HAS_UNSAFE = hasUnsafe(AtomicInteger.class.getClassLoader());
public static boolean hasUnsafe() {
return HAS_UNSAFE;
}
public static int javaVersion() {
return JAVA_VERSION;
}
private static boolean hasUnsafe(ClassLoader loader) {
try {
Class<?> unsafeClazz = Class.forName("sun.misc.Unsafe", true, loader);
return hasUnsafeField(unsafeClazz);
} catch (Exception e) {
// Ignore
}
return false;
}
private static boolean hasUnsafeField(final Class<?> unsafeClass) throws PrivilegedActionException {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws Exception {
unsafeClass.getDeclaredField("theUnsafe");
return true;
}
});
}
private static int javaVersion0() {
try {
Deflater.class.getDeclaredField("SYNC_FLUSH");
return 7;
} catch (Exception e) {
// Ignore
}
return 6;
}
private DetectionUtil() {
// only static method supported
}
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Deflater;
/**
* Utility that detects various properties specific to the current runtime
* environment, such as Java version and the availability of the
* {@code sun.misc.Unsafe} object.
*/
public final class DetectionUtil {
private static final int JAVA_VERSION = javaVersion0();
private static final boolean HAS_UNSAFE = hasUnsafe(AtomicInteger.class.getClassLoader());
public static boolean hasUnsafe() {
return HAS_UNSAFE;
}
public static int javaVersion() {
return JAVA_VERSION;
}
private static boolean hasUnsafe(ClassLoader loader) {
try {
Class<?> unsafeClazz = Class.forName("sun.misc.Unsafe", true, loader);
return hasUnsafeField(unsafeClazz);
} catch (Exception e) {
// Ignore
}
return false;
}
private static boolean hasUnsafeField(final Class<?> unsafeClass) throws PrivilegedActionException {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws Exception {
unsafeClass.getDeclaredField("theUnsafe");
return true;
}
});
}
private static int javaVersion0() {
try {
Deflater.class.getDeclaredField("SYNC_FLUSH");
return 7;
} catch (Exception e) {
// Ignore
}
return 6;
}
private DetectionUtil() {
// only static method supported
}
}

View File

@ -1,62 +1,62 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
/**
* This factory should be used to create the "optimal" {@link BlockingQueue}
* instance for the running JVM.
*/
public final class QueueFactory {
private static final boolean useUnsafe = DetectionUtil.hasUnsafe();
private QueueFactory() {
// only use static methods!
}
/**
* Create a new unbound {@link BlockingQueue}
*
* @param itemClass the {@link Class} type which will be used as {@link BlockingQueue} items
* @return queue the {@link BlockingQueue} implementation
*/
public static <T> BlockingQueue<T> createQueue(Class<T> itemClass) {
if (useUnsafe) {
return new LinkedTransferQueue<T>();
} else {
return new LegacyLinkedTransferQueue<T>();
}
}
/**
* Create a new unbound {@link BlockingQueue}
*
* @param collection the collection which should get copied to the newly created {@link BlockingQueue}
* @param itemClass the {@link Class} type which will be used as {@link BlockingQueue} items
* @return queue the {@link BlockingQueue} implementation
*/
public static <T> BlockingQueue<T> createQueue(Collection<? extends T> collection, Class<T> itemClass) {
if (useUnsafe) {
return new LinkedTransferQueue<T>(collection);
} else {
return new LegacyLinkedTransferQueue<T>(collection);
}
}
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
/**
* This factory should be used to create the "optimal" {@link BlockingQueue}
* instance for the running JVM.
*/
public final class QueueFactory {
private static final boolean useUnsafe = DetectionUtil.hasUnsafe();
private QueueFactory() {
// only use static methods!
}
/**
* Create a new unbound {@link BlockingQueue}
*
* @param itemClass the {@link Class} type which will be used as {@link BlockingQueue} items
* @return queue the {@link BlockingQueue} implementation
*/
public static <T> BlockingQueue<T> createQueue(Class<T> itemClass) {
if (useUnsafe) {
return new LinkedTransferQueue<T>();
} else {
return new LegacyLinkedTransferQueue<T>();
}
}
/**
* Create a new unbound {@link BlockingQueue}
*
* @param collection the collection which should get copied to the newly created {@link BlockingQueue}
* @param itemClass the {@link Class} type which will be used as {@link BlockingQueue} items
* @return queue the {@link BlockingQueue} implementation
*/
public static <T> BlockingQueue<T> createQueue(Collection<? extends T> collection, Class<T> itemClass) {
if (useUnsafe) {
return new LinkedTransferQueue<T>(collection);
} else {
return new LegacyLinkedTransferQueue<T>(collection);
}
}
}

View File

@ -1,85 +1,85 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
import java.util.concurrent.Executor;
import io.netty.util.ExternalResourceReleasable;
import io.netty.util.internal.ExecutorUtil;
/**
* A special {@link Executor} which allows to chain a series of
* {@link Executor}s and {@link ChannelEventRunnableFilter}.
*/
public class ChainedExecutor implements Executor, ExternalResourceReleasable {
private final Executor cur;
private final Executor next;
private final ChannelEventRunnableFilter filter;
/**
* Create a new {@link ChainedExecutor} which will used the given {@link ChannelEventRunnableFilter} to see if the {@link #cur} {@link Executor} should get used.
* Otherwise it will pass the work to the {@link #next} {@link Executor}
*
* @param filter the {@link ChannelEventRunnableFilter} which will be used to check if the {@link ChannelEventRunnable} should be passed to the cur or next {@link Executor}
* @param cur the {@link Executor} to use if the {@link ChannelEventRunnableFilter} match
* @param next the {@link Executor} to use if the {@link ChannelEventRunnableFilter} does not match
*/
public ChainedExecutor(ChannelEventRunnableFilter filter, Executor cur, Executor next) {
if (filter == null) {
throw new NullPointerException("filter");
}
if (cur == null) {
throw new NullPointerException("cur");
}
if (next == null) {
throw new NullPointerException("next");
}
this.filter = filter;
this.cur = cur;
this.next = next;
}
/**
* Execute the passed {@link ChannelEventRunnable} with the current {@link Executor} if the {@link ChannelEventRunnableFilter} match.
* Otherwise pass it to the next {@link Executor} in the chain.
*/
@Override
public void execute(Runnable command) {
assert command instanceof ChannelEventRunnable;
if (filter.filter((ChannelEventRunnable) command)) {
cur.execute(command);
} else {
next.execute(command);
}
}
@Override
public void releaseExternalResources() {
ExecutorUtil.terminate(cur, next);
releaseExternal(cur);
releaseExternal(next);
}
private static void releaseExternal(Executor executor) {
if (executor instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) executor).releaseExternalResources();
}
}
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
import java.util.concurrent.Executor;
import io.netty.util.ExternalResourceReleasable;
import io.netty.util.internal.ExecutorUtil;
/**
* A special {@link Executor} which allows to chain a series of
* {@link Executor}s and {@link ChannelEventRunnableFilter}.
*/
public class ChainedExecutor implements Executor, ExternalResourceReleasable {
private final Executor cur;
private final Executor next;
private final ChannelEventRunnableFilter filter;
/**
* Create a new {@link ChainedExecutor} which will used the given {@link ChannelEventRunnableFilter} to see if the {@link #cur} {@link Executor} should get used.
* Otherwise it will pass the work to the {@link #next} {@link Executor}
*
* @param filter the {@link ChannelEventRunnableFilter} which will be used to check if the {@link ChannelEventRunnable} should be passed to the cur or next {@link Executor}
* @param cur the {@link Executor} to use if the {@link ChannelEventRunnableFilter} match
* @param next the {@link Executor} to use if the {@link ChannelEventRunnableFilter} does not match
*/
public ChainedExecutor(ChannelEventRunnableFilter filter, Executor cur, Executor next) {
if (filter == null) {
throw new NullPointerException("filter");
}
if (cur == null) {
throw new NullPointerException("cur");
}
if (next == null) {
throw new NullPointerException("next");
}
this.filter = filter;
this.cur = cur;
this.next = next;
}
/**
* Execute the passed {@link ChannelEventRunnable} with the current {@link Executor} if the {@link ChannelEventRunnableFilter} match.
* Otherwise pass it to the next {@link Executor} in the chain.
*/
@Override
public void execute(Runnable command) {
assert command instanceof ChannelEventRunnable;
if (filter.filter((ChannelEventRunnable) command)) {
cur.execute(command);
} else {
next.execute(command);
}
}
@Override
public void releaseExternalResources() {
ExecutorUtil.terminate(cur, next);
releaseExternal(cur);
releaseExternal(next);
}
private static void releaseExternal(Executor executor) {
if (executor instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) executor).releaseExternalResources();
}
}
}

View File

@ -1,28 +1,28 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
/**
* {@link ChannelEventRunnableFilter} implementation which matches {@link ChannelDownstreamEventRunnable}
*
*/
public class ChannelDownstreamEventRunnableFilter implements ChannelEventRunnableFilter {
@Override
public boolean filter(ChannelEventRunnable event) {
return event instanceof ChannelDownstreamEventRunnable;
}
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
/**
* {@link ChannelEventRunnableFilter} implementation which matches {@link ChannelDownstreamEventRunnable}
*
*/
public class ChannelDownstreamEventRunnableFilter implements ChannelEventRunnableFilter {
@Override
public boolean filter(ChannelEventRunnable event) {
return event instanceof ChannelDownstreamEventRunnable;
}
}

View File

@ -1,56 +1,56 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.EstimatableObjectWrapper;
public abstract class ChannelEventRunnable implements Runnable, EstimatableObjectWrapper {
protected final ChannelHandlerContext ctx;
protected final ChannelEvent e;
int estimatedSize;
/**
* Creates a {@link Runnable} which sends the specified {@link ChannelEvent}
* upstream via the specified {@link ChannelHandlerContext}.
*/
public ChannelEventRunnable(ChannelHandlerContext ctx, ChannelEvent e) {
this.ctx = ctx;
this.e = e;
}
/**
* Returns the {@link ChannelHandlerContext} which will be used to
* send the {@link ChannelEvent} upstream.
*/
public ChannelHandlerContext getContext() {
return ctx;
}
/**
* Returns the {@link ChannelEvent} which will be sent upstream.
*/
public ChannelEvent getEvent() {
return e;
}
@Override
public Object unwrap() {
return e;
}
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.EstimatableObjectWrapper;
public abstract class ChannelEventRunnable implements Runnable, EstimatableObjectWrapper {
protected final ChannelHandlerContext ctx;
protected final ChannelEvent e;
int estimatedSize;
/**
* Creates a {@link Runnable} which sends the specified {@link ChannelEvent}
* upstream via the specified {@link ChannelHandlerContext}.
*/
public ChannelEventRunnable(ChannelHandlerContext ctx, ChannelEvent e) {
this.ctx = ctx;
this.e = e;
}
/**
* Returns the {@link ChannelHandlerContext} which will be used to
* send the {@link ChannelEvent} upstream.
*/
public ChannelHandlerContext getContext() {
return ctx;
}
/**
* Returns the {@link ChannelEvent} which will be sent upstream.
*/
public ChannelEvent getEvent() {
return e;
}
@Override
public Object unwrap() {
return e;
}
}

View File

@ -1,27 +1,27 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
import java.util.concurrent.Executor;
public interface ChannelEventRunnableFilter {
/**
* Return <code>true</code> if the {@link ChannelEventRunnable} should get handled by the {@link Executor}
*
*/
boolean filter(ChannelEventRunnable event);
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
import java.util.concurrent.Executor;
public interface ChannelEventRunnableFilter {
/**
* Return <code>true</code> if the {@link ChannelEventRunnable} should get handled by the {@link Executor}
*
*/
boolean filter(ChannelEventRunnable event);
}

View File

@ -1,26 +1,26 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
/**
* {@link ChannelEventRunnableFilter} which matches {@link ChannelDownstreamEventRunnable}
*/
public class ChannelUpstreamEventRunnableFilter implements ChannelEventRunnableFilter {
@Override
public boolean filter(ChannelEventRunnable event) {
return event instanceof ChannelDownstreamEventRunnable;
}
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
/**
* {@link ChannelEventRunnableFilter} which matches {@link ChannelDownstreamEventRunnable}
*/
public class ChannelUpstreamEventRunnableFilter implements ChannelEventRunnableFilter {
@Override
public boolean filter(ChannelEventRunnable event) {
return event instanceof ChannelDownstreamEventRunnable;
}
}

View File

@ -1,67 +1,67 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.ipfilter;
import java.net.InetSocketAddress;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
/**
* The listener interface for receiving ipFilter events.
*
* @see IpFilteringHandler
*/
public interface IpFilterListener {
/**
* Called when the channel has the CONNECTED status and the channel was allowed by a previous call to accept().
* This method enables your implementation to send a message back to the client before closing
* or whatever you need. This method returns a ChannelFuture on which the implementation
* can wait uninterruptibly before continuing.<br>
* For instance, If a message is sent back, the corresponding ChannelFuture has to be returned.
*
* @param inetSocketAddress the remote {@link InetSocketAddress} from client
* @return the associated ChannelFuture to be waited for before closing the channel. Null is allowed.
*/
ChannelFuture allowed(ChannelHandlerContext ctx, ChannelEvent e, InetSocketAddress inetSocketAddress);
/**
* Called when the channel has the CONNECTED status and the channel was refused by a previous call to accept().
* This method enables your implementation to send a message back to the client before closing
* or whatever you need. This method returns a ChannelFuture on which the implementation
* will wait uninterruptibly before closing the channel.<br>
* For instance, If a message is sent back, the corresponding ChannelFuture has to be returned.
*
* @param inetSocketAddress the remote {@link InetSocketAddress} from client
* @return the associated ChannelFuture to be waited for before closing the channel. Null is allowed.
*/
ChannelFuture refused(ChannelHandlerContext ctx, ChannelEvent e, InetSocketAddress inetSocketAddress);
/**
* Called in handleUpstream, if this channel was previously blocked,
* to check if whatever the event, it should be passed to the next entry in the pipeline.<br>
* If one wants to not block events, just overridden this method by returning always true.<br><br>
* <b>Note that OPENED and BOUND events are still passed to the next entry in the pipeline since
* those events come out before the CONNECTED event and so the possibility to filter the connection.</b>
*
* @return True if the event should continue, False if the event should not continue
* since this channel was blocked by this filter
*/
boolean continues(ChannelHandlerContext ctx, ChannelEvent e);
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.ipfilter;
import java.net.InetSocketAddress;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
/**
* The listener interface for receiving ipFilter events.
*
* @see IpFilteringHandler
*/
public interface IpFilterListener {
/**
* Called when the channel has the CONNECTED status and the channel was allowed by a previous call to accept().
* This method enables your implementation to send a message back to the client before closing
* or whatever you need. This method returns a ChannelFuture on which the implementation
* can wait uninterruptibly before continuing.<br>
* For instance, If a message is sent back, the corresponding ChannelFuture has to be returned.
*
* @param inetSocketAddress the remote {@link InetSocketAddress} from client
* @return the associated ChannelFuture to be waited for before closing the channel. Null is allowed.
*/
ChannelFuture allowed(ChannelHandlerContext ctx, ChannelEvent e, InetSocketAddress inetSocketAddress);
/**
* Called when the channel has the CONNECTED status and the channel was refused by a previous call to accept().
* This method enables your implementation to send a message back to the client before closing
* or whatever you need. This method returns a ChannelFuture on which the implementation
* will wait uninterruptibly before closing the channel.<br>
* For instance, If a message is sent back, the corresponding ChannelFuture has to be returned.
*
* @param inetSocketAddress the remote {@link InetSocketAddress} from client
* @return the associated ChannelFuture to be waited for before closing the channel. Null is allowed.
*/
ChannelFuture refused(ChannelHandlerContext ctx, ChannelEvent e, InetSocketAddress inetSocketAddress);
/**
* Called in handleUpstream, if this channel was previously blocked,
* to check if whatever the event, it should be passed to the next entry in the pipeline.<br>
* If one wants to not block events, just overridden this method by returning always true.<br><br>
* <b>Note that OPENED and BOUND events are still passed to the next entry in the pipeline since
* those events come out before the CONNECTED event and so the possibility to filter the connection.</b>
*
* @return True if the event should continue, False if the event should not continue
* since this channel was blocked by this filter
*/
boolean continues(ChannelHandlerContext ctx, ChannelEvent e);
}

View File

@ -1,95 +1,95 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.ipfilter;
import java.net.UnknownHostException;
import java.util.ArrayList;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
/**
* The Class IpFilterRuleList is a helper class to generate a List of Rules from a string.
* In case of parse errors no exceptions are thrown. The error is logged.
* <br>
* Rule List Syntax:
* <br>
* <pre>
* RuleList ::= Rule[,Rule]*
* Rule ::= AllowRule | BlockRule
* AllowRule ::= +Filter
* BlockRule ::= -Filter
* Filter ::= PatternFilter | CIDRFilter
* PatternFilter ::= @see PatternRule
* CIDRFilter ::= c:CIDRFilter
* CIDRFilter ::= @see CIDR.newCIDR(String)
* </pre>
* <br>
* Example: allow only localhost:
* <br>
* new IPFilterRuleHandler().addAll(new IpFilterRuleList("+n:localhost, -n:*"));
* <br>
*/
public class IpFilterRuleList extends ArrayList<IpFilterRule> {
private static final long serialVersionUID = -6164162941749588780L;
private static final InternalLogger logger = InternalLoggerFactory.getInstance(IpFilterRuleList.class);
/**
* Instantiates a new ip filter rule list.
*
* @param rules the rules
*/
public IpFilterRuleList(String rules) {
parseRules(rules);
}
private void parseRules(String rules) {
String[] ruless = rules.split(",");
for (String rule : ruless) {
parseRule(rule.trim());
}
}
private void parseRule(String rule) {
if (rule == null || rule.length() == 0) {
return;
}
if (!(rule.startsWith("+") || rule.startsWith("-"))) {
if (logger.isErrorEnabled()) {
logger.error("syntax error in ip filter rule:" + rule);
}
return;
}
boolean allow = rule.startsWith("+");
if (rule.charAt(1) == 'n' || rule.charAt(1) == 'i') {
this.add(new PatternRule(allow, rule.substring(1)));
} else if (rule.charAt(1) == 'c') {
try {
this.add(new IpSubnetFilterRule(allow, rule.substring(3)));
} catch (UnknownHostException e) {
if (logger.isErrorEnabled()) {
logger.error("error parsing ip filter " + rule, e);
}
}
} else {
if (logger.isErrorEnabled()) {
logger.error("syntax error in ip filter rule:" + rule);
}
}
}
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.ipfilter;
import java.net.UnknownHostException;
import java.util.ArrayList;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
/**
* The Class IpFilterRuleList is a helper class to generate a List of Rules from a string.
* In case of parse errors no exceptions are thrown. The error is logged.
* <br>
* Rule List Syntax:
* <br>
* <pre>
* RuleList ::= Rule[,Rule]*
* Rule ::= AllowRule | BlockRule
* AllowRule ::= +Filter
* BlockRule ::= -Filter
* Filter ::= PatternFilter | CIDRFilter
* PatternFilter ::= @see PatternRule
* CIDRFilter ::= c:CIDRFilter
* CIDRFilter ::= @see CIDR.newCIDR(String)
* </pre>
* <br>
* Example: allow only localhost:
* <br>
* new IPFilterRuleHandler().addAll(new IpFilterRuleList("+n:localhost, -n:*"));
* <br>
*/
public class IpFilterRuleList extends ArrayList<IpFilterRule> {
private static final long serialVersionUID = -6164162941749588780L;
private static final InternalLogger logger = InternalLoggerFactory.getInstance(IpFilterRuleList.class);
/**
* Instantiates a new ip filter rule list.
*
* @param rules the rules
*/
public IpFilterRuleList(String rules) {
parseRules(rules);
}
private void parseRules(String rules) {
String[] ruless = rules.split(",");
for (String rule : ruless) {
parseRule(rule.trim());
}
}
private void parseRule(String rule) {
if (rule == null || rule.length() == 0) {
return;
}
if (!(rule.startsWith("+") || rule.startsWith("-"))) {
if (logger.isErrorEnabled()) {
logger.error("syntax error in ip filter rule:" + rule);
}
return;
}
boolean allow = rule.startsWith("+");
if (rule.charAt(1) == 'n' || rule.charAt(1) == 'i') {
this.add(new PatternRule(allow, rule.substring(1)));
} else if (rule.charAt(1) == 'c') {
try {
this.add(new IpSubnetFilterRule(allow, rule.substring(3)));
} catch (UnknownHostException e) {
if (logger.isErrorEnabled()) {
logger.error("error parsing ip filter " + rule, e);
}
}
} else {
if (logger.isErrorEnabled()) {
logger.error("syntax error in ip filter rule:" + rule);
}
}
}
}

View File

@ -1,164 +1,164 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.ipfilter;
import java.net.InetSocketAddress;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ChannelUpstreamHandler;
import io.netty.channel.Channels;
// TODO: Auto-generated Javadoc
/** General class that handle Ip Filtering. */
public abstract class IpFilteringHandlerImpl implements ChannelUpstreamHandler, IpFilteringHandler {
private IpFilterListener listener;
/**
* Called when the channel is connected. It returns True if the corresponding connection
* is to be allowed. Else it returns False.
*
* @param inetSocketAddress the remote {@link InetSocketAddress} from client
* @return True if the corresponding connection is allowed, else False.
*/
protected abstract boolean accept(ChannelHandlerContext ctx, ChannelEvent e, InetSocketAddress inetSocketAddress)
throws Exception;
/**
* Called when the channel has the CONNECTED status and the channel was refused by a previous call to accept().
* This method enables your implementation to send a message back to the client before closing
* or whatever you need. This method returns a ChannelFuture on which the implementation
* will wait uninterruptibly before closing the channel.<br>
* For instance, If a message is sent back, the corresponding ChannelFuture has to be returned.
*
* @param inetSocketAddress the remote {@link InetSocketAddress} from client
* @return the associated ChannelFuture to be waited for before closing the channel. Null is allowed.
*/
protected ChannelFuture handleRefusedChannel(ChannelHandlerContext ctx, ChannelEvent e,
InetSocketAddress inetSocketAddress) throws Exception {
if (listener == null) {
return null;
}
return listener.refused(ctx, e, inetSocketAddress);
}
protected ChannelFuture handleAllowedChannel(ChannelHandlerContext ctx, ChannelEvent e,
InetSocketAddress inetSocketAddress) throws Exception {
if (listener == null) {
return null;
}
return listener.allowed(ctx, e, inetSocketAddress);
}
/**
* Internal method to test if the current channel is blocked. Should not be overridden.
*
* @return True if the current channel is blocked, else False
*/
protected boolean isBlocked(ChannelHandlerContext ctx) {
return ctx.getAttachment() != null;
}
/**
* Called in handleUpstream, if this channel was previously blocked,
* to check if whatever the event, it should be passed to the next entry in the pipeline.<br>
* If one wants to not block events, just overridden this method by returning always true.<br><br>
* <b>Note that OPENED and BOUND events are still passed to the next entry in the pipeline since
* those events come out before the CONNECTED event and so the possibility to filter the connection.</b>
*
* @return True if the event should continue, False if the event should not continue
* since this channel was blocked by this filter
*/
protected boolean continues(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (listener != null) {
return listener.continues(ctx, e);
} else {
return false;
}
}
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent evt = (ChannelStateEvent) e;
switch (evt.getState()) {
case OPEN:
case BOUND:
// Special case: OPEND and BOUND events are before CONNECTED,
// but CLOSED and UNBOUND events are after DISCONNECTED: should those events be blocked too?
if (isBlocked(ctx) && !continues(ctx, evt)) {
// don't pass to next level since channel was blocked early
return;
} else {
ctx.sendUpstream(e);
return;
}
case CONNECTED:
if (evt.getValue() != null) {
// CONNECTED
InetSocketAddress inetSocketAddress = (InetSocketAddress) e.getChannel().getRemoteAddress();
if (!accept(ctx, e, inetSocketAddress)) {
ctx.setAttachment(Boolean.TRUE);
ChannelFuture future = handleRefusedChannel(ctx, e, inetSocketAddress);
if (future != null) {
future.addListener(ChannelFutureListener.CLOSE);
} else {
Channels.close(e.getChannel());
}
if (isBlocked(ctx) && !continues(ctx, evt)) {
// don't pass to next level since channel was blocked early
return;
}
} else {
handleAllowedChannel(ctx, e, inetSocketAddress);
}
// This channel is not blocked
ctx.setAttachment(null);
} else {
// DISCONNECTED
if (isBlocked(ctx) && !continues(ctx, evt)) {
// don't pass to next level since channel was blocked early
return;
}
}
break;
}
}
if (isBlocked(ctx) && !continues(ctx, e)) {
// don't pass to next level since channel was blocked early
return;
}
// Whatever it is, if not blocked, goes to the next level
ctx.sendUpstream(e);
}
@Override
public void setIpFilterListener(IpFilterListener listener) {
this.listener = listener;
}
@Override
public void removeIpFilterListener() {
this.listener = null;
}
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.ipfilter;
import java.net.InetSocketAddress;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ChannelUpstreamHandler;
import io.netty.channel.Channels;
// TODO: Auto-generated Javadoc
/** General class that handle Ip Filtering. */
public abstract class IpFilteringHandlerImpl implements ChannelUpstreamHandler, IpFilteringHandler {
private IpFilterListener listener;
/**
* Called when the channel is connected. It returns True if the corresponding connection
* is to be allowed. Else it returns False.
*
* @param inetSocketAddress the remote {@link InetSocketAddress} from client
* @return True if the corresponding connection is allowed, else False.
*/
protected abstract boolean accept(ChannelHandlerContext ctx, ChannelEvent e, InetSocketAddress inetSocketAddress)
throws Exception;
/**
* Called when the channel has the CONNECTED status and the channel was refused by a previous call to accept().
* This method enables your implementation to send a message back to the client before closing
* or whatever you need. This method returns a ChannelFuture on which the implementation
* will wait uninterruptibly before closing the channel.<br>
* For instance, If a message is sent back, the corresponding ChannelFuture has to be returned.
*
* @param inetSocketAddress the remote {@link InetSocketAddress} from client
* @return the associated ChannelFuture to be waited for before closing the channel. Null is allowed.
*/
protected ChannelFuture handleRefusedChannel(ChannelHandlerContext ctx, ChannelEvent e,
InetSocketAddress inetSocketAddress) throws Exception {
if (listener == null) {
return null;
}
return listener.refused(ctx, e, inetSocketAddress);
}
protected ChannelFuture handleAllowedChannel(ChannelHandlerContext ctx, ChannelEvent e,
InetSocketAddress inetSocketAddress) throws Exception {
if (listener == null) {
return null;
}
return listener.allowed(ctx, e, inetSocketAddress);
}
/**
* Internal method to test if the current channel is blocked. Should not be overridden.
*
* @return True if the current channel is blocked, else False
*/
protected boolean isBlocked(ChannelHandlerContext ctx) {
return ctx.getAttachment() != null;
}
/**
* Called in handleUpstream, if this channel was previously blocked,
* to check if whatever the event, it should be passed to the next entry in the pipeline.<br>
* If one wants to not block events, just overridden this method by returning always true.<br><br>
* <b>Note that OPENED and BOUND events are still passed to the next entry in the pipeline since
* those events come out before the CONNECTED event and so the possibility to filter the connection.</b>
*
* @return True if the event should continue, False if the event should not continue
* since this channel was blocked by this filter
*/
protected boolean continues(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (listener != null) {
return listener.continues(ctx, e);
} else {
return false;
}
}
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent evt = (ChannelStateEvent) e;
switch (evt.getState()) {
case OPEN:
case BOUND:
// Special case: OPEND and BOUND events are before CONNECTED,
// but CLOSED and UNBOUND events are after DISCONNECTED: should those events be blocked too?
if (isBlocked(ctx) && !continues(ctx, evt)) {
// don't pass to next level since channel was blocked early
return;
} else {
ctx.sendUpstream(e);
return;
}
case CONNECTED:
if (evt.getValue() != null) {
// CONNECTED
InetSocketAddress inetSocketAddress = (InetSocketAddress) e.getChannel().getRemoteAddress();
if (!accept(ctx, e, inetSocketAddress)) {
ctx.setAttachment(Boolean.TRUE);
ChannelFuture future = handleRefusedChannel(ctx, e, inetSocketAddress);
if (future != null) {
future.addListener(ChannelFutureListener.CLOSE);
} else {
Channels.close(e.getChannel());
}
if (isBlocked(ctx) && !continues(ctx, evt)) {
// don't pass to next level since channel was blocked early
return;
}
} else {
handleAllowedChannel(ctx, e, inetSocketAddress);
}
// This channel is not blocked
ctx.setAttachment(null);
} else {
// DISCONNECTED
if (isBlocked(ctx) && !continues(ctx, evt)) {
// don't pass to next level since channel was blocked early
return;
}
}
break;
}
}
if (isBlocked(ctx) && !continues(ctx, e)) {
// don't pass to next level since channel was blocked early
return;
}
// Whatever it is, if not blocked, goes to the next level
ctx.sendUpstream(e);
}
@Override
public void setIpFilterListener(IpFilterListener listener) {
this.listener = listener;
}
@Override
public void removeIpFilterListener() {
this.listener = null;
}
}

View File

@ -1,202 +1,202 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.ipfilter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.regex.Pattern;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
/**
* The Class PatternRule represents an IP filter rule using string patterns.
* <br>
* Rule Syntax:
* <br>
* <pre>
* Rule ::= [n|i]:address n stands for computer name, i for ip address
* address ::= &lt;regex&gt; | localhost
* regex is a regular expression with '*' as multi character and '?' as single character wild card
* </pre>
* <br>
* Example: allow localhost:
* <br>
* new PatternRule(true, "n:localhost")
* <br>
* Example: allow local lan:
* <br>
* new PatternRule(true, "i:192.168.0.*")
* <br>
* Example: block all
* <br>
* new PatternRule(false, "n:*")
* <br>
*/
public class PatternRule implements IpFilterRule, Comparable<Object> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PatternRule.class);
private Pattern ipPattern;
private Pattern namePattern;
private boolean isAllowRule = true;
private boolean localhost;
private String pattern;
/**
* Instantiates a new pattern rule.
*
* @param allow indicates if this is an allow or block rule
* @param pattern the filter pattern
*/
public PatternRule(boolean allow, String pattern) {
this.isAllowRule = allow;
this.pattern = pattern;
parse(pattern);
}
/**
* returns the pattern.
*
* @return the pattern
*/
public String getPattern() {
return this.pattern;
}
@Override
public boolean isAllowRule() {
return isAllowRule;
}
@Override
public boolean isDenyRule() {
return !isAllowRule;
}
@Override
public boolean contains(InetAddress inetAddress) {
if (localhost) {
if (isLocalhost(inetAddress)) {
return true;
}
}
if (ipPattern != null) {
if (ipPattern.matcher(inetAddress.getHostAddress()).matches()) {
return true;
}
}
if (namePattern != null) {
if (namePattern.matcher(inetAddress.getHostName()).matches()) {
return true;
}
}
return false;
}
private void parse(String pattern) {
if (pattern == null) {
return;
}
String[] acls = pattern.split(",");
String ip = "";
String name = "";
for (String c : acls) {
c = c.trim();
if (c.equals("n:localhost")) {
this.localhost = true;
} else if (c.startsWith("n:")) {
name = addRule(name, c.substring(2));
} else if (c.startsWith("i:")) {
ip = addRule(ip, c.substring(2));
}
}
if (ip.length() != 0) {
ipPattern = Pattern.compile(ip);
}
if (name.length() != 0) {
namePattern = Pattern.compile(name);
}
}
private String addRule(String pattern, String rule) {
if (rule == null || rule.length() == 0) {
return pattern;
}
if (pattern.length() != 0) {
pattern += "|";
}
rule = rule.replaceAll("\\.", "\\\\.");
rule = rule.replaceAll("\\*", ".*");
rule = rule.replaceAll("\\?", ".");
pattern += "(" + rule + ")";
return pattern;
}
private boolean isLocalhost(InetAddress address) {
try {
if (address.equals(InetAddress.getLocalHost())) {
return true;
}
} catch (UnknownHostException e) {
if (logger.isInfoEnabled()) {
logger.info("error getting ip of localhost", e);
}
}
try {
InetAddress[] addrs = InetAddress.getAllByName("127.0.0.1");
for (InetAddress addr : addrs) {
if (addr.equals(address)) {
return true;
}
}
} catch (UnknownHostException e) {
if (logger.isInfoEnabled()) {
logger.info("error getting ip of localhost", e);
}
}
return false;
}
@Override
public int compareTo(Object o) {
if (o == null) {
return -1;
}
if (!(o instanceof PatternRule)) {
return -1;
}
PatternRule p = (PatternRule) o;
if (p.isAllowRule() && !this.isAllowRule) {
return -1;
}
if (this.pattern == null && p.pattern == null) {
return 0;
}
if (this.pattern != null) {
return this.pattern.compareTo(p.getPattern());
}
return -1;
}
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.ipfilter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.regex.Pattern;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
/**
* The Class PatternRule represents an IP filter rule using string patterns.
* <br>
* Rule Syntax:
* <br>
* <pre>
* Rule ::= [n|i]:address n stands for computer name, i for ip address
* address ::= &lt;regex&gt; | localhost
* regex is a regular expression with '*' as multi character and '?' as single character wild card
* </pre>
* <br>
* Example: allow localhost:
* <br>
* new PatternRule(true, "n:localhost")
* <br>
* Example: allow local lan:
* <br>
* new PatternRule(true, "i:192.168.0.*")
* <br>
* Example: block all
* <br>
* new PatternRule(false, "n:*")
* <br>
*/
public class PatternRule implements IpFilterRule, Comparable<Object> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PatternRule.class);
private Pattern ipPattern;
private Pattern namePattern;
private boolean isAllowRule = true;
private boolean localhost;
private String pattern;
/**
* Instantiates a new pattern rule.
*
* @param allow indicates if this is an allow or block rule
* @param pattern the filter pattern
*/
public PatternRule(boolean allow, String pattern) {
this.isAllowRule = allow;
this.pattern = pattern;
parse(pattern);
}
/**
* returns the pattern.
*
* @return the pattern
*/
public String getPattern() {
return this.pattern;
}
@Override
public boolean isAllowRule() {
return isAllowRule;
}
@Override
public boolean isDenyRule() {
return !isAllowRule;
}
@Override
public boolean contains(InetAddress inetAddress) {
if (localhost) {
if (isLocalhost(inetAddress)) {
return true;
}
}
if (ipPattern != null) {
if (ipPattern.matcher(inetAddress.getHostAddress()).matches()) {
return true;
}
}
if (namePattern != null) {
if (namePattern.matcher(inetAddress.getHostName()).matches()) {
return true;
}
}
return false;
}
private void parse(String pattern) {
if (pattern == null) {
return;
}
String[] acls = pattern.split(",");
String ip = "";
String name = "";
for (String c : acls) {
c = c.trim();
if (c.equals("n:localhost")) {
this.localhost = true;
} else if (c.startsWith("n:")) {
name = addRule(name, c.substring(2));
} else if (c.startsWith("i:")) {
ip = addRule(ip, c.substring(2));
}
}
if (ip.length() != 0) {
ipPattern = Pattern.compile(ip);
}
if (name.length() != 0) {
namePattern = Pattern.compile(name);
}
}
private String addRule(String pattern, String rule) {
if (rule == null || rule.length() == 0) {
return pattern;
}
if (pattern.length() != 0) {
pattern += "|";
}
rule = rule.replaceAll("\\.", "\\\\.");
rule = rule.replaceAll("\\*", ".*");
rule = rule.replaceAll("\\?", ".");
pattern += "(" + rule + ")";
return pattern;
}
private boolean isLocalhost(InetAddress address) {
try {
if (address.equals(InetAddress.getLocalHost())) {
return true;
}
} catch (UnknownHostException e) {
if (logger.isInfoEnabled()) {
logger.info("error getting ip of localhost", e);
}
}
try {
InetAddress[] addrs = InetAddress.getAllByName("127.0.0.1");
for (InetAddress addr : addrs) {
if (addr.equals(address)) {
return true;
}
}
} catch (UnknownHostException e) {
if (logger.isInfoEnabled()) {
logger.info("error getting ip of localhost", e);
}
}
return false;
}
@Override
public int compareTo(Object o) {
if (o == null) {
return -1;
}
if (!(o instanceof PatternRule)) {
return -1;
}
PatternRule p = (PatternRule) o;
if (p.isAllowRule() && !this.isAllowRule) {
return -1;
}
if (this.pattern == null && p.pattern == null) {
return 0;
}
if (this.pattern != null) {
return this.pattern.compareTo(p.getPattern());
}
return -1;
}
}

View File

@ -1,79 +1,79 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.region;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelDownstreamHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureAggregator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.Channels;
import io.netty.channel.MessageEvent;
/**
* {@link WritableByteChannel} implementation which will take care to wrap the {@link ByteBuffer} to a {@link ChannelBuffer} and forward it to the next {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} on every {@link #write(ByteBuffer)}
* operation.
*/
public class ChannelWritableByteChannel implements WritableByteChannel {
private boolean closed;
private final ChannelHandlerContext context;
private final ChannelFutureAggregator aggregator;
private final SocketAddress remote;
public ChannelWritableByteChannel(ChannelHandlerContext context, MessageEvent event) {
this(context, new ChannelFutureAggregator(event.getFuture()), event.getRemoteAddress());
}
public ChannelWritableByteChannel(ChannelHandlerContext context, ChannelFutureAggregator aggregator, SocketAddress remote) {
this.context = context;
this.aggregator = aggregator;
this.remote = remote;
}
@Override
public boolean isOpen() {
return !closed && context.channel().isOpen();
}
@Override
public void close() throws IOException {
closed = true;
}
@Override
public int write(ByteBuffer src) throws IOException {
int written = src.remaining();
// create a new ChannelFuture and add it to the aggregator
ChannelFuture future = Channels.future(context.channel(), true);
aggregator.addFuture(future);
Channels.write(context, future, ChannelBuffers.wrappedBuffer(src), remote);
return written;
}
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.region;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelDownstreamHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureAggregator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.Channels;
import io.netty.channel.MessageEvent;
/**
* {@link WritableByteChannel} implementation which will take care to wrap the {@link ByteBuffer} to a {@link ChannelBuffer} and forward it to the next {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} on every {@link #write(ByteBuffer)}
* operation.
*/
public class ChannelWritableByteChannel implements WritableByteChannel {
private boolean closed;
private final ChannelHandlerContext context;
private final ChannelFutureAggregator aggregator;
private final SocketAddress remote;
public ChannelWritableByteChannel(ChannelHandlerContext context, MessageEvent event) {
this(context, new ChannelFutureAggregator(event.getFuture()), event.getRemoteAddress());
}
public ChannelWritableByteChannel(ChannelHandlerContext context, ChannelFutureAggregator aggregator, SocketAddress remote) {
this.context = context;
this.aggregator = aggregator;
this.remote = remote;
}
@Override
public boolean isOpen() {
return !closed && context.channel().isOpen();
}
@Override
public void close() throws IOException {
closed = true;
}
@Override
public int write(ByteBuffer src) throws IOException {
int written = src.remaining();
// create a new ChannelFuture and add it to the aggregator
ChannelFuture future = Channels.future(context.channel(), true);
aggregator.addFuture(future);
Channels.write(context, future, ChannelBuffers.wrappedBuffer(src), remote);
return written;
}
}

View File

@ -1,75 +1,75 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.region;
import java.nio.channels.WritableByteChannel;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelDownstreamHandler;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.FileRegion;
import io.netty.channel.MessageEvent;
/**
* {@link ChannelDownstreamHandler} implementation which encodes a {@link FileRegion} to {@link ChannelBuffer}'s if one of the given {@link ChannelHandler} was found in the {@link ChannelPipeline}.
*
* This {@link ChannelDownstreamHandler} should be used if you plan to write {@link FileRegion} objects and also have some {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} which needs to transform
* the to be written {@link ChannelBuffer} in any case. This could be for example {@link ChannelDownstreamHandler}'s which needs to encrypt or compress messages.
*
* Users of this {@link FileRegionEncoder} should add / remove this {@link ChannelDownstreamHandler} on the fly to get the best performance out of their system.
*
*
*/
@ChannelHandler.Sharable
public class FileRegionEncoder implements ChannelDownstreamHandler {
@Override
public void handleDownstream(
ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
if (!(evt instanceof MessageEvent)) {
ctx.sendDownstream(evt);
return;
}
MessageEvent e = (MessageEvent) evt;
Object originalMessage = e.getMessage();
if (originalMessage instanceof FileRegion) {
FileRegion fr = (FileRegion) originalMessage;
WritableByteChannel bchannel = new ChannelWritableByteChannel(ctx, e);
int length = 0;
long i = 0;
while ((i = fr.transferTo(bchannel, length)) > 0) {
length += i;
if (length >= fr.getCount()) {
break;
}
}
} else {
// no converting is needed so just sent the event downstream
ctx.sendDownstream(evt);
}
}
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.region;
import java.nio.channels.WritableByteChannel;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelDownstreamHandler;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.FileRegion;
import io.netty.channel.MessageEvent;
/**
* {@link ChannelDownstreamHandler} implementation which encodes a {@link FileRegion} to {@link ChannelBuffer}'s if one of the given {@link ChannelHandler} was found in the {@link ChannelPipeline}.
*
* This {@link ChannelDownstreamHandler} should be used if you plan to write {@link FileRegion} objects and also have some {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} which needs to transform
* the to be written {@link ChannelBuffer} in any case. This could be for example {@link ChannelDownstreamHandler}'s which needs to encrypt or compress messages.
*
* Users of this {@link FileRegionEncoder} should add / remove this {@link ChannelDownstreamHandler} on the fly to get the best performance out of their system.
*
*
*/
@ChannelHandler.Sharable
public class FileRegionEncoder implements ChannelDownstreamHandler {
@Override
public void handleDownstream(
ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
if (!(evt instanceof MessageEvent)) {
ctx.sendDownstream(evt);
return;
}
MessageEvent e = (MessageEvent) evt;
Object originalMessage = e.getMessage();
if (originalMessage instanceof FileRegion) {
FileRegion fr = (FileRegion) originalMessage;
WritableByteChannel bchannel = new ChannelWritableByteChannel(ctx, e);
int length = 0;
long i = 0;
while ((i = fr.transferTo(bchannel, length)) > 0) {
length += i;
if (length >= fr.getCount()) {
break;
}
}
} else {
// no converting is needed so just sent the event downstream
ctx.sendDownstream(evt);
}
}
}

View File

@ -1,347 +1,347 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.ipfilter;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import junit.framework.TestCase;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.UpstreamMessageEvent;
import org.junit.Test;
public class IpFilterRuleTest extends TestCase
{
public static boolean accept(IpFilterRuleHandler h, InetSocketAddress addr) throws Exception
{
return h.accept(new ChannelHandlerContext()
{
@Override
public boolean canHandleDownstream()
{
// TODO Auto-generated method stub
return false;
}
@Override
public boolean canHandleUpstream()
{
// TODO Auto-generated method stub
return false;
}
@Override
public Object getAttachment()
{
// TODO Auto-generated method stub
return null;
}
@Override
public Channel getChannel()
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelHandler getHandler()
{
// TODO Auto-generated method stub
return null;
}
@Override
public String getName()
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelPipeline getPipeline()
{
// TODO Auto-generated method stub
return null;
}
@Override
public void sendDownstream(ChannelEvent e)
{
// TODO Auto-generated method stub
}
@Override
public void sendUpstream(ChannelEvent e)
{
// TODO Auto-generated method stub
}
@Override
public void setAttachment(Object attachment)
{
// TODO Auto-generated method stub
}
},
new UpstreamMessageEvent(new Channel()
{
@Override
public ChannelFuture bind(SocketAddress localAddress)
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelFuture close()
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress)
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelFuture disconnect()
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelFuture getCloseFuture()
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelConfig getConfig()
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelFactory getFactory()
{
// TODO Auto-generated method stub
return null;
}
@Override
public Integer getId()
{
// TODO Auto-generated method stub
return null;
}
@Override
public int getInterestOps()
{
// TODO Auto-generated method stub
return 0;
}
@Override
public SocketAddress getLocalAddress()
{
// TODO Auto-generated method stub
return null;
}
@Override
public Channel getParent()
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelPipeline getPipeline()
{
// TODO Auto-generated method stub
return null;
}
@Override
public SocketAddress getRemoteAddress()
{
// TODO Auto-generated method stub
return null;
}
@Override
public boolean isBound()
{
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isConnected()
{
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isOpen()
{
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isReadable()
{
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isWritable()
{
// TODO Auto-generated method stub
return false;
}
@Override
public ChannelFuture setInterestOps(int interestOps)
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelFuture setReadable(boolean readable)
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelFuture unbind()
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelFuture write(Object message)
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelFuture write(Object message, SocketAddress remoteAddress)
{
// TODO Auto-generated method stub
return null;
}
@Override
public int compareTo(Channel o)
{
// TODO Auto-generated method stub
return 0;
}
@Override
public Object getAttachment() {
return null;
}
@Override
public void setAttachment(Object attachment) {
}
}, h, addr),
addr);
}
@Test
public void testIpFilterRule() throws Exception
{
IpFilterRuleHandler h = new IpFilterRuleHandler();
h.addAll(new IpFilterRuleList("+n:localhost, -n:*"));
InetSocketAddress addr = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
assertTrue(accept(h, addr));
addr = new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 8080);
assertFalse(accept(h, addr));
addr = new InetSocketAddress(InetAddress.getByName(InetAddress.getLocalHost().getHostName()), 8080);
assertTrue(accept(h, addr));
h.clear();
h.addAll(new IpFilterRuleList("+n:*"+InetAddress.getLocalHost().getHostName().substring(1)+", -n:*"));
addr = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
assertTrue(accept(h, addr));
addr = new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 8080);
assertFalse(accept(h, addr));
addr = new InetSocketAddress(InetAddress.getByName(InetAddress.getLocalHost().getHostName()), 8080);
assertTrue(accept(h, addr));
h.clear();
h.addAll(new IpFilterRuleList("+c:"+InetAddress.getLocalHost().getHostAddress()+"/32, -n:*"));
addr = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
assertTrue(accept(h, addr));
addr = new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 8080);
assertFalse(accept(h, addr));
addr = new InetSocketAddress(InetAddress.getByName(InetAddress.getLocalHost().getHostName()), 8080);
assertTrue(accept(h, addr));
h.clear();
h.addAll(new IpFilterRuleList(""));
addr = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
assertTrue(accept(h, addr));
addr = new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 8080);
assertTrue(accept(h, addr));
addr = new InetSocketAddress(InetAddress.getByName(InetAddress.getLocalHost().getHostName()), 8080);
assertTrue(accept(h, addr));
h.clear();
addr = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
assertTrue(accept(h, addr));
addr = new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 8080);
assertTrue(accept(h, addr));
addr = new InetSocketAddress(InetAddress.getByName(InetAddress.getLocalHost().getHostName()), 8080);
assertTrue(accept(h, addr));
}
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.ipfilter;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import junit.framework.TestCase;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.UpstreamMessageEvent;
import org.junit.Test;
public class IpFilterRuleTest extends TestCase
{
public static boolean accept(IpFilterRuleHandler h, InetSocketAddress addr) throws Exception
{
return h.accept(new ChannelHandlerContext()
{
@Override
public boolean canHandleDownstream()
{
// TODO Auto-generated method stub
return false;
}
@Override
public boolean canHandleUpstream()
{
// TODO Auto-generated method stub
return false;
}
@Override
public Object getAttachment()
{
// TODO Auto-generated method stub
return null;
}
@Override
public Channel getChannel()
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelHandler getHandler()
{
// TODO Auto-generated method stub
return null;
}
@Override
public String getName()
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelPipeline getPipeline()
{
// TODO Auto-generated method stub
return null;
}
@Override
public void sendDownstream(ChannelEvent e)
{
// TODO Auto-generated method stub
}
@Override
public void sendUpstream(ChannelEvent e)
{
// TODO Auto-generated method stub
}
@Override
public void setAttachment(Object attachment)
{
// TODO Auto-generated method stub
}
},
new UpstreamMessageEvent(new Channel()
{
@Override
public ChannelFuture bind(SocketAddress localAddress)
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelFuture close()
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress)
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelFuture disconnect()
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelFuture getCloseFuture()
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelConfig getConfig()
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelFactory getFactory()
{
// TODO Auto-generated method stub
return null;
}
@Override
public Integer getId()
{
// TODO Auto-generated method stub
return null;
}
@Override
public int getInterestOps()
{
// TODO Auto-generated method stub
return 0;
}
@Override
public SocketAddress getLocalAddress()
{
// TODO Auto-generated method stub
return null;
}
@Override
public Channel getParent()
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelPipeline getPipeline()
{
// TODO Auto-generated method stub
return null;
}
@Override
public SocketAddress getRemoteAddress()
{
// TODO Auto-generated method stub
return null;
}
@Override
public boolean isBound()
{
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isConnected()
{
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isOpen()
{
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isReadable()
{
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isWritable()
{
// TODO Auto-generated method stub
return false;
}
@Override
public ChannelFuture setInterestOps(int interestOps)
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelFuture setReadable(boolean readable)
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelFuture unbind()
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelFuture write(Object message)
{
// TODO Auto-generated method stub
return null;
}
@Override
public ChannelFuture write(Object message, SocketAddress remoteAddress)
{
// TODO Auto-generated method stub
return null;
}
@Override
public int compareTo(Channel o)
{
// TODO Auto-generated method stub
return 0;
}
@Override
public Object getAttachment() {
return null;
}
@Override
public void setAttachment(Object attachment) {
}
}, h, addr),
addr);
}
@Test
public void testIpFilterRule() throws Exception
{
IpFilterRuleHandler h = new IpFilterRuleHandler();
h.addAll(new IpFilterRuleList("+n:localhost, -n:*"));
InetSocketAddress addr = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
assertTrue(accept(h, addr));
addr = new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 8080);
assertFalse(accept(h, addr));
addr = new InetSocketAddress(InetAddress.getByName(InetAddress.getLocalHost().getHostName()), 8080);
assertTrue(accept(h, addr));
h.clear();
h.addAll(new IpFilterRuleList("+n:*"+InetAddress.getLocalHost().getHostName().substring(1)+", -n:*"));
addr = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
assertTrue(accept(h, addr));
addr = new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 8080);
assertFalse(accept(h, addr));
addr = new InetSocketAddress(InetAddress.getByName(InetAddress.getLocalHost().getHostName()), 8080);
assertTrue(accept(h, addr));
h.clear();
h.addAll(new IpFilterRuleList("+c:"+InetAddress.getLocalHost().getHostAddress()+"/32, -n:*"));
addr = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
assertTrue(accept(h, addr));
addr = new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 8080);
assertFalse(accept(h, addr));
addr = new InetSocketAddress(InetAddress.getByName(InetAddress.getLocalHost().getHostName()), 8080);
assertTrue(accept(h, addr));
h.clear();
h.addAll(new IpFilterRuleList(""));
addr = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
assertTrue(accept(h, addr));
addr = new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 8080);
assertTrue(accept(h, addr));
addr = new InetSocketAddress(InetAddress.getByName(InetAddress.getLocalHost().getHostName()), 8080);
assertTrue(accept(h, addr));
h.clear();
addr = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
assertTrue(accept(h, addr));
addr = new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 8080);
assertTrue(accept(h, addr));
addr = new InetSocketAddress(InetAddress.getByName(InetAddress.getLocalHost().getHostName()), 8080);
assertTrue(accept(h, addr));
}
}

View File

@ -1,36 +1,36 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.http;
import io.netty.buffer.ChannelBuffer;
/**
* Interface from the server message switch and channel sink to an
* accepted channel. Exists primarily for mock testing purposes.
*
*/
interface HttpTunnelAcceptedChannelReceiver {
void updateInterestOps(SaturationStateChange transition);
void dataReceived(ChannelBuffer data);
void clientClosed();
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.http;
import io.netty.buffer.ChannelBuffer;
/**
* Interface from the server message switch and channel sink to an
* accepted channel. Exists primarily for mock testing purposes.
*
*/
interface HttpTunnelAcceptedChannelReceiver {
void updateInterestOps(SaturationStateChange transition);
void dataReceived(ChannelBuffer data);
void clientClosed();
}

View File

@ -1,69 +1,69 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.http;
import static io.netty.channel.socket.http.SaturationStateChange.DESATURATED;
import static io.netty.channel.socket.http.SaturationStateChange.NO_CHANGE;
import static io.netty.channel.socket.http.SaturationStateChange.SATURATED;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* This class is used to monitor the amount of data that has yet to be pushed to
* the underlying socket, in order to implement the "high/low water mark" facility
* that controls Channel.isWritable() and the interest ops of http tunnels.
*
*/
class SaturationManager {
private final AtomicLong desaturationPoint;
private final AtomicLong saturationPoint;
private final AtomicLong queueSize;
private final AtomicBoolean saturated;
public SaturationManager(long desaturationPoint, long saturationPoint) {
this.desaturationPoint = new AtomicLong(desaturationPoint);
this.saturationPoint = new AtomicLong(saturationPoint);
queueSize = new AtomicLong(0);
saturated = new AtomicBoolean(false);
}
public SaturationStateChange queueSizeChanged(long sizeDelta) {
long newQueueSize = queueSize.addAndGet(sizeDelta);
if (newQueueSize <= desaturationPoint.get()) {
if (saturated.compareAndSet(true, false)) {
return DESATURATED;
}
} else if (newQueueSize > saturationPoint.get()) {
if (saturated.compareAndSet(false, true)) {
return SATURATED;
}
}
return NO_CHANGE;
}
public void updateThresholds(long desaturationPoint, long saturationPoint) {
this.desaturationPoint.set(desaturationPoint);
this.saturationPoint.set(saturationPoint);
}
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.http;
import static io.netty.channel.socket.http.SaturationStateChange.DESATURATED;
import static io.netty.channel.socket.http.SaturationStateChange.NO_CHANGE;
import static io.netty.channel.socket.http.SaturationStateChange.SATURATED;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* This class is used to monitor the amount of data that has yet to be pushed to
* the underlying socket, in order to implement the "high/low water mark" facility
* that controls Channel.isWritable() and the interest ops of http tunnels.
*
*/
class SaturationManager {
private final AtomicLong desaturationPoint;
private final AtomicLong saturationPoint;
private final AtomicLong queueSize;
private final AtomicBoolean saturated;
public SaturationManager(long desaturationPoint, long saturationPoint) {
this.desaturationPoint = new AtomicLong(desaturationPoint);
this.saturationPoint = new AtomicLong(saturationPoint);
queueSize = new AtomicLong(0);
saturated = new AtomicBoolean(false);
}
public SaturationStateChange queueSizeChanged(long sizeDelta) {
long newQueueSize = queueSize.addAndGet(sizeDelta);
if (newQueueSize <= desaturationPoint.get()) {
if (saturated.compareAndSet(true, false)) {
return DESATURATED;
}
} else if (newQueueSize > saturationPoint.get()) {
if (saturated.compareAndSet(false, true)) {
return SATURATED;
}
}
return NO_CHANGE;
}
public void updateThresholds(long desaturationPoint, long saturationPoint) {
this.desaturationPoint.set(desaturationPoint);
this.saturationPoint.set(saturationPoint);
}
}

View File

@ -1,29 +1,29 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.http;
/**
* Represents the state change of a chanel in response in the amount of pending data to be
* sent - either no change occurs, the channel becomes desaturated (indicating that writing
* can safely commence) or it becomes saturated (indicating that writing should cease).
*
*/
enum SaturationStateChange {
NO_CHANGE, DESATURATED, SATURATED
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.http;
/**
* Represents the state change of a chanel in response in the amount of pending data to be
* sent - either no change occurs, the channel becomes desaturated (indicating that writing
* can safely commence) or it becomes saturated (indicating that writing should cease).
*
*/
enum SaturationStateChange {
NO_CHANGE, DESATURATED, SATURATED
}

View File

@ -1,52 +1,52 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.http;
import static org.junit.Assert.*;
import static io.netty.channel.socket.http.SaturationStateChange.*;
import org.junit.Before;
import org.junit.Test;
/**
* Tests saturation managers
*
*/
public class SaturationManagerTest {
private SaturationManager manager;
@Before
public void setUp() {
manager = new SaturationManager(100L, 200L);
}
@Test
public void testQueueSizeChanged() {
assertEquals(NO_CHANGE, manager.queueSizeChanged(100L));
assertEquals(NO_CHANGE, manager.queueSizeChanged(99L));
assertEquals(NO_CHANGE, manager.queueSizeChanged(1L));
assertEquals(SATURATED, manager.queueSizeChanged(1L));
assertEquals(NO_CHANGE, manager.queueSizeChanged(10L));
assertEquals(NO_CHANGE, manager.queueSizeChanged(-10L));
assertEquals(NO_CHANGE, manager.queueSizeChanged(-1L));
assertEquals(NO_CHANGE, manager.queueSizeChanged(-1L));
assertEquals(DESATURATED, manager.queueSizeChanged(-99L));
assertEquals(NO_CHANGE, manager.queueSizeChanged(-100L));
}
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.http;
import static org.junit.Assert.*;
import static io.netty.channel.socket.http.SaturationStateChange.*;
import org.junit.Before;
import org.junit.Test;
/**
* Tests saturation managers
*
*/
public class SaturationManagerTest {
private SaturationManager manager;
@Before
public void setUp() {
manager = new SaturationManager(100L, 200L);
}
@Test
public void testQueueSizeChanged() {
assertEquals(NO_CHANGE, manager.queueSizeChanged(100L));
assertEquals(NO_CHANGE, manager.queueSizeChanged(99L));
assertEquals(NO_CHANGE, manager.queueSizeChanged(1L));
assertEquals(SATURATED, manager.queueSizeChanged(1L));
assertEquals(NO_CHANGE, manager.queueSizeChanged(10L));
assertEquals(NO_CHANGE, manager.queueSizeChanged(-10L));
assertEquals(NO_CHANGE, manager.queueSizeChanged(-1L));
assertEquals(NO_CHANGE, manager.queueSizeChanged(-1L));
assertEquals(DESATURATED, manager.queueSizeChanged(-99L));
assertEquals(NO_CHANGE, manager.queueSizeChanged(-100L));
}
}

View File

@ -1,109 +1,109 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.sctp;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import com.sun.nio.sctp.SctpChannel;
import io.netty.channel.socket.nio.AbstractJdkChannel;
public class SctpJdkChannel extends AbstractJdkChannel {
SctpJdkChannel(SctpChannel channel) {
super(channel);
}
@Override
protected SctpChannel getChannel() {
return (SctpChannel) super.getChannel();
}
@Override
public InetSocketAddress getRemoteSocketAddress() {
try {
for (SocketAddress address : getChannel().getRemoteAddresses()) {
return (InetSocketAddress) address;
}
} catch (IOException e) {
// ignore
}
return null;
}
@Override
public SocketAddress getLocalSocketAddress() {
try {
for (SocketAddress address : getChannel().getAllLocalAddresses()) {
return (InetSocketAddress) address;
}
} catch (IOException e) {
// ignore
}
return null;
}
@Override
public boolean isConnected() {
return getChannel().isOpen();
}
@Override
public boolean isSocketBound() {
try {
return !getChannel().getAllLocalAddresses().isEmpty();
} catch (IOException e) {
return false;
}
}
@Override
public void disconnectSocket() throws IOException {
closeSocket();
}
@Override
public void closeSocket() throws IOException {
for (SocketAddress address: getChannel().getAllLocalAddresses()) {
getChannel().unbindAddress(((InetSocketAddress) address).getAddress());
}
}
@Override
public void bind(SocketAddress local) throws IOException {
getChannel().bind(local);
}
@Override
public void connect(SocketAddress remote) throws IOException {
getChannel().connect(remote);
}
@Override
public int write(ByteBuffer src) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public boolean finishConnect() throws IOException {
return getChannel().finishConnect();
}
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.sctp;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import com.sun.nio.sctp.SctpChannel;
import io.netty.channel.socket.nio.AbstractJdkChannel;
public class SctpJdkChannel extends AbstractJdkChannel {
SctpJdkChannel(SctpChannel channel) {
super(channel);
}
@Override
protected SctpChannel getChannel() {
return (SctpChannel) super.getChannel();
}
@Override
public InetSocketAddress getRemoteSocketAddress() {
try {
for (SocketAddress address : getChannel().getRemoteAddresses()) {
return (InetSocketAddress) address;
}
} catch (IOException e) {
// ignore
}
return null;
}
@Override
public SocketAddress getLocalSocketAddress() {
try {
for (SocketAddress address : getChannel().getAllLocalAddresses()) {
return (InetSocketAddress) address;
}
} catch (IOException e) {
// ignore
}
return null;
}
@Override
public boolean isConnected() {
return getChannel().isOpen();
}
@Override
public boolean isSocketBound() {
try {
return !getChannel().getAllLocalAddresses().isEmpty();
} catch (IOException e) {
return false;
}
}
@Override
public void disconnectSocket() throws IOException {
closeSocket();
}
@Override
public void closeSocket() throws IOException {
for (SocketAddress address: getChannel().getAllLocalAddresses()) {
getChannel().unbindAddress(((InetSocketAddress) address).getAddress());
}
}
@Override
public void bind(SocketAddress local) throws IOException {
getChannel().bind(local);
}
@Override
public void connect(SocketAddress remote) throws IOException {
getChannel().connect(remote);
}
@Override
public int write(ByteBuffer src) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public boolean finishConnect() throws IOException {
return getChannel().finishConnect();
}
}

View File

@ -1,33 +1,33 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.sctp;
import java.util.concurrent.Executor;
import io.netty.channel.socket.nio.AbstractNioWorkerPool;
public class SctpWorkerPool extends AbstractNioWorkerPool<SctpWorker> {
public SctpWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) {
super(executor, workerCount, allowShutdownOnIdle);
}
@Override
protected SctpWorker createWorker(Executor executor, boolean allowShutdownOnIdle) {
return new SctpWorker(executor, allowShutdownOnIdle);
}
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.sctp;
import java.util.concurrent.Executor;
import io.netty.channel.socket.nio.AbstractNioWorkerPool;
public class SctpWorkerPool extends AbstractNioWorkerPool<SctpWorker> {
public SctpWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) {
super(executor, workerCount, allowShutdownOnIdle);
}
@Override
protected SctpWorker createWorker(Executor executor, boolean allowShutdownOnIdle) {
return new SctpWorker(executor, allowShutdownOnIdle);
}
}

View File

@ -1,118 +1,118 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.channel.socket.Worker;
abstract class AbstractOioChannel extends AbstractChannel {
private volatile InetSocketAddress localAddress;
volatile InetSocketAddress remoteAddress;
volatile Thread workerThread;
volatile Worker worker;
final Object interestOpsLock = new Object();
AbstractOioChannel(
Channel parent,
ChannelFactory factory,
ChannelPipeline pipeline,
ChannelSink sink) {
super(parent, factory, pipeline, sink);
}
@Override
protected boolean setClosed() {
return super.setClosed();
}
@Override
protected void setInterestOpsNow(int interestOps) {
super.setInterestOpsNow(interestOps);
}
@Override
public ChannelFuture write(Object message, SocketAddress remoteAddress) {
if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
return super.write(message, null);
} else {
return super.write(message, remoteAddress);
}
}
@Override
public boolean isBound() {
return isOpen() && isSocketBound();
}
@Override
public boolean isConnected() {
return isOpen() && isSocketConnected();
}
@Override
public InetSocketAddress getLocalAddress() {
InetSocketAddress localAddress = this.localAddress;
if (localAddress == null) {
try {
this.localAddress = localAddress =
(InetSocketAddress) getLocalSocketAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return localAddress;
}
@Override
public InetSocketAddress getRemoteAddress() {
InetSocketAddress remoteAddress = this.remoteAddress;
if (remoteAddress == null) {
try {
this.remoteAddress = remoteAddress =
(InetSocketAddress) getRemoteSocketAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return remoteAddress;
}
abstract boolean isSocketBound();
abstract boolean isSocketConnected();
abstract boolean isSocketClosed();
abstract InetSocketAddress getLocalSocketAddress() throws Exception;
abstract InetSocketAddress getRemoteSocketAddress() throws Exception;
abstract void closeSocket() throws IOException;
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.channel.socket.Worker;
abstract class AbstractOioChannel extends AbstractChannel {
private volatile InetSocketAddress localAddress;
volatile InetSocketAddress remoteAddress;
volatile Thread workerThread;
volatile Worker worker;
final Object interestOpsLock = new Object();
AbstractOioChannel(
Channel parent,
ChannelFactory factory,
ChannelPipeline pipeline,
ChannelSink sink) {
super(parent, factory, pipeline, sink);
}
@Override
protected boolean setClosed() {
return super.setClosed();
}
@Override
protected void setInterestOpsNow(int interestOps) {
super.setInterestOpsNow(interestOps);
}
@Override
public ChannelFuture write(Object message, SocketAddress remoteAddress) {
if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
return super.write(message, null);
} else {
return super.write(message, remoteAddress);
}
}
@Override
public boolean isBound() {
return isOpen() && isSocketBound();
}
@Override
public boolean isConnected() {
return isOpen() && isSocketConnected();
}
@Override
public InetSocketAddress getLocalAddress() {
InetSocketAddress localAddress = this.localAddress;
if (localAddress == null) {
try {
this.localAddress = localAddress =
(InetSocketAddress) getLocalSocketAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return localAddress;
}
@Override
public InetSocketAddress getRemoteAddress() {
InetSocketAddress remoteAddress = this.remoteAddress;
if (remoteAddress == null) {
try {
this.remoteAddress = remoteAddress =
(InetSocketAddress) getRemoteSocketAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return remoteAddress;
}
abstract boolean isSocketBound();
abstract boolean isSocketConnected();
abstract boolean isSocketClosed();
abstract InetSocketAddress getLocalSocketAddress() throws Exception;
abstract InetSocketAddress getRemoteSocketAddress() throws Exception;
abstract void closeSocket() throws IOException;
}

View File

@ -1,57 +1,57 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.ChannelRunnableWrapper;
import io.netty.channel.socket.Worker;
public abstract class AbstractOioChannelSink extends AbstractChannelSink {
@Override
public ChannelFuture execute(final ChannelPipeline pipeline, final Runnable task) {
Channel ch = pipeline.channel();
if (ch instanceof AbstractOioChannel) {
AbstractOioChannel channel = (AbstractOioChannel) ch;
Worker worker = channel.worker;
if (worker != null) {
ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.channel(), task);
channel.worker.executeInIoThread(wrapper);
return wrapper;
}
}
return super.execute(pipeline, task);
}
@Override
protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) {
Channel channel = event.getChannel();
boolean fireLater = false;
if (channel instanceof AbstractOioChannel) {
fireLater = !AbstractOioWorker.isIoThread((AbstractOioChannel) channel);
}
return fireLater;
}
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.ChannelRunnableWrapper;
import io.netty.channel.socket.Worker;
public abstract class AbstractOioChannelSink extends AbstractChannelSink {
@Override
public ChannelFuture execute(final ChannelPipeline pipeline, final Runnable task) {
Channel ch = pipeline.channel();
if (ch instanceof AbstractOioChannel) {
AbstractOioChannel channel = (AbstractOioChannel) ch;
Worker worker = channel.worker;
if (worker != null) {
ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.channel(), task);
channel.worker.executeInIoThread(wrapper);
return wrapper;
}
}
return super.execute(pipeline, task);
}
@Override
protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) {
Channel channel = event.getChannel();
boolean fireLater = false;
if (channel instanceof AbstractOioChannel) {
fireLater = !AbstractOioWorker.isIoThread((AbstractOioChannel) channel);
}
return fireLater;
}
}

View File

@ -1,226 +1,226 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import static io.netty.channel.Channels.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.Channels;
import io.netty.channel.socket.Worker;
import io.netty.util.internal.QueueFactory;
import java.io.IOException;
import java.util.Queue;
/**
* Abstract base class for Oio-Worker implementations
*
* @param <C> {@link AbstractOioChannel}
*/
abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker {
private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class);
protected final C channel;
/**
* If this worker has been started thread will be a reference to the thread
* used when starting. i.e. the current thread when the run method is executed.
*/
protected volatile Thread thread;
public AbstractOioWorker(C channel) {
this.channel = channel;
channel.worker = this;
}
@Override
public void run() {
thread = channel.workerThread = Thread.currentThread();
while (channel.isOpen()) {
synchronized (channel.interestOpsLock) {
while (!channel.isReadable()) {
try {
// notify() is not called at all.
// close() and setInterestOps() calls Thread.interrupt()
channel.interestOpsLock.wait();
} catch (InterruptedException e) {
if (!channel.isOpen()) {
break;
}
}
}
}
try {
boolean cont = process();
processEventQueue();
if (!cont) {
break;
}
} catch (Throwable t) {
if (!channel.isSocketClosed()) {
fireExceptionCaught(channel, t);
}
break;
}
}
// Setting the workerThread to null will prevent any channel
// operations from interrupting this thread from now on.
channel.workerThread = null;
// Clean up.
close(channel, succeededFuture(channel), true);
}
static boolean isIoThread(AbstractOioChannel channel) {
return Thread.currentThread() == channel.workerThread;
}
@Override
public void executeInIoThread(Runnable task) {
if (Thread.currentThread() == thread) {
task.run();
} else {
boolean added = eventQueue.offer(task);
if (added) {
// as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest
}
}
}
private void processEventQueue() throws IOException {
for (;;) {
final Runnable task = eventQueue.poll();
if (task == null) {
break;
}
task.run();
}
}
/**
* Process the incoming messages and also is responsible for call {@link Channels#fireMessageReceived(Channel, Object)} once a message
* was processed without errors.
*
* @return continue returns <code>true</code> as long as this worker should continue to try processing incoming messages
* @throws IOException
*/
abstract boolean process() throws IOException;
static void setInterestOps(
AbstractOioChannel channel, ChannelFuture future, int interestOps) {
boolean iothread = isIoThread(channel);
// Override OP_WRITE flag - a user cannot change this flag.
interestOps &= ~Channel.OP_WRITE;
interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
boolean changed = false;
try {
if (channel.getInterestOps() != interestOps) {
if ((interestOps & Channel.OP_READ) != 0) {
channel.setInterestOpsNow(Channel.OP_READ);
} else {
channel.setInterestOpsNow(Channel.OP_NONE);
}
changed = true;
}
future.setSuccess();
if (changed) {
synchronized (channel.interestOpsLock) {
channel.setInterestOpsNow(interestOps);
// Notify the worker so it stops or continues reading.
Thread currentThread = Thread.currentThread();
Thread workerThread = channel.workerThread;
if (workerThread != null && currentThread != workerThread) {
workerThread.interrupt();
}
}
if (iothread) {
fireChannelInterestChanged(channel);
} else {
fireChannelInterestChangedLater(channel);
}
}
} catch (Throwable t) {
future.setFailure(t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}
static void close(AbstractOioChannel channel, ChannelFuture future) {
close(channel, future, isIoThread(channel));
}
private static void close(AbstractOioChannel channel, ChannelFuture future, boolean iothread) {
boolean connected = channel.isConnected();
boolean bound = channel.isBound();
try {
channel.closeSocket();
if (channel.setClosed()) {
future.setSuccess();
if (connected) {
// Notify the worker so it stops reading.
Thread currentThread = Thread.currentThread();
Thread workerThread = channel.workerThread;
if (workerThread != null && currentThread != workerThread) {
workerThread.interrupt();
}
if (iothread) {
fireChannelDisconnected(channel);
} else {
fireChannelDisconnectedLater(channel);
}
}
if (bound) {
if (iothread) {
fireChannelUnbound(channel);
} else {
fireChannelUnboundLater(channel);
}
}
if (iothread) {
fireChannelClosed(channel);
} else {
fireChannelClosedLater(channel);
}
} else {
future.setSuccess();
}
} catch (Throwable t) {
future.setFailure(t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import static io.netty.channel.Channels.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.Channels;
import io.netty.channel.socket.Worker;
import io.netty.util.internal.QueueFactory;
import java.io.IOException;
import java.util.Queue;
/**
* Abstract base class for Oio-Worker implementations
*
* @param <C> {@link AbstractOioChannel}
*/
abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker {
private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class);
protected final C channel;
/**
* If this worker has been started thread will be a reference to the thread
* used when starting. i.e. the current thread when the run method is executed.
*/
protected volatile Thread thread;
public AbstractOioWorker(C channel) {
this.channel = channel;
channel.worker = this;
}
@Override
public void run() {
thread = channel.workerThread = Thread.currentThread();
while (channel.isOpen()) {
synchronized (channel.interestOpsLock) {
while (!channel.isReadable()) {
try {
// notify() is not called at all.
// close() and setInterestOps() calls Thread.interrupt()
channel.interestOpsLock.wait();
} catch (InterruptedException e) {
if (!channel.isOpen()) {
break;
}
}
}
}
try {
boolean cont = process();
processEventQueue();
if (!cont) {
break;
}
} catch (Throwable t) {
if (!channel.isSocketClosed()) {
fireExceptionCaught(channel, t);
}
break;
}
}
// Setting the workerThread to null will prevent any channel
// operations from interrupting this thread from now on.
channel.workerThread = null;
// Clean up.
close(channel, succeededFuture(channel), true);
}
static boolean isIoThread(AbstractOioChannel channel) {
return Thread.currentThread() == channel.workerThread;
}
@Override
public void executeInIoThread(Runnable task) {
if (Thread.currentThread() == thread) {
task.run();
} else {
boolean added = eventQueue.offer(task);
if (added) {
// as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest
}
}
}
private void processEventQueue() throws IOException {
for (;;) {
final Runnable task = eventQueue.poll();
if (task == null) {
break;
}
task.run();
}
}
/**
* Process the incoming messages and also is responsible for call {@link Channels#fireMessageReceived(Channel, Object)} once a message
* was processed without errors.
*
* @return continue returns <code>true</code> as long as this worker should continue to try processing incoming messages
* @throws IOException
*/
abstract boolean process() throws IOException;
static void setInterestOps(
AbstractOioChannel channel, ChannelFuture future, int interestOps) {
boolean iothread = isIoThread(channel);
// Override OP_WRITE flag - a user cannot change this flag.
interestOps &= ~Channel.OP_WRITE;
interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
boolean changed = false;
try {
if (channel.getInterestOps() != interestOps) {
if ((interestOps & Channel.OP_READ) != 0) {
channel.setInterestOpsNow(Channel.OP_READ);
} else {
channel.setInterestOpsNow(Channel.OP_NONE);
}
changed = true;
}
future.setSuccess();
if (changed) {
synchronized (channel.interestOpsLock) {
channel.setInterestOpsNow(interestOps);
// Notify the worker so it stops or continues reading.
Thread currentThread = Thread.currentThread();
Thread workerThread = channel.workerThread;
if (workerThread != null && currentThread != workerThread) {
workerThread.interrupt();
}
}
if (iothread) {
fireChannelInterestChanged(channel);
} else {
fireChannelInterestChangedLater(channel);
}
}
} catch (Throwable t) {
future.setFailure(t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}
static void close(AbstractOioChannel channel, ChannelFuture future) {
close(channel, future, isIoThread(channel));
}
private static void close(AbstractOioChannel channel, ChannelFuture future, boolean iothread) {
boolean connected = channel.isConnected();
boolean bound = channel.isBound();
try {
channel.closeSocket();
if (channel.setClosed()) {
future.setSuccess();
if (connected) {
// Notify the worker so it stops reading.
Thread currentThread = Thread.currentThread();
Thread workerThread = channel.workerThread;
if (workerThread != null && currentThread != workerThread) {
workerThread.interrupt();
}
if (iothread) {
fireChannelDisconnected(channel);
} else {
fireChannelDisconnectedLater(channel);
}
}
if (bound) {
if (iothread) {
fireChannelUnbound(channel);
} else {
fireChannelUnboundLater(channel);
}
}
if (iothread) {
fireChannelClosed(channel);
} else {
fireChannelClosedLater(channel);
}
} else {
future.setSuccess();
}
} catch (Throwable t) {
future.setFailure(t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}
}