diff --git a/src/main/java/com/dream/socket/DreamSocket.java b/src/main/java/com/dream/socket/DreamSocket.java index 95127f8..8d2c6d6 100644 --- a/src/main/java/com/dream/socket/DreamSocket.java +++ b/src/main/java/com/dream/socket/DreamSocket.java @@ -26,13 +26,13 @@ public final boolean start() { public final boolean stop() { if (!running) { - return false; + return true; } + running = false; onStop(); if (pool != null && !pool.isShutdown()) { pool.shutdownNow(); } - running = false; return true; } diff --git a/src/main/java/com/dream/socket/DreamUDPSocket.java b/src/main/java/com/dream/socket/DreamUDPSocket.java index 45dfe9c..a825568 100644 --- a/src/main/java/com/dream/socket/DreamUDPSocket.java +++ b/src/main/java/com/dream/socket/DreamUDPSocket.java @@ -5,24 +5,20 @@ import com.dream.socket.codec.MessageEncode; import com.dream.socket.codec.MessageHandle; import com.dream.socket.runnable.HandleRunnable; +import com.dream.socket.runnable.UDPSocketSendRunnable; import java.net.DatagramPacket; import java.net.DatagramSocket; +import java.net.InetSocketAddress; +import java.net.SocketAddress; public class DreamUDPSocket extends DreamSocket { - private int mPort; private DatagramSocket mSocket; private UDPSocketSendRunnable mSocketSendRunnable; - private DatagramPacket packet; private MessageDecode mMessageDecode; private HandleRunnable mHandleRunnable; - public DreamUDPSocket(int port) { - this.mPort = port; - packet = new DatagramPacket(new byte[500], 500); - } - public void codec(MessageDecode messageDecode, MessageHandle messageHandle, MessageEncode messageEncode) { this.mHandleRunnable = new HandleRunnable<>(messageHandle); messageDecode.setHandleRunnable(mHandleRunnable); @@ -34,7 +30,7 @@ public void codec(MessageDecode messageDecode, Messa public boolean onStart() { synchronized (this) { try { - mSocket = new DatagramSocket(mPort); + mSocket = new DatagramSocket(); } catch (Exception e) { e.printStackTrace(); return false; @@ -42,26 +38,30 @@ public boolean onStart() { mSocketSendRunnable.setDatagramSocket(mSocket); executeRunnable(mSocketSendRunnable); executeRunnable(mHandleRunnable); - while (isConnected()) { - try { + byte[] buffer = new byte[500]; + DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + try { + while (isRunning()) { + packet.setData(buffer, 0, buffer.length); mSocket.receive(packet); mMessageDecode.put(packet.getSocketAddress(), packet.getData(), packet.getOffset(), packet.getLength()); - } catch (Exception e) { - e.printStackTrace(); - } finally { - try { - mSocket = null; - if (isRunning()) { - this.mHandleRunnable.status(Status.STATUS_FAIL); - System.out.println("6秒后尝试重连"); - this.wait(6000); - System.out.println("开始重连"); - } - } catch (Exception ie) { - System.out.println("重连发生异常"); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + try { + mSocket = null; + if (isRunning()) { + this.mHandleRunnable.status(Status.STATUS_FAIL); + System.out.println("6秒后尝试重连"); + this.wait(6000); + System.out.println("开始重连"); } + } catch (Exception ie) { + System.out.println("重连发生异常"); } } + } return false; } @@ -90,7 +90,13 @@ public boolean isConnected() { return false; } - public boolean send(String host, int port, Object data) { + public boolean send(String host, int port, DataProtocol data) { + if (mSocketSendRunnable != null) { + SocketAddress address = new InetSocketAddress(host, port); + data.mAddress = address; + mSocketSendRunnable.send(data); + return true; + } return false; } } diff --git a/src/main/java/com/dream/socket/UDPSocketSendRunnable.java b/src/main/java/com/dream/socket/runnable/UDPSocketSendRunnable.java similarity index 71% rename from src/main/java/com/dream/socket/UDPSocketSendRunnable.java rename to src/main/java/com/dream/socket/runnable/UDPSocketSendRunnable.java index 675bef3..27f0796 100644 --- a/src/main/java/com/dream/socket/UDPSocketSendRunnable.java +++ b/src/main/java/com/dream/socket/runnable/UDPSocketSendRunnable.java @@ -1,8 +1,7 @@ -package com.dream.socket; +package com.dream.socket.runnable; import com.dream.socket.codec.DataProtocol; import com.dream.socket.codec.MessageEncode; -import com.dream.socket.runnable.SendRunnable; import java.io.IOException; import java.net.DatagramPacket; @@ -12,7 +11,7 @@ public class UDPSocketSendRunnable extends SendRunnable { private DatagramSocket mSocket; - + private DatagramPacket packet; public UDPSocketSendRunnable(MessageEncode encode) { super(encode); } @@ -25,7 +24,12 @@ public void setDatagramSocket(DatagramSocket socket) { protected boolean doSend(SocketAddress address, byte[] buffer, int offset, int length) { if (mSocket != null) { try { - mSocket.send(new DatagramPacket(buffer, offset, length, address)); + if(packet == null){ + packet = new DatagramPacket(buffer, buffer.length); + } + packet.setData(buffer, offset, length); + packet.setSocketAddress(address); + mSocket.send(packet); return true; } catch (IOException e) { e.printStackTrace(); diff --git a/src/test/java/com/dream/socket/Client.java b/src/test/java/com/dream/socket/Client.java index 9267105..43819ca 100644 --- a/src/test/java/com/dream/socket/Client.java +++ b/src/test/java/com/dream/socket/Client.java @@ -20,21 +20,22 @@ public StringProtocol(byte[] array) { string = new String(array); } + public StringProtocol(byte[] array, int i, int limit) { + string = new String(array, i, limit); + } + public String getString() { return string; } } public static void main(String[] args) { - DreamUDPSocket socket = new DreamUDPSocket(6969); + DreamUDPSocket socket = new DreamUDPSocket(); socket.codec(new MessageDecode() { @Override protected StringProtocol decode(SocketAddress address, ByteBuffer buffer) { - int len = buffer.getInt(); - if(len > buffer.remaining()){ - return null; - } - byte[] array = new byte[len]; + int limit = buffer.limit(); + byte[] array = new byte[limit]; buffer.get(array); return new StringProtocol(array); } @@ -55,11 +56,14 @@ public void encode(StringProtocol data, ByteBuffer buffer) { } }); socket.start(); -// try { -// Thread.sleep(1000); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + for(int i=1; i<=10; i++){ + socket.send("localhost", 6969, new StringProtocol(("message -> " + i).getBytes())); + } // StringBuilder writer = new StringBuilder(); // writer.append("GET / HTTP/1.1\r\n"); // writer.append("Host: www.oschina.net\r\n"); @@ -68,12 +72,12 @@ public void encode(StringProtocol data, ByteBuffer buffer) { // writer.append("\r\n"); // socket.send(writer.toString()); // -// try { -// Thread.sleep(5000); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } -// socket.stop(); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + socket.stop(); } // // public void setSocket(DreamTCPSocket socket) { diff --git a/src/test/java/com/dream/socket/UDPServer.java b/src/test/java/com/dream/socket/UDPServer.java new file mode 100644 index 0000000..e31c9bd --- /dev/null +++ b/src/test/java/com/dream/socket/UDPServer.java @@ -0,0 +1,18 @@ +package com.dream.socket; + +import java.net.DatagramPacket; +import java.net.DatagramSocket; + +public class UDPServer { + + public static void main(String[] args) throws Exception { + DatagramSocket socket = new DatagramSocket(6969); + DatagramPacket packet = new DatagramPacket(new byte[500], 500); + while (true){ + socket.receive(packet); + System.out.println(new String(packet.getData(), 0, packet.getLength())); + socket.send(packet); + } + } + +}