Skip to content

Commit

Permalink
chore: setup new logging stack
Browse files Browse the repository at this point in the history
  • Loading branch information
didinele committed Jun 20, 2024
1 parent 1370962 commit 812dbe9
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 1 deletion.
1 change: 1 addition & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"murmurhash": "^2.0.1",
"pg": "^8.12.0",
"pino": "^9.2.0",
"pino-abstract-transport": "^1.2.0",
"pino-pretty": "^11.2.1",
"tslib": "^2.6.3"
}
Expand Down
6 changes: 5 additions & 1 deletion packages/core/src/singletons/DependencyManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ export class DependencyManager {
}

private registerLogger(): void {
const logger = createPinoLogger({ level: 'trace' });
const transport = createPinoLogger.transport({
target: '../util/loggingTransport.js',
});

const logger = createPinoLogger({ level: 'trace', transport });
globalContainer.bind<Logger>(INJECTION_TOKENS.logger).toConstantValue(logger);
}

Expand Down
140 changes: 140 additions & 0 deletions packages/core/src/util/loggingTransport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// https://github.com/ChatSift/logs - but modern. Due to quick iteration, it currently lives in this repo.
// In the future, it might be moved to the logs repo.

// Note that this file should never be imported/exported. Pino spawns it as a worker thread.

import { URL } from 'node:url';
import buildPinoTransport from 'pino-abstract-transport';

/**
* Options for the transport.
*/
export interface TransportOptions {
/**
* The authorization token to use.
*/
auth: string;
/**
* The domain to send logs to.
*/
domain: string;
/**
* The stream to send logs to.
*/
stream: string;
}

/**
* Data to log.
*
* @internal
*/
export interface LogData extends TransportOptions {
/**
* The literal log data.
*/
data: any;
}

export default async function transport(options: TransportOptions) {
await ensureStream(options);

return buildPinoTransport(async (source) => {
for await (const data of source) {
void handleLog({ ...options, data });
}
});
}

async function handleLog(options: LogData) {
const body = JSON.stringify(options.data);

const res = await fetch(new URL('/api/v1/ingest', options.domain), {
method: 'POST',
body,
headers: getHeaders(options),
});

if (!res.ok) {
console.error('Failed to send log', await parseResponseIfPossible(res));
}
}

function getHeaders({ stream, auth }: TransportOptions): Record<string, string> {
return {
'X-P-Stream': stream,
Authorization: `Basic ${auth}`,
'Content-Type': 'application/json',
};
}

async function parseResponseIfPossible(res: Response): Promise<any> {
async function tryText(): Promise<string | null> {
try {
return await res.text();
} catch {
return null;
}
}

if (res.headers.get('content-type')?.startsWith('application/json')) {
try {
return await res.json();
} catch {
return tryText();
}
}

return tryText();
}

async function ensureStream(options: TransportOptions): Promise<void> {
const { domain, stream } = options;
const headers = getHeaders(options);

const streamListResponse = await fetch(new URL('/api/v1/logstream', domain), {
method: 'GET',
headers,
});

if (!streamListResponse.ok) {
console.error('body', await parseResponseIfPossible(streamListResponse));
throw new Error('Failed to get log streams');
}

const streamList = (await streamListResponse.json()) as { name: string }[];
if (!streamList.some(({ name }) => name === stream)) {
const createResponse = await fetch(new URL(`/api/v1/logstream/${stream}`, domain), {
method: 'PUT',
headers,
});

if (!createResponse.ok) {
console.log(await parseResponseIfPossible(createResponse));
throw new Error('Failed to create log stream');
}

// Streams must not be empty before setting up retention
await handleLog({
...options,
data: { message: 'Log stream created', level: 'info', datetime: new Date().toISOString() },
});

const retentionResponse = await fetch(new URL(`/api/v1/logstream/${stream}/retention`, domain), {
method: 'PUT',
headers,
body: JSON.stringify([
{
description: 'delete after 30 days',
duration: '30d',
action: 'delete',
},
]),
});

if (!retentionResponse.ok) {
console.log(await parseResponseIfPossible(retentionResponse));
throw new Error('Failed to setup log stream retention');
}
}
}

0 comments on commit 812dbe9

Please sign in to comment.