Skip to content

Commit

Permalink
feat(@whook/aws-lambda): use the graceful shutdown feature
Browse files Browse the repository at this point in the history
When this module was created, AWS Lambda did not permit to graceful shutdown the running lambdas.
Now that we can, simply injecting it. Also mordernized the code to use async/await.

fix #189
  • Loading branch information
nfroidure committed Oct 22, 2024
1 parent 652fda4 commit 9c7249f
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 52 deletions.
15 changes: 10 additions & 5 deletions packages/whook-aws-lambda/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,10 @@ async function buildAnyLambda(
const srcRelativePath = relative(lambdaPath, srcPath);

const initializerContent = (
await buildInitializer([`OPERATION_HANDLER_${finalEntryPoint}`])
await buildInitializer([
`OPERATION_HANDLER_${finalEntryPoint}`,
'process',
])
).replaceAll(pathToFileURL(srcPath).toString(), srcRelativePath);
const indexContent = await buildLambdaIndex(
`OPERATION_HANDLER_${finalEntryPoint}`,
Expand Down Expand Up @@ -349,13 +352,15 @@ async function buildLambdaIndex(name: string): Promise<string> {
return `// Automatically generated by \`@whook/aws-lambda\`
import { initialize } from './initialize.js';
const initializationPromise = initialize();
const services = await initialize();
export default function handler (event, context, callback) {
export const handler = async (event, context) => {
context.callbackWaitsForEmptyEventLoop = false;
return initializationPromise
.then(services => services['${name}'](event, context, callback));
return await services['${name}'](event, context);
};
export default handler;
`;
}

Expand Down
10 changes: 2 additions & 8 deletions packages/whook-aws-lambda/src/wrappers/awsConsumerLambda.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { autoService } from 'knifecycle';
import { noop } from '@whook/whook';
import { printStackTrace, YError } from 'yerror';
Expand All @@ -16,7 +15,6 @@ import type {
KinesisStreamEvent,
SQSEvent,
SNSEvent,
Context,
SESEvent,
DynamoDBStreamEvent,
} from 'aws-lambda';
Expand Down Expand Up @@ -84,7 +82,7 @@ async function initWrapHandlerForConsumerLambda<S extends WhookHandler>({
const wrappedHandler = handleForAWSConsumerLambda.bind(
null,
{ ENV, OPERATION_API, apm, time, log },
handler as any,
handler as WhookHandler<LambdaConsumerInput, LambdaConsumerOutput>,
);

return wrappedHandler as unknown as S;
Expand All @@ -108,8 +106,6 @@ async function handleForAWSConsumerLambda(
| SNSEvent
| SESEvent
| DynamoDBStreamEvent,
context: Context,
callback: (err: Error) => void,
) {
const path = Object.keys(OPERATION_API.paths || {})?.[0];
const method = Object.keys(OPERATION_API.paths?.[path] || {})[0];
Expand Down Expand Up @@ -138,8 +134,6 @@ async function handleForAWSConsumerLambda(
endTime: time(),
recordsLength: event.Records.length,
});

callback(null as unknown as Error);
} catch (err) {
const castedErr = YError.cast(err as Error);

Expand All @@ -157,7 +151,7 @@ async function handleForAWSConsumerLambda(
recordsLength: event.Records.length,
});

callback(err as Error);
throw castedErr;
}
}

Expand Down
10 changes: 3 additions & 7 deletions packages/whook-aws-lambda/src/wrappers/awsCronLambda.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { autoService } from 'knifecycle';
import { noop } from '@whook/whook';
import { printStackTrace, YError } from 'yerror';
Expand All @@ -12,7 +11,7 @@ import type {
} from '@whook/whook';
import type { LogService, TimeService } from 'common-services';
import type { OpenAPIV3_1 } from 'openapi-types';
import type { ScheduledEvent, Context } from 'aws-lambda';
import type { ScheduledEvent } from 'aws-lambda';
import type { JsonObject } from 'type-fest';
import type { AppEnvVars } from 'application-services';

Expand Down Expand Up @@ -61,7 +60,7 @@ async function initWrapHandlerForCronLambda<S extends WhookHandler>({
const wrappedHandler = handleForAWSCronLambda.bind(
null,
{ ENV, OPERATION_API, apm, time, log },
handler as any,
handler as WhookHandler<LambdaCronInput, LambdaCronOutput>,
);

return wrappedHandler as unknown as S;
Expand All @@ -80,8 +79,6 @@ async function handleForAWSCronLambda<T extends JsonObject = JsonObject>(
}: Required<WhookWrapCronLambdaDependencies>,
handler: WhookHandler<LambdaCronInput<T>, LambdaCronOutput>,
event: ScheduledEvent & { body: T },
context: Context,
callback: (err: Error) => void,
) {
const path = Object.keys(OPERATION_API.paths || {})[0];
const method = Object.keys(OPERATION_API.paths?.[path] || {})[0];
Expand Down Expand Up @@ -109,7 +106,6 @@ async function handleForAWSCronLambda<T extends JsonObject = JsonObject>(
startTime,
endTime: time(),
});
callback(null as unknown as Error);
} catch (err) {
const castedErr = YError.cast(err as Error);

Expand All @@ -126,7 +122,7 @@ async function handleForAWSCronLambda<T extends JsonObject = JsonObject>(
endTime: time(),
});

callback(err as Error);
throw castedErr;
}
}

