* Made sure ChannelFutures are notified when no lock is acquired in HttpTunnelingChannelHandler

* Reduced the visibility of some methods in HttpTunnelingChannelHandler
This commit is contained in:
Trustin Lee 2009-03-12 07:01:20 +00:00
parent 937ee06d9d
commit 5f1dd20d74

View File

@ -56,7 +56,7 @@ class HttpTunnelingChannelHandler extends SimpleChannelHandler {
private final Condition reconnectCondition = reconnectLock.newCondition();
private final long reconnectTimeout;
private final long reconnectTimeoutMillis;
private volatile boolean connected = false;
@ -68,10 +68,11 @@ class HttpTunnelingChannelHandler extends SimpleChannelHandler {
private final HttpSession session;
public HttpTunnelingChannelHandler(boolean stream, HttpSession session, long reconnectTimeout) {
public HttpTunnelingChannelHandler(
boolean stream, HttpSession session, long reconnectTimeoutMillis) {
this.stream = stream;
this.session = session;
this.reconnectTimeout = reconnectTimeout;
this.reconnectTimeoutMillis = reconnectTimeoutMillis;
}
@Override
@ -79,6 +80,8 @@ class HttpTunnelingChannelHandler extends SimpleChannelHandler {
ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
if (stream) {
boolean success = false;
Throwable cause = null;
byte[] b = null;
reconnectLock.lock();
try {
@ -90,30 +93,35 @@ class HttpTunnelingChannelHandler extends SimpleChannelHandler {
buffer.readBytes(b);
outputStream.write(b);
outputStream.flush();
e.getFuture().setSuccess();
}
catch (IOException e1) {
connected = false;
reconnectCondition.await(reconnectTimeout, TimeUnit.MILLISECONDS);
if (connected) {
outputStream.write(b);
outputStream.flush();
e.getFuture().setSuccess();
}
else {
e.getFuture().setFailure(e1);
success = true;
} catch (Throwable t) {
success = false;
cause = t;
if (awaitReconnect()) {
try {
outputStream.write(b);
outputStream.flush();
success = true;
} catch (Throwable t2) {
success = false;
cause = t2;
}
} else {
if (invalidated.compareAndSet(false, true)) {
session.invalidate();
}
e.getChannel().close();
}
}
finally {
} finally {
reconnectLock.unlock();
if (success) {
e.getFuture().setSuccess();
} else {
assert cause != null;
e.getFuture().setFailure(cause);
}
}
}
else {
} else {
awaitingEvents.add(e);
}
@ -134,14 +142,14 @@ class HttpTunnelingChannelHandler extends SimpleChannelHandler {
}
}
public synchronized List<MessageEvent> getAwaitingEvents() {
synchronized List<MessageEvent> getAwaitingEvents() {
List<MessageEvent> list = new ArrayList<MessageEvent>();
list.addAll(awaitingEvents);
awaitingEvents.clear();
return list;
}
public void setOutputStream(ServletOutputStream outputStream) {
void setOutputStream(ServletOutputStream outputStream) {
reconnectLock.lock();
try {
this.outputStream = outputStream;
@ -166,19 +174,15 @@ class HttpTunnelingChannelHandler extends SimpleChannelHandler {
}
}
public boolean isStreaming() {
boolean isStreaming() {
return stream;
}
public ServletOutputStream getOutputStream() {
return outputStream;
}
public boolean awaitReconnect() {
boolean awaitReconnect() {
reconnectLock.lock();
try {
connected = false;
reconnectCondition.await(reconnectTimeout, TimeUnit.MILLISECONDS);
reconnectCondition.await(reconnectTimeoutMillis, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
return connected;