Removing Http2StreamRemovalPolicy

Motivation:

Due to a recent flurry of cleanup and fixes, we no longer need the stream removal policy to protect against recently removed streams. We should get rid of it.

Modifications:

Removed Http2StreamRemovalPolicy and everywhere it's used.

Result:

Fixes #3448
This commit is contained in:
nmittler 2015-04-10 09:34:39 -07:00
parent cc7ee002dd
commit c388f3f085
4 changed files with 20 additions and 225 deletions

View File

@ -19,7 +19,6 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.immediateRemovalPolicy;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.REFUSED_STREAM;
import static io.netty.handler.codec.http2.Http2Exception.closedStreamError;
@ -35,7 +34,6 @@ import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http2.Http2StreamRemovalPolicy.Action;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.collection.PrimitiveCollections;
@ -71,36 +69,16 @@ public class DefaultHttp2Connection implements Http2Connection {
final ActiveStreams activeStreams;
/**
* Creates a connection with an immediate stream removal policy.
* Creates a new connection with the given settings.
*
* @param server
* whether or not this end-point is the server-side of the HTTP/2 connection.
*/
public DefaultHttp2Connection(boolean server) {
this(server, immediateRemovalPolicy());
}
/**
* Creates a new connection with the given settings.
*
* @param server
* whether or not this end-point is the server-side of the HTTP/2 connection.
* @param removalPolicy
* the policy to be used for removal of closed stream.
*/
public DefaultHttp2Connection(boolean server, Http2StreamRemovalPolicy removalPolicy) {
activeStreams = new ActiveStreams(listeners, checkNotNull(removalPolicy, "removalPolicy"));
activeStreams = new ActiveStreams(listeners);
localEndpoint = new DefaultEndpoint<Http2LocalFlowController>(server);
remoteEndpoint = new DefaultEndpoint<Http2RemoteFlowController>(!server);
// Tell the removal policy how to remove a stream from this connection.
removalPolicy.setAction(new Action() {
@Override
public void removeStream(Http2Stream stream) {
DefaultHttp2Connection.this.removeStream((DefaultStream) stream);
}
});
// Add the connection stream to the map.
streamMap.put(connectionStream.id(), connectionStream);
}
@ -997,31 +975,32 @@ public class DefaultHttp2Connection implements Http2Connection {
}
/**
* Default implementation of the {@link ActiveStreams} class.
* Allows events which would modify the collection of active streams to be queued while iterating via {@link
* #forEachActiveStream(Http2StreamVisitor)}.
*/
private static final class ActiveStreams {
interface Event {
/**
* Allows events which would modify {@link #streams} to be queued while iterating over {@link #streams}.
* Trigger the original intention of this event. Expect to modify the active streams list.
* <p/>
* If a {@link RuntimeException} object is thrown it will be logged and <strong>not propagated</strong>.
* Throwing from this method is not supported and is considered a programming error.
*/
interface Event {
/**
* Trigger the original intention of this event. Expect to modify {@link #streams}.
* <p>
* If a {@link RuntimeException} object is thrown it will be logged and <strong>not propagated</strong>.
* Throwing from this method is not supported and is considered a programming error.
*/
void process();
}
void process();
}
/**
* Manages the list of currently active streams. Queues any {@link Event}s that would modify the list of
* active streams in order to prevent modification while iterating.
*/
private final class ActiveStreams {
private final List<Listener> listeners;
private final Http2StreamRemovalPolicy removalPolicy;
private final Queue<Event> pendingEvents = new ArrayDeque<Event>(4);
private final Set<Http2Stream> streams = new LinkedHashSet<Http2Stream>();
private int pendingIterations;
public ActiveStreams(List<Listener> listeners, Http2StreamRemovalPolicy removalPolicy) {
public ActiveStreams(List<Listener> listeners) {
this.listeners = listeners;
this.removalPolicy = removalPolicy;
}
public int size() {
@ -1110,8 +1089,7 @@ public class DefaultHttp2Connection implements Http2Connection {
}
}
} finally {
// Mark this stream for removal.
removalPolicy.markForRemoval(stream);
removeStream(stream);
}
}
}

View File

@ -1,106 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ScheduledFuture;
/**
* A {@link Http2StreamRemovalPolicy} that periodically runs garbage collection on streams that have
* been marked for removal.
*/
public class DefaultHttp2StreamRemovalPolicy extends ChannelHandlerAdapter implements
Http2StreamRemovalPolicy, Runnable {
/**
* The interval (in ns) at which the removed priority garbage collector runs.
*/
private static final long GARBAGE_COLLECTION_INTERVAL = SECONDS.toNanos(5);
private final Queue<Garbage> garbage = new ArrayDeque<Garbage>();
private ScheduledFuture<?> timerFuture;
private Action action;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// Schedule the periodic timer for performing the policy check.
timerFuture = ctx.channel().eventLoop().scheduleWithFixedDelay(this,
GARBAGE_COLLECTION_INTERVAL,
GARBAGE_COLLECTION_INTERVAL,
NANOSECONDS);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// Cancel the periodic timer.
if (timerFuture != null) {
timerFuture.cancel(false);
timerFuture = null;
}
}
@Override
public void setAction(Action action) {
this.action = action;
}
@Override
public void markForRemoval(Http2Stream stream) {
garbage.add(new Garbage(stream));
}
/**
* Runs garbage collection of any streams marked for removal >
* {@link #GARBAGE_COLLECTION_INTERVAL} nanoseconds ago.
*/
@Override
public void run() {
if (garbage.isEmpty() || action == null) {
return;
}
long time = System.nanoTime();
for (;;) {
Garbage next = garbage.peek();
if (next == null) {
break;
}
if (time - next.removalTime > GARBAGE_COLLECTION_INTERVAL) {
garbage.remove();
action.removeStream(next.stream);
} else {
break;
}
}
}
/**
* Wrapper around a stream and its removal time.
*/
private static final class Garbage {
private final long removalTime = System.nanoTime();
private final Http2Stream stream;
Garbage(Http2Stream stream) {
this.stream = stream;
}
}
}

