619d82b56f
Motivation: Warnings in IDE, unclean code, negligible performance impact. Modification: Deletion of unused imports Result: No more warnings in IDE, cleaner code, negligible performance improvement.
358 lines
11 KiB
Java
358 lines
11 KiB
Java
/*
|
|
* Copyright 2012 The Netty Project
|
|
*
|
|
* The Netty Project licenses this file to you under the Apache License,
|
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at:
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
* License for the specific language governing permissions and limitations
|
|
* under the License.
|
|
*/
|
|
package io.netty.handler.codec.spdy;
|
|
|
|
import io.netty.channel.ChannelPromise;
|
|
import io.netty.util.internal.PlatformDependent;
|
|
|
|
import java.util.Comparator;
|
|
import java.util.Map;
|
|
import java.util.Queue;
|
|
import java.util.TreeMap;
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;
|
|
|
|
final class SpdySession {
|
|
|
|
private final AtomicInteger activeLocalStreams = new AtomicInteger();
|
|
private final AtomicInteger activeRemoteStreams = new AtomicInteger();
|
|
private final Map<Integer, StreamState> activeStreams = PlatformDependent.newConcurrentHashMap();
|
|
private final StreamComparator streamComparator = new StreamComparator();
|
|
private final AtomicInteger sendWindowSize;
|
|
private final AtomicInteger receiveWindowSize;
|
|
|
|
SpdySession(int sendWindowSize, int receiveWindowSize) {
|
|
this.sendWindowSize = new AtomicInteger(sendWindowSize);
|
|
this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
|
|
}
|
|
|
|
int numActiveStreams(boolean remote) {
|
|
if (remote) {
|
|
return activeRemoteStreams.get();
|
|
} else {
|
|
return activeLocalStreams.get();
|
|
}
|
|
}
|
|
|
|
boolean noActiveStreams() {
|
|
return activeStreams.isEmpty();
|
|
}
|
|
|
|
boolean isActiveStream(int streamId) {
|
|
return activeStreams.containsKey(streamId);
|
|
}
|
|
|
|
// Stream-IDs should be iterated in priority order
|
|
Map<Integer, StreamState> activeStreams() {
|
|
Map<Integer, StreamState> streams = new TreeMap<Integer, StreamState>(streamComparator);
|
|
streams.putAll(activeStreams);
|
|
return streams;
|
|
}
|
|
|
|
void acceptStream(
|
|
int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed,
|
|
int sendWindowSize, int receiveWindowSize, boolean remote) {
|
|
if (!remoteSideClosed || !localSideClosed) {
|
|
StreamState state = activeStreams.put(streamId, new StreamState(
|
|
priority, remoteSideClosed, localSideClosed, sendWindowSize, receiveWindowSize));
|
|
if (state == null) {
|
|
if (remote) {
|
|
activeRemoteStreams.incrementAndGet();
|
|
} else {
|
|
activeLocalStreams.incrementAndGet();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private StreamState removeActiveStream(int streamId, boolean remote) {
|
|
StreamState state = activeStreams.remove(streamId);
|
|
if (state != null) {
|
|
if (remote) {
|
|
activeRemoteStreams.decrementAndGet();
|
|
} else {
|
|
activeLocalStreams.decrementAndGet();
|
|
}
|
|
}
|
|
return state;
|
|
}
|
|
|
|
void removeStream(int streamId, Throwable cause, boolean remote) {
|
|
StreamState state = removeActiveStream(streamId, remote);
|
|
if (state != null) {
|
|
state.clearPendingWrites(cause);
|
|
}
|
|
}
|
|
|
|
boolean isRemoteSideClosed(int streamId) {
|
|
StreamState state = activeStreams.get(streamId);
|
|
return state == null || state.isRemoteSideClosed();
|
|
}
|
|
|
|
void closeRemoteSide(int streamId, boolean remote) {
|
|
StreamState state = activeStreams.get(streamId);
|
|
if (state != null) {
|
|
state.closeRemoteSide();
|
|
if (state.isLocalSideClosed()) {
|
|
removeActiveStream(streamId, remote);
|
|
}
|
|
}
|
|
}
|
|
|
|
boolean isLocalSideClosed(int streamId) {
|
|
StreamState state = activeStreams.get(streamId);
|
|
return state == null || state.isLocalSideClosed();
|
|
}
|
|
|
|
void closeLocalSide(int streamId, boolean remote) {
|
|
StreamState state = activeStreams.get(streamId);
|
|
if (state != null) {
|
|
state.closeLocalSide();
|
|
if (state.isRemoteSideClosed()) {
|
|
removeActiveStream(streamId, remote);
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* hasReceivedReply and receivedReply are only called from channelRead()
|
|
* no need to synchronize access to the StreamState
|
|
*/
|
|
boolean hasReceivedReply(int streamId) {
|
|
StreamState state = activeStreams.get(streamId);
|
|
return state != null && state.hasReceivedReply();
|
|
}
|
|
|
|
void receivedReply(int streamId) {
|
|
StreamState state = activeStreams.get(streamId);
|
|
if (state != null) {
|
|
state.receivedReply();
|
|
}
|
|
}
|
|
|
|
int getSendWindowSize(int streamId) {
|
|
if (streamId == SPDY_SESSION_STREAM_ID) {
|
|
return sendWindowSize.get();
|
|
}
|
|
|
|
StreamState state = activeStreams.get(streamId);
|
|
return state != null ? state.getSendWindowSize() : -1;
|
|
}
|
|
|
|
int updateSendWindowSize(int streamId, int deltaWindowSize) {
|
|
if (streamId == SPDY_SESSION_STREAM_ID) {
|
|
return sendWindowSize.addAndGet(deltaWindowSize);
|
|
}
|
|
|
|
StreamState state = activeStreams.get(streamId);
|
|
return state != null ? state.updateSendWindowSize(deltaWindowSize) : -1;
|
|
}
|
|
|
|
int updateReceiveWindowSize(int streamId, int deltaWindowSize) {
|
|
if (streamId == SPDY_SESSION_STREAM_ID) {
|
|
return receiveWindowSize.addAndGet(deltaWindowSize);
|
|
}
|
|
|
|
StreamState state = activeStreams.get(streamId);
|
|
if (state == null) {
|
|
return -1;
|
|
}
|
|
if (deltaWindowSize > 0) {
|
|
state.setReceiveWindowSizeLowerBound(0);
|
|
}
|
|
return state.updateReceiveWindowSize(deltaWindowSize);
|
|
}
|
|
|
|
int getReceiveWindowSizeLowerBound(int streamId) {
|
|
if (streamId == SPDY_SESSION_STREAM_ID) {
|
|
return 0;
|
|
}
|
|
|
|
StreamState state = activeStreams.get(streamId);
|
|
return state != null ? state.getReceiveWindowSizeLowerBound() : 0;
|
|
}
|
|
|
|
void updateAllSendWindowSizes(int deltaWindowSize) {
|
|
for (StreamState state: activeStreams.values()) {
|
|
state.updateSendWindowSize(deltaWindowSize);
|
|
}
|
|
}
|
|
|
|
void updateAllReceiveWindowSizes(int deltaWindowSize) {
|
|
for (StreamState state: activeStreams.values()) {
|
|
state.updateReceiveWindowSize(deltaWindowSize);
|
|
if (deltaWindowSize < 0) {
|
|
state.setReceiveWindowSizeLowerBound(deltaWindowSize);
|
|
}
|
|
}
|
|
}
|
|
|
|
boolean putPendingWrite(int streamId, PendingWrite pendingWrite) {
|
|
StreamState state = activeStreams.get(streamId);
|
|
return state != null && state.putPendingWrite(pendingWrite);
|
|
}
|
|
|
|
PendingWrite getPendingWrite(int streamId) {
|
|
if (streamId == SPDY_SESSION_STREAM_ID) {
|
|
for (Map.Entry<Integer, StreamState> e: activeStreams().entrySet()) {
|
|
StreamState state = e.getValue();
|
|
if (state.getSendWindowSize() > 0) {
|
|
PendingWrite pendingWrite = state.getPendingWrite();
|
|
if (pendingWrite != null) {
|
|
return pendingWrite;
|
|
}
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
StreamState state = activeStreams.get(streamId);
|
|
return state != null ? state.getPendingWrite() : null;
|
|
}
|
|
|
|
PendingWrite removePendingWrite(int streamId) {
|
|
StreamState state = activeStreams.get(streamId);
|
|
return state != null ? state.removePendingWrite() : null;
|
|
}
|
|
|
|
private static final class StreamState {
|
|
|
|
private final byte priority;
|
|
private boolean remoteSideClosed;
|
|
private boolean localSideClosed;
|
|
private boolean receivedReply;
|
|
private final AtomicInteger sendWindowSize;
|
|
private final AtomicInteger receiveWindowSize;
|
|
private int receiveWindowSizeLowerBound;
|
|
private final Queue<PendingWrite> pendingWriteQueue = new ConcurrentLinkedQueue<PendingWrite>();
|
|
|
|
StreamState(
|
|
byte priority, boolean remoteSideClosed, boolean localSideClosed,
|
|
int sendWindowSize, int receiveWindowSize) {
|
|
this.priority = priority;
|
|
this.remoteSideClosed = remoteSideClosed;
|
|
this.localSideClosed = localSideClosed;
|
|
this.sendWindowSize = new AtomicInteger(sendWindowSize);
|
|
this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
|
|
}
|
|
|
|
byte getPriority() {
|
|
return priority;
|
|
}
|
|
|
|
boolean isRemoteSideClosed() {
|
|
return remoteSideClosed;
|
|
}
|
|
|
|
void closeRemoteSide() {
|
|
remoteSideClosed = true;
|
|
}
|
|
|
|
boolean isLocalSideClosed() {
|
|
return localSideClosed;
|
|
}
|
|
|
|
void closeLocalSide() {
|
|
localSideClosed = true;
|
|
}
|
|
|
|
boolean hasReceivedReply() {
|
|
return receivedReply;
|
|
}
|
|
|
|
void receivedReply() {
|
|
receivedReply = true;
|
|
}
|
|
|
|
int getSendWindowSize() {
|
|
return sendWindowSize.get();
|
|
}
|
|
|
|
int updateSendWindowSize(int deltaWindowSize) {
|
|
return sendWindowSize.addAndGet(deltaWindowSize);
|
|
}
|
|
|
|
int updateReceiveWindowSize(int deltaWindowSize) {
|
|
return receiveWindowSize.addAndGet(deltaWindowSize);
|
|
}
|
|
|
|
int getReceiveWindowSizeLowerBound() {
|
|
return receiveWindowSizeLowerBound;
|
|
}
|
|
|
|
void setReceiveWindowSizeLowerBound(int receiveWindowSizeLowerBound) {
|
|
this.receiveWindowSizeLowerBound = receiveWindowSizeLowerBound;
|
|
}
|
|
|
|
boolean putPendingWrite(PendingWrite msg) {
|
|
return pendingWriteQueue.offer(msg);
|
|
}
|
|
|
|
PendingWrite getPendingWrite() {
|
|
return pendingWriteQueue.peek();
|
|
}
|
|
|
|
PendingWrite removePendingWrite() {
|
|
return pendingWriteQueue.poll();
|
|
}
|
|
|
|
void clearPendingWrites(Throwable cause) {
|
|
for (;;) {
|
|
PendingWrite pendingWrite = pendingWriteQueue.poll();
|
|
if (pendingWrite == null) {
|
|
break;
|
|
}
|
|
pendingWrite.fail(cause);
|
|
}
|
|
}
|
|
}
|
|
|
|
private final class StreamComparator implements Comparator<Integer> {
|
|
|
|
StreamComparator() { }
|
|
|
|
@Override
|
|
public int compare(Integer id1, Integer id2) {
|
|
StreamState state1 = activeStreams.get(id1);
|
|
StreamState state2 = activeStreams.get(id2);
|
|
|
|
int result = state1.getPriority() - state2.getPriority();
|
|
if (result != 0) {
|
|
return result;
|
|
}
|
|
|
|
return id1 - id2;
|
|
}
|
|
}
|
|
|
|
public static final class PendingWrite {
|
|
final SpdyDataFrame spdyDataFrame;
|
|
final ChannelPromise promise;
|
|
|
|
PendingWrite(SpdyDataFrame spdyDataFrame, ChannelPromise promise) {
|
|
this.spdyDataFrame = spdyDataFrame;
|
|
this.promise = promise;
|
|
}
|
|
|
|
void fail(Throwable cause) {
|
|
spdyDataFrame.release();
|
|
promise.setFailure(cause);
|
|
}
|
|
}
|
|
}
|