Skip to content

Commit

Permalink
feat: add synchronous shutdown
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Goller <[email protected]>
  • Loading branch information
goller committed Nov 13, 2024
1 parent d568875 commit 8cde205
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 90 deletions.
4 changes: 4 additions & 0 deletions proto/depot/cloud/v3/machine.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ service MachineService {
rpc PingMachineHealth(PingMachineHealthRequest) returns (PingMachineHealthResponse);
rpc ReportMachineHealth(stream ReportMachineHealthRequest) returns (ReportMachineHealthResponse);
rpc Usage(UsageRequest) returns (UsageResponse);
rpc Shutdown(ShutdownRequest) returns (ShutdownResponse);
}

message RegisterMachineRequest {
Expand Down Expand Up @@ -159,3 +160,6 @@ message Cache {
}

message UsageResponse {}

message ShutdownRequest {}
message ShutdownResponse {}
11 changes: 11 additions & 0 deletions src/gen/ts/depot/cloud/v3/machine_connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {
RegisterMachineResponse,
ReportMachineHealthRequest,
ReportMachineHealthResponse,
ShutdownRequest,
ShutdownResponse,
UsageRequest,
UsageResponse,
} from './machine_pb'
Expand Down Expand Up @@ -57,5 +59,14 @@ export const MachineService = {
O: UsageResponse,
kind: MethodKind.Unary,
},
/**
* @generated from rpc depot.cloud.v3.MachineService.Shutdown
*/
shutdown: {
name: 'Shutdown',
I: ShutdownRequest,
O: ShutdownResponse,
kind: MethodKind.Unary,
},
},
} as const
66 changes: 66 additions & 0 deletions src/gen/ts/depot/cloud/v3/machine_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1140,3 +1140,69 @@ export class UsageResponse extends Message<UsageResponse> {
return proto3.util.equals(UsageResponse, a, b)
}
}

/**
* @generated from message depot.cloud.v3.ShutdownRequest
*/
export class ShutdownRequest extends Message<ShutdownRequest> {
constructor(data?: PartialMessage<ShutdownRequest>) {
super()
proto3.util.initPartial(data, this)
}

static readonly runtime: typeof proto3 = proto3
static readonly typeName = 'depot.cloud.v3.ShutdownRequest'
static readonly fields: FieldList = proto3.util.newFieldList(() => [])

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): ShutdownRequest {
return new ShutdownRequest().fromBinary(bytes, options)
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): ShutdownRequest {
return new ShutdownRequest().fromJson(jsonValue, options)
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): ShutdownRequest {
return new ShutdownRequest().fromJsonString(jsonString, options)
}

static equals(
a: ShutdownRequest | PlainMessage<ShutdownRequest> | undefined,
b: ShutdownRequest | PlainMessage<ShutdownRequest> | undefined,
): boolean {
return proto3.util.equals(ShutdownRequest, a, b)
}
}

/**
* @generated from message depot.cloud.v3.ShutdownResponse
*/
export class ShutdownResponse extends Message<ShutdownResponse> {
constructor(data?: PartialMessage<ShutdownResponse>) {
super()
proto3.util.initPartial(data, this)
}

static readonly runtime: typeof proto3 = proto3
static readonly typeName = 'depot.cloud.v3.ShutdownResponse'
static readonly fields: FieldList = proto3.util.newFieldList(() => [])

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): ShutdownResponse {
return new ShutdownResponse().fromBinary(bytes, options)
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): ShutdownResponse {
return new ShutdownResponse().fromJson(jsonValue, options)
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): ShutdownResponse {
return new ShutdownResponse().fromJsonString(jsonString, options)
}

static equals(
a: ShutdownResponse | PlainMessage<ShutdownResponse> | undefined,
b: ShutdownResponse | PlainMessage<ShutdownResponse> | undefined,
): boolean {
return proto3.util.equals(ShutdownResponse, a, b)
}
}
29 changes: 26 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ async function main() {
}

