Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a cloudflare-streaming wrapper #642

Merged
merged 3 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/ten-trainers-boil.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@opennextjs/aws": minor
---

feat: add a cloudflare-streaming wrapper
4 changes: 2 additions & 2 deletions packages/open-next/src/build/createServerBundle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ async function generateBundle(
// `node_modules` inside `.next/standalone`, and others inside
// `.next/standalone/package/path` (ie. `.next`, `server.js`).
// We need to output the handler file inside the package path.
const isMonorepo = monorepoRoot !== appPath;
const packagePath = path.relative(monorepoRoot, appBuildOutputPath);
fs.mkdirSync(path.join(outputPath, packagePath), { recursive: true });

Expand Down Expand Up @@ -244,6 +243,7 @@ async function generateBundle(
options,
);

const isMonorepo = monorepoRoot !== appPath;
if (isMonorepo) {
addMonorepoEntrypoint(outputPath, packagePath);
}
Expand Down Expand Up @@ -301,7 +301,7 @@ function addMonorepoEntrypoint(outputPath: string, packagePath: string) {
const packagePosixPath = packagePath.split(path.sep).join(path.posix.sep);
fs.writeFileSync(
path.join(outputPath, "index.mjs"),
[`export * from "./${packagePosixPath}/index.mjs";`].join(""),
`export * from "./${packagePosixPath}/index.mjs";`,
);
}

Expand Down
3 changes: 2 additions & 1 deletion packages/open-next/src/build/validateConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const compatibilityMatrix: Record<IncludedWrapper, IncludedConverter[]> = {
],
"aws-lambda-streaming": ["aws-apigw-v2"],
cloudflare: ["edge"],
"cloudflare-streaming": ["edge"],
node: ["node"],
dummy: [],
};
Expand Down Expand Up @@ -99,7 +100,7 @@ export function validateConfig(config: OpenNextConfig) {
}
if (config.dangerous?.disableTagCache) {
logger.warn(
`You've disabled tag cache.
`You've disabled tag cache.
This means that revalidatePath and revalidateTag from next/cache will not work.
It is safe to disable if you only use page router`,
);
Expand Down
3 changes: 2 additions & 1 deletion packages/open-next/src/overrides/converters/aws-apigw-v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import { debug } from "../../adapters/logger";
import { convertToQuery } from "../../core/routing/util";
import { removeUndefinedFromQuery } from "./utils";

// Not sure which one is reallly needed as this is not documented anywhere but server actions redirect are not working without this, it causes a 500 error from cloudfront itself with a 'x-amzErrortype: InternalFailure' header
// Not sure which one is really needed as this is not documented anywhere but server actions redirect are not working without this,
// it causes a 500 error from cloudfront itself with a 'x-amzErrortype: InternalFailure' header
const CloudFrontBlacklistedHeaders = [
"connection",
"expect",
Expand Down
11 changes: 7 additions & 4 deletions packages/open-next/src/overrides/converters/edge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ const converter: Converter<
InternalResult | ({ type: "middleware" } & MiddlewareOutputEvent)
> = {
convertFrom: async (event: Request) => {
const searchParams = new URL(event.url).searchParams;
const url = new URL(event.url);

const searchParams = url.searchParams;
const query: Record<string, string | string[]> = {};
for (const [key, value] of searchParams.entries()) {
if (key in query) {
Expand All @@ -29,13 +31,13 @@ const converter: Converter<
query[key] = value;
}
}
//Transform body into Buffer
// Transform body into Buffer
const body = await event.arrayBuffer();
const headers: Record<string, string> = {};
event.headers.forEach((value, key) => {
headers[key] = value;
});
const rawPath = new URL(event.url).pathname;
const rawPath = url.pathname;
const method = event.method;
const shouldHaveBody = method !== "GET" && method !== "HEAD";
const cookies: Record<string, string> = Object.fromEntries(
Expand Down Expand Up @@ -100,9 +102,10 @@ const converter: Converter<
for (const [key, value] of Object.entries(result.headers)) {
headers.set(key, Array.isArray(value) ? value.join(",") : value);
}

return new Response(result.body as ReadableStream, {
status: result.statusCode,
headers: headers,
headers,
});
},
name: "edge",
Expand Down
77 changes: 77 additions & 0 deletions packages/open-next/src/overrides/wrappers/cloudflare-streaming.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import type { InternalEvent, InternalResult } from "types/open-next";
import type { WrapperHandler } from "types/overrides";

import { Writable } from "node:stream";
import type { StreamCreator } from "http/index";
import type { MiddlewareOutputEvent } from "../../core/routingHandler";

const handler: WrapperHandler<
InternalEvent,
InternalResult | ({ type: "middleware" } & MiddlewareOutputEvent)
> =
async (handler, converter) =>
async (
request: Request,
env: Record<string, string>,
ctx: any,
): Promise<Response> => {
globalThis.process = process;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this be necessary to import process here ?
Something like import * as process from 'node:process';

Copy link
Contributor Author

@vicb vicb Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

process is a global in node compat mode so I think this should be dropped entirely. This code would only make sense when not in node compat mode.

globalThis.openNextWaitUntil = ctx.waitUntil.bind(ctx);

// Set the environment variables
// Cloudflare suggests to not override the process.env object but instead apply the values to it
for (const [key, value] of Object.entries(env)) {
if (typeof value === "string") {
process.env[key] = value;
}
}

const internalEvent = await converter.convertFrom(request);

// TODO:
// The edge converter populate event.url with the url including the origin.
// This is required for middleware to keep track of the protocol (i.e. http with wrangler dev).
// However the server expects that the origin is not included.
const url = new URL(internalEvent.url);
(internalEvent.url as string) = url.href.slice(url.origin.length);

const { promise: promiseResponse, resolve: resolveResponse } =
Promise.withResolvers<Response>();

const streamCreator: StreamCreator = {
writeHeaders(prelude: {
statusCode: number;
cookies: string[];
headers: Record<string, string>;
}): Writable {
const { statusCode, cookies, headers } = prelude;

const responseHeaders = new Headers(headers);
for (const cookie of cookies) {
responseHeaders.append("Set-Cookie", cookie);
}

const { readable, writable } = new TransformStream();
const response = new Response(readable, {
status: statusCode,
headers: responseHeaders,
});
resolveResponse(response);

return Writable.fromWeb(writable);
},
onWrite: () => {},
onFinish: (_length: number) => {},
};

ctx.waitUntil(handler(internalEvent, streamCreator));

return promiseResponse;
};

export default {
wrapper: handler,
name: "cloudflare-streaming",
supportStreaming: true,
edgeRuntime: true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure what the value should be here.
My take is that it should be false as this is used for the edge runtime of next which is not the use case for this wrapper.
If that's the way we decide to pursue, we should add a check in the validation of the config file to throw an error for people using edgeRuntime wrapper without runtime: "edge" in their config
It won't have any effect anyway if used on a "node" route

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this value without paying too much attention but I agree it should be false.

};
8 changes: 5 additions & 3 deletions packages/open-next/src/overrides/wrappers/cloudflare.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ const handler: WrapperHandler<
// Retrieve geo information from the cloudflare request
// See https://developers.cloudflare.com/workers/runtime-apis/request
// Note: This code could be moved to a cloudflare specific converter when one is created.
const cfProperties = (request as any).cf as Record<string, string | null>;
const cfProperties = (request as any).cf as
| Record<string, string | null>
| undefined;
for (const [propName, headerName] of Object.entries(
cfPropNameToHeaderName,
)) {
const propValue = cfProperties[propName];
if (propValue !== null) {
const propValue = cfProperties?.[propName];
if (propValue != null) {
internalEvent.headers[headerName] = propValue;
}
}
Expand Down
1 change: 1 addition & 0 deletions packages/open-next/src/types/open-next.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ export type IncludedWrapper =
| "aws-lambda-streaming"
| "node"
| "cloudflare"
| "cloudflare-streaming"
| "dummy";

export type IncludedConverter =
Expand Down