Skip to content

Commit

Permalink
examples: update gatekeeper auth client.
Browse files Browse the repository at this point in the history
  • Loading branch information
thruflo committed Nov 21, 2024
1 parent 6b61ddd commit 65269be
Showing 1 changed file with 30 additions and 42 deletions.
72 changes: 30 additions & 42 deletions examples/gatekeeper-auth/client/index.ts
Original file line number Diff line number Diff line change
@@ -1,54 +1,42 @@
import { Shape, ShapeStream } from '@electric-sql/client'
import { FetchError, Shape, ShapeStream } from '@electric-sql/client'

const API_URL = process.env.API_URL || "http://localhost:4000"

interface Definition {
table: string,
where?: string,
columns?: string
}

/*
* Fetch the shape options and start syncing. When new data is recieved,
* log the number of rows. When an auth token expires, reconnect.
* Makes a request to the gatekeeper endpoint to fetch a config object
* in the format expected by the ShapeStreamOptions including the
* proxy `url` to connect to and auth `headers`.
*/
async function sync(definition: Definition, handle?: string, offset: string = '-1') {
console.log('sync: ', offset)
async function fetchConfig() {
const url = `${API_URL}/gatekeeper/items`

const options = await fetchShapeOptions(definition)
const stream = new ShapeStream({...options, handle: handle, offset: offset})
const shape = new Shape(stream)

shape.subscribe(async ({ rows }) => {
if (shape.error && 'status' in shape.error) {
const status = shape.error.status
console.warn('fetch error: ', status)
const resp = await fetch(url, {method: "POST"})
return await resp.json()
}

// Stream the shape through the proxy, using the url and auth headers
// provided by the gatekeeper.
const config = await fetchConfig()
const stream = new ShapeStream({...config, onError: async (error) => {
if (error instanceof FetchError) {
const status = error.status
console.log('handling fetch error: ', status)

// If the auth token is invalid or expires, hit the gatekeeper
// again to update the auth headers and thus keep streaming
// without interruption.
if (status === 401 || status === 403) {
shape.unsubscribeAll()

return await sync(definition, shape.handle, shape.lastOffset)
return await fetchConfig()
}
}
else {
console.log('num rows: ', rows ? rows.length : 0)
}
})
}

/*
* Make a request to the gatekeeper endpoint to get the proxy url and
* auth headers to connect to/with.
*/
async function fetchShapeOptions(definition: Definition) {
const { table, ...params} = definition

const qs = new URLSearchParams(params).toString()
const url = `${API_URL}/gatekeeper/${table}${qs ? '?' : ''}${qs}`

const resp = await fetch(url, {method: "POST"})
return await resp.json()
}
throw error
}
})

// Start syncing.
await sync({table: 'items'})
// Materialize the stream into a `Shape` and subscibe to data changes
// so we can see the client working.
const shape = new Shape(stream)
shape.subscribe(({ rows }) => {
console.log('num rows: ', rows ? rows.length : 0)
})

0 comments on commit 65269be

Please sign in to comment.