Skip to content

Commit

Permalink
增加UDP支持
Browse files Browse the repository at this point in the history
  • Loading branch information
Sopage committed Nov 23, 2017
1 parent 31ff91f commit 84b7e47
Show file tree
Hide file tree
Showing 14 changed files with 239 additions and 53 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
apply plugin: 'java'
apply plugin: 'maven'
apply from: 'gradle-mvn-push.gradle'
//apply from: 'gradle-mvn-push.gradle'

repositories {
mavenCentral()
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/dream/socket/DreamSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public final boolean stop() {
return true;
}

public abstract boolean onStart();
protected abstract boolean onStart();

public abstract boolean onStop();
protected abstract boolean onStop();

public abstract boolean isConnected();

Expand Down
26 changes: 15 additions & 11 deletions src/main/java/com/dream/socket/DreamTCPSocket.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
package com.dream.socket;

import com.dream.socket.codec.DataProtocol;
import com.dream.socket.codec.MessageDecode;
import com.dream.socket.codec.MessageEncode;
import com.dream.socket.codec.MessageHandle;
import com.dream.socket.runnable.HandleRunnable;
import com.dream.socket.runnable.SocketSendRunnable;
import com.dream.socket.runnable.TCPSocketSendRunnable;

import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;

public class DreamTCPSocket extends DreamSocket {

private static final int SOCKET_CONNECT_TIMEOUT = 10000;
private InetSocketAddress mAddress;
private SocketAddress mAddress;
private String mHost;
private int mPort;
private Socket mSocket;
private SocketSendRunnable mSocketSendRunnable;
private TCPSocketSendRunnable mSocketSendRunnable;
private MessageDecode mMessageDecode;
private HandleRunnable mHandleRunnable;

Expand All @@ -27,15 +29,15 @@ public DreamTCPSocket(String host, int port) {
this.mPort = port;
}

public <T> void codec(MessageDecode<T> messageDecode, MessageHandle<T> messageHandle, MessageEncode<T> messageEncode) {
this.mHandleRunnable = new HandleRunnable<T>(messageHandle);
public <T extends DataProtocol> void codec(MessageDecode<T> messageDecode, MessageHandle<T> messageHandle, MessageEncode<T> messageEncode) {
this.mHandleRunnable = new HandleRunnable<>(messageHandle);
messageDecode.setHandleRunnable(mHandleRunnable);
this.mMessageDecode = messageDecode;
this.mSocketSendRunnable = new SocketSendRunnable<T>(messageEncode);
this.mSocketSendRunnable = new TCPSocketSendRunnable<>(messageEncode);
}

@Override
public boolean onStart() {
protected boolean onStart() {
synchronized (this) {
while (isRunning()) {
try {
Expand All @@ -53,7 +55,7 @@ public boolean onStart() {
InputStream in = mSocket.getInputStream();
int length;
while ((length = in.read(bytes)) > 0) {
this.mMessageDecode.put(bytes, 0, length);
this.mMessageDecode.put(mAddress, bytes, 0, length);
}
}
} catch (Exception e) {
Expand All @@ -76,14 +78,16 @@ public boolean onStart() {
return false;
}

public void send(Object data) {
public <T extends DataProtocol> boolean send(T data) {
if (mSocketSendRunnable != null) {
mSocketSendRunnable.send(data);
data.mAddress = mAddress;
return mSocketSendRunnable.send(data);
}
return false;
}

@Override
public boolean onStop() {
protected boolean onStop() {
if (mSocketSendRunnable != null) {
mSocketSendRunnable.stop();
}
Expand Down
96 changes: 96 additions & 0 deletions src/main/java/com/dream/socket/DreamUDPSocket.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package com.dream.socket;

import com.dream.socket.codec.DataProtocol;
import com.dream.socket.codec.MessageDecode;
import com.dream.socket.codec.MessageEncode;
import com.dream.socket.codec.MessageHandle;
import com.dream.socket.runnable.HandleRunnable;

import java.net.DatagramPacket;
import java.net.DatagramSocket;

public class DreamUDPSocket extends DreamSocket {

private int mPort;
private DatagramSocket mSocket;
private UDPSocketSendRunnable mSocketSendRunnable;
private DatagramPacket packet;
private MessageDecode mMessageDecode;
private HandleRunnable mHandleRunnable;

public DreamUDPSocket(int port) {
this.mPort = port;
packet = new DatagramPacket(new byte[500], 500);
}

public <T extends DataProtocol> void codec(MessageDecode<T> messageDecode, MessageHandle<T> messageHandle, MessageEncode<T> messageEncode) {
this.mHandleRunnable = new HandleRunnable<>(messageHandle);
messageDecode.setHandleRunnable(mHandleRunnable);
this.mMessageDecode = messageDecode;
this.mSocketSendRunnable = new UDPSocketSendRunnable<>(messageEncode);
}

@Override
public boolean onStart() {
synchronized (this) {
try {
mSocket = new DatagramSocket(mPort);
} catch (Exception e) {
e.printStackTrace();
return false;
}
mSocketSendRunnable.setDatagramSocket(mSocket);
executeRunnable(mSocketSendRunnable);
executeRunnable(mHandleRunnable);
while (isConnected()) {
try {
mSocket.receive(packet);
mMessageDecode.put(packet.getSocketAddress(), packet.getData(), packet.getOffset(), packet.getLength());
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
mSocket = null;
if (isRunning()) {
this.mHandleRunnable.status(Status.STATUS_FAIL);
System.out.println("6秒后尝试重连");
this.wait(6000);
System.out.println("开始重连");
}
} catch (Exception ie) {
System.out.println("重连发生异常");
}
}
}
}
return false;
}

@Override
public boolean onStop() {
if (mSocketSendRunnable != null) {
mSocketSendRunnable.stop();
}
if (this.mHandleRunnable != null) {
this.mHandleRunnable.stop();
}
if (mSocket != null) {
mSocket.close();
mSocket = null;
mHandleRunnable.status(Status.STATUS_DISCONNECT);
}
return true;
}

@Override
public boolean isConnected() {
if (mSocket != null && !mSocket.isClosed() && mSocket.isConnected() && isRunning()) {
return true;
}
return false;
}

public boolean send(String host, int port, Object data) {
return false;
}
}
36 changes: 36 additions & 0 deletions src/main/java/com/dream/socket/UDPSocketSendRunnable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.dream.socket;

import com.dream.socket.codec.DataProtocol;
import com.dream.socket.codec.MessageEncode;
import com.dream.socket.runnable.SendRunnable;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketAddress;

public class UDPSocketSendRunnable<T extends DataProtocol> extends SendRunnable<T> {

private DatagramSocket mSocket;

public UDPSocketSendRunnable(MessageEncode<T> encode) {
super(encode);
}

public void setDatagramSocket(DatagramSocket socket) {
this.mSocket = socket;
}

@Override
protected boolean doSend(SocketAddress address, byte[] buffer, int offset, int length) {
if (mSocket != null) {
try {
mSocket.send(new DatagramPacket(buffer, offset, length, address));
return true;
} catch (IOException e) {
e.printStackTrace();
}
}
return false;
}
}
7 changes: 7 additions & 0 deletions src/main/java/com/dream/socket/codec/DataProtocol.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.dream.socket.codec;

import java.net.SocketAddress;

public abstract class DataProtocol {
public SocketAddress mAddress;
}
42 changes: 24 additions & 18 deletions src/main/java/com/dream/socket/codec/MessageDecode.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,49 @@

import com.dream.socket.runnable.HandleRunnable;

import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

public abstract class MessageDecode<T> {
public abstract class MessageDecode<T extends DataProtocol> {

private static int CACHE_BUFFER_LENGTH = 102400;
private final ByteBuffer mBuffer = ByteBuffer.allocate(CACHE_BUFFER_LENGTH);
private HandleRunnable<T> mHandleRunnable;

public MessageDecode() {
mBuffer.flip();
}
private Map<SocketAddress, ByteBuffer> mAddressByteBufferMap = new HashMap<>();

public void setHandleRunnable(HandleRunnable<T> handleRunnable) {
this.mHandleRunnable = handleRunnable;
}

public synchronized void put(byte[] array, int offset, int length) {
if (mBuffer.limit() + length > mBuffer.capacity()) {
public synchronized void put(SocketAddress address, byte[] array, int offset, int length) {
ByteBuffer buffer = mAddressByteBufferMap.get(address);
if (buffer == null) {
buffer = ByteBuffer.allocate(CACHE_BUFFER_LENGTH);
mAddressByteBufferMap.put(address, buffer);
buffer.flip();
}
if (buffer.limit() + length > buffer.capacity()) {
//TODO 缓存区已满,丢弃读取的数据
return;
}

mBuffer.compact();
mBuffer.put(array, offset, length);
mBuffer.flip();
mBuffer.mark();
buffer.compact();
buffer.put(array, offset, length);
buffer.flip();
buffer.mark();

T data;
while (mBuffer.hasRemaining() && ((data = decode(mBuffer)) != null)) {
while (buffer.hasRemaining() && ((data = decode(address, buffer)) != null)) {
data.mAddress = address;
mHandleRunnable.put(data);
mBuffer.compact();
mBuffer.flip();
mBuffer.mark();
buffer.compact();
buffer.flip();
buffer.mark();
}
mBuffer.reset();
buffer.reset();
}

protected abstract T decode(ByteBuffer buffer);
protected abstract T decode(SocketAddress address, ByteBuffer buffer);

}
2 changes: 1 addition & 1 deletion src/main/java/com/dream/socket/codec/MessageEncode.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import java.nio.ByteBuffer;

public abstract class MessageEncode<T> {
public abstract class MessageEncode<T extends DataProtocol> {


/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/dream/socket/codec/MessageHandle.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.dream.socket.codec;

public abstract class MessageHandle<T> {
public abstract class MessageHandle<T extends DataProtocol> {

/**
* 连接状态回调
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/dream/socket/runnable/HandleRunnable.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package com.dream.socket.runnable;

import com.dream.socket.codec.DataProtocol;
import com.dream.socket.codec.MessageHandle;

import java.util.concurrent.LinkedBlockingQueue;

public class HandleRunnable<T> implements Runnable {
public class HandleRunnable<T extends DataProtocol> implements Runnable {

private LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();
private MessageHandle<T> handle;
Expand Down
14 changes: 9 additions & 5 deletions src/main/java/com/dream/socket/runnable/SendRunnable.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.dream.socket.runnable;

import com.dream.socket.codec.DataProtocol;
import com.dream.socket.codec.MessageEncode;

import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingQueue;

public abstract class SendRunnable<T> implements Runnable {
public abstract class SendRunnable<T extends DataProtocol> implements Runnable {

private boolean sending;
private MessageEncode<T> encode;
Expand All @@ -31,7 +33,7 @@ public void run() {
buffer.clear();
encode.encode(data, buffer);
buffer.flip();
if (!doSend(buffer.array(), 0, buffer.limit())) {
if (!doSend(data.mAddress, buffer.array(), 0, buffer.limit())) {
System.out.println("数据没有被真正发送出去!");
}
}
Expand All @@ -48,13 +50,15 @@ public void stop() {

public boolean send(T data) {
try {
this.queue.put(data);
return true;
if (sending) {
this.queue.put(data);
return true;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}

protected abstract boolean doSend(byte[] buffer, int offset, int length);
protected abstract boolean doSend(SocketAddress address, byte[] buffer, int offset, int length);
}
Loading

0 comments on commit 84b7e47

Please sign in to comment.