Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt to make zen more stable and faster #54

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 104 additions & 87 deletions lib/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,7 @@ import initZen from './index'
import yargs from 'yargs'
import { invoke, workTests } from './util'
import * as Profiler from './profiler'
import { Zen } from './types'

type testFailure = {
fullName: string
attempts: number
error?: string
time: number
}
import { TestResultCollection, Zen } from './types'

export type CLIOptions = {
logging: boolean
Expand Down Expand Up @@ -60,85 +53,118 @@ yargs(process.argv.slice(2))
debug: { type: 'boolean', default: false },
}).argv

type TestResultsMap = Record<string, testFailure>

async function runTests(
zen: Zen,
opts: CLIOptions,
tests: string[]
): Promise<TestResultsMap> {
const groups = zen.journal.groupTests(tests, zen.config.lambdaConcurrency)
type failedTest = testFailure & { logStream: string }
): Promise<TestResultCollection> {
const groups = zen.journal.groupTestsWithDuplication(
tests,
zen.config.lambdaConcurrency
)

const failedTests: failedTest[][] = await Promise.all(
groups.map(async (group: { tests: string[] }): Promise<failedTest[]> => {
try {
const response = await workTests(zen, {
deflakeLimit: opts.maxAttempts,
testNames: group.tests,
sessionId: zen.config.sessionId,
})
const logStreamName = response.logStreamName
const testResultGroups: TestResultCollection[] = await Promise.all(
groups.map(
async (group: { tests: string[] }): Promise<TestResultCollection> => {
const testNames = group.tests
try {
const response = await workTests(zen, {
deflakeLimit: opts.maxAttempts,
sessionId: zen.config.sessionId,
testNames,
})
const logStream = response.logStreamName

// Map back to the old representation and fill in any tests that may have not run
const results = group.tests.map((test) => {
const results = response.results[test] || []
const result = results.at(-1)
// Map back to the old representation and fill in any tests that may have not run
const results = testNames.reduce<TestResultCollection>(
(col, fullName) => {
const results = response.results[fullName] || []
let success = true
if (results.length === 0) {
success = false
} else if (results.at(-1)?.error) {
success = false
}

if (!result) {
console.log(test, response.results, results)
return {
fullName: test,
attempts: 0,
error: 'Failed to run on remote!',
time: 0,
logStream: logStreamName,
}
} else {
return {
...result,
logStream: logStreamName,
attempts: results.length,
col[fullName] = {
fullName: fullName,
logStream,
success,
results,
}

return col
},
{}
)

return results
} catch (e) {
let message = 'Failed to run on remote!'
if (e instanceof Error) message = e.message

return testNames.reduce<TestResultCollection>((col, fullName) => {
col[fullName] = {
fullName,
logStream: '',
success: false,
results: [
{
fullName,
error: message,
time: 0,
},
],
}
}
})

return results.filter((r: testFailure) => r.error || r.attempts > 1)
} catch (e) {
console.error(e)
return group.tests.map((name: string) => {
return {
fullName: name,
attempts: 0,
error: 'zen failed to run this group',
time: 0,
logStream: '',
}
})
return col
}, {})
}
}
})
)
)

return failedTests
.flat()
.reduce((acc: Record<string, testFailure>, result: testFailure) => {
acc[result.fullName] = result
const testResults = testResultGroups.reduce<TestResultCollection>(
(acc, results) => {
for (const testName in results) {
if (acc[testName]) {
acc[testName].results = acc[testName].results.concat(
results[testName].results
)
// If the test has ever succeeded, then it is a success
acc[testName].success =
acc[testName].success || results[testName].success
} else {
acc[testName] = results[testName]
}
}
return acc
}, {})
},
{}
)

const testErrors: TestResultCollection = {}
for (const testName in testResults) {
const result = testResults[testName]
if (!result.success) {
testErrors[testName] = result
}
}

return testErrors
}

function combineFailures(
currentFailures: TestResultsMap,
previousFailures?: TestResultsMap
): TestResultsMap {
currentFailures: TestResultCollection,
previousFailures?: TestResultCollection
): TestResultCollection {
if (!previousFailures) return currentFailures

// Combine the current failures with the previous failures
const failures = { ...previousFailures }
// Reset the error state for all the previous tests, that way if they
// succeed it will report only as a flake
// Reset the success flag, if they are not being added to that means they succeded
for (const testName in failures) {
failures[testName].error = undefined
failures[testName].success = true
}

for (const testName in currentFailures) {
Expand All @@ -150,9 +176,8 @@ function combineFailures(
} else {
failures[testName] = {
...prevFailure,
error: curFailure.error,
time: prevFailure.time + curFailure.time,
attempts: prevFailure.attempts + curFailure.attempts,
results: prevFailure.results.concat(curFailure.results),
success: false,
}
}
}
Expand Down Expand Up @@ -201,7 +226,7 @@ async function run(zen: Zen, opts: CLIOptions) {

// In case there is an infinite loop, this should brick the test running
let runsLeft = 5
let failures: TestResultsMap | undefined
let failures: TestResultCollection | undefined
console.log(`Running ${workingSet.length} tests`)
while (runsLeft > 0 && workingSet.length > 0) {
runsLeft--
Expand All @@ -213,7 +238,7 @@ async function run(zen: Zen, opts: CLIOptions) {
for (const testName in failures) {
const failure = failures[testName]
if (!failure) continue
if (failure.error && failure.attempts < opts.maxAttempts) {
if (!failure.success && failure.results.length < opts.maxAttempts) {
testsToContinue.push(failure.fullName)
}
}
Expand All @@ -222,32 +247,24 @@ async function run(zen: Zen, opts: CLIOptions) {
console.log(`Trying to rerun ${workingSet.length} tests`)
}

const metrics = []
let failCount = 0
for (const test of Object.values(failures || {})) {
metrics.push({
name: 'log.test_failed',
fields: {
value: test.attempts,
testName: test.fullName,
time: test.time,
error: test.error,
},
})
const lastResult = test.results.at(-1)
const attempts = test.results.length || 1
if (!lastResult) continue

if (test.error) {
if (lastResult.error) {
failCount += 1
console.log(
`🔴 ${test.fullName} ${test.error} (tried ${
test.attempts || 1
`🔴 ${test.fullName} ${lastResult.error} (tried ${
attempts || 1
} times)`
)
} else if (test.attempts > 1) {
console.log(`⚠️ ${test.fullName} (flaked ${test.attempts - 1}x)`)
} else if (test.results.length > 1) {
console.log(`⚠️ ${test.fullName} (flaked ${attempts - 1}x)`)
}
}

if (opts.logging) Profiler.logBatch(zen, metrics)
console.log(`Took ${Date.now() - t0}ms`)
console.log(
`${failCount ? '😢' : '🎉'} ${failCount} failed test${
Expand Down
20 changes: 20 additions & 0 deletions lib/journal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,26 @@ export default class Journal {
}
}

groupTestsWithDuplication(
tests: string[],
concurrency: number
): { time: number; tests: string[] }[] {
let groups = this.groupTests(tests, concurrency)
// Take every group and add their content to anther group,
// this will help incase a single group fails due to a timeout
// or some other issue.
groups = groups.map((group, i) => {
const groupB = groups[(i + 1) % groups.length]
const groupC = groups[(i + 2) % groups.length]
return {
time: group.time + groupB.time + groupC.time,
tests: [...group.tests, ...groupB.tests, ...groupC.tests],
}
})

return groups
}

groupTests(
tests: string[],
concurrency: number
Expand Down
1 change: 0 additions & 1 deletion lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ module.exports = class Server {
const result = results.at(-1)

if (!result) {
console.log(test, response.results, results)
return {
fullName: test,
attempts: 0,
Expand Down
10 changes: 10 additions & 0 deletions lib/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ export type TestResult = {
log?: log
}

export type TestResultCollection = Record<
string,
{
results: TestResult[]
fullName: string
logStream?: string
success: boolean
}
>

export type Config = {
log?: (metrics: metric[]) => Promise<void>
appRoot: string
Expand Down
41 changes: 16 additions & 25 deletions lib/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ export function ensureDir(dir: string): void {
}

function isRetryableError(error: Error) {
return error.message.includes('Rate Exceeded.')
const errors = ['Rate Exceeded', 'Execution context was destroyed']

return errors.some((e) => error.message.includes(e))
}

function timeout(ms: number): Promise<void> {
Expand All @@ -135,39 +137,28 @@ function timeout(ms: number): Promise<void> {
export async function invoke(
zen: Zen,
name: string,
args: unknown,
retry = 3
args: unknown
): Promise<unknown> {
try {
const result = await zen.lambda
.invoke({ FunctionName: name, Payload: JSON.stringify(args) })
.promise()

if (result.StatusCode != 200 || !result.Payload) throw result
const result = await zen.lambda
.invoke({ FunctionName: name, Payload: JSON.stringify(args) })
.promise()

// @ts-expect-error This appears to work :shrug:, it is not worth fixing until real changes here
const payload = JSON.parse(result.Payload)
if (result.StatusCode != 200 || !result.Payload) throw result

if (payload.errorMessage) {
const err = new Error(payload.errorMessage)
throw err
}
return payload
} catch (e) {
if (retry > 0 && e instanceof Error && isRetryableError(e)) {
// 10s is arbitrary but hopefully it gives time for things like rate-limiting to resolve
await timeout(10_000)
return invoke(zen, name, args, retry - 1)
}
// @ts-expect-error This appears to work :shrug:, it is not worth fixing until real changes here
const payload = JSON.parse(result.Payload)

throw e
if (payload.errorMessage) {
const err = new Error(payload.errorMessage)
throw err
}
return payload
}

export async function workTests(
zen: Zen,
args: { deflakeLimit: number; testNames: string[]; sessionId: string },
rerun = true
rerun = false
): Promise<WorkTestsResult> {
const response = (await invoke(
zen,
Expand All @@ -183,7 +174,7 @@ export async function workTests(

// If there are timeouts, then keep calling workTests again with the unresolved tests
if (timeoutTests.length > 0 && rerun) {
console.log('RERUNNING DUE TO LAMBDA TIMEOUT', timeoutTests)
console.log('RERUNNING DUE TO LAMBDA TIMEOUT')
await timeout(30_000)
// We don't want this going on endlessly, because there are other errors we may want to do work
const newResponse = await workTests(
Expand Down