Skip to content

Commit

Permalink
Allow aborting individual requests (#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
hopperelec authored Jun 13, 2024
1 parent e575b2b commit e32a64f
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 69 deletions.
13 changes: 8 additions & 5 deletions examples/abort/abort.ts → examples/abort/any-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ setTimeout(() => {
}, 1000) // 1000 milliseconds = 1 second

try {
const stream = await ollama.generate({
ollama.generate({
model: 'llama2',
prompt: 'Write a long story',
stream: true,
})
for await (const chunk of stream) {
process.stdout.write(chunk.response)
}
}).then(
async (stream) => {
for await (const chunk of stream) {
process.stdout.write(chunk.response)
}
}
)
} catch (error) {
if (error.name === 'AbortError') {
console.log('The request has been aborted')
Expand Down
31 changes: 31 additions & 0 deletions examples/abort/specific-request.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import ollama from 'ollama'
import { AbortableAsyncIterator } from '../../src/utils'

let stream: AbortableAsyncIterator<object>

// Set a timeout to abort the request after 1 second
setTimeout(() => {
console.log('\nAborting request...\n')
stream.abort()
}, 1000) // 1000 milliseconds = 1 second

try {
ollama.generate({
model: 'llama2',
prompt: 'Write a long story',
stream: true,
}).then(
async (_stream) => {
stream = _stream
for await (const chunk of stream) {
process.stdout.write(chunk.response)
}
}
)
} catch (error) {
if (error.name === 'AbortError') {
console.log('The request has been aborted')
} else {
console.error('An error occurred:', error)
}
}
125 changes: 62 additions & 63 deletions src/browser.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import * as utils from './utils.js'
import { AbortableAsyncIterator, parseJSON, post } from './utils.js'
import 'whatwg-fetch'

import type {
Expand Down Expand Up @@ -26,7 +27,7 @@ import type {
export class Ollama {
protected readonly config: Config
protected readonly fetch: Fetch
private abortController: AbortController
protected readonly ongoingStreamedRequests: AbortableAsyncIterator<object>[] = []

constructor(config?: Partial<Config>) {
this.config = {
Expand All @@ -40,69 +41,59 @@ export class Ollama {
if (config?.fetch != null) {
this.fetch = config.fetch
}

this.abortController = new AbortController()
}

// Abort any ongoing requests to Ollama
// Abort any ongoing streamed requests to Ollama
public abort() {
this.abortController.abort()
this.abortController = new AbortController()
for (const request of this.ongoingStreamedRequests) {
request.abort()
}
this.ongoingStreamedRequests.length = 0
}

/**
* Processes a request to the Ollama server. If the request is streamable, it will return an
* AsyncGenerator that yields the response messages. Otherwise, it will return the response
* Processes a request to the Ollama server. If the request is streamable, it will return a
* AbortableAsyncIterator that yields the response messages. Otherwise, it will return the response
* object.
* @param endpoint {string} - The endpoint to send the request to.
* @param request {object} - The request object to send to the endpoint.
* @protected {T | AsyncGenerator<T>} - The response object or an AsyncGenerator that yields
* @protected {T | AbortableAsyncIterator<T>} - The response object or a AbortableAsyncIterator that yields
* response messages.
* @throws {Error} - If the response body is missing or if the response is an error.
* @returns {Promise<T | AsyncGenerator<T>>} - The response object or an AsyncGenerator that yields the streamed response.
* @returns {Promise<T | AbortableAsyncIterator<T>>} - The response object or a AbortableAsyncIterator that yields the streamed response.
*/
protected async processStreamableRequest<T extends object>(
endpoint: string,
request: { stream?: boolean } & Record<string, any>,
): Promise<T | AsyncGenerator<T>> {
): Promise<T | AbortableAsyncIterator<T>> {
request.stream = request.stream ?? false
const response = await utils.post(
this.fetch,
`${this.config.host}/api/${endpoint}`,
{
...request,
},
{ signal: this.abortController.signal },
)

if (!response.body) {
throw new Error('Missing body')
}
const host = `${this.config.host}/api/${endpoint}`
if (request.stream) {
const abortController = new AbortController()
const response = await post(this.fetch, host, request, {
signal: abortController.signal,
})

const itr = utils.parseJSON<T | ErrorResponse>(response.body)
if (!response.body) {
throw new Error('Missing body')
}

if (request.stream) {
return (async function* () {
for await (const message of itr) {
if ('error' in message) {
throw new Error(message.error)
}
yield message
// message will be done in the case of chat and generate
// message will be success in the case of a progress response (pull, push, create)
if ((message as any).done || (message as any).status === 'success') {
return
const itr = parseJSON<T | ErrorResponse>(response.body)
const abortableAsyncIterator = new AbortableAsyncIterator(
abortController,
itr,
() => {
const i = this.ongoingStreamedRequests.indexOf(abortableAsyncIterator)
if (i > -1) {
this.ongoingStreamedRequests.splice(i, 1)
}
}
throw new Error('Did not receive done or success response in stream.')
})()
} else {
const message = await itr.next()
if (!message.value.done && (message.value as any).status !== 'success') {
throw new Error('Expected a completed response.')
}
return message.value
},
)
this.ongoingStreamedRequests.push(abortableAsyncIterator)
return abortableAsyncIterator
}
const response = await utils.post(this.fetch, host, request)
return await response.json()
}

/**
Expand All @@ -127,34 +118,38 @@ async encodeImage(image: Uint8Array | string): Promise<string> {

generate(
request: GenerateRequest & { stream: true },
): Promise<AsyncGenerator<GenerateResponse>>
): Promise<AbortableAsyncIterator<GenerateResponse>>
generate(request: GenerateRequest & { stream?: false }): Promise<GenerateResponse>
/**
* Generates a response from a text prompt.
* @param request {GenerateRequest} - The request object.
* @returns {Promise<GenerateResponse | AsyncGenerator<GenerateResponse>>} - The response object or
* an AsyncGenerator that yields response messages.
* @returns {Promise<GenerateResponse | AbortableAsyncIterator<GenerateResponse>>} - The response object or
* an AbortableAsyncIterator that yields response messages.
*/
async generate(
request: GenerateRequest,
): Promise<GenerateResponse | AsyncGenerator<GenerateResponse>> {
): Promise<GenerateResponse | AbortableAsyncIterator<GenerateResponse>> {
if (request.images) {
request.images = await Promise.all(request.images.map(this.encodeImage.bind(this)))
}
return this.processStreamableRequest<GenerateResponse>('generate', request)
}

chat(request: ChatRequest & { stream: true }): Promise<AsyncGenerator<ChatResponse>>
chat(
request: ChatRequest & { stream: true },
): Promise<AbortableAsyncIterator<ChatResponse>>
chat(request: ChatRequest & { stream?: false }): Promise<ChatResponse>
/**
* Chats with the model. The request object can contain messages with images that are either
* Uint8Arrays or base64 encoded strings. The images will be base64 encoded before sending the
* request.
* @param request {ChatRequest} - The request object.
* @returns {Promise<ChatResponse | AsyncGenerator<ChatResponse>>} - The response object or an
* AsyncGenerator that yields response messages.
* @returns {Promise<ChatResponse | AbortableAsyncIterator<ChatResponse>>} - The response object or an
* AbortableAsyncIterator that yields response messages.
*/
async chat(request: ChatRequest): Promise<ChatResponse | AsyncGenerator<ChatResponse>> {
async chat(
request: ChatRequest,
): Promise<ChatResponse | AbortableAsyncIterator<ChatResponse>> {
if (request.messages) {
for (const message of request.messages) {
if (message.images) {
Expand All @@ -169,16 +164,16 @@ async encodeImage(image: Uint8Array | string): Promise<string> {

create(
request: CreateRequest & { stream: true },
): Promise<AsyncGenerator<ProgressResponse>>
): Promise<AbortableAsyncIterator<ProgressResponse>>
create(request: CreateRequest & { stream?: false }): Promise<ProgressResponse>
/**
* Creates a new model from a stream of data.
* @param request {CreateRequest} - The request object.
* @returns {Promise<ProgressResponse | AsyncGenerator<ProgressResponse>>} - The response object or a stream of progress responses.
* @returns {Promise<ProgressResponse | AbortableAsyncIterator<ProgressResponse>>} - The response object or a stream of progress responses.
*/
async create(
request: CreateRequest,
): Promise<ProgressResponse | AsyncGenerator<ProgressResponse>> {
): Promise<ProgressResponse | AbortableAsyncIterator<ProgressResponse>> {
return this.processStreamableRequest<ProgressResponse>('create', {
name: request.model,
stream: request.stream,
Expand All @@ -187,37 +182,41 @@ async encodeImage(image: Uint8Array | string): Promise<string> {
})
}

pull(request: PullRequest & { stream: true }): Promise<AsyncGenerator<ProgressResponse>>
pull(
request: PullRequest & { stream: true },
): Promise<AbortableAsyncIterator<ProgressResponse>>
pull(request: PullRequest & { stream?: false }): Promise<ProgressResponse>
/**
* Pulls a model from the Ollama registry. The request object can contain a stream flag to indicate if the
* response should be streamed.
* @param request {PullRequest} - The request object.
* @returns {Promise<ProgressResponse | AsyncGenerator<ProgressResponse>>} - The response object or
* an AsyncGenerator that yields response messages.
* @returns {Promise<ProgressResponse | AbortableAsyncIterator<ProgressResponse>>} - The response object or
* an AbortableAsyncIterator that yields response messages.
*/
async pull(
request: PullRequest,
): Promise<ProgressResponse | AsyncGenerator<ProgressResponse>> {
): Promise<ProgressResponse | AbortableAsyncIterator<ProgressResponse>> {
return this.processStreamableRequest<ProgressResponse>('pull', {
name: request.model,
stream: request.stream,
insecure: request.insecure,
})
}

push(request: PushRequest & { stream: true }): Promise<AsyncGenerator<ProgressResponse>>
push(
request: PushRequest & { stream: true },
): Promise<AbortableAsyncIterator<ProgressResponse>>
push(request: PushRequest & { stream?: false }): Promise<ProgressResponse>
/**
* Pushes a model to the Ollama registry. The request object can contain a stream flag to indicate if the
* response should be streamed.
* @param request {PushRequest} - The request object.
* @returns {Promise<ProgressResponse | AsyncGenerator<ProgressResponse>>} - The response object or
* an AsyncGenerator that yields response messages.
* @returns {Promise<ProgressResponse | AbortableAsyncIterator<ProgressResponse>>} - The response object or
* an AbortableAsyncIterator that yields response messages.
*/
async push(
request: PushRequest,
): Promise<ProgressResponse | AsyncGenerator<ProgressResponse>> {
): Promise<ProgressResponse | AbortableAsyncIterator<ProgressResponse>> {
return this.processStreamableRequest<ProgressResponse>('push', {
name: request.model,
stream: request.stream,
Expand Down
37 changes: 36 additions & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { version } from './version.js'
import type { Fetch, ErrorResponse } from './interfaces.js'
import type { ErrorResponse, Fetch } from './interfaces.js'

/**
* An error class for response errors.
Expand All @@ -19,6 +19,41 @@ class ResponseError extends Error {
}
}

/**
* An AsyncIterator which can be aborted
*/
export class AbortableAsyncIterator<T extends object> {
private readonly abortController: AbortController
private readonly itr: AsyncGenerator<T | ErrorResponse>
private readonly doneCallback: () => void

constructor(abortController: AbortController, itr: AsyncGenerator<T | ErrorResponse>, doneCallback: () => void) {
this.abortController = abortController
this.itr = itr
this.doneCallback = doneCallback
}

abort() {
this.abortController.abort()
}

async *[Symbol.asyncIterator]() {
for await (const message of this.itr) {
if ('error' in message) {
throw new Error(message.error)
}
yield message
// message will be done in the case of chat and generate
// message will be success in the case of a progress response (pull, push, create)
if ((message as any).done || (message as any).status === 'success') {
this.doneCallback()
return
}
}
throw new Error('Did not receive done or success response in stream.')
}
}

/**
* Checks if the response is ok, if not throws an error.
* If the response is not ok, it will try to parse the response as JSON and use the error field as the error message.
Expand Down

0 comments on commit e32a64f

Please sign in to comment.