Fixed web service client. Added more package documentation.

This commit is contained in:
Veebs 2011-10-17 16:49:19 +11:00
parent c60b1e28cb
commit 12d93cca29
3 changed files with 30 additions and 73 deletions

View File

@ -16,31 +16,19 @@
package org.jboss.netty.example.http.websocketx.client;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.concurrent.Executors;
import java.util.logging.ConsoleHandler;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.example.http.websocketx.server.WebSocketServerPipelineFactory;
import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketSpecificationVersion;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
/**
* A HTTP client demo app
@ -51,41 +39,14 @@ import org.jboss.netty.logging.InternalLoggerFactory;
*/
public class App {
private static final ChannelGroup allChannels = new DefaultChannelGroup("App");
private static ChannelFactory channelFactory = null;
public static void main(String[] args) throws Exception {
ConsoleHandler ch = new ConsoleHandler();
ch.setLevel(Level.FINE);
Logger.getLogger("").addHandler(ch);
Logger.getLogger("").setLevel(Level.FINE);
startServer();
runClient();
stopServer();
}
/**
* Starts our web socket server
*/
public static void startServer() {
// Configure the server.
channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
ServerBootstrap bootstrap = new ServerBootstrap(channelFactory);
// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new WebSocketServerPipelineFactory());
// Bind and start to accept incoming connections.
Channel channel = bootstrap.bind(new InetSocketAddress(8080));
allChannels.add(channel);
System.out
.println("Web Socket Server started on 8080. Open your browser and navigate to http://localhost:8080/");
System.exit(0);
}
/**
@ -104,44 +65,35 @@ public class App {
WebSocketSpecificationVersion.V10, callbackHandler);
// Connect
client.connect().awaitUninterruptibly();
System.out.println("WebSocket Client connecting");
client.connect().awaitUninterruptibly();
Thread.sleep(200);
// Send 10 messages and wait for responses
System.out.println("WebSocket Client sending message");
for (int i = 0; i < 10; i++) {
client.send(new TextWebSocketFrame("Message #" + i));
}
Thread.sleep(1000);
// Close - this throws ClosedChannelException. Not sure why. Just as
// easy to just disconnect.
// Ping
System.out.println("WebSocket Client sending ping");
client.send(new PingWebSocketFrame(ChannelBuffers.copiedBuffer(new byte[] { 1, 2, 3, 4, 5, 6 })));
Thread.sleep(1000);
// Close
System.out.println("WebSocket Client sending close");
client.send(new CloseWebSocketFrame());
Thread.sleep(200);
Thread.sleep(1000);
// Disconnect
client.disconnect();
}
/**
* Stops the server
*/
public static void stopServer() {
ChannelGroupFuture future = allChannels.close();
future.awaitUninterruptibly();
channelFactory.releaseExternalResources();
channelFactory = null;
}
/**
* Our web socket callback handler for this app
*/
public static class MyCallbackHandler implements WebSocketCallback {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MyCallbackHandler.class);
public boolean connected = false;
public ArrayList<String> messagesReceived = new ArrayList<String>();
@ -151,13 +103,13 @@ public class App {
@Override
public void onConnect(WebSocketClient client) {
logger.debug("WebSocket Client connected!");
System.out.println("WebSocket Client connected!");
connected = true;
}
@Override
public void onDisconnect(WebSocketClient client) {
logger.debug("WebSocket Client disconnected!");
System.out.println("WebSocket Client disconnected!");
connected = false;
}
@ -165,18 +117,18 @@ public class App {
public void onMessage(WebSocketClient client, WebSocketFrame frame) {
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
logger.debug("WebSocket Client Received Message:" + textFrame.getText());
System.out.println("WebSocket Client received message:" + textFrame.getText());
messagesReceived.add(textFrame.getText());
} else if (frame instanceof PongWebSocketFrame) {
logger.debug("WebSocket Client ping/pong");
System.out.println("WebSocket Client received pong");
} else if (frame instanceof CloseWebSocketFrame) {
logger.debug("WebSocket Client closing");
System.out.println("WebSocket Client received closing");
}
}
@Override
public void onError(Throwable t) {
logger.error("WebSocket Client error", t);
System.out.println("WebSocket Client error " + t.toString());
}
}

View File

@ -206,10 +206,12 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
framePayloadLength = framePayloadLen1;
}
logger.debug("Frame length =" + framePayloadLength);
//logger.debug("Frame length=" + framePayloadLength);
checkpoint(State.MASKING_KEY);
case MASKING_KEY:
maskingKey = buffer.readBytes(4);
if (this.maskedPayload){
maskingKey = buffer.readBytes(4);
}
checkpoint(State.PAYLOAD);
case PAYLOAD:
// Some times, the payload may not be delivered in 1 nice packet
@ -218,6 +220,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
ChannelBuffer payloadBuffer = null;
int willHaveReadByteCount = framePayloadBytesRead + rbytes;
//logger.debug("Frame rbytes=" + rbytes + " willHaveReadByteCount=" + willHaveReadByteCount + " framePayloadLength=" + framePayloadLength);
if (willHaveReadByteCount == framePayloadLength) {
// We have all our content so proceed to process
payloadBuffer = buffer.readBytes(rbytes);
@ -230,7 +233,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
}
framePayload.writeBytes(payloadBuffer);
framePayloadBytesRead = framePayloadBytesRead + rbytes;
// Return null to wait for more bytes to arrive
return null;
} else if (willHaveReadByteCount > framePayloadLength) {
@ -241,7 +244,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
// Now we have all the data, the next checkpoint must be the next
// frame
checkpoint(State.FRAME_START);
checkpoint(State.FRAME_START);
// Take the data that we have in this packet
if (framePayload == null) {
@ -249,7 +252,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
} else {
framePayload.writeBytes(payloadBuffer);
}
// Unmask data if needed
if (this.maskedPayload) {
unmask(framePayload);

View File

@ -93,6 +93,9 @@ public class WebSocket08FrameEncoder extends OneToOneEncoder {
if (msg instanceof WebSocketFrame) {
WebSocketFrame frame = (WebSocketFrame) msg;
ChannelBuffer data = frame.getBinaryData();
if (data == null) {
data = ChannelBuffers.EMPTY_BUFFER;
}
byte opcode;
if (frame instanceof TextWebSocketFrame) {
@ -149,7 +152,7 @@ public class WebSocket08FrameEncoder extends OneToOneEncoder {
header.writeLong(length);
}
// Write payload
// Write payload
if (this.maskPayload) {
Integer random = (int) (Math.random() * Integer.MAX_VALUE);
mask = ByteBuffer.allocate(4).putInt(random).array();
@ -164,8 +167,7 @@ public class WebSocket08FrameEncoder extends OneToOneEncoder {
} else {
body = data;
}
return ChannelBuffers.wrappedBuffer(header, body);
return ChannelBuffers.wrappedBuffer(header, body);
}
// If not websocket, then just return the message