Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Sopage committed Nov 28, 2017
1 parent 35fe9aa commit eb54578
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 41 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
apply plugin: 'java'
apply plugin: 'maven'
apply from: 'gradle-mvn-push.gradle'
//apply from: 'gradle-mvn-push.gradle'

repositories {
mavenCentral()
Expand Down
20 changes: 12 additions & 8 deletions src/main/java/com/dream/socket/DreamTCPSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,26 @@ protected boolean onStart() {

public <T extends Message> boolean send(T data) {
if (mSocketSendRunnable != null) {
data.mAddress = mAddress;
data.setRemoteAddress(mAddress);
return mSocketSendRunnable.send(data);
}
return false;
}

@Override
protected boolean onStop() {
if (mHandleRunnable != null) {
mHandleRunnable.stop();
mHandleRunnable.status(Status.STATUS_DISCONNECT);
}
if (mSocketSendRunnable != null) {
mSocketSendRunnable.stop();
}
if (mSocket != null) {
shutdownInput(mSocket);
shutdownOutput(mSocket);
close(mSocket);
mSocket = null;
if (mHandleRunnable != null) {
mHandleRunnable.status(Status.STATUS_DISCONNECT);
}
}
mSocketSendRunnable = null;
mHandleRunnable = null;
Expand All @@ -120,7 +124,7 @@ private static void shutdownInput(Socket socket) {
LoggerFactory.getLogger().info("关闭Socket输入...");
socket.shutdownInput();
} catch (IOException e) {
e.printStackTrace();
LoggerFactory.getLogger().error("关闭Socket输入异常", e);
}
}
}
Expand All @@ -131,18 +135,18 @@ private static void shutdownOutput(Socket socket) {
LoggerFactory.getLogger().info("关闭Socket输出...");
socket.shutdownOutput();
} catch (IOException e) {
e.printStackTrace();
LoggerFactory.getLogger().error("关闭Socket输出异常", e);
}
}
}

