Skip to content

Commit

Permalink
增加UDP支持
Browse files Browse the repository at this point in the history
  • Loading branch information
Sopage committed Nov 23, 2017
1 parent 84b7e47 commit 07bd6bc
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 47 deletions.
4 changes: 2 additions & 2 deletions src/main/java/com/dream/socket/DreamSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ public final boolean start() {

public final boolean stop() {
if (!running) {
return false;
return true;
}
running = false;
onStop();
if (pool != null && !pool.isShutdown()) {
pool.shutdownNow();
}
running = false;
return true;
}

Expand Down
54 changes: 30 additions & 24 deletions src/main/java/com/dream/socket/DreamUDPSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,20 @@
import com.dream.socket.codec.MessageEncode;
import com.dream.socket.codec.MessageHandle;
import com.dream.socket.runnable.HandleRunnable;
import com.dream.socket.runnable.UDPSocketSendRunnable;

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;

public class DreamUDPSocket extends DreamSocket {

private int mPort;
private DatagramSocket mSocket;
private UDPSocketSendRunnable mSocketSendRunnable;
private DatagramPacket packet;
private MessageDecode mMessageDecode;
private HandleRunnable mHandleRunnable;

public DreamUDPSocket(int port) {
this.mPort = port;
packet = new DatagramPacket(new byte[500], 500);
}

public <T extends DataProtocol> void codec(MessageDecode<T> messageDecode, MessageHandle<T> messageHandle, MessageEncode<T> messageEncode) {
this.mHandleRunnable = new HandleRunnable<>(messageHandle);
messageDecode.setHandleRunnable(mHandleRunnable);
Expand All @@ -34,34 +30,38 @@ public <T extends DataProtocol> void codec(MessageDecode<T> messageDecode, Messa
public boolean onStart() {
synchronized (this) {
try {
mSocket = new DatagramSocket(mPort);
mSocket = new DatagramSocket();
} catch (Exception e) {
e.printStackTrace();
return false;
}
mSocketSendRunnable.setDatagramSocket(mSocket);
executeRunnable(mSocketSendRunnable);
executeRunnable(mHandleRunnable);
while (isConnected()) {
try {
byte[] buffer = new byte[500];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
try {
while (isRunning()) {
packet.setData(buffer, 0, buffer.length);
mSocket.receive(packet);
mMessageDecode.put(packet.getSocketAddress(), packet.getData(), packet.getOffset(), packet.getLength());
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
mSocket = null;
if (isRunning()) {
this.mHandleRunnable.status(Status.STATUS_FAIL);
System.out.println("6秒后尝试重连");
this.wait(6000);
System.out.println("开始重连");
}
} catch (Exception ie) {
System.out.println("重连发生异常");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
mSocket = null;
if (isRunning()) {
this.mHandleRunnable.status(Status.STATUS_FAIL);
System.out.println("6秒后尝试重连");
this.wait(6000);
System.out.println("开始重连");
}
} catch (Exception ie) {
System.out.println("重连发生异常");
}
}

}
return false;
}
Expand Down Expand Up @@ -90,7 +90,13 @@ public boolean isConnected() {
return false;
}

public boolean send(String host, int port, Object data) {
public boolean send(String host, int port, DataProtocol data) {
if (mSocketSendRunnable != null) {
SocketAddress address = new InetSocketAddress(host, port);
data.mAddress = address;
mSocketSendRunnable.send(data);
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package com.dream.socket;
package com.dream.socket.runnable;

import com.dream.socket.codec.DataProtocol;
import com.dream.socket.codec.MessageEncode;
import com.dream.socket.runnable.SendRunnable;

import java.io.IOException;
import java.net.DatagramPacket;
Expand All @@ -12,7 +11,7 @@
public class UDPSocketSendRunnable<T extends DataProtocol> extends SendRunnable<T> {

private DatagramSocket mSocket;

private DatagramPacket packet;
public UDPSocketSendRunnable(MessageEncode<T> encode) {
super(encode);
}
Expand All @@ -25,7 +24,12 @@ public void setDatagramSocket(DatagramSocket socket) {
protected boolean doSend(SocketAddress address, byte[] buffer, int offset, int length) {
if (mSocket != null) {
try {
mSocket.send(new DatagramPacket(buffer, offset, length, address));
if(packet == null){
packet = new DatagramPacket(buffer, buffer.length);
}
packet.setData(buffer, offset, length);
packet.setSocketAddress(address);
mSocket.send(packet);
return true;
} catch (IOException e) {
e.printStackTrace();
Expand Down
38 changes: 21 additions & 17 deletions src/test/java/com/dream/socket/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,22 @@ public StringProtocol(byte[] array) {
string = new String(array);
}

public StringProtocol(byte[] array, int i, int limit) {
string = new String(array, i, limit);
}

public String getString() {
return string;
}
}

public static void main(String[] args) {
DreamUDPSocket socket = new DreamUDPSocket(6969);
DreamUDPSocket socket = new DreamUDPSocket();
socket.codec(new MessageDecode<StringProtocol>() {
@Override
protected StringProtocol decode(SocketAddress address, ByteBuffer buffer) {
int len = buffer.getInt();
if(len > buffer.remaining()){
return null;
}
byte[] array = new byte[len];
int limit = buffer.limit();
byte[] array = new byte[limit];
buffer.get(array);
return new StringProtocol(array);
}
Expand All @@ -55,11 +56,14 @@ public void encode(StringProtocol data, ByteBuffer buffer) {
}
});
socket.start();
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for(int i=1; i<=10; i++){
socket.send("localhost", 6969, new StringProtocol(("message -> " + i).getBytes()));
}
// StringBuilder writer = new StringBuilder();
// writer.append("GET / HTTP/1.1\r\n");
// writer.append("Host: www.oschina.net\r\n");
Expand All @@ -68,12 +72,12 @@ public void encode(StringProtocol data, ByteBuffer buffer) {
// writer.append("\r\n");
// socket.send(writer.toString());
//
// try {
// Thread.sleep(5000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// socket.stop();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
socket.stop();
}
//
// public void setSocket(DreamTCPSocket socket) {
Expand Down
18 changes: 18 additions & 0 deletions src/test/java/com/dream/socket/UDPServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.dream.socket;

import java.net.DatagramPacket;
import java.net.DatagramSocket;

public class UDPServer {

public static void main(String[] args) throws Exception {
DatagramSocket socket = new DatagramSocket(6969);
DatagramPacket packet = new DatagramPacket(new byte[500], 500);
while (true){
socket.receive(packet);
System.out.println(new String(packet.getData(), 0, packet.getLength()));
socket.send(packet);
}
}

}

0 comments on commit 07bd6bc

Please sign in to comment.