Skip to content

Commit

Permalink
fix: always throw errors & always use named Error classes (#1985)
Browse files Browse the repository at this point in the history
Fixes #1983

---------

Co-authored-by: Kevin De Porre <[email protected]>
  • Loading branch information
KyleAMathews and kevin-dp authored Nov 21, 2024
1 parent 598aa28 commit 5a7866f
Show file tree
Hide file tree
Showing 15 changed files with 328 additions and 79 deletions.
15 changes: 15 additions & 0 deletions .changeset/fair-pants-pretend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
"@electric-sql/client": patch
---

refactor: improve error handling with new error classes & stream control

- Add `onError` handler to ShapeStream for centralized error handling
- Add new error classes:
- MissingShapeUrlError
- InvalidSignalError
- MissingShapeHandleError
- ReservedParamError
- ParserNullValueError
- ShapeStreamAlreadyRunningError
- Improve error propagation through ShapeStream lifecycle
7 changes: 2 additions & 5 deletions packages/react-hooks/src/react-hooks.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export function getShapeStream<T extends Row<unknown>>(
// If the stream is already cached, return it if valid
if (streamCache.has(shapeHash)) {
const stream = streamCache.get(shapeHash)! as ShapeStream<T>
if (stream.error === undefined && !stream.options.signal?.aborted) {
if (!stream.error && !stream.options.signal?.aborted) {
return stream
}

Expand All @@ -57,10 +57,7 @@ export function getShape<T extends Row<unknown>>(
): Shape<T> {
// If the stream is already cached, return it if valid
if (shapeCache.has(shapeStream)) {
if (
shapeStream.error === undefined &&
!shapeStream.options.signal?.aborted
) {
if (!shapeStream.error && !shapeStream.options.signal?.aborted) {
return shapeCache.get(shapeStream)! as Shape<T>
}

Expand Down
2 changes: 1 addition & 1 deletion packages/react-hooks/test/support/global-setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export default async function ({ provide }: GlobalSetupContext) {
provide(`proxyCachePath`, proxyCachePath)

return async () => {
await client.query(`DROP SCHEMA electric_test`)
await client.query(`DROP SCHEMA electric_test CASCADE`)
await client.end()
}
}
14 changes: 8 additions & 6 deletions packages/react-hooks/test/support/test-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ export const testWithDbClient = test.extend<{
clearShape: async ({}, use) => {
use(async (table: string, handle?: string) => {
const baseUrl = inject(`baseUrl`)
const resp = await fetch(
`${baseUrl}/v1/shape?table=${table}${handle ? `&handle=${handle}` : ``}`,
{
method: `DELETE`,
}
)
const url = new URL(`${baseUrl}/v1/shape`)
url.searchParams.set(`table`, table)
if (handle) {
url.searchParams.set(`handle`, handle)
}
const resp = await fetch(url.toString(), {
method: `DELETE`,
})
if (!resp.ok) {
console.error(
await FetchError.fromResponse(
Expand Down
46 changes: 45 additions & 1 deletion packages/typescript-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,51 @@ shape.subscribe(({ rows }) => {
}
```
See the [Docs](https://electric-sql.com) and [Examples](https://electric-sql.com/examples/basic) for more information.
### Error Handling
The ShapeStream provides two ways to handle errors:
1. Using the `onError` handler:
```typescript
const stream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
table: `foo`,
onError: (error) => {
// Handle all stream errors here
console.error('Stream error:', error)
}
})
```
If no `onError` handler is provided, the ShapeStream will throw errors that occur during streaming.
2. Individual subscribers can optionally handle errors specific to their subscription:
```typescript
stream.subscribe(
(messages) => {
// Handle messages
},
(error) => {
// Handle errors for this specific subscription
console.error('Subscription error:', error)
}
)
```
Common error types include:
- `MissingShapeUrlError`: Missing required URL parameter
- `InvalidSignalError`: Invalid AbortSignal instance
- `ReservedParamError`: Using reserved parameter names
Runtime errors:
- `FetchError`: HTTP errors during shape fetching
- `FetchBackoffAbortError`: Fetch aborted using AbortSignal
- `MissingShapeHandleError`: Missing required shape handle
- `ParserNullValueError`: Parser encountered NULL value in a column that doesn't allow NULL values
See the [typescript client docs on the website](https://electric-sql.com/docs/api/clients/typescript#error-handling) for more details on error handling.
And in general, see the [docs website](https://electric-sql.com) and [examples folder](https://electric-sql.com/examples/basic) for more information.
## Develop
Expand Down
69 changes: 42 additions & 27 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import { isUpToDateMessage } from './helpers'
import {
FetchError,
FetchBackoffAbortError,
MissingHeadersError,
MissingShapeUrlError,
InvalidSignalError,
MissingShapeHandleError,
ReservedParamError,
} from './error'
import {
BackoffDefaults,
Expand Down Expand Up @@ -63,6 +66,8 @@ type ReservedParamKeys =

type ParamsRecord = Omit<Record<string, string>, ReservedParamKeys>

type ShapeStreamErrorHandler = (error: Error) => void

/**
* Options for constructing a ShapeStream.
*/
Expand Down Expand Up @@ -147,6 +152,15 @@ export interface ShapeStreamOptions<T = never> {
fetchClient?: typeof fetch
backoffOptions?: BackoffOptions
parser?: Parser<T>

/**
* A function for handling shapestream errors.
* This is optional, when it is not provided any shapestream errors will be thrown.
* If the function is provided and returns an object containing parameters and/or headers
* the shapestream will apply those changes and try syncing again.
* If the function returns void the shapestream is stopped.
*/
onError?: ShapeStreamErrorHandler
}

export interface ShapeStreamInterface<T extends Row<unknown> = Row> {
Expand All @@ -164,6 +178,7 @@ export interface ShapeStreamInterface<T extends Row<unknown> = Row> {
isUpToDate: boolean
lastOffset: Offset
shapeHandle?: string
error?: unknown
}

/**
Expand Down Expand Up @@ -206,6 +221,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
}

readonly options: ShapeStreamOptions<GetExtensions<T>>
#error: unknown = null

readonly #fetchClient: typeof fetch
readonly #messageParser: MessageParser<T>
Expand All @@ -226,18 +242,19 @@ export class ShapeStream<T extends Row<unknown> = Row>
#shapeHandle?: string
#databaseId?: string
#schema?: Schema
#error?: unknown
#onError?: ShapeStreamErrorHandler
#replica?: Replica

constructor(options: ShapeStreamOptions<GetExtensions<T>>) {
validateOptions(options)
this.options = { subscribe: true, ...options }
validateOptions(this.options)
this.#lastOffset = this.options.offset ?? `-1`
this.#liveCacheBuster = ``
this.#shapeHandle = this.options.handle
this.#databaseId = this.options.databaseId
this.#messageParser = new MessageParser<T>(options.parser)
this.#replica = this.options.replica
this.#onError = this.options.onError

const baseFetchClient =
options.fetchClient ??
Expand All @@ -255,13 +272,17 @@ export class ShapeStream<T extends Row<unknown> = Row>
createFetchWithChunkBuffer(fetchWithBackoffClient)
)

this.start()
this.#start()
}

get shapeHandle() {
return this.#shapeHandle
}

get error() {
return this.#error
}

get isUpToDate() {
return this.#isUpToDate
}
Expand All @@ -270,20 +291,14 @@ export class ShapeStream<T extends Row<unknown> = Row>
return this.#lastOffset
}

get error() {
return this.#error
}

async start() {
this.#isUpToDate = false

const { url, table, where, columns, signal } = this.options

async #start() {
try {
while (
(!signal?.aborted && !this.#isUpToDate) ||
(!this.options.signal?.aborted && !this.#isUpToDate) ||
this.options.subscribe
) {
const { url, table, where, columns, signal } = this.options

const fetchUrl = new URL(url)

// Add any custom parameters first
Expand Down Expand Up @@ -346,7 +361,6 @@ export class ShapeStream<T extends Row<unknown> = Row>
this.#connected = true
} catch (e) {
if (e instanceof FetchBackoffAbortError) break // interrupted
if (e instanceof MissingHeadersError) throw e
if (!(e instanceof FetchError)) throw e // should never happen
if (e.status == 409) {
// Upon receiving a 409, we should start from scratch
Expand Down Expand Up @@ -409,14 +423,21 @@ export class ShapeStream<T extends Row<unknown> = Row>
}
} catch (err) {
this.#error = err
if (this.#onError) {
this.#onError(err as Error)
return
}

// If no handler is provided for errors just throw so the error still bubbles up.
throw this.#error
} finally {
this.#connected = false
}
}

subscribe(
callback: (messages: Message<T>[]) => MaybePromise<void>,
onError?: (error: FetchError | Error) => void
onError: (error: Error) => void = () => {}
) {
const subscriptionId = Math.random()

Expand Down Expand Up @@ -449,7 +470,7 @@ export class ShapeStream<T extends Row<unknown> = Row>

/** True during initial fetch. False afterwise. */
isLoading(): boolean {
return !this.isUpToDate
return !this.#isUpToDate
}

async #publish(messages: Message<T>[]): Promise<void> {
Expand Down Expand Up @@ -488,22 +509,18 @@ export class ShapeStream<T extends Row<unknown> = Row>

function validateOptions<T>(options: Partial<ShapeStreamOptions<T>>): void {
if (!options.url) {
throw new Error(`Invalid shape options. It must provide the url`)
throw new MissingShapeUrlError()
}
if (options.signal && !(options.signal instanceof AbortSignal)) {
throw new Error(
`Invalid signal option. It must be an instance of AbortSignal.`
)
throw new InvalidSignalError()
}

if (
options.offset !== undefined &&
options.offset !== `-1` &&
!options.handle
) {
throw new Error(
`handle is required if this isn't an initial fetch (i.e. offset > -1)`
)
throw new MissingShapeHandleError()
}

// Check for reserved parameter names
Expand All @@ -512,9 +529,7 @@ function validateOptions<T>(options: Partial<ShapeStreamOptions<T>>): void {
RESERVED_PARAMS.has(key)
)
if (reservedParams.length > 0) {
throw new Error(
`Cannot use reserved Electric parameter names in custom params: ${reservedParams.join(`, `)}`
)
throw new ReservedParamError(reservedParams)
}
}
return
Expand Down
54 changes: 54 additions & 0 deletions packages/typescript-client/src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,60 @@ export class FetchError extends Error {
export class FetchBackoffAbortError extends Error {
constructor() {
super(`Fetch with backoff aborted`)
this.name = `FetchBackoffAbortError`
}
}

export class InvalidShapeOptionsError extends Error {
constructor(message: string) {
super(message)
this.name = `InvalidShapeOptionsError`
}
}

export class MissingShapeUrlError extends Error {
constructor() {
super(`Invalid shape options: missing required url parameter`)
this.name = `MissingShapeUrlError`
}
}

export class InvalidSignalError extends Error {
constructor() {
super(`Invalid signal option. It must be an instance of AbortSignal.`)
this.name = `InvalidSignalError`
}
}

export class MissingShapeHandleError extends Error {
constructor() {
super(
`shapeHandle is required if this isn't an initial fetch (i.e. offset > -1)`
)
this.name = `MissingShapeHandleError`
}
}

export class ReservedParamError extends Error {
constructor(reservedParams: string[]) {
super(
`Cannot use reserved Electric parameter names in custom params: ${reservedParams.join(`, `)}`
)
this.name = `ReservedParamError`
}
}

export class ParserNullValueError extends Error {
constructor(columnName: string) {
super(`Column "${columnName ?? `unknown`}" does not allow NULL values`)
this.name = `ParserNullValueError`
}
}

export class ShapeStreamAlreadyRunningError extends Error {
constructor() {
super(`ShapeStream is already running`)
this.name = `ShapeStreamAlreadyRunningError`
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/typescript-client/src/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ export function createFetchWithResponseHeadersCheck(
const missingHeaders: Array<string> = []

const addMissingHeaders = (requiredHeaders: Array<string>) =>
requiredHeaders.filter((h) => !headers.has(h))
missingHeaders.push(...requiredHeaders.filter((h) => !headers.has(h)))
addMissingHeaders(requiredElectricResponseHeaders)

const input = args[0]
Expand Down
Loading

0 comments on commit 5a7866f

Please sign in to comment.