Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement exponential retry/backoff mechanism in k8s pod exec #125

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions packages/k8s/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/k8s/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"@actions/exec": "^1.1.1",
"@actions/io": "^1.1.2",
"@kubernetes/client-node": "^0.18.1",
"exponential-backoff": "^3.1.1",
"hooklib": "file:../hooklib",
"js-yaml": "^4.1.0",
"shlex": "^2.1.2"
Expand Down
84 changes: 61 additions & 23 deletions packages/k8s/src/k8s/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
mergeObjectMeta,
useKubeScheduler
} from './utils'
import { backOff } from 'exponential-backoff'

const kc = new k8s.KubeConfig()

Expand Down Expand Up @@ -227,33 +228,70 @@ export async function execPodStep(
): Promise<void> {
const exec = new k8s.Exec(kc)
await new Promise(async function (resolve, reject) {
await exec.exec(
namespace(),
podName,
containerName,
command,
process.stdout,
process.stderr,
stdin ?? null,
false /* tty */,
resp => {
// kube.exec returns an error if exit code is not 0, but we can't actually get the exit code
if (resp.status === 'Success') {
resolve(resp.code)
} else {
core.debug(
JSON.stringify({
message: resp?.message,
details: resp?.details
})
)
reject(resp?.message)
}
const backOffOptions = {
numOfAttempts: 3,
retry: (e, attemptNumber) => {
core.debug(e.toString())
core.debug(
`an error occurred trying to execute command in pod, retrying (${attemptNumber}/3)`
)
return true
}
)
}
try {
await backOff(async () => {
await execInPod(
exec,
command,
podName,
containerName,
resolve,
reject,
stdin
)
}, backOffOptions)
} catch (e) {
core.debug('something went wrong in calling pod exec')
reject(e)
}
})
}

async function execInPod(
exec: k8s.Exec,
command: string[],
podName: string,
containerName: string,
resolve: (value?: number | PromiseLike<number> | undefined) => void,
reject: (reason?: any) => void,
stdin?: stream.Readable
): Promise<void> {
await exec.exec(
namespace(),
podName,
containerName,
command,
process.stdout,
process.stderr,
stdin ?? null,
false /* tty */,
resp => {
// kube.exec returns an error if exit code is not 0, but we can't actually get the exit code
if (resp.status === 'Success') {
resolve(resp.code)
} else {
core.debug(
JSON.stringify({
message: resp?.message,
details: resp?.details
})
)
reject(resp?.message)
}
}
)
}

export async function waitForJobToComplete(jobName: string): Promise<void> {
const backOffManager = new BackOffManager()
while (true) {
Expand Down
44 changes: 44 additions & 0 deletions packages/k8s/tests/run-script-step-test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import * as fs from 'fs'
import { cleanupJob, prepareJob, runScriptStep } from '../src/hooks'
import { TestHelper } from './test-setup'
import * as k8s from '@kubernetes/client-node'
import { prunePods } from '../src/k8s'
import { IncomingMessage } from 'http'

jest.useRealTimers()

Expand All @@ -10,8 +13,11 @@ let prepareJobOutputData: any

let runScriptStepDefinition

let execSpy

describe('Run script step', () => {
beforeEach(async () => {
execSpy = jest.spyOn(k8s.Exec.prototype, 'exec')
testHelper = new TestHelper()
await testHelper.initialize()
const prepareJobOutputFilePath = testHelper.createFile(
Expand All @@ -27,6 +33,7 @@ describe('Run script step', () => {
})

afterEach(async () => {
execSpy.mockRestore()
await cleanupJob()
await testHelper.cleanup()
})
Expand All @@ -45,6 +52,43 @@ describe('Run script step', () => {
).resolves.not.toThrow()
})

it('should be able to handle errors occurring in k8s.Exec.exec() (e.g non 2xx Kubernetes API response)', async () => {
let errorCallCount = 0
const mockExec = jest.fn(async (...args) => {
errorCallCount++

if (errorCallCount < 2) {
throw new k8s.HttpError('' as unknown as IncomingMessage, 'test', 500)
} else {
execSpy.mockRestore()
throw new k8s.HttpError('' as unknown as IncomingMessage, 'test', 500)
}
})

execSpy.mockImplementation(mockExec)
await expect(
runScriptStep(
runScriptStepDefinition.args,
prepareJobOutputData.state,
null
)
).resolves.not.toThrow()
})

it('should fail after multiple consecutive failures in k8s.Exec.exec()', async () => {
const mockExec = jest.fn(async (...args) => {
throw new k8s.HttpError('' as unknown as IncomingMessage, 'test', 500)
})
execSpy.mockImplementation(mockExec)
await expect(
runScriptStep(
runScriptStepDefinition.args,
prepareJobOutputData.state,
null
)
).rejects.toThrow()
})

it('should fail if the working directory does not exist', async () => {
runScriptStepDefinition.args.workingDirectory = '/foo/bar'
await expect(
Expand Down
Loading