Skip to content

Commit

Permalink
Increase code reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
lpsinger committed Dec 6, 2023
1 parent f119ae7 commit 903b413
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 101 deletions.
3 changes: 1 addition & 2 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
*/

import { launch } from './run.js'
import type { LocalSearch } from './run.js'
import { populate } from './data.js'
import {
cloudformationResources as serverlessCloudformationResources,
Expand Down Expand Up @@ -66,7 +65,7 @@ export const deploy = {
},
}

let local: LocalSearch
let local: Awaited<ReturnType<typeof launch>>

function getEngine(name?: string) {
if (name?.toLowerCase() === 'opensearch') return 'opensearch'
Expand Down
19 changes: 0 additions & 19 deletions processes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,3 @@ export async function untilTerminated(child: ChildProcess) {
child.on('exit', resolve).on('error', reject)
})
}

class UnexpectedTerminationError extends Error {
readonly exitStatus: number | null

constructor(exitStatus: number | null) {
super('Child process terminated unexpectedly with exit status ${number}')
this.exitStatus = exitStatus
}
}

export async function untilTerminatedUnexpectedly(child: ChildProcess) {
return new Promise<number | null>((_resolve, reject) => {
child
.on('exit', (exitStatus) =>
reject(new UnexpectedTerminationError(exitStatus))
)
.on('error', reject)
})
}
178 changes: 98 additions & 80 deletions run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,105 +12,75 @@ import { install } from './install.js'
import { mkdtemp } from 'fs/promises'
import { mkdirP, temp } from './paths.js'
import rimraf from 'rimraf'
import {
spawn,
untilTerminated,
untilTerminatedUnexpectedly,
} from './processes.js'
import { pipeline } from 'stream/promises'
import { createReadStream } from 'fs'
import { spawn, untilTerminated } from './processes.js'
import type { SandboxEngine } from './engines.js'
import Dockerode from 'dockerode'

async function BaseLauncher({
port,
engine,
}: {
port?: number
engine: SandboxEngine
}) {
port ??= 9200
const url = `http://localhost:${port}`

const opts = [
`http.port=${port}`,
'discovery.type=single-node',
engine === 'elasticsearch'
? 'xpack.security.enabled=false'
: 'plugins.security.disabled=true',
]
class UnexpectedResolveError extends Error {}

const tempDir = await mkdtemp(join(temp, 'run-'))
const [dataDir, logsDir] = ['data', 'logs'].map((s) => join(tempDir, s))
await Promise.all([dataDir, logsDir].map(mkdirP))

return {
url,
port,
opts,
dataDir,
logsDir,
async stop() {
console.log('Removing temporary directory')
await rimraf(tempDir)
},
}
async function neverResolve<T>(promise: Promise<T>) {
await promise
throw new UnexpectedResolveError('promise resolved unexpectedly')
}

async function BinLauncher({
bin,
...props
}: {
bin: string
} & Parameters<typeof BaseLauncher>[0]) {
const { port, url, dataDir, logsDir, opts, stop } = await BaseLauncher(props)
type SearchEngineLauncherFunction<T = object> = ({
options,
dataDir,
logsDir,
engine,
port,
}: T & {
options: string[]
dataDir: string
logsDir: string
engine: SandboxEngine
port: number
}) => Promise<{
kill: () => Promise<void>
waitUntilStopped: () => Promise<void>
}>

opts.push(`path.data=${dataDir}`, `path.logs=${logsDir}`)
const args = opts.map((opt) => `-E${opt}`)
const launchBinary: SearchEngineLauncherFunction<{ bin: string }> = async ({
bin,
dataDir,
logsDir,
options,
}) => {
const args = [...options, `path.data=${dataDir}`, `path.logs=${logsDir}`].map(
(opt) => `-E${opt}`
)

console.log('Spawning', bin, ...args)
const child = await spawn(bin, args, {
stdio: ['ignore', 'ignore', 'inherit'],
})

try {
await Promise.race([
waitPort({ port, protocol: 'http' }),
untilTerminatedUnexpectedly(child),
])
} catch (e) {
await pipeline(
createReadStream(join(logsDir, 'opensearch.log')),
process.stderr
)
throw e
}
console.log('OpenSearch is ready at', url)

return {
port,
url,
async stop() {
console.log('Stopping child process')
async kill() {
console.log('Killing child process')
child.kill()
},
async waitUntilStopped() {
await untilTerminated(child)
await stop()
},
}
}

async function DockerLauncher(props: Parameters<typeof BaseLauncher>[0]) {
const { port, dataDir, logsDir, opts, stop } = await BaseLauncher(props)
opts.push('path.data=/var/lib/search', 'path.logs=/var/log/search')
const launchDocker: SearchEngineLauncherFunction = async ({
dataDir,
logsDir,
engine,
port,
options,
}) => {
const Image =
props.engine === 'elasticsearch'
engine === 'elasticsearch'
? 'elastic/elasticsearch:8.6.2'
: 'opensearchproject/opensearch:2.11.0'

console.log('Launching Docker container', Image)
const docker = new Dockerode()
const container = await docker.createContainer({
Env: opts,
Env: [...options, 'path.data=/var/lib/search', 'path.logs=/var/log/search'],
HostConfig: {
AutoRemove: true,
Mounts: [
Expand All @@ -126,19 +96,67 @@ async function DockerLauncher(props: Parameters<typeof BaseLauncher>[0]) {
const stream = await container.attach({ stream: true, stderr: true })
stream.pipe(process.stderr)
await container.start()
await waitPort({ port, protocol: 'http' })

return {
port,
async stop() {
console.log('Stopping Docker container')
async kill() {
console.log('Killing Docker container')
await container.kill()
await stop()
},
async waitUntilStopped() {
await container.wait()
},
}
}

export async function launch(engine: SandboxEngine) {
export async function launch({
port,
engine,
}: {
port?: number
engine: SandboxEngine
}) {
port ??= 9200
const url = `http://localhost:${port}`

const options = [
`http.port=${port}`,
'discovery.type=single-node',
engine === 'elasticsearch'
? 'xpack.security.enabled=false'
: 'plugins.security.disabled=true',
]

const tempDir = await mkdtemp(join(temp, 'run-'))
const [dataDir, logsDir] = ['data', 'logs'].map((s) => join(tempDir, s))
await Promise.all([dataDir, logsDir].map(mkdirP))

const bin = await install(engine)
return await (bin ? BinLauncher({ bin, engine }) : DockerLauncher({ engine }))
const props = { engine, dataDir, logsDir, options, port }
const { kill, waitUntilStopped } = await (bin
? launchBinary({ bin, ...props })
: launchDocker(props))

try {
Promise.race([
waitPort({ port, protocol: 'http' }),
neverResolve(waitUntilStopped()),
])
} catch (e) {
if (e instanceof UnexpectedResolveError) {
throw new Error('Search engine terminated unexpectedly')
} else {
throw e
}
}

return {
url,
port,
async stop() {
await kill()
await waitUntilStopped()
console.log('Removing temporary directory')
await rimraf(tempDir)
},
}
}

0 comments on commit 903b413

Please sign in to comment.