From eb5457809228bba153cf60324b32a40eb2f0ff3c Mon Sep 17 00:00:00 2001 From: "Mr.Huang" <1123033157@qq.com> Date: Tue, 28 Nov 2017 11:25:57 +0800 Subject: [PATCH] update --- build.gradle | 2 +- .../java/com/dream/socket/DreamTCPSocket.java | 20 ++++++---- .../java/com/dream/socket/DreamUDPSocket.java | 10 ++++- .../java/com/dream/socket/codec/Message.java | 10 ++++- .../com/dream/socket/codec/MessageDecode.java | 3 +- .../dream/socket/runnable/HandleRunnable.java | 33 ++++++++++------ .../dream/socket/runnable/SendRunnable.java | 39 ++++++++++++------- 7 files changed, 76 insertions(+), 41 deletions(-) diff --git a/build.gradle b/build.gradle index 85a70fd..fd1d953 100644 --- a/build.gradle +++ b/build.gradle @@ -1,6 +1,6 @@ apply plugin: 'java' apply plugin: 'maven' -apply from: 'gradle-mvn-push.gradle' +//apply from: 'gradle-mvn-push.gradle' repositories { mavenCentral() diff --git a/src/main/java/com/dream/socket/DreamTCPSocket.java b/src/main/java/com/dream/socket/DreamTCPSocket.java index d14c391..554b77d 100644 --- a/src/main/java/com/dream/socket/DreamTCPSocket.java +++ b/src/main/java/com/dream/socket/DreamTCPSocket.java @@ -84,7 +84,7 @@ protected boolean onStart() { public boolean send(T data) { if (mSocketSendRunnable != null) { - data.mAddress = mAddress; + data.setRemoteAddress(mAddress); return mSocketSendRunnable.send(data); } return false; @@ -92,14 +92,18 @@ public boolean send(T data) { @Override protected boolean onStop() { + if (mHandleRunnable != null) { + mHandleRunnable.stop(); + mHandleRunnable.status(Status.STATUS_DISCONNECT); + } + if (mSocketSendRunnable != null) { + mSocketSendRunnable.stop(); + } if (mSocket != null) { shutdownInput(mSocket); shutdownOutput(mSocket); close(mSocket); mSocket = null; - if (mHandleRunnable != null) { - mHandleRunnable.status(Status.STATUS_DISCONNECT); - } } mSocketSendRunnable = null; mHandleRunnable = null; @@ -120,7 +124,7 @@ private static void shutdownInput(Socket socket) { LoggerFactory.getLogger().info("关闭Socket输入..."); socket.shutdownInput(); } catch (IOException e) { - e.printStackTrace(); + LoggerFactory.getLogger().error("关闭Socket输入异常", e); } } } @@ -131,18 +135,18 @@ private static void shutdownOutput(Socket socket) { LoggerFactory.getLogger().info("关闭Socket输出..."); socket.shutdownOutput(); } catch (IOException e) { - e.printStackTrace(); + LoggerFactory.getLogger().error("关闭Socket输出异常", e); } } } private static void close(Socket socket) { - if (socket != null && !socket.isClosed()) { + if (socket != null) { try { LoggerFactory.getLogger().info("关闭Socket..."); socket.close(); } catch (IOException e) { - e.printStackTrace(); + LoggerFactory.getLogger().error("关闭Socket异常", e); } } } diff --git a/src/main/java/com/dream/socket/DreamUDPSocket.java b/src/main/java/com/dream/socket/DreamUDPSocket.java index 503e42d..a3c299e 100644 --- a/src/main/java/com/dream/socket/DreamUDPSocket.java +++ b/src/main/java/com/dream/socket/DreamUDPSocket.java @@ -10,7 +10,6 @@ import java.net.DatagramPacket; import java.net.DatagramSocket; -import java.net.InetSocketAddress; import java.net.SocketAddress; public class DreamUDPSocket extends DreamSocket { @@ -69,6 +68,13 @@ public boolean onStart() { @Override public boolean onStop() { + if (mHandleRunnable != null) { + mHandleRunnable.stop(); + mHandleRunnable.status(Status.STATUS_DISCONNECT); + } + if (mSocketSendRunnable != null) { + mSocketSendRunnable.stop(); + } if (mSocket != null) { LoggerFactory.getLogger().info("关闭UDP管道"); mSocket.close(); @@ -92,7 +98,7 @@ public boolean isConnected() { public boolean send(SocketAddress address, Message data) { if (mSocketSendRunnable != null) { - data.mAddress = address; + data.setRemoteAddress(address); mSocketSendRunnable.send(data); return true; } diff --git a/src/main/java/com/dream/socket/codec/Message.java b/src/main/java/com/dream/socket/codec/Message.java index 1a350ca..3fc5bc1 100644 --- a/src/main/java/com/dream/socket/codec/Message.java +++ b/src/main/java/com/dream/socket/codec/Message.java @@ -3,5 +3,13 @@ import java.net.SocketAddress; public abstract class Message { - public SocketAddress mAddress; + private SocketAddress mRemoteAddress; + + public void setRemoteAddress(SocketAddress remoteAddress) { + this.mRemoteAddress = remoteAddress; + } + + public SocketAddress getRemoteAddress() { + return mRemoteAddress; + } } diff --git a/src/main/java/com/dream/socket/codec/MessageDecode.java b/src/main/java/com/dream/socket/codec/MessageDecode.java index b417d57..91063dd 100644 --- a/src/main/java/com/dream/socket/codec/MessageDecode.java +++ b/src/main/java/com/dream/socket/codec/MessageDecode.java @@ -28,7 +28,6 @@ public final void decode(SocketAddress address, byte[] array, int offset, int le } LoggerFactory.getLogger().info(String.format("%s 上次未解码 position=%d limit=%d", address.toString(), buffer.position(), buffer.limit())); if (buffer.limit() + length > buffer.capacity()) { - //TODO 缓存区已满,丢弃读取的数据 LoggerFactory.getLogger().warn(address.toString() + " -> decode缓存区已满,读取的数据被丢弃!!!!!"); return; } @@ -41,7 +40,7 @@ public final void decode(SocketAddress address, byte[] array, int offset, int le LoggerFactory.getLogger().info(address.toString() + " 开始解码数据"); while (buffer.hasRemaining() && ((data = decode(address, buffer)) != null)) { LoggerFactory.getLogger().info(address.toString() + " 成功解码一条数据"); - data.mAddress = address; + data.setRemoteAddress(address); mHandleRunnable.put(data); buffer.compact(); buffer.flip(); diff --git a/src/main/java/com/dream/socket/runnable/HandleRunnable.java b/src/main/java/com/dream/socket/runnable/HandleRunnable.java index 78fae4f..91e5b29 100644 --- a/src/main/java/com/dream/socket/runnable/HandleRunnable.java +++ b/src/main/java/com/dream/socket/runnable/HandleRunnable.java @@ -11,6 +11,7 @@ public class HandleRunnable implements Runnable { private LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); private MessageHandle handle; + private boolean isHandle = false; public HandleRunnable(MessageHandle handle) { this.handle = handle; @@ -18,34 +19,42 @@ public HandleRunnable(MessageHandle handle) { @Override public void run() { - synchronized (this) { - queue.clear(); - LoggerFactory.getLogger().info("开启 -> 接收线程"); + LoggerFactory.getLogger().info("开启 -> 接收线程"); + handle.onStatus(Status.STATUS_CONNECTED); + isHandle = true; + while (isHandle) { try { - handle.onStatus(Status.STATUS_CONNECTED); - while (true) { - T data = queue.take(); - if (handle != null) { - handle.onMessage(data); - } - } + handing(); } catch (Exception e) { - e.printStackTrace(); + LoggerFactory.getLogger().error("异常 -> 接收线程异常退出", e); } } LoggerFactory.getLogger().info("结束 -> 接收线程"); } + private void handing() throws Exception { + while (true) { + T data = queue.take(); + if (handle != null) { + handle.onMessage(data); + } + } + } + public boolean put(T d) { try { this.queue.put(d); return true; } catch (Exception e) { - e.printStackTrace(); + LoggerFactory.getLogger().error("异常 -> 接收线程 queue.put() 异常", e); } return false; } + public void stop(){ + isHandle = false; + } + public void status(int status) { if (handle != null) { handle.onStatus(status); diff --git a/src/main/java/com/dream/socket/runnable/SendRunnable.java b/src/main/java/com/dream/socket/runnable/SendRunnable.java index 6726e6e..2de40d7 100644 --- a/src/main/java/com/dream/socket/runnable/SendRunnable.java +++ b/src/main/java/com/dream/socket/runnable/SendRunnable.java @@ -13,6 +13,7 @@ public abstract class SendRunnable implements Runnable { private MessageEncode encode; private ByteBuffer buffer = ByteBuffer.allocate(102400); private LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); + private boolean isSend = false; public SendRunnable(MessageEncode encode) { this.encode = encode; @@ -20,35 +21,43 @@ public SendRunnable(MessageEncode encode) { @Override public void run() { - synchronized (this) { - queue.clear(); - LoggerFactory.getLogger().info("开启 -> 发送线程"); + LoggerFactory.getLogger().info("开启 -> 发送线程"); + isSend = true; + while (isSend) { try { - while (true) { - T data = queue.take(); - buffer.clear(); - encode.encode(data, buffer); - buffer.flip(); - if (!doSend(data.mAddress, buffer.array(), 0, buffer.limit())) { - LoggerFactory.getLogger().error("数据没有被发送出去!"); - } - } + sending(); } catch (Exception e) { - e.printStackTrace(); + LoggerFactory.getLogger().error("异常 -> 发送线程异常退出", e); } } LoggerFactory.getLogger().info("结束 -> 发送线程"); } + private void sending() throws Exception { + while (true) { + T data = queue.take(); + buffer.clear(); + encode.encode(data, buffer); + buffer.flip(); + if (!doSend(data.getRemoteAddress(), buffer.array(), 0, buffer.limit())) { + LoggerFactory.getLogger().error("数据没有被发送出去!"); + } + } + } + public boolean send(T data) { try { this.queue.put(data); return true; } catch (Exception e) { - e.printStackTrace(); + LoggerFactory.getLogger().error("异常 -> 发送线程 LinkedBlockingQueue.put() 异常", e); } return false; } - protected abstract boolean doSend(SocketAddress address, byte[] buffer, int offset, int length); + public void stop(){ + isSend = false; + } + + protected abstract boolean doSend(SocketAddress remoteAddress, byte[] buffer, int offset, int length); }