Expand Down
6 changes: 2 additions & 4 deletions packages/whook-aws-lambda/src/wrappers/awsHTTPLambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import {
import type {
APIGatewayProxyEvent,
APIGatewayProxyResult,
Context,
} from 'aws-lambda';
import type { WhookErrorHandler } from '@whook/http-router';
import type { AppEnvVars } from 'application-services';
Expand Down Expand Up @@ -262,8 +261,6 @@ async function handleForAWSHTTPLambda(
},
handler: WhookHandler<LambdaHTTPInput, LambdaHTTPOutput>,
event: APIGatewayProxyEvent,
context: Context,
callback: (err: Error, result?: APIGatewayProxyResult) => void,
) {
const startTime = time();
const bufferLimit = bytes.parse(BUFFER_LIMIT);
Expand Down Expand Up @@ -506,7 +503,8 @@ async function handleForAWSHTTPLambda(
{},
),
});
callback(null as unknown as Error, awsResponse);

return awsResponse;
}

async function awsRequestEventToRequest(
Expand Down
14 changes: 6 additions & 8 deletions packages/whook-aws-lambda/src/wrappers/awsKafkaConsumerLambda.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { autoService } from 'knifecycle';
import { noop } from '@whook/whook';
import { printStackTrace, YError } from 'yerror';
Expand All @@ -12,7 +11,7 @@ import type {
} from '@whook/whook';
import type { TimeService, LogService } from 'common-services';
import type { OpenAPIV3_1 } from 'openapi-types';
import type { MSKEvent, Context } from 'aws-lambda';
import type { MSKEvent } from 'aws-lambda';
import type { AppEnvVars } from 'application-services';

export type LambdaKafkaConsumerInput = { body: MSKEvent['records'] };
Expand Down Expand Up @@ -61,7 +60,10 @@ async function initWrapHandlerForKafkaLambda<S extends WhookHandler>({
const wrappedHandler = handleForAWSKafkaConsumerLambda.bind(
null,
{ ENV, OPERATION_API, apm, time, log },
handler as any,
handler as WhookHandler<
LambdaKafkaConsumerInput,
LambdaKafkaConsumerOutput
>,
);

return wrappedHandler as unknown as S;
Expand All @@ -80,8 +82,6 @@ async function handleForAWSKafkaConsumerLambda(
}: Required<WhookWrapKafkaLambdaDependencies>,
handler: WhookHandler<LambdaKafkaConsumerInput, LambdaKafkaConsumerOutput>,
event: MSKEvent,
context: Context,
callback: (err: Error) => void,
) {
const path = Object.keys(OPERATION_API.paths || {})[0];
const method = Object.keys(OPERATION_API.paths?.[path] || {})[0];
Expand Down Expand Up @@ -113,8 +113,6 @@ async function handleForAWSKafkaConsumerLambda(
0,
),
});

callback(null as unknown as Error);
} catch (err) {
const castedErr = YError.cast(err as Error);

Expand All @@ -135,7 +133,7 @@ async function handleForAWSKafkaConsumerLambda(
),
});

callback(err as Error);
throw castedErr;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import zlib from 'zlib';
import { autoService } from 'knifecycle';
import { noop } from '@whook/whook';
Expand All @@ -17,7 +16,6 @@ import type { JsonValue } from 'type-fest';
import type {
CloudWatchLogsEvent,
CloudWatchLogsDecodedData,
Context,
} from 'aws-lambda';
import type { AppEnvVars } from 'application-services';

