Fixed a bug where Channel.write(emptyBuffer) does not complete when SslHandler is in the pipeline
This commit is contained in:
parent
bfaa647bba
commit
43d2fb47bd
@ -343,8 +343,13 @@ public class SslHandler extends FrameDecoder implements ChannelDownstreamHandler
|
||||
|
||||
// Otherwise, all messages are encrypted.
|
||||
ChannelBuffer msg = (ChannelBuffer) e.getMessage();
|
||||
PendingWrite pendingWrite =
|
||||
new PendingWrite(evt.getFuture(), msg.toByteBuffer(msg.readerIndex(), msg.readableBytes()));
|
||||
PendingWrite pendingWrite;
|
||||
|
||||
if (msg.readable()) {
|
||||
pendingWrite = new PendingWrite(evt.getFuture(), msg.toByteBuffer(msg.readerIndex(), msg.readableBytes()));
|
||||
} else {
|
||||
pendingWrite = new PendingWrite(evt.getFuture(), null);
|
||||
}
|
||||
synchronized (pendingUnencryptedWrites) {
|
||||
boolean offered = pendingUnencryptedWrites.offer(pendingWrite);
|
||||
assert offered;
|
||||
@ -472,69 +477,74 @@ public class SslHandler extends FrameDecoder implements ChannelDownstreamHandler
|
||||
}
|
||||
|
||||
ByteBuffer outAppBuf = pendingWrite.outAppBuf;
|
||||
SSLEngineResult result = null;
|
||||
try {
|
||||
synchronized (handshakeLock) {
|
||||
result = engine.wrap(outAppBuf, outNetBuf);
|
||||
}
|
||||
} finally {
|
||||
if (!outAppBuf.hasRemaining()) {
|
||||
pendingUnencryptedWrites.remove();
|
||||
}
|
||||
}
|
||||
|
||||
if (result.bytesProduced() > 0) {
|
||||
outNetBuf.flip();
|
||||
msg = ChannelBuffers.buffer(outNetBuf.remaining());
|
||||
msg.writeBytes(outNetBuf.array(), 0, msg.capacity());
|
||||
outNetBuf.clear();
|
||||
|
||||
if (pendingWrite.outAppBuf.hasRemaining()) {
|
||||
// pendingWrite's future shouldn't be notified if
|
||||
// only partial data is written.
|
||||
future = succeededFuture(channel);
|
||||
} else {
|
||||
future = pendingWrite.future;
|
||||
}
|
||||
|
||||
MessageEvent encryptedWrite = new DownstreamMessageEvent(
|
||||
channel, future, msg, channel.getRemoteAddress());
|
||||
if (Thread.holdsLock(pendingEncryptedWrites)) {
|
||||
offered = pendingEncryptedWrites.offer(encryptedWrite);
|
||||
} else {
|
||||
synchronized (pendingEncryptedWrites) {
|
||||
offered = pendingEncryptedWrites.offer(encryptedWrite);
|
||||
}
|
||||
}
|
||||
assert offered;
|
||||
if (outAppBuf == null) {
|
||||
// A write request with an empty buffer
|
||||
pendingUnencryptedWrites.remove();
|
||||
offerEncryptedWriteRequest(
|
||||
new DownstreamMessageEvent(
|
||||
channel, pendingWrite.future,
|
||||
ChannelBuffers.EMPTY_BUFFER,
|
||||
channel.getRemoteAddress()));
|
||||
offered = true;
|
||||
} else {
|
||||
HandshakeStatus handshakeStatus = result.getHandshakeStatus();
|
||||
switch (handshakeStatus) {
|
||||
case NEED_WRAP:
|
||||
if (outAppBuf.hasRemaining()) {
|
||||
break;
|
||||
SSLEngineResult result = null;
|
||||
try {
|
||||
synchronized (handshakeLock) {
|
||||
result = engine.wrap(outAppBuf, outNetBuf);
|
||||
}
|
||||
} finally {
|
||||
if (!outAppBuf.hasRemaining()) {
|
||||
pendingUnencryptedWrites.remove();
|
||||
}
|
||||
}
|
||||
|
||||
if (result.bytesProduced() > 0) {
|
||||
outNetBuf.flip();
|
||||
msg = ChannelBuffers.buffer(outNetBuf.remaining());
|
||||
msg.writeBytes(outNetBuf.array(), 0, msg.capacity());
|
||||
outNetBuf.clear();
|
||||
|
||||
if (pendingWrite.outAppBuf.hasRemaining()) {
|
||||
// pendingWrite's future shouldn't be notified if
|
||||
// only partial data is written.
|
||||
future = succeededFuture(channel);
|
||||
} else {
|
||||
future = pendingWrite.future;
|
||||
}
|
||||
|
||||
MessageEvent encryptedWrite = new DownstreamMessageEvent(
|
||||
channel, future, msg, channel.getRemoteAddress());
|
||||
offerEncryptedWriteRequest(encryptedWrite);
|
||||
offered = true;
|
||||
} else {
|
||||
HandshakeStatus handshakeStatus = result.getHandshakeStatus();
|
||||
switch (handshakeStatus) {
|
||||
case NEED_WRAP:
|
||||
if (outAppBuf.hasRemaining()) {
|
||||
break;
|
||||
} else {
|
||||
break loop;
|
||||
}
|
||||
case NEED_UNWRAP:
|
||||
needsUnwrap = true;
|
||||
break loop;
|
||||
case NEED_TASK:
|
||||
runDelegatedTasks();
|
||||
break;
|
||||
case FINISHED:
|
||||
case NOT_HANDSHAKING:
|
||||
if (handshakeStatus == HandshakeStatus.FINISHED) {
|
||||
setHandshakeSuccess(channel);
|
||||
}
|
||||
if (result.getStatus() == Status.CLOSED) {
|
||||
success = false;
|
||||
}
|
||||
break loop;
|
||||
default:
|
||||
throw new IllegalStateException(
|
||||
"Unknown handshake status: " +
|
||||
result.getHandshakeStatus());
|
||||
}
|
||||
case NEED_UNWRAP:
|
||||
needsUnwrap = true;
|
||||
break loop;
|
||||
case NEED_TASK:
|
||||
runDelegatedTasks();
|
||||
break;
|
||||
case FINISHED:
|
||||
case NOT_HANDSHAKING:
|
||||
if (handshakeStatus == HandshakeStatus.FINISHED) {
|
||||
setHandshakeSuccess(channel);
|
||||
}
|
||||
if (result.getStatus() == Status.CLOSED) {
|
||||
success = false;
|
||||
}
|
||||
break loop;
|
||||
default:
|
||||
throw new IllegalStateException(
|
||||
"Unknown handshake status: " +
|
||||
result.getHandshakeStatus());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -581,6 +591,18 @@ public class SslHandler extends FrameDecoder implements ChannelDownstreamHandler
|
||||
return future;
|
||||
}
|
||||
|
||||
private void offerEncryptedWriteRequest(MessageEvent encryptedWrite) {
|
||||
boolean offered;
|
||||
if (Thread.holdsLock(pendingEncryptedWrites)) {
|
||||
offered = pendingEncryptedWrites.offer(encryptedWrite);
|
||||
} else {
|
||||
synchronized (pendingEncryptedWrites) {
|
||||
offered = pendingEncryptedWrites.offer(encryptedWrite);
|
||||
}
|
||||
}
|
||||
assert offered;
|
||||
}
|
||||
|
||||
private void flushPendingEncryptedWrites(ChannelHandlerContext ctx) {
|
||||
// Avoid possible dead lock and data integrity issue
|
||||
// which is caused by cross communication between more than one channel
|
||||
|
Loading…
Reference in New Issue
Block a user