From cff25f24a6e95170545995dbdb51295c91371d4d Mon Sep 17 00:00:00 2001 From: Jonathan Gamba Date: Tue, 3 Dec 2024 18:05:10 -0600 Subject: [PATCH] #30367 Simplified SSE handling --- .../rest/api/v1/job/SSEConnectionManager.java | 256 ------------------ .../rest/api/v1/job/SSEMonitorUtil.java | 98 +++---- 2 files changed, 44 insertions(+), 310 deletions(-) delete mode 100644 dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java deleted file mode 100644 index 457d498fa518..000000000000 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java +++ /dev/null @@ -1,256 +0,0 @@ -package com.dotcms.rest.api.v1.job; - -import com.dotmarketing.util.Config; -import com.dotmarketing.util.Logger; -import io.vavr.Lazy; -import java.io.IOException; -import java.time.LocalDateTime; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import javax.annotation.PreDestroy; -import javax.enterprise.context.ApplicationScoped; -import org.glassfish.jersey.media.sse.EventOutput; - -/** - * Manages Server-Sent Events (SSE) connections for job monitoring. This class provides - * functionality for tracking, limiting, and cleaning up SSE connections across multiple jobs. - * - *

Key features include: - *

- * - *

Configuration properties: - *

- * - *

Usage example: - *