Expand Down Expand Up @@ -94,8 +92,6 @@ async function handleForAWSLogSubscriberLambda<
}: Required<WhookWrapLogSubscriberLambdaDependencies>,
handler: S,
event: CloudWatchLogsEvent,
context: Context,
callback: (err: Error) => void,
) {
const path = Object.keys(OPERATION_API.paths || {})[0];
const method = Object.keys(OPERATION_API.paths?.[path] || {})[0];
Expand Down Expand Up @@ -124,8 +120,6 @@ async function handleForAWSLogSubscriberLambda<
endTime: time(),
recordsLength: parameters.body.logEvents.length,
});

callback(null as unknown as Error);
} catch (err) {
const castedErr = YError.cast(err as Error);

Expand All @@ -143,7 +137,7 @@ async function handleForAWSLogSubscriberLambda<
recordsLength: parameters.body.logEvents.length,
});

callback(err as Error);
throw castedErr;
}
}

Expand Down
8 changes: 2 additions & 6 deletions packages/whook-aws-lambda/src/wrappers/awsS3Lambda.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { autoService } from 'knifecycle';
import { noop } from '@whook/whook';
import { printStackTrace, YError } from 'yerror';
Expand All @@ -12,7 +11,7 @@ import type {
} from '@whook/whook';
import type { LogService, TimeService } from 'common-services';
import type { OpenAPIV3_1 } from 'openapi-types';
import type { S3Event, Context } from 'aws-lambda';
import type { S3Event } from 'aws-lambda';
import type { AppEnvVars } from 'application-services';

export type LambdaS3Input = { body: S3Event['Records'] };
Expand Down Expand Up @@ -76,8 +75,6 @@ async function handleForAWSS3Lambda(
}: Required<WhookWrapS3LambdaDependencies>,
handler: WhookHandler<LambdaS3Input, LambdaS3Output>,
event: S3Event,
context: Context,
callback: (err: Error) => void,
) {
const path = Object.keys(OPERATION_API.paths || {})[0];
const method = Object.keys(OPERATION_API.paths?.[path] || {})[0];
Expand Down Expand Up @@ -105,7 +102,6 @@ async function handleForAWSS3Lambda(
endTime: time(),
recordsLength: event.Records.length,
});
callback(null as unknown as Error);
} catch (err) {
const castedErr = YError.cast(err as Error);

Expand All @@ -123,7 +119,7 @@ async function handleForAWSS3Lambda(
recordsLength: event.Records.length,
});

callback(err as Error);
throw castedErr;
}
}

Expand Down
10 changes: 3 additions & 7 deletions packages/whook-aws-lambda/src/wrappers/awsTransformerLambda.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { autoService } from 'knifecycle';
import { noop } from '@whook/whook';
import { printStackTrace, YError } from 'yerror';
Expand All @@ -17,7 +16,6 @@ import type {
FirehoseTransformationEventRecord,
FirehoseTransformationResultRecord,
FirehoseTransformationResult,
Context,
} from 'aws-lambda';
import type { AppEnvVars } from 'application-services';

Expand Down Expand Up @@ -116,9 +114,7 @@ async function handleForAWSTransformerLambda(
}: Required<TransformerWrapperDependencies>,
handler: WhookHandler<LambdaTransformerInput, LambdaTransformerOutput>,
event: FirehoseTransformationEvent,
context: Context,
callback: (err: Error | null, result?: FirehoseTransformationResult) => void,
) {
): Promise<FirehoseTransformationResult> {
const path = Object.keys(OPERATION_API.paths || {})[0];
const method = Object.keys(OPERATION_API.paths?.[path] || {})[0];
const OPERATION: WhookOperation = {
Expand Down Expand Up @@ -146,7 +142,7 @@ async function handleForAWSTransformerLambda(
recordsLength: event.records.length,
});

callback(null, { records: response.body.map(encodeRecord) });
return { records: response.body.map(encodeRecord) };
} catch (err) {
const castedErr = YError.cast(err as Error);

Expand All @@ -164,7 +160,7 @@ async function handleForAWSTransformerLambda(
recordsLength: event.records.length,
});

callback(err as Error);
throw castedErr;
}
}

Expand Down

0 comments on commit 9c7249f

Please sign in to comment.