private static void close(Socket socket) {
if (socket != null && !socket.isClosed()) {
if (socket != null) {
try {
LoggerFactory.getLogger().info("关闭Socket...");
socket.close();
} catch (IOException e) {
e.printStackTrace();
LoggerFactory.getLogger().error("关闭Socket异常", e);
}
}
}
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/com/dream/socket/DreamUDPSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;

public class DreamUDPSocket extends DreamSocket {
Expand Down Expand Up @@ -69,6 +68,13 @@ public boolean onStart() {

@Override
public boolean onStop() {
if (mHandleRunnable != null) {
mHandleRunnable.stop();
mHandleRunnable.status(Status.STATUS_DISCONNECT);
}
if (mSocketSendRunnable != null) {
mSocketSendRunnable.stop();
}
if (mSocket != null) {
LoggerFactory.getLogger().info("关闭UDP管道");
mSocket.close();
Expand All @@ -92,7 +98,7 @@ public boolean isConnected() {

public boolean send(SocketAddress address, Message data) {
if (mSocketSendRunnable != null) {
data.mAddress = address;
data.setRemoteAddress(address);
mSocketSendRunnable.send(data);
return true;
}
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/com/dream/socket/codec/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,13 @@
import java.net.SocketAddress;

public abstract class Message {
public SocketAddress mAddress;
private SocketAddress mRemoteAddress;

public void setRemoteAddress(SocketAddress remoteAddress) {
this.mRemoteAddress = remoteAddress;
}

public SocketAddress getRemoteAddress() {
return mRemoteAddress;
}
}
3 changes: 1 addition & 2 deletions src/main/java/com/dream/socket/codec/MessageDecode.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public final void decode(SocketAddress address, byte[] array, int offset, int le
}
LoggerFactory.getLogger().info(String.format("%s 上次未解码 position=%d limit=%d", address.toString(), buffer.position(), buffer.limit()));
if (buffer.limit() + length > buffer.capacity()) {
//TODO 缓存区已满,丢弃读取的数据
LoggerFactory.getLogger().warn(address.toString() + " -> decode缓存区已满,读取的数据被丢弃!!!!!");
return;
}
Expand All @@ -41,7 +40,7 @@ public final void decode(SocketAddress address, byte[] array, int offset, int le
LoggerFactory.getLogger().info(address.toString() + " 开始解码数据");
while (buffer.hasRemaining() && ((data = decode(address, buffer)) != null)) {
LoggerFactory.getLogger().info(address.toString() + " 成功解码一条数据");
data.mAddress = address;
data.setRemoteAddress(address);
mHandleRunnable.put(data);
buffer.compact();
buffer.flip();
Expand Down
33 changes: 21 additions & 12 deletions src/main/java/com/dream/socket/runnable/HandleRunnable.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,50 @@ public class HandleRunnable<T extends Message> implements Runnable {

private LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();
private MessageHandle<T> handle;
private boolean isHandle = false;

public HandleRunnable(MessageHandle<T> handle) {
this.handle = handle;
}

@Override
public void run() {
synchronized (this) {
queue.clear();
LoggerFactory.getLogger().info("开启 -> 接收线程");
LoggerFactory.getLogger().info("开启 -> 接收线程");
handle.onStatus(Status.STATUS_CONNECTED);
isHandle = true;
while (isHandle) {
try {
handle.onStatus(Status.STATUS_CONNECTED);
while (true) {
T data = queue.take();
if (handle != null) {
handle.onMessage(data);
}
}
handing();
} catch (Exception e) {
e.printStackTrace();
LoggerFactory.getLogger().error("异常 -> 接收线程异常退出", e);
}
}
LoggerFactory.getLogger().info("结束 -> 接收线程");
}

private void handing() throws Exception {
while (true) {
T data = queue.take();
if (handle != null) {
handle.onMessage(data);
}
}
}

public boolean put(T d) {
try {
this.queue.put(d);
return true;
} catch (Exception e) {
e.printStackTrace();
LoggerFactory.getLogger().error("异常 -> 接收线程 queue.put() 异常", e);
}
return false;
}

public void stop(){
isHandle = false;
}

public void status(int status) {
if (handle != null) {
handle.onStatus(status);
Expand Down
39 changes: 24 additions & 15 deletions src/main/java/com/dream/socket/runnable/SendRunnable.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,51 @@ public abstract class SendRunnable<T extends Message> implements Runnable {
private MessageEncode<T> encode;
private ByteBuffer buffer = ByteBuffer.allocate(102400);
private LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();
private boolean isSend = false;

public SendRunnable(MessageEncode<T> encode) {
this.encode = encode;
}

@Override
public void run() {
synchronized (this) {
queue.clear();
LoggerFactory.getLogger().info("开启 -> 发送线程");
LoggerFactory.getLogger().info("开启 -> 发送线程");
isSend = true;
while (isSend) {
try {
while (true) {
T data = queue.take();
buffer.clear();
encode.encode(data, buffer);
buffer.flip();
if (!doSend(data.mAddress, buffer.array(), 0, buffer.limit())) {
LoggerFactory.getLogger().error("数据没有被发送出去!");
}
}
sending();
} catch (Exception e) {
e.printStackTrace();
LoggerFactory.getLogger().error("异常 -> 发送线程异常退出", e);
}
}
LoggerFactory.getLogger().info("结束 -> 发送线程");
}

private void sending() throws Exception {
while (true) {
T data = queue.take();
buffer.clear();
encode.encode(data, buffer);
buffer.flip();
if (!doSend(data.getRemoteAddress(), buffer.array(), 0, buffer.limit())) {
LoggerFactory.getLogger().error("数据没有被发送出去!");
}
}
}

public boolean send(T data) {
try {
this.queue.put(data);
return true;
} catch (Exception e) {
e.printStackTrace();
LoggerFactory.getLogger().error("异常 -> 发送线程 LinkedBlockingQueue.put() 异常", e);
}
return false;
}

protected abstract boolean doSend(SocketAddress address, byte[] buffer, int offset, int length);
public void stop(){
isSend = false;
}

protected abstract boolean doSend(SocketAddress remoteAddress, byte[] buffer, int offset, int length);
}

0 comments on commit eb54578

Please sign in to comment.