{@code
- * SSEConnectionManager manager = new SSEConnectionManager();
- *
- * // Check if new connection can be accepted
- * if (manager.canAcceptNewConnection(jobId)) {
- *     // Add new connection
- *     manager.addConnection(jobId, eventOutput);
- * }
- *
- * // Close connections when job completes
- * manager.closeJobConnections(jobId);
- * }
- */ -@ApplicationScoped -public class SSEConnectionManager { - - // Add status tracking - private volatile boolean isShutdown = false; - - // Add per-job connection limit, default to 5 and -1 to disable - private static final Lazy MAX_SSE_CONNECTIONS_PER_JOB = - Lazy.of(() -> Config.getIntProperty("MAX_SSE_CONNECTIONS_PER_JOB", 5)); - - // Add total connection limit, default to 50 and -1 to disable - private static final Lazy MAX_SSE_TOTAL_CONNECTIONS = - Lazy.of(() -> Config.getIntProperty("MAX_SSE_TOTAL_CONNECTIONS", 50)); - - private final ConcurrentMap> jobConnections = - new ConcurrentHashMap<>(); - - /** - * Shuts down the SSE connection manager and cleans up all resources. This method closes all - * active connections. After shutdown, no new connections can be added. - */ - @PreDestroy - public void shutdown() { - isShutdown = true; - closeAllConnections(); - } - - /** - * Checks if a new SSE connection can be accepted for the given job. This method verifies both - * per-job and system-wide connection limits if enabled (not -1). - * - * @param jobId The ID of the job for which to check connection availability - * @return true if a new connection can be accepted, false otherwise - */ - public boolean canAcceptNewConnection(final String jobId) { - - final var maxSseTotalConnections = MAX_SSE_TOTAL_CONNECTIONS.get(); - final var maxSseConnectionsPerJob = MAX_SSE_CONNECTIONS_PER_JOB.get(); - - // Check total connections limit if enabled (not -1) - if (maxSseTotalConnections != -1 && getTotalConnections() >= maxSseTotalConnections) { - return false; - } - - // If per-job limit is disabled (-1), allow connection - if (maxSseConnectionsPerJob == -1) { - return true; - } - - // Check per-job limit - Set connections = jobConnections.get(jobId); - return connections == null || connections.size() < maxSseConnectionsPerJob; - } - - /** - * Adds a new SSE connection for a job. The connection will be automatically closed after the - * configured timeout period if timeout is enabled (not -1). - * - * @param jobId The ID of the job to monitor - * @param eventOutput The EventOutput instance representing the SSE connection - * @return The created SSEConnection instance - * @throws IllegalStateException if the manager is shut down - */ - public SSEConnection addConnection(final String jobId, final EventOutput eventOutput) { - - if (isShutdown) { - throw new IllegalStateException("SSEConnectionManager is shut down"); - } - - SSEConnection connection = new SSEConnection(jobId, eventOutput); - jobConnections.computeIfAbsent(jobId, k -> ConcurrentHashMap.newKeySet()).add(connection); - - return connection; - } - - /** - * Closes a specific SSE connection for a job. If this was the last connection for the job, the - * job entry is removed from tracking. - * - * @param connection The connection to remove - */ - public void closeConnection(final SSEConnection connection) { - - if (connection != null) { - Set connections = jobConnections.get(connection.jobId); - if (connections != null) { - connections.remove(connection); - connection.close(); - - // If this was the last connection for the job, clean up the job entry - if (connections.isEmpty()) { - jobConnections.remove(connection.jobId); - } - } - } - } - - /** - * Gets the total number of active SSE connections across all jobs. - * - * @return The total number of active connections - */ - private int getTotalConnections() { - return jobConnections.values().stream() - .mapToInt(Set::size) - .sum(); - } - - /** - * Closes all active SSE connections and clears connection tracking. - */ - private void closeAllConnections() { - jobConnections.values().forEach(connections -> - connections.forEach(SSEConnection::close) - ); - jobConnections.clear(); - } - - /** - * Closes all SSE connections for a specific job. - * - * @param jobId The ID of the job whose connections should be closed - */ - public void closeAllJobConnections(final String jobId) { - Set connections = jobConnections.remove(jobId); - if (connections != null) { - connections.forEach(SSEConnection::close); - } - } - - /** - * Gets the number of active connections for a specific job. - * - * @param jobId The ID of the job - * @return The number of active connections for the job - */ - public int getConnectionCount(final String jobId) { - Set connections = jobConnections.get(jobId); - return connections != null ? connections.size() : 0; - } - - /** - * Gets information about the current state of SSE connections. - * - * @return A map containing connection statistics: - * - totalConnections: Total number of active connections - * - activeJobs: Number of jobs with active connections - */ - public Map getConnectionInfo() { - return Map.of( - "totalConnections", getTotalConnections(), - "activeJobs", jobConnections.size() - ); - } - - /** - * Represents a single SSE connection for a job. Each connection tracks its creation time and - * handles its own cleanup. - */ - public static class SSEConnection { - - private final String jobId; - private final EventOutput eventOutput; - private final LocalDateTime createdAt; - - /** - * Creates a new SSE connection. - * - * @param jobId The ID of the job this connection is monitoring - * @param eventOutput The EventOutput instance representing the SSE connection - */ - public SSEConnection(String jobId, EventOutput eventOutput) { - this.jobId = jobId; - this.eventOutput = eventOutput; - this.createdAt = LocalDateTime.now(); - } - - /** - * Closes this SSE connection. - */ - public void close() { - try { - eventOutput.close(); - } catch (IOException e) { - Logger.error(SSEConnection.class, "Error closing SSE connection", e); - } - } - - /** - * Gets the ID of the job this connection is monitoring. - * - * @return The job ID - */ - public String getJobId() { - return jobId; - } - - /** - * Gets the EventOutput instance representing the SSE connection. - * - * @return The EventOutput instance - */ - public EventOutput getEventOutput() { - return eventOutput; - } - } - -} diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEMonitorUtil.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEMonitorUtil.java index c9c3d6ec3c26..a213355d0e3f 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEMonitorUtil.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEMonitorUtil.java @@ -2,11 +2,9 @@ import static javax.ws.rs.core.Response.Status.BAD_REQUEST; import static javax.ws.rs.core.Response.Status.NOT_FOUND; -import static javax.ws.rs.core.Response.Status.TOO_MANY_REQUESTS; import com.dotcms.jobs.business.api.events.JobWatcher; import com.dotcms.jobs.business.job.Job; -import com.dotcms.rest.api.v1.job.SSEConnectionManager.SSEConnection; import com.dotmarketing.exception.DotRuntimeException; import com.dotmarketing.util.Logger; import java.io.IOException; @@ -40,29 +38,21 @@ * EventOutput eventOutput = sseMonitorUtil.monitorJob(jobId); * } * - *

This class is thread-safe and can handle multiple concurrent monitoring sessions. - * It automatically manages resource cleanup through the {@link SSEConnectionManager} and - * ensures proper handling of connection lifecycles. - * - * @see SSEConnectionManager * @see JobQueueHelper */ @ApplicationScoped public class SSEMonitorUtil { private final JobQueueHelper helper; - private final SSEConnectionManager sseConnectionManager; public SSEMonitorUtil() { // Default constructor required for CDI this.helper = null; - this.sseConnectionManager = null; } @Inject - public SSEMonitorUtil(JobQueueHelper helper, SSEConnectionManager sseConnectionManager) { + public SSEMonitorUtil(JobQueueHelper helper) { this.helper = helper; - this.sseConnectionManager = sseConnectionManager; } /** @@ -74,26 +64,19 @@ public SSEMonitorUtil(JobQueueHelper helper, SSEConnectionManager sseConnectionM @SuppressWarnings("java:S1854") // jobWatcher assignment is needed for cleanup in catch blocks public EventOutput monitorJob(final String jobId) { - JobWatcher jobWatcher = null; - final EventOutput eventOutput = new EventOutput(); - final var connection = sseConnectionManager.addConnection(jobId, eventOutput); + final var eventOutput = new EventOutput(); + final var resources = new MonitorResources(jobId, eventOutput, helper); - try (final var resources = - new MonitorResources(jobId, connection, helper, sseConnectionManager)) { + try { Job job = helper.getJobForSSE(jobId); if (job == null) { - sendError(SSEError.JOB_NOT_FOUND, connection); + sendErrorAndClose(SSEError.JOB_NOT_FOUND, resources); return eventOutput; } if (helper.isNotWatchable(job)) { - sendError(SSEError.JOB_NOT_WATCHABLE, connection); - return eventOutput; - } - - if (!sseConnectionManager.canAcceptNewConnection(jobId)) { - sendError(SSEError.TOO_MANY_CONNECTIONS, connection); + sendErrorAndClose(SSEError.JOB_NOT_WATCHABLE, resources); return eventOutput; } @@ -108,15 +91,19 @@ public EventOutput monitorJob(final String jobId) { .build(); eventOutput.write(event); - // If job is in a completed state, close all connections as no further + // If job is in a completed state, close the connection as no further // updates will be available if (helper.isTerminalState(watched.state())) { - sseConnectionManager.closeAllJobConnections(jobId); + resources.close(); } } catch (IOException e) { final var errorMessage = "Error writing SSE event"; Logger.error(this, errorMessage, e); + + // Make sure to close the connection + resources.close(); + // Re-throw the IOException to be caught by the outer catch block in the // RealTimeJobMonitor that will clean up the job watcher throw new DotRuntimeException(errorMessage, e); @@ -125,13 +112,17 @@ public EventOutput monitorJob(final String jobId) { }; // Start watching the job - jobWatcher = helper.watchJob(job.id(), jobWatcherConsumer); + final var jobWatcher = helper.watchJob(job.id(), jobWatcherConsumer); resources.jobWatcher(jobWatcher); return eventOutput; } catch (Exception e) { final var errorMessage = "Error setting up job monitor"; Logger.error(this, errorMessage, e); + + // Make sure to close the connection and remove the job watcher + resources.close(); + throw new DotRuntimeException(errorMessage, e); } } @@ -140,17 +131,18 @@ public EventOutput monitorJob(final String jobId) { * Send an error event and close the connection * * @param error The error to send - * @param connection The SSE connection to close + * @param resources The current monitoring resources * @throws IOException If there is an error writing the event */ - private void sendError(final SSEError error, final SSEConnection connection) + private void sendErrorAndClose(final SSEError error, MonitorResources resources) throws IOException { OutboundEvent event = new OutboundEvent.Builder() .mediaType(MediaType.TEXT_HTML_TYPE) .name(error.getName()) .data(String.class, String.valueOf(error.getCode())) .build(); - connection.getEventOutput().write(event); + resources.eventOutput().write(event); + resources.close(); } /** @@ -161,8 +153,7 @@ private void sendError(final SSEError error, final SSEConnection connection) private enum SSEError { JOB_NOT_FOUND("job-not-found", NOT_FOUND.getStatusCode()), - JOB_NOT_WATCHABLE("job-not-watchable", BAD_REQUEST.getStatusCode()), - TOO_MANY_CONNECTIONS("too-many-connections", TOO_MANY_REQUESTS.getStatusCode()); + JOB_NOT_WATCHABLE("job-not-watchable", BAD_REQUEST.getStatusCode()); private final String name; private final int code; @@ -182,39 +173,26 @@ public int getCode() { } /** - * A resource management class that handles cleanup of SSE monitoring resources. This class - * implements AutoCloseable to ensure proper cleanup of both SSE connections and job watchers - * through try-with-resources blocks. - * - *

This class manages: - *

    - *
  • SSE connection lifecycle
  • - *
  • Job watcher registration and cleanup
  • - *
  • Automatic resource cleanup when monitoring ends or errors occur
  • - *
+ * A resource management class that handles cleanup of SSE monitoring resources. */ - private static class MonitorResources implements AutoCloseable { + private static class MonitorResources { - private final SSEConnection connection; + private final EventOutput eventOutput; private JobWatcher jobWatcher; private final String jobId; private final JobQueueHelper helper; - private final SSEConnectionManager sseConnectionManager; /** * Creates a new MonitorResources instance to manage SSE monitoring resources. * * @param jobId The ID of the job being monitored - * @param connection The SSE connection to manage + * @param eventOutput The SSE connection for job updates * @param helper Helper for job queue operations - * @param sseConnectionManager Manager for SSE connections */ - MonitorResources(String jobId, SSEConnection connection, JobQueueHelper helper, - SSEConnectionManager sseConnectionManager) { + MonitorResources(String jobId, EventOutput eventOutput, JobQueueHelper helper) { this.jobId = jobId; - this.connection = connection; + this.eventOutput = eventOutput; this.helper = helper; - this.sseConnectionManager = sseConnectionManager; } /** @@ -226,14 +204,26 @@ void jobWatcher(JobWatcher watcher) { this.jobWatcher = watcher; } + /** + * Gets the SSE connection for this monitoring session. + * + * @return The SSE connection + */ + EventOutput eventOutput() { + return eventOutput; + } + /** * Closes and cleans up all monitoring resources. This includes closing the SSE connection * and removing the job watcher if one exists. */ - @Override - public void close() { - if (connection != null) { - sseConnectionManager.closeConnection(connection); + void close() { + if (eventOutput != null) { + try { + eventOutput.close(); + } catch (IOException e) { + Logger.error(MonitorResources.class, "Error closing event output", e); + } } if (jobWatcher != null) { helper.removeWatcher(jobId, jobWatcher);