diff --git a/index.ts b/index.ts index 1498de7..7f9c17f 100644 --- a/index.ts +++ b/index.ts @@ -7,7 +7,6 @@ */ import { launch } from './run.js' -import type { LocalSearch } from './run.js' import { populate } from './data.js' import { cloudformationResources as serverlessCloudformationResources, @@ -66,7 +65,7 @@ export const deploy = { }, } -let local: LocalSearch +let local: Awaited> function getEngine(name?: string) { if (name?.toLowerCase() === 'opensearch') return 'opensearch' diff --git a/processes.ts b/processes.ts index e4680e5..818d6b6 100644 --- a/processes.ts +++ b/processes.ts @@ -26,22 +26,3 @@ export async function untilTerminated(child: ChildProcess) { child.on('exit', resolve).on('error', reject) }) } - -class UnexpectedTerminationError extends Error { - readonly exitStatus: number | null - - constructor(exitStatus: number | null) { - super('Child process terminated unexpectedly with exit status ${number}') - this.exitStatus = exitStatus - } -} - -export async function untilTerminatedUnexpectedly(child: ChildProcess) { - return new Promise((_resolve, reject) => { - child - .on('exit', (exitStatus) => - reject(new UnexpectedTerminationError(exitStatus)) - ) - .on('error', reject) - }) -} diff --git a/run.ts b/run.ts index eae43e0..d4d6bf1 100644 --- a/run.ts +++ b/run.ts @@ -12,105 +12,75 @@ import { install } from './install.js' import { mkdtemp } from 'fs/promises' import { mkdirP, temp } from './paths.js' import rimraf from 'rimraf' -import { - spawn, - untilTerminated, - untilTerminatedUnexpectedly, -} from './processes.js' -import { pipeline } from 'stream/promises' -import { createReadStream } from 'fs' +import { spawn, untilTerminated } from './processes.js' import type { SandboxEngine } from './engines.js' import Dockerode from 'dockerode' -async function BaseLauncher({ - port, - engine, -}: { - port?: number - engine: SandboxEngine -}) { - port ??= 9200 - const url = `http://localhost:${port}` - - const opts = [ - `http.port=${port}`, - 'discovery.type=single-node', - engine === 'elasticsearch' - ? 'xpack.security.enabled=false' - : 'plugins.security.disabled=true', - ] +class UnexpectedResolveError extends Error {} - const tempDir = await mkdtemp(join(temp, 'run-')) - const [dataDir, logsDir] = ['data', 'logs'].map((s) => join(tempDir, s)) - await Promise.all([dataDir, logsDir].map(mkdirP)) - - return { - url, - port, - opts, - dataDir, - logsDir, - async stop() { - console.log('Removing temporary directory') - await rimraf(tempDir) - }, - } +async function neverResolve(promise: Promise) { + await promise + throw new UnexpectedResolveError('promise resolved unexpectedly') } -async function BinLauncher({ - bin, - ...props -}: { - bin: string -} & Parameters[0]) { - const { port, url, dataDir, logsDir, opts, stop } = await BaseLauncher(props) +type SearchEngineLauncherFunction = ({ + options, + dataDir, + logsDir, + engine, + port, +}: T & { + options: string[] + dataDir: string + logsDir: string + engine: SandboxEngine + port: number +}) => Promise<{ + kill: () => Promise + waitUntilStopped: () => Promise +}> - opts.push(`path.data=${dataDir}`, `path.logs=${logsDir}`) - const args = opts.map((opt) => `-E${opt}`) +const launchBinary: SearchEngineLauncherFunction<{ bin: string }> = async ({ + bin, + dataDir, + logsDir, + options, +}) => { + const args = [...options, `path.data=${dataDir}`, `path.logs=${logsDir}`].map( + (opt) => `-E${opt}` + ) console.log('Spawning', bin, ...args) const child = await spawn(bin, args, { stdio: ['ignore', 'ignore', 'inherit'], }) - try { - await Promise.race([ - waitPort({ port, protocol: 'http' }), - untilTerminatedUnexpectedly(child), - ]) - } catch (e) { - await pipeline( - createReadStream(join(logsDir, 'opensearch.log')), - process.stderr - ) - throw e - } - console.log('OpenSearch is ready at', url) - return { - port, - url, - async stop() { - console.log('Stopping child process') + async kill() { + console.log('Killing child process') child.kill() + }, + async waitUntilStopped() { await untilTerminated(child) - await stop() }, } } -async function DockerLauncher(props: Parameters[0]) { - const { port, dataDir, logsDir, opts, stop } = await BaseLauncher(props) - opts.push('path.data=/var/lib/search', 'path.logs=/var/log/search') +const launchDocker: SearchEngineLauncherFunction = async ({ + dataDir, + logsDir, + engine, + port, + options, +}) => { const Image = - props.engine === 'elasticsearch' + engine === 'elasticsearch' ? 'elastic/elasticsearch:8.6.2' : 'opensearchproject/opensearch:2.11.0' - console.log('Launching Docker container', Image) const docker = new Dockerode() const container = await docker.createContainer({ - Env: opts, + Env: [...options, 'path.data=/var/lib/search', 'path.logs=/var/log/search'], HostConfig: { AutoRemove: true, Mounts: [ @@ -126,19 +96,67 @@ async function DockerLauncher(props: Parameters[0]) { const stream = await container.attach({ stream: true, stderr: true }) stream.pipe(process.stderr) await container.start() - await waitPort({ port, protocol: 'http' }) return { - port, - async stop() { - console.log('Stopping Docker container') + async kill() { + console.log('Killing Docker container') await container.kill() - await stop() + }, + async waitUntilStopped() { + await container.wait() }, } } -export async function launch(engine: SandboxEngine) { +export async function launch({ + port, + engine, +}: { + port?: number + engine: SandboxEngine +}) { + port ??= 9200 + const url = `http://localhost:${port}` + + const options = [ + `http.port=${port}`, + 'discovery.type=single-node', + engine === 'elasticsearch' + ? 'xpack.security.enabled=false' + : 'plugins.security.disabled=true', + ] + + const tempDir = await mkdtemp(join(temp, 'run-')) + const [dataDir, logsDir] = ['data', 'logs'].map((s) => join(tempDir, s)) + await Promise.all([dataDir, logsDir].map(mkdirP)) + const bin = await install(engine) - return await (bin ? BinLauncher({ bin, engine }) : DockerLauncher({ engine })) + const props = { engine, dataDir, logsDir, options, port } + const { kill, waitUntilStopped } = await (bin + ? launchBinary({ bin, ...props }) + : launchDocker(props)) + + try { + Promise.race([ + waitPort({ port, protocol: 'http' }), + neverResolve(waitUntilStopped()), + ]) + } catch (e) { + if (e instanceof UnexpectedResolveError) { + throw new Error('Search engine terminated unexpectedly') + } else { + throw e + } + } + + return { + url, + port, + async stop() { + await kill() + await waitUntilStopped() + console.log('Removing temporary directory') + await rimraf(tempDir) + }, + } }