async function runLoop() {
let buildTask: Promise<void> | undefined
const controller = new AbortController()
const signal = controller.signal

try {
let req: PartialMessage<RegisterMachineRequest> = {connectionId: DEPOT_CLOUD_CONNECTION_ID}

Expand All @@ -61,20 +65,39 @@ async function runLoop() {
}

console.log('Connecting to task stream')
const stream = client.registerMachine(req)
const stream = client.registerMachine(req, {signal})
console.log('Connected to task stream')

for await (const message of stream) {
if (!message.task) continue
if (message.task.case === 'buildkit') await startBuildKit(message, message.task.value)
if (message.task.case === 'engine') await startEngine(message, message.task.value)
const {token, machineId} = message

if (message.task.case === 'buildkit') {
buildTask = startBuildKit(token, machineId, message.task.value)
break
}
if (message.task.case === 'engine') {
buildTask = startEngine(token, message.task.value)
break
}
}
} catch (err) {
if (err instanceof ConnectError && err.code === Code.Internal && err.message.includes('RST_STREAM')) {
console.log('Connection closed by server')
} else {
throw err
}
} finally {
controller.abort()
}

if (buildTask) {
try {
await buildTask
} catch (err) {
Sentry.captureException(err)
console.log(err)
}
}
}

Expand Down
82 changes: 34 additions & 48 deletions src/tasks/buildkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ import {isAbortError} from 'abort-controller-x'
import {execa} from 'execa'
import * as fsp from 'fs/promises'
import {onShutdown, onShutdownError} from 'node-graceful-shutdown'
import {RegisterMachineResponse, RegisterMachineResponse_BuildKitTask} from '../gen/ts/depot/cloud/v3/machine_pb'
import {RegisterMachineResponse_BuildKitTask} from '../gen/ts/depot/cloud/v3/machine_pb'
import {pathExists} from '../utils/common'
import {ensureMounted, fstrim, mountExecutor, unmapBlockDevice, unmountDevice} from '../utils/mounts'
import {ensureMounted, mountExecutor} from '../utils/mounts'
import {reportHealth} from './health'
import {ShutdownBuildkit} from './shutdown'
import {reportUsage} from './usage'

export async function startBuildKit(message: RegisterMachineResponse, task: RegisterMachineResponse_BuildKitTask) {
export async function startBuildKit(token: string, machineId: string, task: RegisterMachineResponse_BuildKitTask) {
console.log('Starting BuildKit')

// Attempt to set up binfmt
Expand Down Expand Up @@ -47,7 +48,6 @@ export async function startBuildKit(message: RegisterMachineResponse, task: Regi
})
} catch {}

const {machineId, token} = message
const headers = {Authorization: `Bearer ${token}`}

await fsp.writeFile('/etc/buildkit/tls.crt', task.cert!.cert, {mode: 0o644})
Expand Down Expand Up @@ -207,71 +207,57 @@ keepBytes = ${cacheSizeBytes}
} catch (error) {
if (error instanceof Error && error.message.includes('Command failed with exit code 1')) {
// Ignore this error, it's expected when the process is killed.
} else if (error instanceof Error && error.message.includes('Command failed with exit code 2')) {
// Exit code 2 is a panic in the go runtime (at least in go <= 1.23).
console.error(`BuildKit exited with panic: ${error}`)
} else if (isAbortError(error)) {
// Ignore this error, it's expected when the process is killed.
} else {
// Unknown error.
throw error
}
} finally {
controller.abort()
}
}

const buildkit = runBuildKit()
const build = async () => {
try {
await Promise.allSettled([
runBuildKit(),
reportHealth({controller, headers, path: rootDir}),
reportUsage({machineId, signal, headers}),
])
console.log('BuildKit exited')
} catch (error) {
console.error(`BuildKit exited with error: ${error}`)
}

try {
await ShutdownBuildkit(rootDir, task.mounts)
} catch (error) {
console.error(`Error shutting down: ${error}`)
}
}
const run = build()

onShutdownError(async (error) => {
console.error('Error shutting down:', error)
console.error(`Error shutting down: ${error}`)
})

