Skip to content

Commit

Permalink
Add option to specify OkHttp dispatcher for socket connectivity. (#85)
Browse files Browse the repository at this point in the history
  • Loading branch information
mofirouz authored Nov 30, 2024
1 parent eb55222 commit 818349c
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 13 deletions.
15 changes: 15 additions & 0 deletions src/main/java/com/heroiclabs/nakama/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,21 @@ public interface Client {
*/
SocketClient createSocket(final String host, final int port, final boolean ssl, final int socketTimeoutMs, final int socketPingMs, ExecutorService listenerThreadExec);

/**
* Create a new socket from the client.
* @param host The host URL of the server.
* @param port The port number of the server. Default should be 7350.
* @param ssl Whether to use SSL to connect to the server.
* @param socketTimeoutMs Sets the connect, read and write timeout for new connections.
* @param socketPingMs The interval at which to send Ping frames to the server.
* @param listenerThreadExec The threading model to use when processing socket messages from the server.
* @param socketThreadExec The threading model to use when processing socket connectivity. See documentation here for more details:
* @see okhttp3.Dispatcher
* @param maxNumConcurrentRequests The maximum number of requests to execute concurrently. Above this requests queue in memory, waiting for the running calls to complete. The okHttpThreadExec must be able to run this number of concurrent requests at the same time.
* @return a new SocketClient instance.
*/
SocketClient createSocket(final String host, final int port, final boolean ssl, final int socketTimeoutMs, final int socketPingMs, ExecutorService listenerThreadExec, ExecutorService socketThreadExec, int maxNumConcurrentRequests);

/**
* Add one or more friends by id.
* @param session The session of the user.
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/heroiclabs/nakama/DefaultClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ public SocketClient createSocket(final String host, final int port, final boolea
return new WebSocketClient(host, port, ssl, socketTimeoutMs, socketPingMs, this.trace, listenerThreadExec);
}

@Override
public SocketClient createSocket(final String host, final int port, final boolean ssl, final int socketTimeoutMs, final int socketPingMs, ExecutorService listenerThreadExec, ExecutorService socketThreadExec, int maxNumConcurrentRequests) {
return new WebSocketClient(host, port, ssl, socketTimeoutMs, socketPingMs, this.trace, listenerThreadExec, socketThreadExec, maxNumConcurrentRequests);
}

@Override
public ListenableFuture<Empty> addFriends(@NonNull final Session session, @NonNull final String... ids) {
return autoRefreshSession(session, getStub(session).addFriends(AddFriendsRequest.newBuilder().addAllIds(Arrays.asList(ids)).build()));
Expand Down
51 changes: 38 additions & 13 deletions src/main/java/com/heroiclabs/nakama/WebSocketClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,31 +76,46 @@ public byte[] deserialize(final JsonElement jsonElement, final Type type, final
private final boolean ssl;
private final boolean trace;
private final Map<String, SettableFuture<?>> collationIds;
private final boolean shouldShutdownThreadExecService;
private final OkHttpClient.Builder clientBuilder;
private final boolean shouldShutdownListenerThreadExecService;
private boolean shouldShutdownOkHttpThreadExecService;
private ExecutorService listenerThreadExec;
private WebSocket socket;

WebSocketClient(@NonNull final String host, final int port, final boolean ssl,
final int socketTimeoutMs, final int socketPingMs, final boolean trace, ExecutorService listenerThreadExec) {
final int socketTimeoutMs, final int socketPingMs, final boolean trace,
ExecutorService listenerThreadExec) {
this.host = host;
this.port = port;
this.ssl = ssl;
this.trace = trace;
this.collationIds = new ConcurrentHashMap<>();
if (listenerThreadExec != null) {
this.listenerThreadExec = listenerThreadExec;
this.shouldShutdownThreadExecService = false;
this.shouldShutdownListenerThreadExecService = false;
} else {
this.listenerThreadExec = Executors.newSingleThreadExecutor();
this.shouldShutdownThreadExecService = true;
this.shouldShutdownListenerThreadExecService = true;
}

clientBuilder = new OkHttpClient.Builder()
.connectTimeout(socketTimeoutMs, TimeUnit.MILLISECONDS)
.readTimeout(socketTimeoutMs, TimeUnit.MILLISECONDS)
.writeTimeout(socketTimeoutMs, TimeUnit.MILLISECONDS)
.pingInterval(socketPingMs, TimeUnit.SECONDS);

this.shouldShutdownOkHttpThreadExecService = true;
clientBuilder.setDispatcher$okhttp(new Dispatcher());
}

WebSocketClient(@NonNull final String host, final int port, final boolean ssl,
final int socketTimeoutMs, final int socketPingMs, final boolean trace,
ExecutorService listenerThreadExec, @NonNull ExecutorService okHttpThreadExec, final int maxNumConcurrentRequests) {
this(host, port, ssl, socketTimeoutMs, socketPingMs, trace, listenerThreadExec);
this.shouldShutdownOkHttpThreadExecService = false;
Dispatcher dispatcher = new Dispatcher(okHttpThreadExec);
dispatcher.setMaxRequests(maxNumConcurrentRequests);
clientBuilder.setDispatcher$okhttp(dispatcher);
}

@Override
Expand Down Expand Up @@ -137,12 +152,22 @@ public ListenableFuture<Session> connect(@NonNull final Session session, @NonNul
}

private ListenableFuture<Session> createWebsocket(@NonNull final Session session, @NonNull final SocketListener listener, @NonNull final Request request) {
if (this.listenerThreadExec == null || this.listenerThreadExec.isShutdown() || this.listenerThreadExec.isTerminated()) {
if (this.listenerThreadExec == null || this.listenerThreadExec.isShutdown()) {
// in case of previously closed/failed socket.
this.listenerThreadExec = Executors.newSingleThreadExecutor();
}

final SettableFuture<Session> connectFuture = SettableFuture.create();
if (clientBuilder.getDispatcher$okhttp().executorService().isShutdown()) {
if (this.shouldShutdownOkHttpThreadExecService) {
// in case we are managing the OkHttpExecService, and it was previously shutdown, then recreate it before creating the client.
clientBuilder.setDispatcher$okhttp(new Dispatcher());
} else {
connectFuture.setException(new Throwable("websocket dispatcher thread executing has been previously shutdown."));
return connectFuture;
}
}

final Object lock = this;
final OkHttpClient client = clientBuilder.build();
socket = client.newWebSocket(request, new WebSocketListener() {
Expand Down Expand Up @@ -315,15 +340,15 @@ public void onClosed(final WebSocket webSocket, final int code, final String rea
connectFuture.setException(new Throwable("Socket closed."));
}

if (shouldShutdownThreadExecService) {
if (shouldShutdownListenerThreadExecService) {
listenerThreadExec.shutdown();
}

// clean up OkHttp Websocket resources.
client.connectionPool().evictAll();
client.dispatcher().executorService().shutdown();
// setup new executor service in case of reconnections.
clientBuilder.setDispatcher$okhttp(new Dispatcher());
if (shouldShutdownOkHttpThreadExecService) {
client.dispatcher().executorService().shutdown();
}
}
}

Expand All @@ -341,15 +366,15 @@ public void onFailure(final WebSocket webSocket, final Throwable t, final Respon
connectFuture.setException(t);
}

if (shouldShutdownThreadExecService) {
if (shouldShutdownListenerThreadExecService) {
listenerThreadExec.shutdown();
}

// clean up OkHttp Websocket resources.
client.connectionPool().evictAll();
client.dispatcher().executorService().shutdown();
// setup new executor service in case of reconnections.
clientBuilder.setDispatcher$okhttp(new Dispatcher());
if (shouldShutdownOkHttpThreadExecService) {
client.dispatcher().executorService().shutdown();
}
}
}
});
Expand Down

0 comments on commit 818349c

Please sign in to comment.