View File

@ -16,14 +16,13 @@
package io.netty.handler.codec.http2;
import static io.netty.util.CharsetUtil.UTF_8;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.http2.Http2StreamRemovalPolicy.Action;
import io.netty.util.concurrent.EventExecutor;
/**
@ -113,30 +112,6 @@ public final class Http2CodecUtil {
return Unpooled.wrappedBuffer(EMPTY_PING);
}
/**
* Returns a simple {@link Http2StreamRemovalPolicy} that immediately calls back the
* {@link Action} when a stream is marked for removal.
*/
public static Http2StreamRemovalPolicy immediateRemovalPolicy() {
return new Http2StreamRemovalPolicy() {
private Action action;
@Override
public void setAction(Action action) {
this.action = checkNotNull(action, "action");
}
@Override
public void markForRemoval(Http2Stream stream) {
if (action == null) {
throw new IllegalStateException(
"Action must be called before removing streams.");
}
action.removeStream(stream);
}
};
}
/**
* Iteratively looks through the causaility chain for the given exception and returns the first
* {@link Http2Exception} or {@code null} if none.
@ -321,15 +296,5 @@ public final class Http2CodecUtil {
}
}
/**
* Fails the given promise with the cause and then re-throws the cause.
*/
public static <T extends Throwable> T failAndThrow(ChannelPromise promise, T cause) throws T {
if (!promise.isDone()) {
promise.setFailure(cause);
}
throw cause;
}
private Http2CodecUtil() { }
}

View File

@ -1,42 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
/**
* A policy for determining when it is appropriate to remove streams from an HTTP/2 stream registry.
*/
public interface Http2StreamRemovalPolicy {
/**
* Performs the action of removing the stream.
*/
interface Action {
/**
* Removes the stream from the registry.
*/
void removeStream(Http2Stream stream);
}
/**
* Sets the removal action.
*/
void setAction(Action action);
/**
* Marks the given stream for removal. When this policy has determined that the given stream
* should be removed, it will call back the {@link Action}.
*/
void markForRemoval(Http2Stream stream);
}