Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #1409 Socket Mode: Slow message consumption when listeners do not immediately return ack() #1411

Merged
merged 1 commit into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.slack.api.bolt.App;
import com.slack.api.bolt.request.Request;
import com.slack.api.bolt.response.Response;
import com.slack.api.bolt.jakarta_socket_mode.request.SocketModeRequest;
import com.slack.api.bolt.jakarta_socket_mode.request.SocketModeRequestParser;
import com.slack.api.bolt.request.Request;
import com.slack.api.bolt.response.Response;
import com.slack.api.jakarta_socket_mode.JakartaSocketModeClientFactory;
import com.slack.api.socket_mode.SocketModeClient;
import com.slack.api.socket_mode.response.AckResponse;
import com.slack.api.util.json.GsonFactory;
import com.slack.api.util.thread.DaemonThreadExecutorServiceFactory;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;

Expand All @@ -27,6 +29,7 @@
private final App app;
private final Supplier<SocketModeClient> clientFactory;
private SocketModeClient client;
private final ExecutorService executorService;

private static final Function<ErrorContext, Response> DEFAULT_ERROR_HANDLER = (context) -> {
Exception e = context.getException();
Expand Down Expand Up @@ -69,35 +72,22 @@
private static Supplier<SocketModeClient> buildSocketModeClientFactory(
App app,
String appToken,
Function<ErrorContext, Response> errorHandler
Function<ErrorContext, Response> errorHandler,
ExecutorService executorService
) {
return () -> {
try {
final SocketModeClient client = JakartaSocketModeClientFactory.create(app.slack(), appToken);
final SocketModeRequestParser requestParser = new SocketModeRequestParser(app.config());
final Gson gson = GsonFactory.createSnakeCase(app.slack().getConfig());
client.addWebSocketMessageListener(message -> {
long startMillis = System.currentTimeMillis();
SocketModeRequest req = requestParser.parse(message);
if (req != null) {
try {
Response boltResponse = app.run(req.getBoltRequest());
if (boltResponse.getStatusCode() != 200) {
log.warn("Unsuccessful Bolt app execution (status: {}, body: {})",
boltResponse.getStatusCode(), boltResponse.getBody());
return;
}
sendSocketModeResponse(client, gson, req, boltResponse);
} catch (Exception e) {
ErrorContext context = ErrorContext.builder().request(req.getBoltRequest()).exception(e).build();
Response errorResponse = errorHandler.apply(context);
if (errorResponse != null) {
sendSocketModeResponse(client, gson, req, errorResponse);
}
} finally {
long spentMillis = System.currentTimeMillis() - startMillis;
log.debug("Response time: {} milliseconds", spentMillis);
}
if (executorService != null) {
// asynchronous
executorService.execute(() -> runBoltApp(

Check warning on line 86 in bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java

View check run for this annotation

Codecov / codecov/patch

bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java#L86

Added line #L86 was not covered by tests
message, app, client, requestParser, errorHandler, gson));
} else {
// synchronous
runBoltApp(message, app, client, requestParser, errorHandler, gson);
Comment on lines +84 to +90
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there be any shortcomings from always using an executorService where the default concurrency = 1

Other then maintaining backwards compatible behavior is there a use case where

runBoltApp(message, app, client, requestParser, errorHandler, gson);

is preferred over

ExecutorService executorService = buildExecutorService(1);
executorService.execute(() -> runBoltApp(message, app, client, requestParser, errorHandler, gson));

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! Indeed, this approach is also an option, but for now I'd like to avoid moving the runBoltApp to a different thread, even though it's a slight difference. When we make the concurrent mode the default, simplifying the code as you suggest is worth considering.

}
});
return client;
Expand All @@ -108,34 +98,101 @@
};
}

private static void runBoltApp(
String message,
App app,
SocketModeClient client,
SocketModeRequestParser requestParser,
Function<ErrorContext, Response> errorHandler,
Gson gson
) {
long startMillis = System.currentTimeMillis();
SocketModeRequest req = requestParser.parse(message);
if (req != null) {
try {
Response boltResponse = app.run(req.getBoltRequest());
if (boltResponse.getStatusCode() != 200) {
log.warn("Unsuccessful Bolt app execution (status: {}, body: {})",
boltResponse.getStatusCode(), boltResponse.getBody());
return;

Check warning on line 117 in bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java

View check run for this annotation

Codecov / codecov/patch

bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java#L115-L117

Added lines #L115 - L117 were not covered by tests
}
sendSocketModeResponse(client, gson, req, boltResponse);
} catch (Exception e) {
ErrorContext context = ErrorContext.builder().request(req.getBoltRequest()).exception(e).build();
Response errorResponse = errorHandler.apply(context);

Check warning on line 122 in bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java

View check run for this annotation

Codecov / codecov/patch

bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java#L120-L122

Added lines #L120 - L122 were not covered by tests
if (errorResponse != null) {
sendSocketModeResponse(client, gson, req, errorResponse);

Check warning on line 124 in bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java

View check run for this annotation

Codecov / codecov/patch

bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java#L124

Added line #L124 was not covered by tests
}
} finally {
long spentMillis = System.currentTimeMillis() - startMillis;
log.debug("Response time: {} milliseconds", spentMillis);
}
}
}

private static ExecutorService buildExecutorService(int concurrency) {
return DaemonThreadExecutorServiceFactory.createDaemonThreadPoolExecutor(

Check warning on line 134 in bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java

View check run for this annotation

Codecov / codecov/patch

bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java#L134

Added line #L134 was not covered by tests
"slack-bolt-socket-mode", concurrency);
}

// -------------------------------------------

public SocketModeApp(App app) throws IOException {
this(System.getenv("SLACK_APP_TOKEN"), app);
}

public SocketModeApp(App app, int concurrency) throws IOException {
this(System.getenv("SLACK_APP_TOKEN"), app, concurrency);
}

Check warning on line 146 in bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java

View check run for this annotation

Codecov / codecov/patch

bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java#L145-L146

Added lines #L145 - L146 were not covered by tests

public SocketModeApp(String appToken, App app) throws IOException {
this(appToken, DEFAULT_ERROR_HANDLER, app);
}

public SocketModeApp(String appToken, App app, int concurrency) throws IOException {
this(appToken, DEFAULT_ERROR_HANDLER, app, buildExecutorService(concurrency));
}

Check warning on line 154 in bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java

View check run for this annotation

Codecov / codecov/patch

bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java#L153-L154

Added lines #L153 - L154 were not covered by tests

public SocketModeApp(
String appToken,
Function<ErrorContext, Response> errorHandler,
App app
) throws IOException {
this(buildSocketModeClientFactory(app, appToken, errorHandler), app);
this(buildSocketModeClientFactory(app, appToken, errorHandler, null), app);
}

public SocketModeApp(
String appToken,
App app,
Function<ErrorContext, Response> errorHandler
) throws IOException {
this(buildSocketModeClientFactory(app, appToken, errorHandler), app);
this(buildSocketModeClientFactory(app, appToken, errorHandler, null), app);

Check warning on line 169 in bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java

View check run for this annotation

Codecov / codecov/patch

bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java#L169

Added line #L169 was not covered by tests
}

public SocketModeApp(Supplier<SocketModeClient> clientFactory, App app) {
this(clientFactory, app, null);
}


// intentionally private to avoid exposing the ExecutorService initialization
private SocketModeApp(
String appToken,
Function<ErrorContext, Response> errorHandler,
App app,
ExecutorService executorService
) throws IOException {
this(buildSocketModeClientFactory(app, appToken, errorHandler, executorService), app, executorService);
}

Check warning on line 185 in bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java

View check run for this annotation

Codecov / codecov/patch

bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java#L184-L185

Added lines #L184 - L185 were not covered by tests

// intentionally private to avoid exposing the ExecutorService initialization
private SocketModeApp(
Supplier<SocketModeClient> clientFactory,
App app,
ExecutorService executorService
) {
this.clientFactory = clientFactory;
this.app = app;
this.executorService = executorService;
}

/**
Expand All @@ -152,10 +209,9 @@
this.client = socketModeClient;
this.clientFactory = () -> socketModeClient;
this.app = app;
this.executorService = null;

Check warning on line 212 in bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java

View check run for this annotation

Codecov / codecov/patch

bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java#L212

Added line #L212 was not covered by tests
}

// -------------------------------------------

public void start() throws Exception {
run(true);
}
Expand Down Expand Up @@ -192,6 +248,16 @@
public void close() throws Exception {
this.stop();
this.client = null;
if (executorService != null) {
for (Runnable runnable : executorService.shutdownNow()) {
try {
runnable.run();
} catch (Exception e) {
log.warn("Failed to run the remaining Runnable in SocketModeApp (error: {}, message: {})",
e.getClass().getCanonicalName(), e.getMessage());
}
}

Check warning on line 259 in bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java

View check run for this annotation

Codecov / codecov/patch

bolt-jakarta-socket-mode/src/main/java/com/slack/api/bolt/jakarta_socket_mode/SocketModeApp.java#L254-L259

Added lines #L254 - L259 were not covered by tests
}
}

// -------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package samples;

import com.slack.api.bolt.App;
import com.slack.api.bolt.AppConfig;
import com.slack.api.bolt.jakarta_socket_mode.SocketModeApp;
import com.slack.api.model.event.MessageChangedEvent;
import com.slack.api.model.event.MessageDeletedEvent;
import com.slack.api.model.event.MessageEvent;
import config.Constants;

public class ConcurrencyTestApp {

public static void main(String[] args) throws Exception {
App app = new App(AppConfig.builder()
.singleTeamBotToken(System.getenv(Constants.SLACK_SDK_TEST_SOCKET_MODE_BOT_TOKEN))
.build());

app.event(MessageEvent.class, (req, ctx) -> {
// Without concurrency option, this time-consuming task slows the whole message processing mechanism down
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ctx.asyncClient().reactionsAdd(r -> r
.channel(req.getEvent().getChannel())
.name("eyes")
.timestamp(req.getEvent().getTs())
);
return ctx.ack();
});
app.event(MessageChangedEvent.class, (req, ctx) -> ctx.ack());
app.event(MessageDeletedEvent.class, (req, ctx) -> ctx.ack());

String appToken = System.getenv(Constants.SLACK_SDK_TEST_SOCKET_MODE_APP_TOKEN);
// SocketModeApp socketModeApp = new SocketModeApp(appToken, app);
SocketModeApp socketModeApp = new SocketModeApp(appToken, app, 10);
socketModeApp.start();
}
}
Loading
Loading