From a33083a90bcde55733833d28e3b8107b64495dfd Mon Sep 17 00:00:00 2001 From: kasiaMarek Date: Mon, 2 Dec 2024 10:49:49 +0100 Subject: [PATCH 1/3] improvement: queue or cancel previous connect request --- .../scala/meta/internal/bsp/BuildChange.scala | 1 + .../meta/internal/builds/BloopInstall.scala | 44 +-- .../internal/metals/ConnectionProvider.scala | 318 +++++++++++++----- .../meta/internal/metals/Interruptable.scala | 56 +++ .../scala/meta/internal/metals/Messages.scala | 5 +- .../metals/scalacli/ScalaCliServers.scala | 13 +- .../scala/tests/sbt/SbtBloopLspSuite.scala | 43 ++- 7 files changed, 351 insertions(+), 129 deletions(-) create mode 100644 metals/src/main/scala/scala/meta/internal/metals/Interruptable.scala diff --git a/metals/src/main/scala/scala/meta/internal/bsp/BuildChange.scala b/metals/src/main/scala/scala/meta/internal/bsp/BuildChange.scala index 79c5cf949a6..72cde8f50b7 100644 --- a/metals/src/main/scala/scala/meta/internal/bsp/BuildChange.scala +++ b/metals/src/main/scala/scala/meta/internal/bsp/BuildChange.scala @@ -12,4 +12,5 @@ object BuildChange { case object Failed extends BuildChange case object Reconnected extends BuildChange case object Reloaded extends BuildChange + case object Cancelled extends BuildChange } diff --git a/metals/src/main/scala/scala/meta/internal/builds/BloopInstall.scala b/metals/src/main/scala/scala/meta/internal/builds/BloopInstall.scala index 44fed0a0699..9b4ba9bea41 100644 --- a/metals/src/main/scala/scala/meta/internal/builds/BloopInstall.scala +++ b/metals/src/main/scala/scala/meta/internal/builds/BloopInstall.scala @@ -1,7 +1,6 @@ package scala.meta.internal.builds import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicBoolean import scala.concurrent.ExecutionContext import scala.concurrent.Future @@ -37,33 +36,23 @@ final class BloopInstall( override def toString: String = s"BloopInstall($workspace)" def runUnconditionally( - buildTool: BloopInstallProvider, - isImportInProcess: AtomicBoolean, + buildTool: BloopInstallProvider ): Future[WorkspaceLoadedStatus] = { - if (isImportInProcess.compareAndSet(false, true)) { - buildTool.bloopInstall( - workspace, - args => { - scribe.info(s"running '${args.mkString(" ")}'") - val process = - runArgumentsUnconditionally(buildTool, args, userConfig().javaHome) - process.foreach { e => - if (e.isFailed) { - // Record the exact command that failed to help troubleshooting. - scribe.error(s"$buildTool command failed: ${args.mkString(" ")}") - } + buildTool.bloopInstall( + workspace, + args => { + scribe.info(s"running '${args.mkString(" ")}'") + val process = + runArgumentsUnconditionally(buildTool, args, userConfig().javaHome) + process.foreach { e => + if (e.isFailed) { + // Record the exact command that failed to help troubleshooting. + scribe.error(s"$buildTool command failed: ${args.mkString(" ")}") } - process.onComplete(_ => isImportInProcess.set(false)) - process - }, - ) - } else { - Future - .successful { - languageClient.showMessage(ImportAlreadyRunning) - WorkspaceLoadedStatus.Dismissed } - } + process + }, + ) } private def runArgumentsUnconditionally( @@ -123,7 +112,6 @@ final class BloopInstall( def runIfApproved( buildTool: BloopInstallProvider, digest: String, - isImportInProcess: AtomicBoolean, ): Future[WorkspaceLoadedStatus] = synchronized { oldInstallResult(digest) match { @@ -133,7 +121,7 @@ final class BloopInstall( Future.successful(result) case _ => if (userConfig().shouldAutoImportNewProject) { - runUnconditionally(buildTool, isImportInProcess) + runUnconditionally(buildTool) } else { scribe.debug("Awaiting user response...") for { @@ -145,7 +133,7 @@ final class BloopInstall( ) installResult <- { if (userResponse.isYes) { - runUnconditionally(buildTool, isImportInProcess) + runUnconditionally(buildTool) } else { // Don't spam the user with requests during rapid build changes. notification.dismiss(2, TimeUnit.MINUTES) diff --git a/metals/src/main/scala/scala/meta/internal/metals/ConnectionProvider.scala b/metals/src/main/scala/scala/meta/internal/metals/ConnectionProvider.scala index 9bce8c88c4e..89ec080a68f 100644 --- a/metals/src/main/scala/scala/meta/internal/metals/ConnectionProvider.scala +++ b/metals/src/main/scala/scala/meta/internal/metals/ConnectionProvider.scala @@ -1,12 +1,14 @@ package scala.meta.internal.metals import java.nio.charset.Charset +import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import scala.concurrent.ExecutionContextExecutorService import scala.concurrent.Future import scala.concurrent.Promise +import scala.util.Failure import scala.util.control.NonFatal import scala.meta.internal.bsp @@ -25,6 +27,7 @@ import scala.meta.internal.builds.Digest.Status import scala.meta.internal.builds.SbtBuildTool import scala.meta.internal.builds.ScalaCliBuildTool import scala.meta.internal.builds.ShellRunner +import scala.meta.internal.metals.Interruptable._ import scala.meta.internal.metals.Messages.IncompatibleBloopVersion import scala.meta.internal.metals.MetalsEnrichments._ import scala.meta.internal.metals.doctor.Doctor @@ -296,34 +299,103 @@ class ConnectionProvider( } object Connect { + class RequestInfo(val request: ConnectRequest) { + val promise: Promise[BuildChange] = Promise() + val cancelPromise: Promise[Unit] = Promise() + def cancel(): Boolean = cancelPromise.trySuccess(()) + } + + @volatile private var currentRequest: Option[RequestInfo] = None + private val queue = new ConcurrentLinkedQueue[RequestInfo]() + + def getOngoingRequest(): Option[RequestInfo] = currentRequest + def connect[T](request: ConnectRequest): Future[BuildChange] = { - request match { - case Disconnect(shutdownBuildServer) => disconnect(shutdownBuildServer) - case Index(check) => index(check) - case ImportBuildAndIndex(session) => importBuildAndIndex(session) - case ConnectToSession(session) => connectToSession(session) - case CreateSession(shutdownBuildServer) => - createSession(shutdownBuildServer) - case GenerateBspConfigAndConnect(buildTool, shutdownServer) => - generateBspConfigAndConnect(buildTool, shutdownServer) - case BloopInstallAndConnect( - buildTool, - checksum, - forceImport, - shutdownServer, - ) => - bloopInstallAndConnect( - buildTool, - checksum, - forceImport, - shutdownServer, - ) + val info = addToQueue(request) + pollAndConnect() + info.promise.future + } + + private def addToQueue(request: ConnectRequest): RequestInfo = + synchronized { + val info = new RequestInfo(request) + val iter = queue.iterator() + while (iter.hasNext()) { + val curr = iter.next() + request.cancelCompare(iter.next().request) match { + case 1 => curr.cancel() + case -1 => info.cancel() + case _ => + } + } + queue.add(info) + // maybe cancel ongoing + currentRequest.foreach(ongoing => + if (request.cancelCompare(ongoing.request) == 1) ongoing.cancel() + ) + info + } + + private def pollAndConnect(): Unit = { + val optRequest = synchronized { + if (currentRequest.isEmpty) { + currentRequest = Option(queue.poll()) + currentRequest + } else None + } + + for (request <- optRequest) { + val cancelPromise = request.cancelPromise + val result = + if (cancelPromise.isCompleted) + Interruptable.successful(BuildChange.Cancelled) + else + request.request match { + case Disconnect(shutdownBuildServer) => + disconnect(shutdownBuildServer, cancelPromise) + case Index(check) => index(check, cancelPromise) + case ImportBuildAndIndex(session) => + importBuildAndIndex(session, cancelPromise) + case ConnectToSession(session) => + connectToSession(session, cancelPromise) + case CreateSession(shutdownBuildServer) => + createSession(shutdownBuildServer, cancelPromise) + case GenerateBspConfigAndConnect(buildTool, shutdownServer) => + generateBspConfigAndConnect( + buildTool, + shutdownServer, + cancelPromise, + ) + case BloopInstallAndConnect( + buildTool, + checksum, + forceImport, + shutdownServer, + ) => + bloopInstallAndConnect( + buildTool, + checksum, + forceImport, + shutdownServer, + cancelPromise, + ) + } + result.future.onComplete { res => + res match { + case Failure(CancelConnectException) => + request.promise.trySuccess(BuildChange.Cancelled) + case _ => request.promise.tryComplete(res) + } + currentRequest = None + pollAndConnect() + } } } private def disconnect( - shutdownBuildServer: Boolean - ): Future[BuildChange] = { + shutdownBuildServer: Boolean, + cancelPromise: Promise[Unit], + ): Interruptable[BuildChange] = { def shutdownBsp(optMainBsp: Option[String]): Future[Boolean] = { optMainBsp match { case Some(BloopServers.name) => @@ -348,34 +420,43 @@ class ConnectionProvider( ) for { - _ <- scalaCli.stop() - optMainBsp <- bspSession match { + _ <- scalaCli.stop(storeLast = true).withInterrupt(cancelPromise) + optMainBsp <- (bspSession match { case None => Future.successful(None) case Some(session) => bspSession = None mainBuildTargetsData.resetConnections(List.empty) session.shutdown().map(_ => Some(session.main.name)) - } + }).withInterrupt(cancelPromise) _ <- - if (shutdownBuildServer) shutdownBsp(optMainBsp) - else Future.successful(()) + if (shutdownBuildServer) + shutdownBsp(optMainBsp).withInterrupt(cancelPromise) + else Interruptable.successful(()) } yield BuildChange.None } - private def index(check: () => Unit): Future[BuildChange] = - profiledIndexWorkspace(check).map(_ => BuildChange.None) + private def index( + check: () => Unit, + cancelPromise: Promise[Unit], + ): Interruptable[BuildChange] = + profiledIndexWorkspace(check) + .map(_ => BuildChange.None) + .withInterrupt(cancelPromise) private def importBuildAndIndex( - session: BspSession - ): Future[BuildChange] = { + session: BspSession, + cancelPromise: Promise[Unit], + ): Interruptable[BuildChange] = { val importedBuilds0 = timerProvider.timed("Imported build") { session.importBuilds() } for { - bspBuilds <- workDoneProgress.trackFuture( - Messages.importingBuild, - importedBuilds0, - ) + bspBuilds <- workDoneProgress + .trackFuture( + Messages.importingBuild, + importedBuilds0, + ) + .withInterrupt(cancelPromise) _ = { val idToConnection = bspBuilds.flatMap { bspBuild => val targets = @@ -386,7 +467,7 @@ class ConnectionProvider( saveProjectReferencesInfo(bspBuilds) } _ = compilers.cancel() - buildChange <- index(check) + buildChange <- index(check, cancelPromise) } yield buildChange } @@ -408,7 +489,10 @@ class ConnectionProvider( DelegateSetting.writeProjectRef(folder, projectRefs) } - private def connectToSession(session: BspSession): Future[BuildChange] = { + private def connectToSession( + session: BspSession, + cancelPromise: Promise[Unit], + ): Interruptable[BuildChange] = { scribe.info( s"Connected to Build server: ${session.main.name} v${session.version}" ) @@ -419,7 +503,7 @@ class ConnectionProvider( bspSession = Some(session) isConnecting.set(false) for { - _ <- importBuildAndIndex(session) + _ <- importBuildAndIndex(session, cancelPromise) _ = buildToolProvider.buildTool.foreach( workspaceReload.persistChecksumStatus(Digest.Status.Installed, _) ) @@ -450,7 +534,10 @@ class ConnectionProvider( } } - def createSession(shutdownServer: Boolean): Future[BuildChange] = { + def createSession( + shutdownServer: Boolean, + cancelPromise: Promise[Unit], + ): Interruptable[BuildChange] = { def compileAllOpenFiles: BuildChange => Future[BuildChange] = { case change if !change.isFailed => Future @@ -465,25 +552,25 @@ class ConnectionProvider( case other => Future.successful(other) } - val scalaCliPaths = scalaCli.paths - isConnecting.set(true) (for { - _ <- disconnect(shutdownServer) - maybeSession <- timerProvider.timed( - "Connected to build server", - true, - ) { - bspConnector.connect( - buildToolProvider.buildTool, - folder, - () => userConfig, - shellRunner, - ) - } + _ <- disconnect(shutdownServer, cancelPromise) + maybeSession <- timerProvider + .timed( + "Connected to build server", + true, + ) { + bspConnector.connect( + buildToolProvider.buildTool, + folder, + () => userConfig, + shellRunner, + ) + } + .withInterrupt(cancelPromise) result <- maybeSession match { case Some(session) => - val result = connectToSession(session) + val result = connectToSession(session, cancelPromise) session.mainConnection.onReconnection { newMainConn => val updSession = session.copy(main = newMainConn) connect(ConnectToSession(updSession)) @@ -492,19 +579,17 @@ class ConnectionProvider( } result case None => - Future.successful(BuildChange.None) + Interruptable.successful(BuildChange.None) } - _ <- Future.sequence( - scalaCliPaths - .collect { - case path if (!buildTargets.belongsToBuildTarget(path.toNIO)) => - scalaCli.start(path) - } - ) + _ <- scalaCli + .startForAllLastPaths(path => + !buildTargets.belongsToBuildTarget(path.toNIO) + ) + .withInterrupt(cancelPromise) _ = initTreeView() } yield result) .recover { case NonFatal(e) => - disconnect(false) + disconnect(false, cancelPromise) val message = "Failed to connect with build server, no functionality will work." val details = " See logs for more details." @@ -514,7 +599,7 @@ class ConnectionProvider( scribe.error(message, e) BuildChange.Failed } - .flatMap(compileAllOpenFiles) + .flatMap(compileAllOpenFiles(_).withInterrupt(cancelPromise)) .map { res => buildServerPromise.trySuccess(()) res @@ -524,23 +609,25 @@ class ConnectionProvider( private def generateBspConfigAndConnect( buildTool: BuildServerProvider, shutdownServer: Boolean, - ): Future[BuildChange] = { + cancelPromise: Promise[Unit], + ): Interruptable[BuildChange] = { tables.buildTool.chooseBuildTool(buildTool.executableName) maybeChooseServer(buildTool.buildServerName, alreadySelected = false) for { _ <- - if (shutdownServer) disconnect(shutdownServer) - else Future.unit + if (shutdownServer) disconnect(shutdownServer, cancelPromise) + else Interruptable.successful(()) status <- buildTool .generateBspConfig( folder, args => bspConfigGenerator.runUnconditionally(buildTool, args), statusBar, ) + .withInterrupt(cancelPromise) shouldConnect = handleGenerationStatus(buildTool, status) status <- - if (shouldConnect) createSession(false) - else Future.successful(BuildChange.Failed) + if (shouldConnect) createSession(false, cancelPromise) + else Interruptable.successful(BuildChange.Failed) } yield status } @@ -575,30 +662,27 @@ class ConnectionProvider( false } - val isImportInProcess = new AtomicBoolean(false) - private def bloopInstallAndConnect( buildTool: BloopInstallProvider, checksum: String, forceImport: Boolean, shutdownServer: Boolean, - ): Future[BuildChange] = { + cancelPromise: Promise[Unit], + ): Interruptable[BuildChange] = { for { result <- { if (forceImport) bloopInstall.runUnconditionally( - buildTool, - isImportInProcess, + buildTool ) else bloopInstall.runIfApproved( buildTool, checksum, - isImportInProcess, ) - } + }.withInterrupt(cancelPromise) change <- { - if (result.isInstalled) createSession(shutdownServer) + if (result.isInstalled) createSession(shutdownServer, cancelPromise) else if (result.isFailed) { for { change <- @@ -614,13 +698,13 @@ class ConnectionProvider( // Connect nevertheless, many build import failures are caused // by resolution errors in one weird module while other modules // exported successfully. - createSession(shutdownServer) + createSession(shutdownServer, cancelPromise) } else { languageClient.showMessage(Messages.ImportProjectFailed) - Future.successful(BuildChange.Failed) + Interruptable.successful(BuildChange.Failed) } } yield change - } else Future.successful(BuildChange.None) + } else Interruptable.successful(BuildChange.None) } } yield change } @@ -630,21 +714,77 @@ class ConnectionProvider( sealed trait ConnectKind object SlowConnect extends ConnectKind -sealed trait ConnectRequest extends ConnectKind +sealed trait ConnectRequest extends ConnectKind { + + /** + * -1 cancel this + * 1 cancel other + * 0 queue + * @param other + * @return + */ + def cancelCompare(other: ConnectRequest): Int +} -case class Disconnect(shutdownBuildServer: Boolean) extends ConnectRequest -case class Index(check: () => Unit) extends ConnectRequest -case class ImportBuildAndIndex(bspSession: BspSession) extends ConnectRequest -case class ConnectToSession(bspSession: BspSession) extends ConnectRequest +case class Disconnect(shutdownBuildServer: Boolean) extends ConnectRequest { + def cancelCompare(other: ConnectRequest): Int = + other match { + case _: Index => 0 + case _ => -1 + } +} +case class Index(check: () => Unit) extends ConnectRequest { + def cancelCompare(other: ConnectRequest): Int = + other match { + case _: Disconnect => 0 + case _ => -1 + } +} +case class ImportBuildAndIndex(bspSession: BspSession) extends ConnectRequest { + def cancelCompare(other: ConnectRequest): Int = + other match { + case (_: Index) | (_: ImportBuildAndIndex) => 1 + case _: Disconnect => 0 + case _ => -1 + } +} +case class ConnectToSession(bspSession: BspSession) extends ConnectRequest { + def cancelCompare(other: ConnectRequest): Int = + other match { + case (_: Disconnect) | (_: Index) | (_: ConnectToSession) => 1 + case _ => -1 + } +} case class CreateSession(shutdownBuildServer: Boolean = false) - extends ConnectRequest + extends ConnectRequest { + def cancelCompare(other: ConnectRequest): Int = + other match { + case (_: Disconnect) | (_: Index) | (_: ConnectToSession) | CreateSession( + false + ) => + 1 + case _ => -1 + } +} case class GenerateBspConfigAndConnect( buildTool: BuildServerProvider, shutdownServer: Boolean = false, -) extends ConnectRequest +) extends ConnectRequest { + def cancelCompare(other: ConnectRequest): Int = + other match { + case BloopInstallAndConnect(_, _, _, true) if !shutdownServer => 0 + case _ => 1 + } +} case class BloopInstallAndConnect( buildTool: BloopInstallProvider, checksum: String, forceImport: Boolean, shutdownServer: Boolean, -) extends ConnectRequest +) extends ConnectRequest { + def cancelCompare(other: ConnectRequest): Int = + other match { + case GenerateBspConfigAndConnect(_, true) if !shutdownServer => 0 + case _ => 1 + } +} diff --git a/metals/src/main/scala/scala/meta/internal/metals/Interruptable.scala b/metals/src/main/scala/scala/meta/internal/metals/Interruptable.scala new file mode 100644 index 00000000000..2b3d7cc3f2c --- /dev/null +++ b/metals/src/main/scala/scala/meta/internal/metals/Interruptable.scala @@ -0,0 +1,56 @@ +package scala.meta.internal.metals + +import java.util.concurrent.CompletableFuture + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.Promise + +import scala.meta.internal.metals.Interruptable.CancelConnectException + +class Interruptable[+T] private ( + futureIn: Future[T], + cancelPromise: Promise[Unit], +) extends CompletableFuture { + + def future(implicit executor: ExecutionContext): Future[T] = futureIn.map( + if (cancelPromise.isCompleted) throw CancelConnectException else _ + ) + + override def cancel(mayInterruptIfRunning: Boolean): Boolean = { + cancelPromise.trySuccess(()) + true + } + + override def isCancelled(): Boolean = cancelPromise.isCompleted + + def flatMap[S]( + f: T => Interruptable[S] + )(implicit executor: ExecutionContext): Interruptable[S] = + new Interruptable(future.flatMap(f(_).future), cancelPromise) + + def map[S]( + f: T => S + )(implicit executor: ExecutionContext): Interruptable[S] = + new Interruptable(future.map(f(_)), cancelPromise) + + def recover[U >: T]( + pf: PartialFunction[Throwable, U] + )(implicit executor: ExecutionContext): Interruptable[U] = { + val pf0: PartialFunction[Throwable, U] = { case CancelConnectException => + throw CancelConnectException + } + new Interruptable(future.recover(pf0.orElse(pf)), cancelPromise) + } +} + +object Interruptable { + def successful[T](result: T) = + new Interruptable(Future.successful(result), Promise()) + + object CancelConnectException extends RuntimeException + implicit class XtensionFuture[+T](future: Future[T]) { + def withInterrupt(cancelPromise: Promise[Unit]): Interruptable[T] = + new Interruptable(future, cancelPromise) + } +} diff --git a/metals/src/main/scala/scala/meta/internal/metals/Messages.scala b/metals/src/main/scala/scala/meta/internal/metals/Messages.scala index b6e8ea9747a..bdaab7106da 100644 --- a/metals/src/main/scala/scala/meta/internal/metals/Messages.scala +++ b/metals/src/main/scala/scala/meta/internal/metals/Messages.scala @@ -105,10 +105,7 @@ object Messages { MessageType.Error, "Import project failed, no functionality will work. See the logs for more details", ) - val ImportAlreadyRunning = new MessageParams( - MessageType.Warning, - s"Import already running. \nPlease cancel the current import to run a new one.", - ) + val ImportProjectPartiallyFailed = new MessageParams( MessageType.Warning, "Import project partially failed, limited functionality may work in some parts of the workspace. " + diff --git a/metals/src/main/scala/scala/meta/internal/metals/scalacli/ScalaCliServers.scala b/metals/src/main/scala/scala/meta/internal/metals/scalacli/ScalaCliServers.scala index 9fd739544bd..268354f22a1 100644 --- a/metals/src/main/scala/scala/meta/internal/metals/scalacli/ScalaCliServers.scala +++ b/metals/src/main/scala/scala/meta/internal/metals/scalacli/ScalaCliServers.scala @@ -49,6 +49,9 @@ class ScalaCliServers( )(implicit ec: ExecutionContextExecutorService) extends Cancelable { + private val lastServerPaths = + new AtomicReference[Set[AbsolutePath]](Set.empty) + private def localTmpWorkspace(path: AbsolutePath) = { val root = if (path.isDirectory) path else path.parent root.resolve(s".metals-scala-cli/") @@ -128,6 +131,11 @@ class ScalaCliServers( def paths: Iterable[AbsolutePath] = servers.map(_.path) + def startForAllLastPaths(filter: AbsolutePath => Boolean): Future[Set[Unit]] = + Future.sequence( + lastServerPaths.getAndSet(Set.empty).withFilter(filter).map(start) + ) + def start(path: AbsolutePath): Future[Unit] = { val customWorkspace = if (path.isDirectory) None @@ -201,8 +209,11 @@ class ScalaCliServers( } yield () } - def stop(): Future[Unit] = { + def stop(storeLast: Boolean = false): Future[Unit] = { val servers = serversRef.getAndSet(Queue.empty) + if (storeLast) { + lastServerPaths.updateAndGet(_ ++ servers.map(_.path)) + } Future.sequence(servers.map(_.stop()).toSeq).ignoreValue } diff --git a/tests/slow/src/test/scala/tests/sbt/SbtBloopLspSuite.scala b/tests/slow/src/test/scala/tests/sbt/SbtBloopLspSuite.scala index 516fc0bc51f..c39565b4417 100644 --- a/tests/slow/src/test/scala/tests/sbt/SbtBloopLspSuite.scala +++ b/tests/slow/src/test/scala/tests/sbt/SbtBloopLspSuite.scala @@ -19,6 +19,7 @@ import scala.meta.io.AbsolutePath import com.google.gson.JsonObject import com.google.gson.JsonPrimitive +import org.eclipse.lsp4j.MessageActionItem import tests.BaseImportSuite import tests.JavaHomeChangeTest import tests.ScriptsAssertions @@ -213,18 +214,12 @@ class SbtBloopLspSuite _ = assertNoDiff( client.beginProgressMessages, List( + progressMessage, progressMessage, Messages.importingBuild, Messages.indexing, ).mkString("\n"), ) - _ = assertNoDiff( - client.workspaceShowMessages, - List( - ImportAlreadyRunning.getMessage() - ).mkString("\n"), - ) - } yield () } @@ -884,4 +879,38 @@ class SbtBloopLspSuite |""".stripMargin, ) + test("switch-build-server-while-connect") { + cleanWorkspace() + val layout = + s"""|/project/build.properties + |sbt.version=${V.sbtVersion} + |/build.sbt + |scalaVersion := "${V.scala213}" + |/src/main/scala/A.scala + | + |object A { + | val i: Int = "aaa" + |} + |""".stripMargin + writeLayout(layout) + client.importBuild = ImportBuild.yes + client.selectBspServer = { _ => new MessageActionItem("sbt") } + for { + _ <- server.initialize() + _ = server.initialized() + connectionProvider = server.headServer.connectionProvider.Connect + _ = while (connectionProvider.getOngoingRequest().isEmpty) { + // wait for connect to start + Thread.sleep(100) + } + bloopConnectF = connectionProvider.getOngoingRequest().get.promise.future + bspSwitchF = server.executeCommand(ServerCommands.BspSwitch) + _ <- bloopConnectF + _ = assert(!server.server.indexingPromise.isCompleted) + _ <- bspSwitchF + _ = assert(server.server.indexingPromise.isCompleted) + _ = assert(server.server.bspSession.exists(_.main.isSbt)) + } yield () + } + } From 30643c4ccdda8ebe20044b7c92da0a8ddfc5a14f Mon Sep 17 00:00:00 2001 From: kasiaMarek Date: Mon, 2 Dec 2024 12:34:45 +0100 Subject: [PATCH 2/3] introduce enum for --- .../internal/metals/ConnectionProvider.scala | 80 ++++++++++--------- 1 file changed, 43 insertions(+), 37 deletions(-) diff --git a/metals/src/main/scala/scala/meta/internal/metals/ConnectionProvider.scala b/metals/src/main/scala/scala/meta/internal/metals/ConnectionProvider.scala index 89ec080a68f..57b1d4a9cdb 100644 --- a/metals/src/main/scala/scala/meta/internal/metals/ConnectionProvider.scala +++ b/metals/src/main/scala/scala/meta/internal/metals/ConnectionProvider.scala @@ -106,9 +106,9 @@ class ConnectionProvider( var buildServerPromise: Promise[Unit] = Promise[Unit]() val isConnecting = new AtomicBoolean(false) - override def index(check: () => Unit): Future[Unit] = connect( - Index(check) - ).ignoreValue + override def index(check: () => Unit): Future[Unit] = + connect(Index(check)).ignoreValue + override def cancel(): Unit = { cancelables.cancel() } @@ -322,16 +322,16 @@ class ConnectionProvider( val iter = queue.iterator() while (iter.hasNext()) { val curr = iter.next() - request.cancelCompare(iter.next().request) match { - case 1 => curr.cancel() - case -1 => info.cancel() - case _ => + request.cancelCompare(curr.request) match { + case TakeOver => curr.cancel() + case Yield => info.cancel() + case _ => } } queue.add(info) // maybe cancel ongoing currentRequest.foreach(ongoing => - if (request.cancelCompare(ongoing.request) == 1) ongoing.cancel() + if (request.cancelCompare(ongoing.request) == TakeOver) ongoing.cancel() ) info } @@ -714,66 +714,72 @@ class ConnectionProvider( sealed trait ConnectKind object SlowConnect extends ConnectKind +sealed trait ConflictBehaviour +case object Yield extends ConflictBehaviour +case object TakeOver extends ConflictBehaviour +case object Queue extends ConflictBehaviour + sealed trait ConnectRequest extends ConnectKind { - /** - * -1 cancel this - * 1 cancel other - * 0 queue - * @param other - * @return + /** Decides what to do with a new connect request + * in presence of an another ongoing/queued request. + * @param other the ongoing or queued request + * @return behavoiur of the incoming request + * Yield -- cancel this + * TakeOver -- cancel other + * Queue -- queue */ - def cancelCompare(other: ConnectRequest): Int + def cancelCompare(other: ConnectRequest): ConflictBehaviour } case class Disconnect(shutdownBuildServer: Boolean) extends ConnectRequest { - def cancelCompare(other: ConnectRequest): Int = + def cancelCompare(other: ConnectRequest): ConflictBehaviour = other match { - case _: Index => 0 - case _ => -1 + case _: Index => Queue + case _ => Yield } } case class Index(check: () => Unit) extends ConnectRequest { - def cancelCompare(other: ConnectRequest): Int = + def cancelCompare(other: ConnectRequest): ConflictBehaviour = other match { - case _: Disconnect => 0 - case _ => -1 + case _: Disconnect => Queue + case _ => Yield } } case class ImportBuildAndIndex(bspSession: BspSession) extends ConnectRequest { - def cancelCompare(other: ConnectRequest): Int = + def cancelCompare(other: ConnectRequest): ConflictBehaviour = other match { - case (_: Index) | (_: ImportBuildAndIndex) => 1 - case _: Disconnect => 0 - case _ => -1 + case (_: Index) | (_: ImportBuildAndIndex) => TakeOver + case _: Disconnect => Queue + case _ => Yield } } case class ConnectToSession(bspSession: BspSession) extends ConnectRequest { - def cancelCompare(other: ConnectRequest): Int = + def cancelCompare(other: ConnectRequest): ConflictBehaviour = other match { - case (_: Disconnect) | (_: Index) | (_: ConnectToSession) => 1 - case _ => -1 + case (_: Disconnect) | (_: Index) | (_: ConnectToSession) => TakeOver + case _ => Yield } } case class CreateSession(shutdownBuildServer: Boolean = false) extends ConnectRequest { - def cancelCompare(other: ConnectRequest): Int = + def cancelCompare(other: ConnectRequest): ConflictBehaviour = other match { case (_: Disconnect) | (_: Index) | (_: ConnectToSession) | CreateSession( false ) => - 1 - case _ => -1 + TakeOver + case _ => Yield } } case class GenerateBspConfigAndConnect( buildTool: BuildServerProvider, shutdownServer: Boolean = false, ) extends ConnectRequest { - def cancelCompare(other: ConnectRequest): Int = + def cancelCompare(other: ConnectRequest): ConflictBehaviour = other match { - case BloopInstallAndConnect(_, _, _, true) if !shutdownServer => 0 - case _ => 1 + case BloopInstallAndConnect(_, _, _, true) if !shutdownServer => Queue + case _ => TakeOver } } case class BloopInstallAndConnect( @@ -782,9 +788,9 @@ case class BloopInstallAndConnect( forceImport: Boolean, shutdownServer: Boolean, ) extends ConnectRequest { - def cancelCompare(other: ConnectRequest): Int = + def cancelCompare(other: ConnectRequest): ConflictBehaviour = other match { - case GenerateBspConfigAndConnect(_, true) if !shutdownServer => 0 - case _ => 1 + case GenerateBspConfigAndConnect(_, true) if !shutdownServer => Queue + case _ => TakeOver } } From 044cc69c18193a17497996846fccdfb2d14f6e2b Mon Sep 17 00:00:00 2001 From: kasiaMarek Date: Thu, 12 Dec 2024 11:23:34 +0100 Subject: [PATCH 3/3] try to propagate cancel --- .../internal/bsp/BspConfigGenerator.scala | 3 +- .../meta/internal/bsp/BspConnector.scala | 1 + .../meta/internal/builds/BloopInstall.scala | 28 +++-- .../builds/BloopInstallProvider.scala | 7 +- .../internal/builds/BuildServerProvider.scala | 13 ++- .../internal/builds/NewProjectProvider.scala | 1 + .../meta/internal/builds/SbtBuildTool.scala | 4 +- .../internal/builds/ScalaCliBuildTool.scala | 11 +- .../meta/internal/builds/ShellRunner.scala | 8 +- .../internal/metals/CancelableFuture.scala | 2 +- .../internal/metals/ConnectionProvider.scala | 108 ++++++++---------- .../internal/metals/FileDecoderProvider.scala | 2 + .../meta/internal/metals/Interruptable.scala | 71 ++++++++---- .../scala/tests/bazel/BazelLspSuite.scala | 22 ++-- 14 files changed, 158 insertions(+), 123 deletions(-) diff --git a/metals/src/main/scala/scala/meta/internal/bsp/BspConfigGenerator.scala b/metals/src/main/scala/scala/meta/internal/bsp/BspConfigGenerator.scala index 8f083a8ba8e..6e8ffe082d3 100644 --- a/metals/src/main/scala/scala/meta/internal/bsp/BspConfigGenerator.scala +++ b/metals/src/main/scala/scala/meta/internal/bsp/BspConfigGenerator.scala @@ -10,6 +10,7 @@ import scala.util.control.NonFatal import scala.meta.internal.bsp.BspConfigGenerationStatus._ import scala.meta.internal.builds.BuildServerProvider import scala.meta.internal.builds.ShellRunner +import scala.meta.internal.metals.CancelableFuture import scala.meta.internal.metals.Directories import scala.meta.internal.metals.Messages.BspProvider import scala.meta.internal.metals.MetalsEnrichments._ @@ -31,7 +32,7 @@ final class BspConfigGenerator( def runUnconditionally( buildTool: BuildServerProvider, args: List[String], - ): Future[BspConfigGenerationStatus] = + ): CancelableFuture[BspConfigGenerationStatus] = shellRunner .run( s"${buildTool.buildServerName} bspConfig", diff --git a/metals/src/main/scala/scala/meta/internal/bsp/BspConnector.scala b/metals/src/main/scala/scala/meta/internal/bsp/BspConnector.scala index f321e141098..0d0a73d5348 100644 --- a/metals/src/main/scala/scala/meta/internal/bsp/BspConnector.scala +++ b/metals/src/main/scala/scala/meta/internal/bsp/BspConnector.scala @@ -159,6 +159,7 @@ class BspConnector( args => bspConfigGenerator.runUnconditionally(bsp, args), statusBar, ) + .future .flatMap { _ => connect( projectRoot, diff --git a/metals/src/main/scala/scala/meta/internal/builds/BloopInstall.scala b/metals/src/main/scala/scala/meta/internal/builds/BloopInstall.scala index 9b4ba9bea41..a3b17f3991a 100644 --- a/metals/src/main/scala/scala/meta/internal/builds/BloopInstall.scala +++ b/metals/src/main/scala/scala/meta/internal/builds/BloopInstall.scala @@ -4,10 +4,15 @@ import java.util.concurrent.TimeUnit import scala.concurrent.ExecutionContext import scala.concurrent.Future +import scala.concurrent.Promise import scala.meta.internal.builds.Digest.Status import scala.meta.internal.metals.BuildInfo +import scala.meta.internal.metals.CancelSwitch +import scala.meta.internal.metals.CancelableFuture import scala.meta.internal.metals.Confirmation +import scala.meta.internal.metals.Interruptable +import scala.meta.internal.metals.Interruptable._ import scala.meta.internal.metals.Messages._ import scala.meta.internal.metals.MetalsEnrichments._ import scala.meta.internal.metals.Tables @@ -37,14 +42,14 @@ final class BloopInstall( def runUnconditionally( buildTool: BloopInstallProvider - ): Future[WorkspaceLoadedStatus] = { + ): CancelableFuture[WorkspaceLoadedStatus] = { buildTool.bloopInstall( workspace, args => { scribe.info(s"running '${args.mkString(" ")}'") val process = runArgumentsUnconditionally(buildTool, args, userConfig().javaHome) - process.foreach { e => + process.future.foreach { e => if (e.isFailed) { // Record the exact command that failed to help troubleshooting. scribe.error(s"$buildTool command failed: ${args.mkString(" ")}") @@ -59,7 +64,7 @@ final class BloopInstall( buildTool: BloopInstallProvider, args: List[String], javaHome: Option[String], - ): Future[WorkspaceLoadedStatus] = { + ): CancelableFuture[WorkspaceLoadedStatus] = { persistChecksumStatus(Status.Started, buildTool) val processFuture = shellRunner .run( @@ -81,7 +86,7 @@ final class BloopInstall( case ExitCodes.Cancel => WorkspaceLoadedStatus.Cancelled case result => WorkspaceLoadedStatus.Failed(result) } - processFuture.foreach { result => + processFuture.future.foreach { result => try result.toChecksumStatus.foreach(persistChecksumStatus(_, buildTool)) catch { case _: InterruptedException => @@ -112,35 +117,36 @@ final class BloopInstall( def runIfApproved( buildTool: BloopInstallProvider, digest: String, - ): Future[WorkspaceLoadedStatus] = + ): CancelableFuture[WorkspaceLoadedStatus] = synchronized { oldInstallResult(digest) match { case Some(result) if result != WorkspaceLoadedStatus.Duplicate(Status.Requested) => scribe.info(s"skipping build import with status '${result.name}'") - Future.successful(result) + CancelableFuture.successful(result) case _ => if (userConfig().shouldAutoImportNewProject) { runUnconditionally(buildTool) } else { scribe.debug("Awaiting user response...") - for { + implicit val cancelSwitch = CancelSwitch(Promise[Unit]()) + (for { userResponse <- requestImport( buildTools, buildTool, languageClient, digest, - ) + ).withInterrupt installResult <- { if (userResponse.isYes) { - runUnconditionally(buildTool) + runUnconditionally(buildTool).withInterrupt } else { // Don't spam the user with requests during rapid build changes. notification.dismiss(2, TimeUnit.MINUTES) - Future.successful(WorkspaceLoadedStatus.Rejected) + Interruptable.successful(WorkspaceLoadedStatus.Rejected) } } - } yield installResult + } yield installResult).toCancellable } } } diff --git a/metals/src/main/scala/scala/meta/internal/builds/BloopInstallProvider.scala b/metals/src/main/scala/scala/meta/internal/builds/BloopInstallProvider.scala index fa3731e6ca4..d3fa5c0e4c7 100644 --- a/metals/src/main/scala/scala/meta/internal/builds/BloopInstallProvider.scala +++ b/metals/src/main/scala/scala/meta/internal/builds/BloopInstallProvider.scala @@ -2,8 +2,7 @@ package scala.meta.internal.builds import java.io.IOException import java.nio.file.Files -import scala.concurrent.Future - +import scala.meta.internal.metals.CancelableFuture import scala.meta.internal.metals.MetalsEnrichments._ import scala.meta.io.AbsolutePath @@ -18,8 +17,8 @@ trait BloopInstallProvider extends BuildTool { */ def bloopInstall( workspace: AbsolutePath, - systemProcess: List[String] => Future[WorkspaceLoadedStatus], - ): Future[WorkspaceLoadedStatus] = { + systemProcess: List[String] => CancelableFuture[WorkspaceLoadedStatus], + ): CancelableFuture[WorkspaceLoadedStatus] = { cleanupStaleConfig() systemProcess(bloopInstallArgs(workspace)) } diff --git a/metals/src/main/scala/scala/meta/internal/builds/BuildServerProvider.scala b/metals/src/main/scala/scala/meta/internal/builds/BuildServerProvider.scala index c47fcb04aff..aa45eda0760 100644 --- a/metals/src/main/scala/scala/meta/internal/builds/BuildServerProvider.scala +++ b/metals/src/main/scala/scala/meta/internal/builds/BuildServerProvider.scala @@ -4,6 +4,7 @@ import scala.annotation.nowarn import scala.concurrent.Future import scala.meta.internal.bsp.BspConfigGenerationStatus._ +import scala.meta.internal.metals.CancelableFuture import scala.meta.internal.metals.Messages import scala.meta.internal.metals.StatusBar import scala.meta.io.AbsolutePath @@ -20,12 +21,16 @@ trait BuildServerProvider extends BuildTool { @nowarn("msg=parameter statusBar in method generateBspConfig is never used") def generateBspConfig( workspace: AbsolutePath, - systemProcess: List[String] => Future[BspConfigGenerationStatus], + systemProcess: List[String] => CancelableFuture[ + BspConfigGenerationStatus + ], statusBar: StatusBar, - ): Future[BspConfigGenerationStatus] = + ): CancelableFuture[BspConfigGenerationStatus] = createBspFileArgs(workspace).map(systemProcess).getOrElse { - Future.successful( - Failed(Right(Messages.NoBspSupport.toString())) + CancelableFuture( + Future.successful( + Failed(Right(Messages.NoBspSupport.toString())) + ) ) } diff --git a/metals/src/main/scala/scala/meta/internal/builds/NewProjectProvider.scala b/metals/src/main/scala/scala/meta/internal/builds/NewProjectProvider.scala index ad0dcb6fdbe..cc40a5a6378 100644 --- a/metals/src/main/scala/scala/meta/internal/builds/NewProjectProvider.scala +++ b/metals/src/main/scala/scala/meta/internal/builds/NewProjectProvider.scala @@ -134,6 +134,7 @@ class NewProjectProvider( javaHome, javaOptsMap = javaOpts, ) + .future .flatMap { case ExitCodes.Success => askForWindow(projectPath) diff --git a/metals/src/main/scala/scala/meta/internal/builds/SbtBuildTool.scala b/metals/src/main/scala/scala/meta/internal/builds/SbtBuildTool.scala index d36a4f7b52c..a689db6d2a7 100644 --- a/metals/src/main/scala/scala/meta/internal/builds/SbtBuildTool.scala +++ b/metals/src/main/scala/scala/meta/internal/builds/SbtBuildTool.scala @@ -77,7 +77,7 @@ case class SbtBuildTool( def shutdownBspServer( shellRunner: ShellRunner - ): Future[Int] = { + ): CancelableFuture[Int] = { val shutdownArgs = composeArgs(List("--client", "shutdown"), projectRoot, projectRoot.toNIO) scribe.info(s"running ${shutdownArgs.mkString(" ")}") @@ -242,7 +242,7 @@ case class SbtBuildTool( if (promise.isCompleted) { // executes when user chooses `restart` after the timeout restartSbtBuildServer() - } else shutdownBspServer(shellRunner).ignoreValue + } else shutdownBspServer(shellRunner).future.ignoreValue case _ => promise.trySuccess(()) Future.successful(()) diff --git a/metals/src/main/scala/scala/meta/internal/builds/ScalaCliBuildTool.scala b/metals/src/main/scala/scala/meta/internal/builds/ScalaCliBuildTool.scala index b5ad6d1a546..632f47c0d39 100644 --- a/metals/src/main/scala/scala/meta/internal/builds/ScalaCliBuildTool.scala +++ b/metals/src/main/scala/scala/meta/internal/builds/ScalaCliBuildTool.scala @@ -2,10 +2,9 @@ package scala.meta.internal.builds import java.security.MessageDigest -import scala.concurrent.Future - import scala.meta.internal.bsp.BspConfigGenerationStatus._ import scala.meta.internal.metals.BuildInfo +import scala.meta.internal.metals.CancelableFuture import scala.meta.internal.metals.Directories import scala.meta.internal.metals.MetalsEnrichments._ import scala.meta.internal.metals.StatusBar @@ -26,9 +25,11 @@ case class ScalaCliBuildTool( override def generateBspConfig( workspace: AbsolutePath, - systemProcess: List[String] => Future[BspConfigGenerationStatus], + systemProcess: List[String] => CancelableFuture[ + BspConfigGenerationStatus + ], statusBar: StatusBar, - ): Future[BspConfigGenerationStatus] = + ): CancelableFuture[BspConfigGenerationStatus] = createBspFileArgs(workspace).map(systemProcess).getOrElse { // fallback to creating `.bsp/scala-cli.json` that starts JVM launcher val bspConfig = @@ -37,7 +38,7 @@ case class ScalaCliBuildTool( bspConfig.writeText( ScalaCli.scalaCliBspJsonContent(projectRoot = projectRoot.toString()) ) - Future.successful(Generated) + CancelableFuture.successful(Generated) } override def createBspFileArgs( diff --git a/metals/src/main/scala/scala/meta/internal/builds/ShellRunner.scala b/metals/src/main/scala/scala/meta/internal/builds/ShellRunner.scala index 2e180f42d08..befa53d2708 100644 --- a/metals/src/main/scala/scala/meta/internal/builds/ShellRunner.scala +++ b/metals/src/main/scala/scala/meta/internal/builds/ShellRunner.scala @@ -4,13 +4,13 @@ import java.io.File import scala.concurrent.Await import scala.concurrent.ExecutionContext -import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration.DurationInt import scala.language.postfixOps import scala.util.Properties import scala.meta.internal.metals.Cancelable +import scala.meta.internal.metals.CancelableFuture import scala.meta.internal.metals.JavaBinary import scala.meta.internal.metals.JdkSources import scala.meta.internal.metals.MetalsEnrichments._ @@ -45,7 +45,7 @@ class ShellRunner(time: Time, workDoneProvider: WorkDoneProgress)(implicit processErr: String => Unit = scribe.error(_), propagateError: Boolean = false, javaOptsMap: Map[String, String] = Map.empty, - ): Future[Int] = { + ): CancelableFuture[Int] = { val classpathSeparator = if (Properties.isWin) ";" else ":" val classpath = Fetch @@ -92,7 +92,7 @@ class ShellRunner(time: Time, workDoneProvider: WorkDoneProgress)(implicit processErr: String => Unit = scribe.error(_), propagateError: Boolean = false, logInfo: Boolean = true, - ): Future[Int] = { + ): CancelableFuture[Int] = { val elapsed = new Timer(time) val env = additionalEnv ++ JdkSources.envVariables(javaHome) val ps = SystemProcess.run( @@ -125,7 +125,7 @@ class ShellRunner(time: Time, workDoneProvider: WorkDoneProgress)(implicit result.trySuccess(code) } result.future.onComplete(_ => cancelables.remove(newCancelable)) - result.future + CancelableFuture(result.future, newCancelable) } } diff --git a/metals/src/main/scala/scala/meta/internal/metals/CancelableFuture.scala b/metals/src/main/scala/scala/meta/internal/metals/CancelableFuture.scala index 984437b8f77..bd50d31c3b1 100644 --- a/metals/src/main/scala/scala/meta/internal/metals/CancelableFuture.scala +++ b/metals/src/main/scala/scala/meta/internal/metals/CancelableFuture.scala @@ -4,7 +4,7 @@ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.util.Try -case class CancelableFuture[T]( +case class CancelableFuture[+T]( future: Future[T], cancelable: Cancelable = Cancelable.empty, ) extends Cancelable { diff --git a/metals/src/main/scala/scala/meta/internal/metals/ConnectionProvider.scala b/metals/src/main/scala/scala/meta/internal/metals/ConnectionProvider.scala index 57b1d4a9cdb..6c5494e0b7a 100644 --- a/metals/src/main/scala/scala/meta/internal/metals/ConnectionProvider.scala +++ b/metals/src/main/scala/scala/meta/internal/metals/ConnectionProvider.scala @@ -324,14 +324,15 @@ class ConnectionProvider( val curr = iter.next() request.cancelCompare(curr.request) match { case TakeOver => curr.cancel() - case Yield => info.cancel() - case _ => + case Yield => info.cancel() + case _ => } } queue.add(info) // maybe cancel ongoing currentRequest.foreach(ongoing => - if (request.cancelCompare(ongoing.request) == TakeOver) ongoing.cancel() + if (request.cancelCompare(ongoing.request) == TakeOver) + ongoing.cancel() ) info } @@ -345,26 +346,25 @@ class ConnectionProvider( } for (request <- optRequest) { - val cancelPromise = request.cancelPromise + implicit val cancelPromise = CancelSwitch(request.cancelPromise) val result = - if (cancelPromise.isCompleted) + if (request.cancelPromise.isCompleted) Interruptable.successful(BuildChange.Cancelled) else request.request match { case Disconnect(shutdownBuildServer) => - disconnect(shutdownBuildServer, cancelPromise) - case Index(check) => index(check, cancelPromise) + disconnect(shutdownBuildServer) + case Index(check) => index(check) case ImportBuildAndIndex(session) => - importBuildAndIndex(session, cancelPromise) + importBuildAndIndex(session) case ConnectToSession(session) => - connectToSession(session, cancelPromise) + connectToSession(session) case CreateSession(shutdownBuildServer) => - createSession(shutdownBuildServer, cancelPromise) + createSession(shutdownBuildServer) case GenerateBspConfigAndConnect(buildTool, shutdownServer) => generateBspConfigAndConnect( buildTool, shutdownServer, - cancelPromise, ) case BloopInstallAndConnect( buildTool, @@ -377,7 +377,6 @@ class ConnectionProvider( checksum, forceImport, shutdownServer, - cancelPromise, ) } result.future.onComplete { res => @@ -393,22 +392,21 @@ class ConnectionProvider( } private def disconnect( - shutdownBuildServer: Boolean, - cancelPromise: Promise[Unit], - ): Interruptable[BuildChange] = { - def shutdownBsp(optMainBsp: Option[String]): Future[Boolean] = { + shutdownBuildServer: Boolean + )(implicit cancelSwitch: CancelSwitch): Interruptable[BuildChange] = { + def shutdownBsp(optMainBsp: Option[String]): Interruptable[Boolean] = { optMainBsp match { case Some(BloopServers.name) => - Future { bloopServers.shutdownServer() } + Interruptable.successful { bloopServers.shutdownServer() } case Some(SbtBuildTool.name) => for { res <- buildToolProvider.buildTool match { case Some(sbt: SbtBuildTool) => - sbt.shutdownBspServer(shellRunner).map(_ == 0) - case _ => Future.successful(false) + sbt.shutdownBspServer(shellRunner).withInterrupt.map(_ == 0) + case _ => Interruptable.successful(false) } } yield res - case s => Future.successful(s.nonEmpty) + case s => Interruptable.successful(s.nonEmpty) } } @@ -420,33 +418,28 @@ class ConnectionProvider( ) for { - _ <- scalaCli.stop(storeLast = true).withInterrupt(cancelPromise) + _ <- scalaCli.stop(storeLast = true).withInterrupt optMainBsp <- (bspSession match { case None => Future.successful(None) case Some(session) => bspSession = None mainBuildTargetsData.resetConnections(List.empty) session.shutdown().map(_ => Some(session.main.name)) - }).withInterrupt(cancelPromise) + }).withInterrupt _ <- - if (shutdownBuildServer) - shutdownBsp(optMainBsp).withInterrupt(cancelPromise) + if (shutdownBuildServer) shutdownBsp(optMainBsp) else Interruptable.successful(()) } yield BuildChange.None } - private def index( - check: () => Unit, - cancelPromise: Promise[Unit], - ): Interruptable[BuildChange] = + private def index(check: () => Unit): Interruptable[BuildChange] = profiledIndexWorkspace(check) .map(_ => BuildChange.None) - .withInterrupt(cancelPromise) + .withInterrupt private def importBuildAndIndex( - session: BspSession, - cancelPromise: Promise[Unit], - ): Interruptable[BuildChange] = { + session: BspSession + )(implicit cancelSwitch: CancelSwitch): Interruptable[BuildChange] = { val importedBuilds0 = timerProvider.timed("Imported build") { session.importBuilds() } @@ -456,7 +449,7 @@ class ConnectionProvider( Messages.importingBuild, importedBuilds0, ) - .withInterrupt(cancelPromise) + .withInterrupt _ = { val idToConnection = bspBuilds.flatMap { bspBuild => val targets = @@ -467,7 +460,7 @@ class ConnectionProvider( saveProjectReferencesInfo(bspBuilds) } _ = compilers.cancel() - buildChange <- index(check, cancelPromise) + buildChange <- index(check) } yield buildChange } @@ -490,9 +483,8 @@ class ConnectionProvider( } private def connectToSession( - session: BspSession, - cancelPromise: Promise[Unit], - ): Interruptable[BuildChange] = { + session: BspSession + )(implicit cancelSwitch: CancelSwitch): Interruptable[BuildChange] = { scribe.info( s"Connected to Build server: ${session.main.name} v${session.version}" ) @@ -503,7 +495,7 @@ class ConnectionProvider( bspSession = Some(session) isConnecting.set(false) for { - _ <- importBuildAndIndex(session, cancelPromise) + _ <- importBuildAndIndex(session) _ = buildToolProvider.buildTool.foreach( workspaceReload.persistChecksumStatus(Digest.Status.Installed, _) ) @@ -535,9 +527,8 @@ class ConnectionProvider( } def createSession( - shutdownServer: Boolean, - cancelPromise: Promise[Unit], - ): Interruptable[BuildChange] = { + shutdownServer: Boolean + )(implicit cancelSwitch: CancelSwitch): Interruptable[BuildChange] = { def compileAllOpenFiles: BuildChange => Future[BuildChange] = { case change if !change.isFailed => Future @@ -554,7 +545,7 @@ class ConnectionProvider( isConnecting.set(true) (for { - _ <- disconnect(shutdownServer, cancelPromise) + _ <- disconnect(shutdownServer) maybeSession <- timerProvider .timed( "Connected to build server", @@ -567,10 +558,10 @@ class ConnectionProvider( shellRunner, ) } - .withInterrupt(cancelPromise) + .withInterrupt result <- maybeSession match { case Some(session) => - val result = connectToSession(session, cancelPromise) + val result = connectToSession(session) session.mainConnection.onReconnection { newMainConn => val updSession = session.copy(main = newMainConn) connect(ConnectToSession(updSession)) @@ -585,11 +576,11 @@ class ConnectionProvider( .startForAllLastPaths(path => !buildTargets.belongsToBuildTarget(path.toNIO) ) - .withInterrupt(cancelPromise) + .withInterrupt _ = initTreeView() } yield result) .recover { case NonFatal(e) => - disconnect(false, cancelPromise) + disconnect(false) val message = "Failed to connect with build server, no functionality will work." val details = " See logs for more details." @@ -599,7 +590,7 @@ class ConnectionProvider( scribe.error(message, e) BuildChange.Failed } - .flatMap(compileAllOpenFiles(_).withInterrupt(cancelPromise)) + .flatMap(compileAllOpenFiles(_).withInterrupt) .map { res => buildServerPromise.trySuccess(()) res @@ -609,13 +600,12 @@ class ConnectionProvider( private def generateBspConfigAndConnect( buildTool: BuildServerProvider, shutdownServer: Boolean, - cancelPromise: Promise[Unit], - ): Interruptable[BuildChange] = { + )(implicit cancelSwitch: CancelSwitch): Interruptable[BuildChange] = { tables.buildTool.chooseBuildTool(buildTool.executableName) maybeChooseServer(buildTool.buildServerName, alreadySelected = false) for { _ <- - if (shutdownServer) disconnect(shutdownServer, cancelPromise) + if (shutdownServer) disconnect(shutdownServer) else Interruptable.successful(()) status <- buildTool .generateBspConfig( @@ -623,10 +613,10 @@ class ConnectionProvider( args => bspConfigGenerator.runUnconditionally(buildTool, args), statusBar, ) - .withInterrupt(cancelPromise) + .withInterrupt shouldConnect = handleGenerationStatus(buildTool, status) status <- - if (shouldConnect) createSession(false, cancelPromise) + if (shouldConnect) createSession(false) else Interruptable.successful(BuildChange.Failed) } yield status } @@ -667,8 +657,7 @@ class ConnectionProvider( checksum: String, forceImport: Boolean, shutdownServer: Boolean, - cancelPromise: Promise[Unit], - ): Interruptable[BuildChange] = { + )(implicit cancelSwitch: CancelSwitch): Interruptable[BuildChange] = { for { result <- { if (forceImport) @@ -680,9 +669,9 @@ class ConnectionProvider( buildTool, checksum, ) - }.withInterrupt(cancelPromise) + }.withInterrupt change <- { - if (result.isInstalled) createSession(shutdownServer, cancelPromise) + if (result.isInstalled) createSession(shutdownServer) else if (result.isFailed) { for { change <- @@ -698,7 +687,7 @@ class ConnectionProvider( // Connect nevertheless, many build import failures are caused // by resolution errors in one weird module while other modules // exported successfully. - createSession(shutdownServer, cancelPromise) + createSession(shutdownServer) } else { languageClient.showMessage(Messages.ImportProjectFailed) Interruptable.successful(BuildChange.Failed) @@ -721,10 +710,11 @@ case object Queue extends ConflictBehaviour sealed trait ConnectRequest extends ConnectKind { - /** Decides what to do with a new connect request + /** + * Decides what to do with a new connect request * in presence of an another ongoing/queued request. * @param other the ongoing or queued request - * @return behavoiur of the incoming request + * @return behavoiur of the incoming request * Yield -- cancel this * TakeOver -- cancel other * Queue -- queue diff --git a/metals/src/main/scala/scala/meta/internal/metals/FileDecoderProvider.scala b/metals/src/main/scala/scala/meta/internal/metals/FileDecoderProvider.scala index c24501c421d..a2d9d87e321 100644 --- a/metals/src/main/scala/scala/meta/internal/metals/FileDecoderProvider.scala +++ b/metals/src/main/scala/scala/meta/internal/metals/FileDecoderProvider.scala @@ -565,6 +565,7 @@ final class FileDecoderProvider( propagateError = true, logInfo = false, ) + .future .map(_ => { if (sbErr.nonEmpty) DecoderResponse.failed(path.toURI, sbErr.toString) @@ -675,6 +676,7 @@ final class FileDecoderProvider( else DecoderResponse.success(path.toURI, sbOut.toString) }) + .future } catch { case NonFatal(e) => scribe.error(e.toString()) diff --git a/metals/src/main/scala/scala/meta/internal/metals/Interruptable.scala b/metals/src/main/scala/scala/meta/internal/metals/Interruptable.scala index 2b3d7cc3f2c..862af05f359 100644 --- a/metals/src/main/scala/scala/meta/internal/metals/Interruptable.scala +++ b/metals/src/main/scala/scala/meta/internal/metals/Interruptable.scala @@ -1,7 +1,5 @@ package scala.meta.internal.metals -import java.util.concurrent.CompletableFuture - import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise @@ -10,47 +8,76 @@ import scala.meta.internal.metals.Interruptable.CancelConnectException class Interruptable[+T] private ( futureIn: Future[T], - cancelPromise: Promise[Unit], -) extends CompletableFuture { + val cancelable: Cancelable, +) { - def future(implicit executor: ExecutionContext): Future[T] = futureIn.map( - if (cancelPromise.isCompleted) throw CancelConnectException else _ + def future(implicit + executor: ExecutionContext, + cancelPromise: CancelSwitch, + ): Future[T] = futureIn.map( + if (cancelPromise.promise.isCompleted) throw CancelConnectException else _ ) - override def cancel(mayInterruptIfRunning: Boolean): Boolean = { - cancelPromise.trySuccess(()) - true - } - - override def isCancelled(): Boolean = cancelPromise.isCompleted - def flatMap[S]( f: T => Interruptable[S] - )(implicit executor: ExecutionContext): Interruptable[S] = - new Interruptable(future.flatMap(f(_).future), cancelPromise) + )(implicit + executor: ExecutionContext, + cancelPromise: CancelSwitch, + ): Interruptable[S] = { + val mutCancel = + cancelable match { + case c: MutableCancelable => c + case c => new MutableCancelable().add(c) + } + val newFuture = future.flatMap { res => + val i = f(res) + mutCancel.add(i.cancelable) + i.future + } + new Interruptable(newFuture, mutCancel) + } def map[S]( f: T => S - )(implicit executor: ExecutionContext): Interruptable[S] = - new Interruptable(future.map(f(_)), cancelPromise) + )(implicit + executor: ExecutionContext, + cancelPromise: CancelSwitch, + ): Interruptable[S] = + new Interruptable(future.map(f(_)), cancelable) def recover[U >: T]( pf: PartialFunction[Throwable, U] - )(implicit executor: ExecutionContext): Interruptable[U] = { + )(implicit + executor: ExecutionContext, + cancelPromise: CancelSwitch, + ): Interruptable[U] = { val pf0: PartialFunction[Throwable, U] = { case CancelConnectException => throw CancelConnectException } - new Interruptable(future.recover(pf0.orElse(pf)), cancelPromise) + new Interruptable(future.recover(pf0.orElse(pf)), cancelable) } + + def toCancellable(implicit cancelPromise: CancelSwitch): CancelableFuture[T] = + CancelableFuture( + futureIn, + () => { cancelPromise.promise.trySuccess(()); cancelable.cancel() }, + ) } object Interruptable { def successful[T](result: T) = - new Interruptable(Future.successful(result), Promise()) + new Interruptable(Future.successful(result), Cancelable.empty) object CancelConnectException extends RuntimeException implicit class XtensionFuture[+T](future: Future[T]) { - def withInterrupt(cancelPromise: Promise[Unit]): Interruptable[T] = - new Interruptable(future, cancelPromise) + def withInterrupt: Interruptable[T] = + new Interruptable(future, Cancelable.empty) + } + + implicit class XtensionCancelFuture[+T](future: CancelableFuture[T]) { + def withInterrupt: Interruptable[T] = + new Interruptable(future.future, future.cancelable) } } + +case class CancelSwitch(promise: Promise[Unit]) extends AnyVal diff --git a/tests/slow/src/test/scala/tests/bazel/BazelLspSuite.scala b/tests/slow/src/test/scala/tests/bazel/BazelLspSuite.scala index 2f1fd0cb7f8..7a2fef9e181 100644 --- a/tests/slow/src/test/scala/tests/bazel/BazelLspSuite.scala +++ b/tests/slow/src/test/scala/tests/bazel/BazelLspSuite.scala @@ -295,16 +295,18 @@ class BazelLspSuite def jsonFile = workspace.resolve(Directories.bsp).resolve("bazelbsp.json").readText for { - _ <- shellRunner.runJava( - Dependency.of( - BazelBuildTool.dependency.getModule(), - "3.2.0-20240508-f3a81e7-NIGHTLY", - ), - BazelBuildTool.mainClass, - workspace, - BazelBuildTool.projectViewArgs(workspace), - None, - ) + _ <- shellRunner + .runJava( + Dependency.of( + BazelBuildTool.dependency.getModule(), + "3.2.0-20240508-f3a81e7-NIGHTLY", + ), + BazelBuildTool.mainClass, + workspace, + BazelBuildTool.projectViewArgs(workspace), + None, + ) + .future _ = assertContains(jsonFile, "3.2.0-20240508-f3a81e7-NIGHTLY") _ <- initialize( BazelBuildLayout(workspaceLayout, V.bazelScalaVersion, bazelVersion)