diff --git a/build.gradle b/build.gradle index 41986ba..85682af 100644 --- a/build.gradle +++ b/build.gradle @@ -1,6 +1,6 @@ apply plugin: 'java' apply plugin: 'maven' -apply from: 'gradle-mvn-push.gradle' +//apply from: 'gradle-mvn-push.gradle' repositories { mavenCentral() diff --git a/src/main/java/com/dream/socket/DreamSocket.java b/src/main/java/com/dream/socket/DreamSocket.java index 8e45a17..95127f8 100644 --- a/src/main/java/com/dream/socket/DreamSocket.java +++ b/src/main/java/com/dream/socket/DreamSocket.java @@ -36,9 +36,9 @@ public final boolean stop() { return true; } - public abstract boolean onStart(); + protected abstract boolean onStart(); - public abstract boolean onStop(); + protected abstract boolean onStop(); public abstract boolean isConnected(); diff --git a/src/main/java/com/dream/socket/DreamTCPSocket.java b/src/main/java/com/dream/socket/DreamTCPSocket.java index 374feac..3be23d4 100644 --- a/src/main/java/com/dream/socket/DreamTCPSocket.java +++ b/src/main/java/com/dream/socket/DreamTCPSocket.java @@ -1,24 +1,26 @@ package com.dream.socket; +import com.dream.socket.codec.DataProtocol; import com.dream.socket.codec.MessageDecode; import com.dream.socket.codec.MessageEncode; import com.dream.socket.codec.MessageHandle; import com.dream.socket.runnable.HandleRunnable; -import com.dream.socket.runnable.SocketSendRunnable; +import com.dream.socket.runnable.TCPSocketSendRunnable; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.SocketAddress; public class DreamTCPSocket extends DreamSocket { private static final int SOCKET_CONNECT_TIMEOUT = 10000; - private InetSocketAddress mAddress; + private SocketAddress mAddress; private String mHost; private int mPort; private Socket mSocket; - private SocketSendRunnable mSocketSendRunnable; + private TCPSocketSendRunnable mSocketSendRunnable; private MessageDecode mMessageDecode; private HandleRunnable mHandleRunnable; @@ -27,15 +29,15 @@ public DreamTCPSocket(String host, int port) { this.mPort = port; } - public void codec(MessageDecode messageDecode, MessageHandle messageHandle, MessageEncode messageEncode) { - this.mHandleRunnable = new HandleRunnable(messageHandle); + public void codec(MessageDecode messageDecode, MessageHandle messageHandle, MessageEncode messageEncode) { + this.mHandleRunnable = new HandleRunnable<>(messageHandle); messageDecode.setHandleRunnable(mHandleRunnable); this.mMessageDecode = messageDecode; - this.mSocketSendRunnable = new SocketSendRunnable(messageEncode); + this.mSocketSendRunnable = new TCPSocketSendRunnable<>(messageEncode); } @Override - public boolean onStart() { + protected boolean onStart() { synchronized (this) { while (isRunning()) { try { @@ -53,7 +55,7 @@ public boolean onStart() { InputStream in = mSocket.getInputStream(); int length; while ((length = in.read(bytes)) > 0) { - this.mMessageDecode.put(bytes, 0, length); + this.mMessageDecode.put(mAddress, bytes, 0, length); } } } catch (Exception e) { @@ -76,14 +78,16 @@ public boolean onStart() { return false; } - public void send(Object data) { + public boolean send(T data) { if (mSocketSendRunnable != null) { - mSocketSendRunnable.send(data); + data.mAddress = mAddress; + return mSocketSendRunnable.send(data); } + return false; } @Override - public boolean onStop() { + protected boolean onStop() { if (mSocketSendRunnable != null) { mSocketSendRunnable.stop(); } diff --git a/src/main/java/com/dream/socket/DreamUDPSocket.java b/src/main/java/com/dream/socket/DreamUDPSocket.java new file mode 100644 index 0000000..45dfe9c --- /dev/null +++ b/src/main/java/com/dream/socket/DreamUDPSocket.java @@ -0,0 +1,96 @@ +package com.dream.socket; + +import com.dream.socket.codec.DataProtocol; +import com.dream.socket.codec.MessageDecode; +import com.dream.socket.codec.MessageEncode; +import com.dream.socket.codec.MessageHandle; +import com.dream.socket.runnable.HandleRunnable; + +import java.net.DatagramPacket; +import java.net.DatagramSocket; + +public class DreamUDPSocket extends DreamSocket { + + private int mPort; + private DatagramSocket mSocket; + private UDPSocketSendRunnable mSocketSendRunnable; + private DatagramPacket packet; + private MessageDecode mMessageDecode; + private HandleRunnable mHandleRunnable; + + public DreamUDPSocket(int port) { + this.mPort = port; + packet = new DatagramPacket(new byte[500], 500); + } + + public void codec(MessageDecode messageDecode, MessageHandle messageHandle, MessageEncode messageEncode) { + this.mHandleRunnable = new HandleRunnable<>(messageHandle); + messageDecode.setHandleRunnable(mHandleRunnable); + this.mMessageDecode = messageDecode; + this.mSocketSendRunnable = new UDPSocketSendRunnable<>(messageEncode); + } + + @Override + public boolean onStart() { + synchronized (this) { + try { + mSocket = new DatagramSocket(mPort); + } catch (Exception e) { + e.printStackTrace(); + return false; + } + mSocketSendRunnable.setDatagramSocket(mSocket); + executeRunnable(mSocketSendRunnable); + executeRunnable(mHandleRunnable); + while (isConnected()) { + try { + mSocket.receive(packet); + mMessageDecode.put(packet.getSocketAddress(), packet.getData(), packet.getOffset(), packet.getLength()); + } catch (Exception e) { + e.printStackTrace(); + } finally { + try { + mSocket = null; + if (isRunning()) { + this.mHandleRunnable.status(Status.STATUS_FAIL); + System.out.println("6秒后尝试重连"); + this.wait(6000); + System.out.println("开始重连"); + } + } catch (Exception ie) { + System.out.println("重连发生异常"); + } + } + } + } + return false; + } + + @Override + public boolean onStop() { + if (mSocketSendRunnable != null) { + mSocketSendRunnable.stop(); + } + if (this.mHandleRunnable != null) { + this.mHandleRunnable.stop(); + } + if (mSocket != null) { + mSocket.close(); + mSocket = null; + mHandleRunnable.status(Status.STATUS_DISCONNECT); + } + return true; + } + + @Override + public boolean isConnected() { + if (mSocket != null && !mSocket.isClosed() && mSocket.isConnected() && isRunning()) { + return true; + } + return false; + } + + public boolean send(String host, int port, Object data) { + return false; + } +} diff --git a/src/main/java/com/dream/socket/UDPSocketSendRunnable.java b/src/main/java/com/dream/socket/UDPSocketSendRunnable.java new file mode 100644 index 0000000..675bef3 --- /dev/null +++ b/src/main/java/com/dream/socket/UDPSocketSendRunnable.java @@ -0,0 +1,36 @@ +package com.dream.socket; + +import com.dream.socket.codec.DataProtocol; +import com.dream.socket.codec.MessageEncode; +import com.dream.socket.runnable.SendRunnable; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketAddress; + +public class UDPSocketSendRunnable extends SendRunnable { + + private DatagramSocket mSocket; + + public UDPSocketSendRunnable(MessageEncode encode) { + super(encode); + } + + public void setDatagramSocket(DatagramSocket socket) { + this.mSocket = socket; + } + + @Override + protected boolean doSend(SocketAddress address, byte[] buffer, int offset, int length) { + if (mSocket != null) { + try { + mSocket.send(new DatagramPacket(buffer, offset, length, address)); + return true; + } catch (IOException e) { + e.printStackTrace(); + } + } + return false; + } +} diff --git a/src/main/java/com/dream/socket/codec/DataProtocol.java b/src/main/java/com/dream/socket/codec/DataProtocol.java new file mode 100644 index 0000000..813d767 --- /dev/null +++ b/src/main/java/com/dream/socket/codec/DataProtocol.java @@ -0,0 +1,7 @@ +package com.dream.socket.codec; + +import java.net.SocketAddress; + +public abstract class DataProtocol { + public SocketAddress mAddress; +} diff --git a/src/main/java/com/dream/socket/codec/MessageDecode.java b/src/main/java/com/dream/socket/codec/MessageDecode.java index a9e2708..2d2fef7 100644 --- a/src/main/java/com/dream/socket/codec/MessageDecode.java +++ b/src/main/java/com/dream/socket/codec/MessageDecode.java @@ -2,43 +2,49 @@ import com.dream.socket.runnable.HandleRunnable; +import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; -public abstract class MessageDecode { +public abstract class MessageDecode { private static int CACHE_BUFFER_LENGTH = 102400; - private final ByteBuffer mBuffer = ByteBuffer.allocate(CACHE_BUFFER_LENGTH); private HandleRunnable mHandleRunnable; - - public MessageDecode() { - mBuffer.flip(); - } + private Map mAddressByteBufferMap = new HashMap<>(); public void setHandleRunnable(HandleRunnable handleRunnable) { this.mHandleRunnable = handleRunnable; } - public synchronized void put(byte[] array, int offset, int length) { - if (mBuffer.limit() + length > mBuffer.capacity()) { + public synchronized void put(SocketAddress address, byte[] array, int offset, int length) { + ByteBuffer buffer = mAddressByteBufferMap.get(address); + if (buffer == null) { + buffer = ByteBuffer.allocate(CACHE_BUFFER_LENGTH); + mAddressByteBufferMap.put(address, buffer); + buffer.flip(); + } + if (buffer.limit() + length > buffer.capacity()) { //TODO 缓存区已满,丢弃读取的数据 return; } - mBuffer.compact(); - mBuffer.put(array, offset, length); - mBuffer.flip(); - mBuffer.mark(); + buffer.compact(); + buffer.put(array, offset, length); + buffer.flip(); + buffer.mark(); T data; - while (mBuffer.hasRemaining() && ((data = decode(mBuffer)) != null)) { + while (buffer.hasRemaining() && ((data = decode(address, buffer)) != null)) { + data.mAddress = address; mHandleRunnable.put(data); - mBuffer.compact(); - mBuffer.flip(); - mBuffer.mark(); + buffer.compact(); + buffer.flip(); + buffer.mark(); } - mBuffer.reset(); + buffer.reset(); } - protected abstract T decode(ByteBuffer buffer); + protected abstract T decode(SocketAddress address, ByteBuffer buffer); } diff --git a/src/main/java/com/dream/socket/codec/MessageEncode.java b/src/main/java/com/dream/socket/codec/MessageEncode.java index 5a7dcb9..f9c1a02 100644 --- a/src/main/java/com/dream/socket/codec/MessageEncode.java +++ b/src/main/java/com/dream/socket/codec/MessageEncode.java @@ -2,7 +2,7 @@ import java.nio.ByteBuffer; -public abstract class MessageEncode { +public abstract class MessageEncode { /** diff --git a/src/main/java/com/dream/socket/codec/MessageHandle.java b/src/main/java/com/dream/socket/codec/MessageHandle.java index e91f3da..d61153c 100644 --- a/src/main/java/com/dream/socket/codec/MessageHandle.java +++ b/src/main/java/com/dream/socket/codec/MessageHandle.java @@ -1,6 +1,6 @@ package com.dream.socket.codec; -public abstract class MessageHandle { +public abstract class MessageHandle { /** * 连接状态回调 diff --git a/src/main/java/com/dream/socket/runnable/HandleRunnable.java b/src/main/java/com/dream/socket/runnable/HandleRunnable.java index 76bbe4f..40d2d84 100644 --- a/src/main/java/com/dream/socket/runnable/HandleRunnable.java +++ b/src/main/java/com/dream/socket/runnable/HandleRunnable.java @@ -1,10 +1,11 @@ package com.dream.socket.runnable; +import com.dream.socket.codec.DataProtocol; import com.dream.socket.codec.MessageHandle; import java.util.concurrent.LinkedBlockingQueue; -public class HandleRunnable implements Runnable { +public class HandleRunnable implements Runnable { private LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); private MessageHandle handle; diff --git a/src/main/java/com/dream/socket/runnable/SendRunnable.java b/src/main/java/com/dream/socket/runnable/SendRunnable.java index 13150f6..34e38a8 100644 --- a/src/main/java/com/dream/socket/runnable/SendRunnable.java +++ b/src/main/java/com/dream/socket/runnable/SendRunnable.java @@ -1,11 +1,13 @@ package com.dream.socket.runnable; +import com.dream.socket.codec.DataProtocol; import com.dream.socket.codec.MessageEncode; +import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.concurrent.LinkedBlockingQueue; -public abstract class SendRunnable implements Runnable { +public abstract class SendRunnable implements Runnable { private boolean sending; private MessageEncode encode; @@ -31,7 +33,7 @@ public void run() { buffer.clear(); encode.encode(data, buffer); buffer.flip(); - if (!doSend(buffer.array(), 0, buffer.limit())) { + if (!doSend(data.mAddress, buffer.array(), 0, buffer.limit())) { System.out.println("数据没有被真正发送出去!"); } } @@ -48,13 +50,15 @@ public void stop() { public boolean send(T data) { try { - this.queue.put(data); - return true; + if (sending) { + this.queue.put(data); + return true; + } } catch (Exception e) { e.printStackTrace(); } return false; } - protected abstract boolean doSend(byte[] buffer, int offset, int length); + protected abstract boolean doSend(SocketAddress address, byte[] buffer, int offset, int length); } diff --git a/src/main/java/com/dream/socket/runnable/SocketSendRunnable.java b/src/main/java/com/dream/socket/runnable/TCPSocketSendRunnable.java similarity index 69% rename from src/main/java/com/dream/socket/runnable/SocketSendRunnable.java rename to src/main/java/com/dream/socket/runnable/TCPSocketSendRunnable.java index 2e00aed..124e8e1 100644 --- a/src/main/java/com/dream/socket/runnable/SocketSendRunnable.java +++ b/src/main/java/com/dream/socket/runnable/TCPSocketSendRunnable.java @@ -1,14 +1,16 @@ package com.dream.socket.runnable; +import com.dream.socket.codec.DataProtocol; import com.dream.socket.codec.MessageEncode; import java.io.OutputStream; +import java.net.SocketAddress; -public final class SocketSendRunnable extends SendRunnable { +public final class TCPSocketSendRunnable extends SendRunnable { private OutputStream out; - public SocketSendRunnable(MessageEncode encode) { + public TCPSocketSendRunnable(MessageEncode encode) { super(encode); } @@ -17,7 +19,7 @@ public void setOutputStream(OutputStream out) { } @Override - protected boolean doSend(byte[] buffer, int offset, int length) { + protected boolean doSend(SocketAddress address, byte[] buffer, int offset, int length) { if (out != null) { try { out.write(buffer, offset, length); diff --git a/src/test/java/com/dream/socket/Client.java b/src/test/java/com/dream/socket/Client.java index 4817170..9267105 100644 --- a/src/test/java/com/dream/socket/Client.java +++ b/src/test/java/com/dream/socket/Client.java @@ -1,42 +1,57 @@ package com.dream.socket; +import com.dream.socket.codec.DataProtocol; import com.dream.socket.codec.MessageDecode; import com.dream.socket.codec.MessageEncode; import com.dream.socket.codec.MessageHandle; +import java.net.SocketAddress; import java.nio.ByteBuffer; public class Client { private DreamTCPSocket socket; + public static final class StringProtocol extends DataProtocol{ + + String string; + + public StringProtocol(byte[] array) { + string = new String(array); + } + + public String getString() { + return string; + } + } + public static void main(String[] args) { - DreamTCPSocket socket = new DreamTCPSocket("localhost", 6969); - socket.codec(new MessageDecode() { + DreamUDPSocket socket = new DreamUDPSocket(6969); + socket.codec(new MessageDecode() { @Override - protected String decode(ByteBuffer buffer) { + protected StringProtocol decode(SocketAddress address, ByteBuffer buffer) { int len = buffer.getInt(); if(len > buffer.remaining()){ return null; } byte[] array = new byte[len]; buffer.get(array); - return new String(array); + return new StringProtocol(array); } - }, new MessageHandle() { + }, new MessageHandle() { @Override public void onStatus(int status) { } @Override - public void onMessage(String data) { - System.out.println(data); + public void onMessage(StringProtocol data) { + System.out.println(data.getString()); } - }, new MessageEncode() { + }, new MessageEncode() { @Override - public void encode(String data, ByteBuffer buffer) { - buffer.put(data.getBytes()); + public void encode(StringProtocol data, ByteBuffer buffer) { + buffer.put(data.getString().getBytes()); } }); socket.start(); diff --git a/src/test/java/com/dream/socket/TestAddress.java b/src/test/java/com/dream/socket/TestAddress.java new file mode 100644 index 0000000..ce1ed54 --- /dev/null +++ b/src/test/java/com/dream/socket/TestAddress.java @@ -0,0 +1,15 @@ +package com.dream.socket; + +import java.net.InetSocketAddress; + +public class TestAddress { + + public static void main(String[] args) { + InetSocketAddress address1 = new InetSocketAddress("127.0.0.1", 6969); + InetSocketAddress address2 = new InetSocketAddress("127.0.0.1", 6969); + System.out.println(address1.equals(address2)); + System.out.println(address1.hashCode()); + System.out.println(address2.hashCode()); + } + +}