Skip to content

Commit

Permalink
feat: enable error handler to recover from failed HTTP requests (#2011)
Browse files Browse the repository at this point in the history
This PR modifies the `onError` handler that is supported by the shape
stream such that it can recover from certain HTTP request errors. For
example, if the HTTP request fails with a 401 because of an expired
token, the handler could fetch a new token, and try again by returning
the modified HTTP headers. The shapestream will then retry.
  • Loading branch information
kevin-dp authored Nov 21, 2024
1 parent c5b79a5 commit de204fc
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 4 deletions.
5 changes: 5 additions & 0 deletions .changeset/loud-peaches-mate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@electric-sql/client": patch
---

Allow error handler to modify HTTP query parameters and headers to retry failed HTTP request.
26 changes: 23 additions & 3 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,13 @@ type ReservedParamKeys =

type ParamsRecord = Omit<Record<string, string>, ReservedParamKeys>

type ShapeStreamErrorHandler = (error: Error) => void
type RetryOpts = {
params?: ParamsRecord
headers?: Record<string, string>
}
type ShapeStreamErrorHandler = (
error: Error
) => void | RetryOpts | Promise<void | RetryOpts>

/**
* Options for constructing a ShapeStream.
Expand Down Expand Up @@ -424,12 +430,26 @@ export class ShapeStream<T extends Row<unknown> = Row>
} catch (err) {
this.#error = err
if (this.#onError) {
this.#onError(err as Error)
const retryOpts = await this.#onError(err as Error)
if (typeof retryOpts === `object`) {
this.#reset()

if (`params` in retryOpts) {
this.options.params = retryOpts.params
}

if (`headers` in retryOpts) {
this.options.headers = retryOpts.headers
}

// Restart
this.#start()
}
return
}

// If no handler is provided for errors just throw so the error still bubbles up.
throw this.#error
throw err
} finally {
this.#connected = false
}
Expand Down
144 changes: 144 additions & 0 deletions packages/typescript-client/test/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,150 @@ describe(`Shape`, () => {
expect(shapeStream.isConnected()).true
})

it(`should not throw error if an error handler is provided`, async ({
issuesTableUrl,
}) => {
const mockErrorHandler = vi.fn()
new ShapeStream({
url: `${BASE_URL}/v1/shape`,
table: issuesTableUrl,
fetchClient: async (_input, _init) => {
return new Response(undefined, {
status: 401,
})
},
onError: mockErrorHandler,
})

await sleep(10) // give some time for the initial fetch to complete
expect(mockErrorHandler.mock.calls.length).toBe(1)
expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError)
})

it(`should retry on error if error handler returns modified params`, async ({
issuesTableUrl,
}) => {
// This test creates a shapestream but provides wrong query params
// the fetch client therefore returns a 401 status code
// the custom error handler handles it by correcting the query param
// after which the fetch succeeds

const mockErrorHandler = vi.fn().mockImplementation((error) => {
if (error instanceof FetchError && error.status === 401) {
return {
params: {
todo: `pass`,
},
}
}
})

new ShapeStream({
url: `${BASE_URL}/v1/shape`,
table: issuesTableUrl,
params: {
todo: `fail`,
},
fetchClient: async (input, _init) => {
const url = new URL(input)
if (url.searchParams.get(`todo`) === `fail`) {
return new Response(undefined, {
status: 401,
})
}

return new Response(`[]`, { status: 204 })
},
onError: mockErrorHandler,
})

await sleep(50) // give some time for the fetches to complete
expect(mockErrorHandler.mock.calls.length).toBe(1)
expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError)
})

it(`should retry on error if error handler returns modified headers`, async ({
issuesTableUrl,
}) => {
// This test creates a shapestream but provides invalid auth credentials
// the fetch client therefore returns a 401 status code
// the custom error handler handles it by replacing the credentials with valid credentials
// after which the fetch succeeds

const mockErrorHandler = vi.fn().mockImplementation((error) => {
if (error instanceof FetchError && error.status === 401) {
return {
headers: {
Authorization: `valid credentials`,
},
}
}
})

new ShapeStream({
url: `${BASE_URL}/v1/shape`,
table: issuesTableUrl,
headers: {
Authorization: `invalid credentials`,
},
fetchClient: async (input, init) => {
const headers = init?.headers as Record<string, string>
if (headers && headers.Authorization === `valid credentials`) {
return fetch(input, init)
}

return new Response(undefined, {
status: 401,
})
},
onError: mockErrorHandler,
})

await sleep(50) // give some time for the fetches to complete
expect(mockErrorHandler.mock.calls.length).toBe(1)
expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError)
})

it(`should support async error handler`, async ({ issuesTableUrl }) => {
const mockErrorHandler = vi.fn().mockImplementation(async (error) => {
if (error instanceof FetchError && error.status === 401) {
await sleep(200)
return {
headers: {
Authorization: `valid credentials`,
},
}
}
})

const shapeStream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
table: issuesTableUrl,
headers: {
Authorization: `invalid credentials`,
},
fetchClient: async (input, init) => {
const headers = init?.headers as Record<string, string>
if (headers && headers.Authorization === `valid credentials`) {
return fetch(input, init)
}

return new Response(undefined, {
status: 401,
})
},
onError: mockErrorHandler,
})

await sleep(50) // give some time for the first fetch to complete
expect(mockErrorHandler.mock.calls.length).toBe(1)
expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError)
expect(shapeStream.isConnected()).toBe(false)

await sleep(200) // give some time for the error handler to modify the authorization header
expect(shapeStream.isConnected()).toBe(true)
})

it(`should stop fetching and report an error if response is missing required headers`, async ({
issuesTableUrl,
}) => {
Expand Down
4 changes: 3 additions & 1 deletion packages/typescript-client/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,9 @@ describe(`HTTP Sync`, () => {
subscribe: true,
handle: issueStream.shapeHandle,
where: `1=1`,
onError: (err) => (error = err),
onError: (err) => {
error = err
},
})

const errorSubscriberPromise = new Promise((_, reject) =>
Expand Down

0 comments on commit de204fc

Please sign in to comment.