diff --git a/bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java b/bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java index 10f683d8c..a1e25ef93 100644 --- a/bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java +++ b/bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java @@ -3,14 +3,15 @@ import com.google.gson.Gson; import com.google.gson.JsonElement; import com.slack.api.bolt.App; -import com.slack.api.bolt.request.Request; -import com.slack.api.bolt.response.Response; import com.slack.api.bolt.jakarta_socket_mode.request.SocketModeRequest; import com.slack.api.bolt.jakarta_socket_mode.request.SocketModeRequestParser; +import com.slack.api.bolt.request.Request; +import com.slack.api.bolt.response.Response; import com.slack.api.jakarta_socket_mode.JakartaSocketModeClientFactory; import com.slack.api.socket_mode.SocketModeClient; import com.slack.api.socket_mode.response.AckResponse; import com.slack.api.util.json.GsonFactory; +import com.slack.api.util.thread.DaemonThreadExecutorServiceFactory; import lombok.Builder; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -18,6 +19,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.function.Function; import java.util.function.Supplier; @@ -27,6 +29,7 @@ public class SocketModeApp { private final App app; private final Supplier clientFactory; private SocketModeClient client; + private final ExecutorService executorService; private static final Function DEFAULT_ERROR_HANDLER = (context) -> { Exception e = context.getException(); @@ -69,7 +72,8 @@ private static void sendSocketModeResponse( private static Supplier buildSocketModeClientFactory( App app, String appToken, - Function errorHandler + Function errorHandler, + ExecutorService executorService ) { return () -> { try { @@ -77,27 +81,13 @@ private static Supplier buildSocketModeClientFactory( final SocketModeRequestParser requestParser = new SocketModeRequestParser(app.config()); final Gson gson = GsonFactory.createSnakeCase(app.slack().getConfig()); client.addWebSocketMessageListener(message -> { - long startMillis = System.currentTimeMillis(); - SocketModeRequest req = requestParser.parse(message); - if (req != null) { - try { - Response boltResponse = app.run(req.getBoltRequest()); - if (boltResponse.getStatusCode() != 200) { - log.warn("Unsuccessful Bolt app execution (status: {}, body: {})", - boltResponse.getStatusCode(), boltResponse.getBody()); - return; - } - sendSocketModeResponse(client, gson, req, boltResponse); - } catch (Exception e) { - ErrorContext context = ErrorContext.builder().request(req.getBoltRequest()).exception(e).build(); - Response errorResponse = errorHandler.apply(context); - if (errorResponse != null) { - sendSocketModeResponse(client, gson, req, errorResponse); - } - } finally { - long spentMillis = System.currentTimeMillis() - startMillis; - log.debug("Response time: {} milliseconds", spentMillis); - } + if (executorService != null) { + // asynchronous + executorService.execute(() -> runBoltApp( + message, app, client, requestParser, errorHandler, gson)); + } else { + // synchronous + runBoltApp(message, app, client, requestParser, errorHandler, gson); } }); return client; @@ -108,21 +98,67 @@ private static Supplier buildSocketModeClientFactory( }; } + private static void runBoltApp( + String message, + App app, + SocketModeClient client, + SocketModeRequestParser requestParser, + Function errorHandler, + Gson gson + ) { + long startMillis = System.currentTimeMillis(); + SocketModeRequest req = requestParser.parse(message); + if (req != null) { + try { + Response boltResponse = app.run(req.getBoltRequest()); + if (boltResponse.getStatusCode() != 200) { + log.warn("Unsuccessful Bolt app execution (status: {}, body: {})", + boltResponse.getStatusCode(), boltResponse.getBody()); + return; + } + sendSocketModeResponse(client, gson, req, boltResponse); + } catch (Exception e) { + ErrorContext context = ErrorContext.builder().request(req.getBoltRequest()).exception(e).build(); + Response errorResponse = errorHandler.apply(context); + if (errorResponse != null) { + sendSocketModeResponse(client, gson, req, errorResponse); + } + } finally { + long spentMillis = System.currentTimeMillis() - startMillis; + log.debug("Response time: {} milliseconds", spentMillis); + } + } + } + + private static ExecutorService buildExecutorService(int concurrency) { + return DaemonThreadExecutorServiceFactory.createDaemonThreadPoolExecutor( + "slack-bolt-socket-mode", concurrency); + } + + // ------------------------------------------- + public SocketModeApp(App app) throws IOException { this(System.getenv("SLACK_APP_TOKEN"), app); } + public SocketModeApp(App app, int concurrency) throws IOException { + this(System.getenv("SLACK_APP_TOKEN"), app, concurrency); + } public SocketModeApp(String appToken, App app) throws IOException { this(appToken, DEFAULT_ERROR_HANDLER, app); } + public SocketModeApp(String appToken, App app, int concurrency) throws IOException { + this(appToken, DEFAULT_ERROR_HANDLER, app, buildExecutorService(concurrency)); + } + public SocketModeApp( String appToken, Function errorHandler, App app ) throws IOException { - this(buildSocketModeClientFactory(app, appToken, errorHandler), app); + this(buildSocketModeClientFactory(app, appToken, errorHandler, null), app); } public SocketModeApp( @@ -130,12 +166,33 @@ public SocketModeApp( App app, Function errorHandler ) throws IOException { - this(buildSocketModeClientFactory(app, appToken, errorHandler), app); + this(buildSocketModeClientFactory(app, appToken, errorHandler, null), app); } public SocketModeApp(Supplier clientFactory, App app) { + this(clientFactory, app, null); + } + + + // intentionally private to avoid exposing the ExecutorService initialization + private SocketModeApp( + String appToken, + Function errorHandler, + App app, + ExecutorService executorService + ) throws IOException { + this(buildSocketModeClientFactory(app, appToken, errorHandler, executorService), app, executorService); + } + + // intentionally private to avoid exposing the ExecutorService initialization + private SocketModeApp( + Supplier clientFactory, + App app, + ExecutorService executorService + ) { this.clientFactory = clientFactory; this.app = app; + this.executorService = executorService; } /** @@ -152,10 +209,9 @@ public SocketModeApp(SocketModeClient socketModeClient, App app) { this.client = socketModeClient; this.clientFactory = () -> socketModeClient; this.app = app; + this.executorService = null; } - // ------------------------------------------- - public void start() throws Exception { run(true); } @@ -192,6 +248,16 @@ public void stop() throws Exception { public void close() throws Exception { this.stop(); this.client = null; + if (executorService != null) { + for (Runnable runnable : executorService.shutdownNow()) { + try { + runnable.run(); + } catch (Exception e) { + log.warn("Failed to run the remaining Runnable in SocketModeApp (error: {}, message: {})", + e.getClass().getCanonicalName(), e.getMessage()); + } + } + } } // ------------------------------------------- diff --git a/bolt-jakarta-socket-mode/src/test/java/samples/ConcurrencyTestApp.java b/bolt-jakarta-socket-mode/src/test/java/samples/ConcurrencyTestApp.java new file mode 100644 index 000000000..9da072d34 --- /dev/null +++ b/bolt-jakarta-socket-mode/src/test/java/samples/ConcurrencyTestApp.java @@ -0,0 +1,40 @@ +package samples; + +import com.slack.api.bolt.App; +import com.slack.api.bolt.AppConfig; +import com.slack.api.bolt.jakarta_socket_mode.SocketModeApp; +import com.slack.api.model.event.MessageChangedEvent; +import com.slack.api.model.event.MessageDeletedEvent; +import com.slack.api.model.event.MessageEvent; +import config.Constants; + +public class ConcurrencyTestApp { + + public static void main(String[] args) throws Exception { + App app = new App(AppConfig.builder() + .singleTeamBotToken(System.getenv(Constants.SLACK_SDK_TEST_SOCKET_MODE_BOT_TOKEN)) + .build()); + + app.event(MessageEvent.class, (req, ctx) -> { + // Without concurrency option, this time-consuming task slows the whole message processing mechanism down + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + ctx.asyncClient().reactionsAdd(r -> r + .channel(req.getEvent().getChannel()) + .name("eyes") + .timestamp(req.getEvent().getTs()) + ); + return ctx.ack(); + }); + app.event(MessageChangedEvent.class, (req, ctx) -> ctx.ack()); + app.event(MessageDeletedEvent.class, (req, ctx) -> ctx.ack()); + + String appToken = System.getenv(Constants.SLACK_SDK_TEST_SOCKET_MODE_APP_TOKEN); +// SocketModeApp socketModeApp = new SocketModeApp(appToken, app); + SocketModeApp socketModeApp = new SocketModeApp(appToken, app, 10); + socketModeApp.start(); + } +} diff --git a/bolt-socket-mode/src/main/java/com/slack/api/bolt/socket_mode/SocketModeApp.java b/bolt-socket-mode/src/main/java/com/slack/api/bolt/socket_mode/SocketModeApp.java index 57eb6fa8a..422227d98 100644 --- a/bolt-socket-mode/src/main/java/com/slack/api/bolt/socket_mode/SocketModeApp.java +++ b/bolt-socket-mode/src/main/java/com/slack/api/bolt/socket_mode/SocketModeApp.java @@ -10,6 +10,7 @@ import com.slack.api.socket_mode.SocketModeClient; import com.slack.api.socket_mode.response.AckResponse; import com.slack.api.util.json.GsonFactory; +import com.slack.api.util.thread.DaemonThreadExecutorServiceFactory; import lombok.Builder; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -17,6 +18,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.function.Function; import java.util.function.Supplier; @@ -26,6 +28,7 @@ public class SocketModeApp { private final App app; private final Supplier clientFactory; private SocketModeClient client; + private final ExecutorService executorService; private static final Function DEFAULT_ERROR_HANDLER = (context) -> { Exception e = context.getException(); @@ -69,7 +72,8 @@ private static Supplier buildSocketModeClientFactory( App app, String appToken, SocketModeClient.Backend backend, - Function errorHandler + Function errorHandler, + ExecutorService executorService ) { return () -> { try { @@ -77,27 +81,13 @@ private static Supplier buildSocketModeClientFactory( final SocketModeRequestParser requestParser = new SocketModeRequestParser(app.config()); final Gson gson = GsonFactory.createSnakeCase(app.slack().getConfig()); client.addWebSocketMessageListener(message -> { - long startMillis = System.currentTimeMillis(); - SocketModeRequest req = requestParser.parse(message); - if (req != null) { - try { - Response boltResponse = app.run(req.getBoltRequest()); - if (boltResponse.getStatusCode() != 200) { - log.warn("Unsuccessful Bolt app execution (status: {}, body: {})", - boltResponse.getStatusCode(), boltResponse.getBody()); - return; - } - sendSocketModeResponse(client, gson, req, boltResponse); - } catch (Exception e) { - ErrorContext context = ErrorContext.builder().request(req.getBoltRequest()).exception(e).build(); - Response errorResponse = errorHandler.apply(context); - if (errorResponse != null) { - sendSocketModeResponse(client, gson, req, errorResponse); - } - } finally { - long spentMillis = System.currentTimeMillis() - startMillis; - log.debug("Response time: {} milliseconds", spentMillis); - } + if (executorService != null) { + // asynchronous + executorService.execute(() -> runBoltApp( + message, app, client, requestParser, errorHandler, gson)); + } else { + // synchronous + runBoltApp(message, app, client, requestParser, errorHandler, gson); } }); return client; @@ -108,14 +98,61 @@ private static Supplier buildSocketModeClientFactory( }; } + private static void runBoltApp( + String message, + App app, + SocketModeClient client, + SocketModeRequestParser requestParser, + Function errorHandler, + Gson gson + ) { + long startMillis = System.currentTimeMillis(); + SocketModeRequest req = requestParser.parse(message); + if (req != null) { + try { + Response boltResponse = app.run(req.getBoltRequest()); + if (boltResponse.getStatusCode() != 200) { + log.warn("Unsuccessful Bolt app execution (status: {}, body: {})", + boltResponse.getStatusCode(), boltResponse.getBody()); + return; + } + sendSocketModeResponse(client, gson, req, boltResponse); + } catch (Exception e) { + ErrorContext context = ErrorContext.builder().request(req.getBoltRequest()).exception(e).build(); + Response errorResponse = errorHandler.apply(context); + if (errorResponse != null) { + sendSocketModeResponse(client, gson, req, errorResponse); + } + } finally { + long spentMillis = System.currentTimeMillis() - startMillis; + log.debug("Response time: {} milliseconds", spentMillis); + } + } + } + + private static ExecutorService buildExecutorService(int concurrency) { + return DaemonThreadExecutorServiceFactory.createDaemonThreadPoolExecutor( + "slack-bolt-socket-mode", concurrency); + } + + // ------------------------------------------- + public SocketModeApp(App app) throws IOException { this(System.getenv("SLACK_APP_TOKEN"), SocketModeClient.Backend.Tyrus, app); } + public SocketModeApp(App app, int concurrency) throws IOException { + this(System.getenv("SLACK_APP_TOKEN"), app, SocketModeClient.Backend.Tyrus, concurrency); + } + public SocketModeApp(String appToken, App app) throws IOException { this(appToken, SocketModeClient.Backend.Tyrus, app); } + public SocketModeApp(String appToken, App app, int concurrency) throws IOException { + this(appToken, app, SocketModeClient.Backend.Tyrus, concurrency); + } + public SocketModeApp( String appToken, SocketModeClient.Backend backend, @@ -132,13 +169,32 @@ public SocketModeApp( this(appToken, backend, DEFAULT_ERROR_HANDLER, app); } + public SocketModeApp( + String appToken, + App app, + SocketModeClient.Backend backend, + int concurrency + ) throws IOException { + this(appToken, backend, DEFAULT_ERROR_HANDLER, app, buildExecutorService(concurrency)); + } + public SocketModeApp( String appToken, SocketModeClient.Backend backend, Function errorHandler, App app ) throws IOException { - this(buildSocketModeClientFactory(app, appToken, backend, errorHandler), app); + this(appToken, backend, errorHandler, app, null); + } + + public SocketModeApp( + String appToken, + SocketModeClient.Backend backend, + Function errorHandler, + App app, + int concurrency + ) throws IOException { + this(appToken, backend, errorHandler, app, buildExecutorService(concurrency)); } public SocketModeApp( @@ -147,12 +203,43 @@ public SocketModeApp( SocketModeClient.Backend backend, Function errorHandler ) throws IOException { - this(buildSocketModeClientFactory(app, appToken, backend, errorHandler), app); + this(appToken, backend, errorHandler, app, null); + } + + public SocketModeApp( + String appToken, + App app, + SocketModeClient.Backend backend, + Function errorHandler, + int concurrency + ) throws IOException { + this(appToken, backend, errorHandler, app, buildExecutorService(concurrency)); } public SocketModeApp(Supplier clientFactory, App app) { + this(clientFactory, app, null); + } + + // intentionally private to avoid exposing the ExecutorService initialization + private SocketModeApp( + String appToken, + SocketModeClient.Backend backend, + Function errorHandler, + App app, + ExecutorService executorService + ) throws IOException { + this(buildSocketModeClientFactory(app, appToken, backend, errorHandler, executorService), app, executorService); + } + + // intentionally private to avoid exposing the ExecutorService initialization + private SocketModeApp( + Supplier clientFactory, + App app, + ExecutorService executorService + ) { this.clientFactory = clientFactory; this.app = app; + this.executorService = executorService; } /** @@ -169,10 +256,9 @@ public SocketModeApp(SocketModeClient socketModeClient, App app) { this.client = socketModeClient; this.clientFactory = () -> socketModeClient; this.app = app; + this.executorService = null; } - // ------------------------------------------- - public void start() throws Exception { run(true); } @@ -209,6 +295,16 @@ public void stop() throws Exception { public void close() throws Exception { this.stop(); this.client = null; + if (executorService != null) { + for (Runnable runnable : executorService.shutdownNow()) { + try { + runnable.run(); + } catch (Exception e) { + log.warn("Failed to run the remaining Runnable in SocketModeApp (error: {}, message: {})", + e.getClass().getCanonicalName(), e.getMessage()); + } + } + } } // ------------------------------------------- diff --git a/bolt-socket-mode/src/test/java/samples/ConcurrencyTestApp.java b/bolt-socket-mode/src/test/java/samples/ConcurrencyTestApp.java new file mode 100644 index 000000000..dec65b88f --- /dev/null +++ b/bolt-socket-mode/src/test/java/samples/ConcurrencyTestApp.java @@ -0,0 +1,40 @@ +package samples; + +import com.slack.api.bolt.App; +import com.slack.api.bolt.AppConfig; +import com.slack.api.bolt.socket_mode.SocketModeApp; +import com.slack.api.model.event.MessageChangedEvent; +import com.slack.api.model.event.MessageDeletedEvent; +import com.slack.api.model.event.MessageEvent; +import config.Constants; + +public class ConcurrencyTestApp { + + public static void main(String[] args) throws Exception { + App app = new App(AppConfig.builder() + .singleTeamBotToken(System.getenv(Constants.SLACK_SDK_TEST_SOCKET_MODE_BOT_TOKEN)) + .build()); + + app.event(MessageEvent.class, (req, ctx) -> { + // Without concurrency option, this time-consuming task slows the whole message processing mechanism down + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + ctx.asyncClient().reactionsAdd(r -> r + .channel(req.getEvent().getChannel()) + .name("eyes") + .timestamp(req.getEvent().getTs()) + ); + return ctx.ack(); + }); + app.event(MessageChangedEvent.class, (req, ctx) -> ctx.ack()); + app.event(MessageDeletedEvent.class, (req, ctx) -> ctx.ack()); + + String appToken = System.getenv(Constants.SLACK_SDK_TEST_SOCKET_MODE_APP_TOKEN); +// SocketModeApp socketModeApp = new SocketModeApp(appToken, app); + SocketModeApp socketModeApp = new SocketModeApp(appToken, app, 10); + socketModeApp.start(); + } +}