From 062ac42cf8e8a5b9186ce1e49f21b8388730c476 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Fri, 19 Jan 2024 16:45:24 +0200 Subject: [PATCH] Rejecting awakeable on failed query --- typescript/promisify-anything/src/service.ts | 37 +++++++++++++------- 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/typescript/promisify-anything/src/service.ts b/typescript/promisify-anything/src/service.ts index bdb67238..917005ac 100644 --- a/typescript/promisify-anything/src/service.ts +++ b/typescript/promisify-anything/src/service.ts @@ -21,13 +21,13 @@ export const internalApi: restate.ServiceApi = { // Public API implementation -const query = async (ctx: restate.RpcContext, param: string) => { +const query = async (ctx: restate.RpcContext, externalRequest: string) => { const uniqueId = ctx.rand.uuidv4(); const awakeable = ctx.awakeable(); ctx.send(internalApi).query(uniqueId, { awakeableId: awakeable.id, - query: param, + query: externalRequest, }); return await awakeable.promise; @@ -43,7 +43,7 @@ const client = new athena.AthenaClient({}); type QueryRequest = { awakeableId: string; - query: string; + query?: string; }; const queryInternal = async (ctx: restate.RpcContext, requestId: string, request: QueryRequest) => { @@ -59,7 +59,7 @@ const queryInternal = async (ctx: restate.RpcContext, requestId: string, request executionId = (await ctx.sideEffect(async () => { const startQueryResult = await client.send( new athena.StartQueryExecutionCommand({ - QueryString: 'SELECT * FROM "demo_db"."table" limit 10;', + QueryString: request.query ?? 'SELECT * FROM "demo_db"."table" limit 10;', WorkGroup: "demo-workgroup", ClientRequestToken: requestId, }), @@ -67,18 +67,31 @@ const queryInternal = async (ctx: restate.RpcContext, requestId: string, request return startQueryResult.QueryExecutionId; })) as string; } catch (err) { - throw new restate.TerminalError("Unable to start query", { cause: err }); + //throw new restate.TerminalError("Unable to start query: " + err); + ctx.rejectAwakeable(request.awakeableId, "Unable to start query: " + err); + return; } - const results = await ctx.sideEffect(async () => { - return await client.send( - new athena.GetQueryResultsCommand({ - QueryExecutionId: executionId, - }), - ); + const result = await ctx.sideEffect(async () => { + try { + return await client.send( + new athena.GetQueryResultsCommand({ + QueryExecutionId: executionId, + }), + ); + } catch (err) { + if (err instanceof athena.InvalidRequestException && err.message.match(/state: FAILED/)) { + return err; // side effect completes with an error result + } + throw err; // side effect will be retried + } }); - ctx.resolveAwakeable(request.awakeableId, { result: results.ResultSet, _id: results.$metadata.requestId }); + if (result instanceof athena.InvalidRequestException) { + ctx.rejectAwakeable(request.awakeableId, "Query execution failed: " + result.message); + } else { + ctx.resolveAwakeable(request.awakeableId, { result: result.ResultSet, _id: result.$metadata.requestId }); + } }; export const internalAthenaApiRouter = restate.keyedRouter({