// onShutdown handles SIGINT and SIGTERM signals. We would receive these signals
// from systemd when the machine is told to turn off.
//
// We will not ever be restarted, so, we ignore the exit code for buildkit.
onShutdown(async () => {
setTimeout(() => {
console.log('Shutdown timed out, killing process')
process.exit(1)
}, 1000 * 60).unref()

controller.abort()
try {
await buildkit
console.log('BuildKit exited')
} catch (error) {
console.log(`BuildKit exited with error: ${error}`)
}

// Remove estargz cache because we will rely on the buildkit layer cache instead.
await execa('rm', ['-rf', `${rootDir}/runc-stargz/snapshots/stargz`], {stdio: 'inherit'}).catch((err) => {
console.error(err)
})

// Print the time it takes to sync the filesystem.
const start = Date.now()
// sync the filesystem to ensure all data is written to disk.
await execa('sync', {stdio: 'inherit'}).catch((err) => {
console.error(err)
})
console.log(`sync took ${Date.now() - start}ms`)

for (const mount of task.mounts) {
if (mount.cephVolume) {
if (!task.disableFstrim) {
await fstrim(mount.path)
}
await unmountDevice(mount.path)
await unmapBlockDevice(mount.cephVolume.volumeName, mount.cephVolume.imageSpec)
} else {
await unmountDevice(mount.path)
}
}
await run
})

try {
await Promise.all([
buildkit,
reportHealth({signal, headers, path: rootDir}),
reportUsage({machineId, signal, headers}),
])
} catch (error) {
throw error
} finally {
controller.abort()
}
await run
}
52 changes: 21 additions & 31 deletions src/tasks/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ import {isAbortError} from 'abort-controller-x'
import {execa} from 'execa'
import * as fsp from 'fs/promises'
import {onShutdown, onShutdownError} from 'node-graceful-shutdown'
import {RegisterMachineResponse, RegisterMachineResponse_EngineTask} from '../gen/ts/depot/cloud/v3/machine_pb'
import {ensureMounted, unmapBlockDevice, unmountDevice} from '../utils/mounts'
import {RegisterMachineResponse_EngineTask} from '../gen/ts/depot/cloud/v3/machine_pb'
import {ensureMounted} from '../utils/mounts'
import {reportEngineHealth} from './engineHealth'
import {ShutdownDagger} from './shutdown'

export async function startEngine(message: RegisterMachineResponse, task: RegisterMachineResponse_EngineTask) {
export async function startEngine(token: string, task: RegisterMachineResponse_EngineTask) {
console.log('Starting engine')

let useCeph = false
Expand All @@ -15,7 +16,6 @@ export async function startEngine(message: RegisterMachineResponse, task: Regist
if (mount.cephVolume) useCeph = true
}

const {machineId, token} = message
const headers = {Authorization: `Bearer ${token}`}

await fsp.mkdir('/etc/engine', {recursive: true})
Expand Down Expand Up @@ -77,7 +77,21 @@ export async function startEngine(message: RegisterMachineResponse, task: Regist
}
}

const engine = runEngine()
const engine = async () => {
try {
await Promise.allSettled([runEngine(), reportEngineHealth({controller, headers, path: '/var/lib/engine'})])
console.log('Engine exited')
} catch (error) {
console.error(`Engine exited with error: ${error}`)
}

try {
await ShutdownDagger('/var/lib/engine', task.mounts)
} catch (error) {
console.error(`Error shutting down: ${error}`)
}
}
const run = engine()

onShutdownError(async (error) => {
console.error('Error shutting down:', error)
Expand All @@ -90,32 +104,8 @@ export async function startEngine(message: RegisterMachineResponse, task: Regist
}, 1000 * 60).unref()

controller.abort()
try {
await engine
console.log('Engine exited')
} catch (error) {
console.log(`Engine exited with error: ${error}`)
}

for (const mount of task.mounts) {
if (mount.cephVolume) {
await unmountDevice(mount.path)
await unmapBlockDevice(mount.cephVolume.volumeName, mount.cephVolume.imageSpec)
} else {
await unmountDevice(mount.path)
}
}
await run
})

try {
await Promise.all([
engine,
reportEngineHealth({signal, headers, path: '/var/lib/engine'}),
// reportUsage({machineId, signal, headers}),
])
} catch (error) {
throw error
} finally {
controller.abort()
}
await run
}
Loading

0 comments on commit 8cde205

Please sign in to comment.