From 1e74c3c23a5b7e5842a421f0ad67e463cde7d431 Mon Sep 17 00:00:00 2001 From: Shigma Date: Mon, 12 Feb 2024 17:34:08 +0800 Subject: [PATCH] feat(http): fix context tracing --- package.json | 2 +- packages/http/src/adapter/browser.ts | 5 +- packages/http/src/index.ts | 238 ++++++++++++++------------- packages/http/src/utils.ts | 3 - packages/socks/src/index.ts | 4 + yakumo.yml | 8 + 6 files changed, 140 insertions(+), 120 deletions(-) create mode 100644 yakumo.yml diff --git a/package.json b/package.json index 4ba3567..c7d90ef 100644 --- a/package.json +++ b/package.json @@ -34,7 +34,7 @@ "tsx": "^4.7.0", "typescript": "^5.3.2", "yakumo": "^1.0.0-alpha.10", - "yakumo-esbuild": "^1.0.0-alpha.2", + "yakumo-esbuild": "^1.0.0-beta.3", "yakumo-publish-sync": "^1.0.0-alpha.1", "yakumo-tsc": "^1.0.0-alpha.2" } diff --git a/packages/http/src/adapter/browser.ts b/packages/http/src/adapter/browser.ts index 284e318..ad4db80 100644 --- a/packages/http/src/adapter/browser.ts +++ b/packages/http/src/adapter/browser.ts @@ -3,9 +3,8 @@ import { LookupAddress } from 'dns' import { HTTP } from '../index.js' -const ws = typeof WebSocket !== 'undefined' ? WebSocket : null - -export { ws as WebSocket } +const { WebSocket } = globalThis +export { WebSocket } const v4 = /^(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}$/ diff --git a/packages/http/src/index.ts b/packages/http/src/index.ts index 5dde254..23bacbf 100644 --- a/packages/http/src/index.ts +++ b/packages/http/src/index.ts @@ -32,17 +32,6 @@ class HTTPError extends Error { } } -export interface HTTP { - (url: string | URL, config?: HTTP.RequestConfig): Promise> - (method: HTTP.Method, url: string | URL, config?: HTTP.RequestConfig): Promise> - config: HTTP.Config - get: HTTP.Request1 - delete: HTTP.Request1 - patch: HTTP.Request2 - post: HTTP.Request2 - put: HTTP.Request2 -} - export namespace HTTP { export type Method = | 'get' | 'GET' @@ -114,15 +103,43 @@ export namespace HTTP { export type Error = HTTPError } -export class HTTP { +export interface HTTP { + [Context.current]: Context + (url: string | URL, config?: HTTP.RequestConfig): Promise> + (method: HTTP.Method, url: string | URL, config?: HTTP.RequestConfig): Promise> + config: HTTP.Config + get: HTTP.Request1 + delete: HTTP.Request1 + patch: HTTP.Request2 + post: HTTP.Request2 + put: HTTP.Request2 +} + +export class HTTP extends Function { static Error = HTTPError /** @deprecated use `HTTP.Error.is()` instead */ static isAxiosError = HTTPError.is - protected [Context.current]: Context + static { + for (const method of ['get', 'delete'] as const) { + defineProperty(HTTP.prototype, method, async function (this: HTTP, url: string, config?: HTTP.Config) { + const caller = this[Context.current] + const response = await this.call(caller, method, url, config) + return response.data + }) + } - constructor(ctx: Context, config: HTTP.Config = {}) { - ctx.provide('http') + for (const method of ['patch', 'post', 'put'] as const) { + defineProperty(HTTP.prototype, method, async function (this: HTTP, url: string, data?: any, config?: HTTP.Config) { + const caller = this[Context.current] + const response = await this.call(caller, method, url, { data, ...config }) + return response.data + }) + } + } + + constructor(ctx: Context, config: HTTP.Config = {}, isExtend?: boolean) { + super() function resolveDispatcher(href?: string) { if (!href) return @@ -137,123 +154,120 @@ export class HTTP { if (typeof args[1] === 'string' || args[1] instanceof URL) { method = args.shift() } - const config = this.http.resolveConfig(args[1]) - const url = this.http.resolveURL(args[0], config) + const caller = isExtend ? ctx : this + const config = (http as HTTP).resolveConfig(caller, args[1]) + const url = HTTP.resolveURL(caller, args[0], config) + const controller = new AbortController() - this.on('dispose', () => { + let timer: NodeJS.Timeout | number | undefined + const dispose = caller.on('dispose', () => { + clearTimeout(timer) controller.abort('context disposed') }) if (config.timeout) { - const timer = setTimeout(() => { + timer = setTimeout(() => { controller.abort('timeout') }, config.timeout) - this.on('dispose', () => clearTimeout(timer)) } - const raw = await fetch(url, { - method, - body: config.data, - headers: config.headers, - keepalive: config.keepAlive, - signal: controller.signal, - ['dispatcher' as never]: resolveDispatcher(config?.proxyAgent), - }).catch((cause) => { - const error = new HTTP.Error(cause.message) - error.cause = cause - throw error - }) - - const response: HTTP.Response = { - data: null, - url: raw.url, - status: raw.status, - statusText: raw.statusText, - headers: raw.headers, - } + try { + const raw = await fetch(url, { + method, + body: config.data, + headers: config.headers, + keepalive: config.keepAlive, + signal: controller.signal, + ['dispatcher' as never]: resolveDispatcher(config?.proxyAgent), + }).catch((cause) => { + const error = new HTTP.Error(`fetch ${url} failed`) + error.cause = cause + throw error + }) - if (!raw.ok) { - const error = new HTTP.Error(raw.statusText) - error.response = response - try { + const response: HTTP.Response = { + data: null, + url: raw.url, + status: raw.status, + statusText: raw.statusText, + headers: raw.headers, + } + + if (!raw.ok) { + const error = new HTTP.Error(raw.statusText) + error.response = response + try { + response.data = await this.http.decodeResponse(raw) + } catch {} + throw error + } + + if (config.responseType === 'arraybuffer') { + response.data = await raw.arrayBuffer() + } else if (config.responseType === 'stream') { + response.data = raw.body + } else { response.data = await this.http.decodeResponse(raw) - } catch {} - throw error + } + return response + } finally { + dispose() } - - if (config.responseType === 'arraybuffer') { - response.data = await raw.arrayBuffer() - } else if (config.responseType === 'stream') { - response.data = raw.body - } else { - response.data = await this.http.decodeResponse(raw) - } - return response } as HTTP http.config = config defineProperty(http, Context.current, ctx) Object.setPrototypeOf(http, Object.getPrototypeOf(this)) - for (const method of ['get', 'delete'] as const) { - http[method] = async function (this: HTTP, url: string, config?: HTTP.Config) { - const caller = this[Context.current] - const response = await caller.http(url, { - method, - ...config, - }) - return response.data - } - } - - for (const method of ['patch', 'post', 'put'] as const) { - http[method] = async function (this: HTTP, url: string, data?: any, config?: HTTP.Config) { - const caller = this[Context.current] - const response = await caller.http(url, { - method, - data, - ...config, - }) - return response.data - } + if (!isExtend) { + ctx.provide('http') + ctx.http = http + ctx.on('dispose', () => { + ctx.http = null as never + }) } - ctx.http = Context.associate(http, 'http') - ctx.on('dispose', () => { - ctx.http = null as never - }) - return http } - resolveConfig(init?: HTTP.RequestConfig): HTTP.RequestConfig { - let result = { headers: {}, ...this.config } - const merge = (init?: HTTP.RequestConfig) => { - result = { - ...result, - ...this.config, - headers: { - ...result.headers, - ...init?.headers, - }, - } - } + static mergeConfig = (target: HTTP.Config, source?: HTTP.Config) => ({ + ...target, + ...source, + headers: { + ...target.headers, + ...source?.headers, + }, + }) + + extend(config: HTTP.Config = {}) { + return new HTTP(this[Context.current], HTTP.mergeConfig(this.config, config), true) + } - const caller = this[Context.current] + resolveConfig(caller: Context, init?: HTTP.RequestConfig): HTTP.RequestConfig { + let result = { headers: {}, ...this.config } let intercept = caller[Context.intercept] while (intercept) { - merge(intercept.http) + result = HTTP.mergeConfig(result, intercept.http) intercept = Object.getPrototypeOf(intercept) } - merge(init) + result = HTTP.mergeConfig(result, init) return result } - resolveURL(url: string | URL, config: HTTP.RequestConfig) { + static resolveURL(caller: Context, url: string | URL, config: HTTP.RequestConfig) { if (config.endpoint) { - this[Context.current].emit('internal/warning', 'endpoint is deprecated, please use baseURL instead') - url = trimSlash(config.endpoint) + url + // caller.emit('internal/warning', 'endpoint is deprecated, please use baseURL instead') + try { + new URL(url) + } catch { + url = trimSlash(config.endpoint) + url + } + } + try { + url = new URL(url, config.baseURL) + } catch (error) { + // prettify the error message + throw new TypeError(`Invalid URL: ${url}`) } - url = new URL(url, config.baseURL) for (const [key, value] of Object.entries(config.params ?? {})) { url.searchParams.append(key, value) } @@ -261,8 +275,8 @@ export class HTTP { } decodeResponse(response: Response) { - const type = response.headers.get('Content-Type') - if (type === 'application/json') { + const type = response.headers.get('content-type') + if (type?.startsWith('application/json')) { return response.json() } else if (type?.startsWith('text/')) { return response.text() @@ -273,18 +287,15 @@ export class HTTP { async head(url: string, config?: HTTP.Config) { const caller = this[Context.current] - const response = await caller.http(url, { - method: 'HEAD', - ...config, - }) + const response = await this.call(caller, 'HEAD', url, config) return response.headers } /** @deprecated use `ctx.http()` instead */ - async axios(url: string, config?: HTTP.Config) { + axios(url: string, config?: HTTP.Config): HTTP.Response { const caller = this[Context.current] caller.emit('internal/warning', 'ctx.http.axios() is deprecated, use ctx.http() instead') - return caller.http(url, config) + return this.call(caller, url, config) } resolveAgent(href?: string) { @@ -296,14 +307,15 @@ export class HTTP { } async ws(this: HTTP, url: string | URL, init?: HTTP.Config) { - const config = this.resolveConfig(init) - url = this.resolveURL(url, config) + const caller = this[Context.current] + const config = this.resolveConfig(caller, init) + url = HTTP.resolveURL(caller, url, config) const socket = new WebSocket(url, 'Server' in WebSocket ? { agent: this.resolveAgent(config?.proxyAgent), handshakeTimeout: config?.timeout, headers: config?.headers, } as ClientOptions as never : undefined) - this[Context.current].on('dispose', () => { + caller.on('dispose', () => { socket.close(1001, 'context disposed') }) return socket @@ -318,12 +330,12 @@ export class HTTP { const [, mime, base64] = capture return { mime, data: base64ToArrayBuffer(base64) } } - const { headers, data, url: responseUrl } = await caller.http(url, { + const { headers, data, url: responseUrl } = await this.call(caller, url, { method: 'GET', responseType: 'arraybuffer', timeout: +options.timeout! || undefined, }) - const mime = headers.get('Content-Type') ?? undefined + const mime = headers.get('content-type') ?? undefined const [, name] = responseUrl.match(/.+\/([^/?]*)(?=\?)?/)! return { mime, name, data } } diff --git a/packages/http/src/utils.ts b/packages/http/src/utils.ts index c8232dc..a190767 100644 --- a/packages/http/src/utils.ts +++ b/packages/http/src/utils.ts @@ -40,17 +40,14 @@ function parseIPv4(ip: string) { function parseIPv6(ip: string) { const exp = ip.indexOf('::') let num = 0n - // :: 左边有内容 if (exp !== -1 && exp !== 0) { ip.slice(0, exp).split(':').forEach((piece, i) => { num |= BigInt(`0x${piece}`) << BigInt((7 - i) * 16) }) } - // :: 在最右边 if (exp === ip.length - 2) { return num } - // :: 右边的内容 const rest = exp === -1 ? ip : ip.slice(exp + 2) const v4 = rest.includes('.') const pieces = rest.split(':') diff --git a/packages/socks/src/index.ts b/packages/socks/src/index.ts index dac079a..cdd5f25 100644 --- a/packages/socks/src/index.ts +++ b/packages/socks/src/index.ts @@ -7,6 +7,10 @@ import { SocksClient, SocksProxy } from 'socks' import type { Agent, buildConnector, Client } from 'undici' import { SocksProxyAgent } from 'socks-proxy-agent' +// @ts-ignore +// ensure the global dispatcher is initialized +fetch().catch(() => {}) + function getUniqueSymbol(object: object, name: string) { const symbol = Object.getOwnPropertySymbols(object).find(s => s.toString() === `Symbol(${name})`) return object[symbol!] diff --git a/yakumo.yml b/yakumo.yml new file mode 100644 index 0000000..ee9a2ce --- /dev/null +++ b/yakumo.yml @@ -0,0 +1,8 @@ +- name: yakumo + config: + pipeline: + build: + - tsc + - esbuild +- name: yakumo-esbuild +- name: yakumo-tsc