Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Sopage committed Nov 23, 2017
1 parent 07bd6bc commit 47f5189
Show file tree
Hide file tree
Showing 24 changed files with 271 additions and 2,779 deletions.
18 changes: 15 additions & 3 deletions src/main/java/com/dream/socket/DreamSocket.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.dream.socket;

import com.dream.socket.logger.Logger;
import com.dream.socket.logger.LoggerFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand All @@ -10,28 +13,33 @@ public abstract class DreamSocket {

public final boolean start() {
if (running) {
return false;
LoggerFactory.getLogger().info("Socket已连接!");
return true;
}
if (pool != null) {
if (!pool.isShutdown()) {
LoggerFactory.getLogger().warn("停止线程池");
pool.shutdownNow();
}
pool = null;
}
pool = Executors.newFixedThreadPool(3);
LoggerFactory.getLogger().info("创建线程池");
pool.execute(new StartRunnable());
running = true;
return true;
}

public final boolean stop() {
if (!running) {
LoggerFactory.getLogger().info("Socket已关闭!");
return true;
}
running = false;
onStop();
if (pool != null && !pool.isShutdown()) {
pool.shutdownNow();
LoggerFactory.getLogger().warn("停止线程池");
}
return true;
}
Expand All @@ -54,11 +62,15 @@ public boolean isRunning() {
return this.running;
}

protected boolean executeRunnable(Runnable runnable){
if(pool != null && !pool.isShutdown()){
protected boolean executeRunnable(Runnable runnable) {
if (pool != null && !pool.isShutdown()) {
pool.execute(runnable);
return true;
}
return false;
}

public void setLogger(Logger logger) {
LoggerFactory.setLogger(logger);
}
}
27 changes: 18 additions & 9 deletions src/main/java/com/dream/socket/DreamTCPSocket.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.dream.socket;

import com.dream.socket.codec.DataProtocol;
import com.dream.socket.codec.Message;
import com.dream.socket.codec.MessageDecode;
import com.dream.socket.codec.MessageEncode;
import com.dream.socket.codec.MessageHandle;
import com.dream.socket.logger.LoggerFactory;
import com.dream.socket.runnable.HandleRunnable;
import com.dream.socket.runnable.TCPSocketSendRunnable;

Expand All @@ -29,7 +30,7 @@ public DreamTCPSocket(String host, int port) {
this.mPort = port;
}

public <T extends DataProtocol> void codec(MessageDecode<T> messageDecode, MessageHandle<T> messageHandle, MessageEncode<T> messageEncode) {
public <T extends Message> void codec(MessageDecode<T> messageDecode, MessageHandle<T> messageHandle, MessageEncode<T> messageEncode) {
this.mHandleRunnable = new HandleRunnable<>(messageHandle);
messageDecode.setHandleRunnable(mHandleRunnable);
this.mMessageDecode = messageDecode;
Expand All @@ -45,40 +46,43 @@ protected boolean onStart() {
mAddress = new InetSocketAddress(mHost, mPort);
}
mSocket = new Socket();
LoggerFactory.getLogger().info("开始连接 -> " + mAddress.toString());
mSocket.connect(mAddress, SOCKET_CONNECT_TIMEOUT);
if (mSocket.isConnected()) {
System.out.println("连接成功");
LoggerFactory.getLogger().info("连接成功");
mSocketSendRunnable.setOutputStream(mSocket.getOutputStream());
executeRunnable(mSocketSendRunnable);
executeRunnable(mHandleRunnable);
byte[] bytes = new byte[102400];
InputStream in = mSocket.getInputStream();
int length;
while ((length = in.read(bytes)) > 0) {
this.mMessageDecode.put(mAddress, bytes, 0, length);
this.mMessageDecode.decode(mAddress, bytes, 0, length);
}
} else {
LoggerFactory.getLogger().error("连接失败!");
}
} catch (Exception e) {
e.printStackTrace();
LoggerFactory.getLogger().error("重连发生异常", e);
} finally {
try {
mSocket = null;
if (isRunning()) {
this.mHandleRunnable.status(Status.STATUS_FAIL);
System.out.println("6秒后尝试重连");
LoggerFactory.getLogger().info("6秒后尝试重连");
this.wait(6000);
System.out.println("开始重连");
LoggerFactory.getLogger().info("开始重连 -> " + mAddress.toString());
}
} catch (Exception ie) {
System.out.println("重连发生异常");
LoggerFactory.getLogger().error("重连发生异常", ie);
}
}
}
}
return false;
}

