diff --git a/src/main/java/com/saucelabs/ci/sauceconnect/AbstractSauceTunnelManager.java b/src/main/java/com/saucelabs/ci/sauceconnect/AbstractSauceTunnelManager.java index 02d332f..b64884d 100644 --- a/src/main/java/com/saucelabs/ci/sauceconnect/AbstractSauceTunnelManager.java +++ b/src/main/java/com/saucelabs/ci/sauceconnect/AbstractSauceTunnelManager.java @@ -11,6 +11,10 @@ import java.io.InputStreamReader; import java.io.PrintStream; import java.net.ServerSocket; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -25,6 +29,7 @@ import org.json.JSONException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.json.JSONObject; /** * Provides common logic for the invocation of Sauce Connect v3 and v4 processes. The class @@ -49,6 +54,7 @@ public abstract class AbstractSauceTunnelManager implements SauceTunnelManager { private SauceREST sauceRest; private SauceConnectEndpoint scEndpoint; + private SCMonitor scMonitor; private AtomicInteger launchAttempts = new AtomicInteger(0); @@ -114,6 +120,10 @@ public void setSauceRest(SauceREST sauceRest) { this.scEndpoint = sauceRest.getSauceConnectEndpoint(); } + public void setSCMonitor(SCMonitor scMonitor) { + this.scMonitor = scMonitor; + } + /** * Closes the Sauce Connect process * @@ -527,15 +537,20 @@ public Process openConnection( try { Semaphore semaphore = new Semaphore(1); semaphore.acquire(); - StreamGobbler errorGobbler = makeErrorGobbler(printStream, process.getErrorStream()); - errorGobbler.start(); - SystemOutGobbler outputGobbler = - makeOutputGobbler(printStream, process.getInputStream(), semaphore); - outputGobbler.start(); + + SCMonitor scMonitor; + if ( this.scMonitor != null ) { + scMonitor = this.scMonitor; + } else { + scMonitor = new SCMonitor("SCMonitor", port, LOGGER); + } + + scMonitor.setSemaphore(semaphore); + scMonitor.start(); boolean sauceConnectStarted = semaphore.tryAcquire(3, TimeUnit.MINUTES); if (sauceConnectStarted) { - if (outputGobbler.isFailed()) { + if (scMonitor.isFailed()) { String message = "Error launching Sauce Connect"; logMessage(printStream, message); // ensure that Sauce Connect process is closed @@ -543,7 +558,7 @@ public Process openConnection( throw new SauceConnectDidNotStartException(message); } else { // everything okay, continue the build - String provisionedTunnelId = outputGobbler.getTunnelId(); + String provisionedTunnelId = scMonitor.getTunnelId(); if (provisionedTunnelId != null) { tunnelInformation.setTunnelId(provisionedTunnelId); waitForReadiness(provisionedTunnelId); @@ -611,16 +626,6 @@ private void waitForReadiness(String tunnelId) { } } - public SystemErrorGobbler makeErrorGobbler(PrintStream printStream, InputStream errorStream) { - return new SystemErrorGobbler("ErrorGobbler", errorStream, printStream); - } - - public SystemOutGobbler makeOutputGobbler( - PrintStream printStream, InputStream inputStream, Semaphore semaphore) { - return new SystemOutGobbler( - "OutputGobbler", inputStream, semaphore, printStream, getSauceStartedMessage()); - } - private TunnelInformation getTunnelInformation(String name) { if (name == null) { return null; @@ -682,11 +687,6 @@ public String getSauceConnectWorkingDirectory() { public abstract File getSauceConnectLogFile(String options); - /** - * @return Text which indicates that Sauce Connect has started - */ - protected abstract String getSauceStartedMessage(); - /** Base exception class which is thrown if an error occurs launching Sauce Connect. */ public static class SauceConnectException extends IOException { @@ -710,49 +710,6 @@ public SauceConnectDidNotStartException(String message) { } } - /** Handles receiving and processing the output of an external process. */ - protected abstract class StreamGobbler extends Thread { - private final PrintStream printStream; - private final InputStream is; - - public StreamGobbler(String name, InputStream is, PrintStream printStream) { - super(name); - this.is = is; - this.printStream = printStream; - } - - /** Opens a BufferedReader over the input stream, reads and processes each line. */ - public void run() { - try { - InputStreamReader isr = new InputStreamReader(is); - BufferedReader br = new BufferedReader(isr); - String line; - while ((line = br.readLine()) != null) { - processLine(line); - } - } catch (IOException ioe) { - // ignore stream closed errors - if (!(ioe.getMessage().equalsIgnoreCase("stream closed"))) { - ioe.printStackTrace(); - } - } - } - - /** - * Processes a line of output received by the stream gobbler. - * - * @param line line to process - */ - protected void processLine(String line) { - if (!quietMode) { - if (printStream != null) { - printStream.println(line); - } - LOGGER.info(line); - } - } - } - private int findFreePort() throws SauceConnectException { try (ServerSocket socket = new ServerSocket(0)) { return socket.getLocalPort(); @@ -761,65 +718,102 @@ private int findFreePort() throws SauceConnectException { } } - /** Handles processing Sauce Connect output sent to stdout. */ - public class SystemOutGobbler extends StreamGobbler { - - private final Semaphore semaphore; - private final String startedMessage; + /** Monitors SC Process via HTTP API */ + public class SCMonitor extends Thread { + private Semaphore semaphore; + private final int port; + private final Logger LOGGER; private String tunnelId; private boolean failed; + private boolean apiResponse; + + private HttpClient client = HttpClient.newHttpClient(); + private static final int sleepTime = 1000; - public SystemOutGobbler( + public SCMonitor( String name, - InputStream is, - final Semaphore semaphore, - PrintStream printStream, - String startedMessage) { - super(name, is, printStream); - this.semaphore = semaphore; - this.startedMessage = startedMessage; + final int port, + final Logger logger) { + super(name); + this.port = port; + this.LOGGER = logger; } - /** - * {@inheritDoc} - * - *

If the line contains the Sauce Connect started message, then release the semaphone, which - * will allow the build to resume. - * - * @param line Line being processed - */ - @Override - protected void processLine(String line) { - super.processLine(line); - - System.out.println(line); - if (StringUtils.containsIgnoreCase(line, "sauce connect running id=")) { - tunnelId = StringUtils.substringAfter(line, "sauce connect running id=").trim(); - } - if (StringUtils.containsIgnoreCase(line, "fatal error exiting")) { - failed = true; - } - if (StringUtils.containsIgnoreCase(line, startedMessage) || failed) { - // unlock processMonitor - semaphore.release(); - } + public void setSemaphore(Semaphore semaphore) { + this.semaphore = semaphore; } public String getTunnelId() { - return tunnelId; + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(String.format("http://localhost:%d/info", port))) + .GET() + .build(); + try { + HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); + String responseBody = response.body(); + JSONObject jsonObject = new JSONObject(responseBody); + if (jsonObject.has("tunnel_id")) { + this.LOGGER.info("Got tunnel ID", jsonObject.getString("tunnel_id")); + return jsonObject.getString("tunnel_id"); + } + } catch (Exception e) { + this.LOGGER.info("Failed to get tunnel id", e); + return null; + } + this.LOGGER.info("Failed to get tunnel id"); + return null; } public boolean isFailed() { return failed; } - } - /** Handles processing Sauce Connect output sent to stderr. */ - public class SystemErrorGobbler extends StreamGobbler { + public void run() { + while (true) { + pollEndpoint(); + if (this.semaphore.availablePermits() > 0) { + return; + } + + try { + Thread.sleep(sleepTime); + } catch ( java.lang.InterruptedException e ) { + return; + } + } + } + + private void pollEndpoint() { + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(String.format("http://localhost:%d/status", port))) + .GET() + .build(); + + try { + HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); + + this.LOGGER.info("Got http response", response.statusCode()); - public SystemErrorGobbler(String name, InputStream is, PrintStream printStream) { - super(name, is, printStream); + if (response.statusCode() == 200) { + this.apiResponse = true; + String responseBody = response.body(); + JSONObject jsonObject = new JSONObject(responseBody); + if (jsonObject.has("status") && "connected".equals(jsonObject.getString("status"))) { + this.LOGGER.info("Got connected status"); + semaphore.release(); + } + } + } catch ( Exception e ) { + if ( this.apiResponse ) { + // We've had a successful API endpoint read, but then it stopped responding, which means the process failed to start + this.failed = true; + this.LOGGER.warn("API stopped responding", e); + semaphore.release(); + } + } + + this.LOGGER.info("No API response yet"); } } } diff --git a/src/main/java/com/saucelabs/ci/sauceconnect/SauceConnectManager.java b/src/main/java/com/saucelabs/ci/sauceconnect/SauceConnectManager.java index f28d34f..c00d0e1 100755 --- a/src/main/java/com/saucelabs/ci/sauceconnect/SauceConnectManager.java +++ b/src/main/java/com/saucelabs/ci/sauceconnect/SauceConnectManager.java @@ -131,11 +131,6 @@ public String getDefaultSauceConnectLogDirectory() { private static final String WINDOWS_TEMP_DIR = System.getProperty("java.io.tmpdir"); - /** Output from Sauce Connect process which indicates that it has been started. */ - // TODO Replace with HTTP readiness check - private static final String SAUCE_CONNECT_STARTED = - "Sauce Connect is up, you may start your tests"; - public static final String CURRENT_SC_VERSION = "5.1.3"; public static final LazyInitializer LATEST_SC_VERSION = new Builder, String>() .setInitializer(SauceConnectManager::getLatestSauceConnectVersion) @@ -389,12 +384,6 @@ private File getUnzipDir(File workingDirectory, OperatingSystem operatingSystem) return new File(workingDirectory, operatingSystem.getDirectory(useLatestSauceConnect)); } - // TODO - /** {@inheritDoc} */ - protected String getSauceStartedMessage() { - return SAUCE_CONNECT_STARTED; - } - protected boolean isConnected() { HttpClient client = HttpClient.newHttpClient(); diff --git a/src/test/java/com/saucelabs/ci/sauceconnect/AbstractSauceTunnelManagerTest.java b/src/test/java/com/saucelabs/ci/sauceconnect/AbstractSauceTunnelManagerTest.java index 25d51e9..fc99a26 100644 --- a/src/test/java/com/saucelabs/ci/sauceconnect/AbstractSauceTunnelManagerTest.java +++ b/src/test/java/com/saucelabs/ci/sauceconnect/AbstractSauceTunnelManagerTest.java @@ -40,15 +40,4 @@ void testGetLogfile() { AbstractSauceTunnelManager.getLogfile("-l first --logfile second -c -l third"), "mix of -l and --logfile still returns the last one"); } - - @Test - void testSystemOutGobbler_ProcessLine() { - Semaphore semaphore = new Semaphore(1); - SauceConnectManager man = new SauceConnectManager(true); - AbstractSauceTunnelManager.SystemOutGobbler sot = man.makeOutputGobbler(null, null, semaphore); - sot.processLine("sauce connect running id=tunnelId1"); - assertEquals(sot.getTunnelId(), "tunnelId1"); - sot.processLine("sauce connect running id=tunnelId2 "); - assertEquals(sot.getTunnelId(), "tunnelId2"); - } } diff --git a/src/test/java/com/saucelabs/ci/sauceconnect/SauceConnectManagerTest.java b/src/test/java/com/saucelabs/ci/sauceconnect/SauceConnectManagerTest.java index 32f9c32..bd0f8e5 100755 --- a/src/test/java/com/saucelabs/ci/sauceconnect/SauceConnectManagerTest.java +++ b/src/test/java/com/saucelabs/ci/sauceconnect/SauceConnectManagerTest.java @@ -1,5 +1,6 @@ package com.saucelabs.ci.sauceconnect; +import com.saucelabs.ci.sauceconnect.AbstractSauceTunnelManager.SCMonitor; import com.saucelabs.ci.sauceconnect.SauceConnectManager.OperatingSystem; import com.saucelabs.saucerest.DataCenter; import com.saucelabs.saucerest.SauceREST; @@ -17,7 +18,10 @@ import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.Spy; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.mockito.junit.jupiter.MockitoExtension; +import static org.mockito.Mockito.*; import java.io.ByteArrayInputStream; import java.io.File; @@ -25,11 +29,13 @@ import java.io.InputStream; import java.io.PrintStream; import java.net.http.HttpClient; +import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.net.http.HttpResponse.BodyHandler; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.Locale; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.List; @@ -57,6 +63,7 @@ class SauceConnectManagerTest { @Mock private Process mockProcess; @Mock private SauceREST mockSauceRest; @Mock private SauceConnectEndpoint mockSCEndpoint; + @Mock private HttpClient mockHttpClient; @Spy private final SauceConnectManager tunnelManager = new SauceConnectManager(); private final PrintStream ps = System.out; @@ -77,8 +84,21 @@ void testOpenConnectionSuccessfully(boolean cleanUpOnExit) throws IOException { when(mockSCEndpoint.getTunnelsInformationForAUser()).thenReturn(List.of()); TunnelInformation readyTunnel = new TunnelInformation(); readyTunnel.isReady = true; - when(mockSCEndpoint.getTunnelInformation(STARTED_TUNNEL_ID)).thenReturn(readyTunnel); tunnelManager.setCleanUpOnExit(cleanUpOnExit); + + SCMonitor scMonitor = mock(SCMonitor.class); + + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Semaphore sem = (Semaphore) invocation.getArgument(0); + sem.release(); + return null; + } + }).when(scMonitor).setSemaphore(any(Semaphore.class)); + + tunnelManager.setSCMonitor(scMonitor); + Process process = testOpenConnection(STARTED_SC_LOG); assertEquals(mockProcess, process); } @@ -87,6 +107,21 @@ void testOpenConnectionSuccessfully(boolean cleanUpOnExit) throws IOException { void openConnectionTest_closes() throws IOException, InterruptedException { when(mockSCEndpoint.getTunnelsInformationForAUser()).thenReturn(List.of()); when(mockProcess.waitFor(30, TimeUnit.SECONDS)).thenReturn(true); + + SCMonitor scMonitor = mock(SCMonitor.class); + + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Semaphore sem = (Semaphore) invocation.getArgument(0); + sem.release(); + return null; + } + }).when(scMonitor).setSemaphore(any(Semaphore.class)); + + when(scMonitor.isFailed()).thenReturn(true); + + tunnelManager.setSCMonitor(scMonitor); assertThrows(AbstractSauceTunnelManager.SauceConnectDidNotStartException.class, () -> testOpenConnection( "/started_sc_closes.log")); verify(mockProcess).destroy(); @@ -99,8 +134,20 @@ void testOpenConnectionWithExtraSpacesInArgs() throws IOException { notReadyTunnel.isReady = false; TunnelInformation readyTunnel = new TunnelInformation(); readyTunnel.isReady = true; - when(mockSCEndpoint.getTunnelInformation(STARTED_TUNNEL_ID)).thenReturn(notReadyTunnel, - readyTunnel); + + SCMonitor scMonitor = mock(SCMonitor.class); + + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Semaphore sem = (Semaphore) invocation.getArgument(0); + sem.release(); + return null; + } + }).when(scMonitor).setSemaphore(any(Semaphore.class)); + + tunnelManager.setSCMonitor(scMonitor); + testOpenConnection(STARTED_SC_LOG, " username-with-spaces-around "); } @@ -113,9 +160,6 @@ private Process testOpenConnection(String logFile, String username) throws IOExc final DataCenter dataCenter = DataCenter.US_WEST; try (InputStream resourceAsStream = getResourceAsStream(logFile)) { - when(mockProcess.getErrorStream()) - .thenReturn(new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8))); - when(mockProcess.getInputStream()).thenReturn(resourceAsStream); doReturn(mockProcess).when(tunnelManager).createProcess(any(String[].class), any(File.class)); return tunnelManager.openConnection( username, apiKey, dataCenter, null, " ", ps, false, ""); @@ -145,7 +189,19 @@ void openConnectionTest_existing_tunnel() throws IOException { started.isReady = true; when(mockSCEndpoint.getTunnelsInformationForAUser()).thenReturn(List.of(started)); - when(mockSCEndpoint.getTunnelInformation(STARTED_TUNNEL_ID)).thenReturn(started); + + SCMonitor scMonitor = mock(SCMonitor.class); + + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Semaphore sem = (Semaphore) invocation.getArgument(0); + sem.release(); + return null; + } + }).when(scMonitor).setSemaphore(any(Semaphore.class)); + + tunnelManager.setSCMonitor(scMonitor); Process process = testOpenConnection(STARTED_SC_LOG); assertEquals(mockProcess, process);