public <T extends DataProtocol> boolean send(T data) {
public <T extends Message> boolean send(T data) {
if (mSocketSendRunnable != null) {
data.mAddress = mAddress;
return mSocketSendRunnable.send(data);
Expand All @@ -89,9 +93,11 @@ public <T extends DataProtocol> boolean send(T data) {
@Override
protected boolean onStop() {
if (mSocketSendRunnable != null) {
LoggerFactory.getLogger().info("开始结束发送线程...");
mSocketSendRunnable.stop();
}
if (this.mHandleRunnable != null) {
LoggerFactory.getLogger().info("开始结束接收线程...");
this.mHandleRunnable.stop();
}
if (mSocket != null) {
Expand All @@ -115,6 +121,7 @@ public boolean isConnected() {
private static void shutdownInput(Socket socket) {
if (socket != null && !socket.isInputShutdown()) {
try {
LoggerFactory.getLogger().info("关闭Socket输入...");
socket.shutdownInput();
} catch (IOException e) {
e.printStackTrace();
Expand All @@ -125,6 +132,7 @@ private static void shutdownInput(Socket socket) {
private static void shutdownOutput(Socket socket) {
if (socket != null && !socket.isOutputShutdown()) {
try {
LoggerFactory.getLogger().info("关闭Socket输出...");
socket.shutdownOutput();
} catch (IOException e) {
e.printStackTrace();
Expand All @@ -135,6 +143,7 @@ private static void shutdownOutput(Socket socket) {
private static void close(Socket socket) {
if (socket != null && !socket.isClosed()) {
try {
LoggerFactory.getLogger().info("关闭Socket...");
socket.close();
} catch (IOException e) {
e.printStackTrace();
Expand Down
22 changes: 13 additions & 9 deletions src/main/java/com/dream/socket/DreamUDPSocket.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.dream.socket;

import com.dream.socket.codec.DataProtocol;
import com.dream.socket.codec.Message;
import com.dream.socket.codec.MessageDecode;
import com.dream.socket.codec.MessageEncode;
import com.dream.socket.codec.MessageHandle;
import com.dream.socket.logger.LoggerFactory;
import com.dream.socket.runnable.HandleRunnable;
import com.dream.socket.runnable.UDPSocketSendRunnable;

Expand All @@ -19,7 +20,7 @@ public class DreamUDPSocket extends DreamSocket {
private MessageDecode mMessageDecode;
private HandleRunnable mHandleRunnable;

public <T extends DataProtocol> void codec(MessageDecode<T> messageDecode, MessageHandle<T> messageHandle, MessageEncode<T> messageEncode) {
public <T extends Message> void codec(MessageDecode<T> messageDecode, MessageHandle<T> messageHandle, MessageEncode<T> messageEncode) {
this.mHandleRunnable = new HandleRunnable<>(messageHandle);
messageDecode.setHandleRunnable(mHandleRunnable);
this.mMessageDecode = messageDecode;
Expand All @@ -30,9 +31,10 @@ public <T extends DataProtocol> void codec(MessageDecode<T> messageDecode, Messa
public boolean onStart() {
synchronized (this) {
try {
LoggerFactory.getLogger().info("开始创建UDP管道");
mSocket = new DatagramSocket();
} catch (Exception e) {
e.printStackTrace();
LoggerFactory.getLogger().error("UDP管道创建失败!", e);
return false;
}
mSocketSendRunnable.setDatagramSocket(mSocket);
Expand All @@ -44,21 +46,20 @@ public boolean onStart() {
while (isRunning()) {
packet.setData(buffer, 0, buffer.length);
mSocket.receive(packet);
mMessageDecode.put(packet.getSocketAddress(), packet.getData(), packet.getOffset(), packet.getLength());
mMessageDecode.decode(packet.getSocketAddress(), packet.getData(), packet.getOffset(), packet.getLength());
}
} catch (Exception e) {
e.printStackTrace();
LoggerFactory.getLogger().error("UDP receive执行异常!", e);
} finally {
try {
mSocket = null;
if (isRunning()) {
this.mHandleRunnable.status(Status.STATUS_FAIL);
System.out.println("6秒后尝试重连");
LoggerFactory.getLogger().info("6秒后重新创建UDP管道");
this.wait(6000);
System.out.println("开始重连");
}
} catch (Exception ie) {
System.out.println("重连发生异常");
LoggerFactory.getLogger().error("wait奔溃!", ie);
}
}

Expand All @@ -69,12 +70,15 @@ public boolean onStart() {
@Override
public boolean onStop() {
if (mSocketSendRunnable != null) {
LoggerFactory.getLogger().info("开始结束发送线程...");
mSocketSendRunnable.stop();
}
if (this.mHandleRunnable != null) {
LoggerFactory.getLogger().info("开始结束接收线程...");
this.mHandleRunnable.stop();
}
if (mSocket != null) {
LoggerFactory.getLogger().info("关闭UDP管道");
mSocket.close();
mSocket = null;
mHandleRunnable.status(Status.STATUS_DISCONNECT);
Expand All @@ -90,7 +94,7 @@ public boolean isConnected() {
return false;
}

public boolean send(String host, int port, DataProtocol data) {
public boolean send(String host, int port, Message data) {
if (mSocketSendRunnable != null) {
SocketAddress address = new InetSocketAddress(host, port);
data.mAddress = address;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

import java.net.SocketAddress;

public abstract class DataProtocol {
public abstract class Message {
public SocketAddress mAddress;
}
16 changes: 11 additions & 5 deletions src/main/java/com/dream/socket/codec/MessageDecode.java
Original file line number Diff line number Diff line change
@@ -1,47 +1,53 @@
package com.dream.socket.codec;

import com.dream.socket.logger.LoggerFactory;
import com.dream.socket.runnable.HandleRunnable;

import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

public abstract class MessageDecode<T extends DataProtocol> {
public abstract class MessageDecode<T extends Message> {

private static int CACHE_BUFFER_LENGTH = 102400;
private static final int CACHE_BUFFER_LENGTH = 102400;
private HandleRunnable<T> mHandleRunnable;
private Map<SocketAddress, ByteBuffer> mAddressByteBufferMap = new HashMap<>();

public void setHandleRunnable(HandleRunnable<T> handleRunnable) {
this.mHandleRunnable = handleRunnable;
}

public synchronized void put(SocketAddress address, byte[] array, int offset, int length) {
public synchronized void decode(SocketAddress address, byte[] array, int offset, int length) {
ByteBuffer buffer = mAddressByteBufferMap.get(address);
if (buffer == null) {
buffer = ByteBuffer.allocate(CACHE_BUFFER_LENGTH);
mAddressByteBufferMap.put(address, buffer);
buffer.flip();
LoggerFactory.getLogger().info("创建 " + address.toString() + " 数据缓冲区ByteBuffer");
}
LoggerFactory.getLogger().info(String.format("%s 上次未解码 position=%d limit=%d", address.toString(), buffer.position(), buffer.limit()));
if (buffer.limit() + length > buffer.capacity()) {
//TODO 缓存区已满,丢弃读取的数据
LoggerFactory.getLogger().warn(address.toString() + " -> decode缓存区已满,读取的数据被丢弃!!!!!");
return;
}

buffer.compact();
buffer.put(array, offset, length);
buffer.flip();
buffer.mark();

LoggerFactory.getLogger().info(String.format("%s 合并未解码 position=%d limit=%d", address.toString(), buffer.position(), buffer.limit()));
T data;
LoggerFactory.getLogger().info(address.toString() + " 开始解码数据");
while (buffer.hasRemaining() && ((data = decode(address, buffer)) != null)) {
LoggerFactory.getLogger().info(address.toString() + " 成功解码一条数据");
data.mAddress = address;
mHandleRunnable.put(data);
buffer.compact();
buffer.flip();
buffer.mark();
}
LoggerFactory.getLogger().info(address.toString() + " 退出解码");
buffer.reset();
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/dream/socket/codec/MessageEncode.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import java.nio.ByteBuffer;

public abstract class MessageEncode<T extends DataProtocol> {
public abstract class MessageEncode<T extends Message> {


/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/dream/socket/codec/MessageHandle.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.dream.socket.codec;

public abstract class MessageHandle<T extends DataProtocol> {
public abstract class MessageHandle<T extends Message> {

/**
* 连接状态回调
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/com/dream/socket/logger/DefaultLogger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.dream.socket.logger;

public class DefaultLogger implements Logger {
@Override
public void debug(String log) {
System.out.println(log);
}

@Override
public void info(String log) {
System.out.println(log);
}

@Override
public void warn(String log) {
System.err.println(log);
}

@Override
public void error(String log) {
System.err.println(log);
}

@Override
public void error(String log, Throwable throwable) {
System.err.println(log);
throwable.printStackTrace();
}
}
15 changes: 15 additions & 0 deletions src/main/java/com/dream/socket/logger/Logger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.dream.socket.logger;

public interface Logger {

void debug(String log);

void info(String log);

void warn(String log);

void error(String log);

void error(String log, Throwable throwable);

}
Loading

0 comments on commit 47f5189

Please sign in to comment.