From 5a8f536d6dc73f84527c66e61565438702b2b346 Mon Sep 17 00:00:00 2001 From: Sai Ranjit Tummalapalli Date: Fri, 11 Oct 2024 12:18:07 +0530 Subject: [PATCH] fix: add config to process messages concurrently (#2026) Signed-off-by: Pritam Singh Signed-off-by: Sai Ranjit Tummalapalli Co-authored-by: Pritam Singh --- packages/core/src/agent/Agent.ts | 24 ++++---- packages/core/src/agent/AgentConfig.ts | 4 ++ packages/core/src/agent/Dispatcher.ts | 3 +- packages/core/src/agent/Events.ts | 4 ++ packages/core/src/agent/MessageReceiver.ts | 1 + .../src/agent/models/InboundMessageContext.ts | 4 ++ packages/core/src/types.ts | 1 + packages/node/package.json | 1 + .../src/transport/HttpInboundTransport.ts | 56 ++++++++++++++++--- .../node/src/transport/WsInboundTransport.ts | 26 +++++++-- pnpm-lock.yaml | 3 + 11 files changed, 102 insertions(+), 25 deletions(-) diff --git a/packages/core/src/agent/Agent.ts b/packages/core/src/agent/Agent.ts index 8179d7bef5..0b417f9b03 100644 --- a/packages/core/src/agent/Agent.ts +++ b/packages/core/src/agent/Agent.ts @@ -8,7 +8,7 @@ import type { InitConfig } from '../types' import type { Subscription } from 'rxjs' import { Subject } from 'rxjs' -import { concatMap, takeUntil } from 'rxjs/operators' +import { mergeMap, takeUntil } from 'rxjs/operators' import { InjectionSymbols } from '../constants' import { SigningProviderToken } from '../crypto' @@ -152,16 +152,18 @@ export class Agent extends BaseAge .observable(AgentEventTypes.AgentMessageReceived) .pipe( takeUntil(stop$), - concatMap((e) => - this.messageReceiver - .receiveMessage(e.payload.message, { - connection: e.payload.connection, - contextCorrelationId: e.payload.contextCorrelationId, - receivedAt: e.payload.receivedAt, - }) - .catch((error) => { - this.logger.error('Failed to process message', { error }) - }) + mergeMap( + (e) => + this.messageReceiver + .receiveMessage(e.payload.message, { + connection: e.payload.connection, + contextCorrelationId: e.payload.contextCorrelationId, + session: e.payload.session, + }) + .catch((error) => { + this.logger.error('Failed to process message', { error }) + }), + this.agentConfig.processDidCommMessagesConcurrently ? undefined : 1 ) ) .subscribe() diff --git a/packages/core/src/agent/AgentConfig.ts b/packages/core/src/agent/AgentConfig.ts index a6a4cb379f..b7e737212d 100644 --- a/packages/core/src/agent/AgentConfig.ts +++ b/packages/core/src/agent/AgentConfig.ts @@ -75,6 +75,10 @@ export class AgentConfig { return this.initConfig.backupBeforeStorageUpdate ?? true } + public get processDidCommMessagesConcurrently() { + return this.initConfig.processDidCommMessagesConcurrently ?? false + } + public extend(config: Partial): AgentConfig { return new AgentConfig( { ...this.initConfig, logger: this.logger, label: this.label, ...config }, diff --git a/packages/core/src/agent/Dispatcher.ts b/packages/core/src/agent/Dispatcher.ts index b42f9aa6ca..1d4b5ef91f 100644 --- a/packages/core/src/agent/Dispatcher.ts +++ b/packages/core/src/agent/Dispatcher.ts @@ -65,7 +65,7 @@ class Dispatcher { } public async dispatch(messageContext: InboundMessageContext): Promise { - const { agentContext, connection, senderKey, recipientKey, message } = messageContext + const { agentContext, connection, senderKey, recipientKey, message, encryptedMessage } = messageContext // Set default handler if available, middleware can still override the message handler const messageHandler = this.messageHandlerRegistry.getHandlerForMessageType(message.type) @@ -138,6 +138,7 @@ class Dispatcher { message, connection, receivedAt: messageContext.receivedAt, + encryptedMessage, }, }) } diff --git a/packages/core/src/agent/Events.ts b/packages/core/src/agent/Events.ts index 8a889a237c..f41120c29d 100644 --- a/packages/core/src/agent/Events.ts +++ b/packages/core/src/agent/Events.ts @@ -1,6 +1,8 @@ import type { AgentMessage } from './AgentMessage' +import type { TransportSession } from './TransportService' import type { OutboundMessageContext, OutboundMessageSendStatus } from './models' import type { ConnectionRecord } from '../modules/connections' +import type { EncryptedMessage } from '../types' import type { Observable } from 'rxjs' import { filter } from 'rxjs' @@ -34,6 +36,7 @@ export interface AgentMessageReceivedEvent extends BaseEvent { connection?: ConnectionRecord contextCorrelationId?: string receivedAt?: Date + session?: TransportSession } } @@ -43,6 +46,7 @@ export interface AgentMessageProcessedEvent extends BaseEvent { message: AgentMessage connection?: ConnectionRecord receivedAt?: Date + encryptedMessage?: EncryptedMessage } } diff --git a/packages/core/src/agent/MessageReceiver.ts b/packages/core/src/agent/MessageReceiver.ts index 02f94d1e7b..81cb2961b8 100644 --- a/packages/core/src/agent/MessageReceiver.ts +++ b/packages/core/src/agent/MessageReceiver.ts @@ -149,6 +149,7 @@ export class MessageReceiver { recipientKey, agentContext, receivedAt, + encryptedMessage, }) // We want to save a session if there is a chance of returning outbound message via inbound transport. diff --git a/packages/core/src/agent/models/InboundMessageContext.ts b/packages/core/src/agent/models/InboundMessageContext.ts index 886210f5f5..67ae9274e1 100644 --- a/packages/core/src/agent/models/InboundMessageContext.ts +++ b/packages/core/src/agent/models/InboundMessageContext.ts @@ -1,6 +1,7 @@ import type { OutboundMessageContext } from './OutboundMessageContext' import type { Key } from '../../crypto' import type { ConnectionRecord } from '../../modules/connections' +import type { EncryptedMessage } from '../../types' import type { AgentMessage } from '../AgentMessage' import type { MessageHandler } from '../MessageHandler' import type { AgentContext } from '../context' @@ -14,6 +15,7 @@ export interface MessageContextParams { recipientKey?: Key agentContext: AgentContext receivedAt?: Date + encryptedMessage?: EncryptedMessage } export class InboundMessageContext { @@ -28,6 +30,7 @@ export class InboundMessageContext { public message: T public messageHandler?: MessageHandler public responseMessage?: OutboundMessageContext + public encryptedMessage?: EncryptedMessage public constructor(message: T, context: MessageContextParams) { this.message = message @@ -37,6 +40,7 @@ export class InboundMessageContext { this.sessionId = context.sessionId this.agentContext = context.agentContext this.receivedAt = context.receivedAt ?? new Date() + this.encryptedMessage = context.encryptedMessage } public setMessageHandler(messageHandler: MessageHandler) { diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index cd00c8706b..c0dcee8406 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -83,6 +83,7 @@ export interface InitConfig { connectionImageUrl?: string autoUpdateStorageOnStartup?: boolean backupBeforeStorageUpdate?: boolean + processDidCommMessagesConcurrently?: boolean } export type ProtocolVersion = `${number}.${number}` diff --git a/packages/node/package.json b/packages/node/package.json index 6c782bb540..b1ca1a6e9e 100644 --- a/packages/node/package.json +++ b/packages/node/package.json @@ -31,6 +31,7 @@ "@credo-ts/core": "workspace:*", "@types/express": "^4.17.15", "express": "^4.17.1", + "rxjs": "^7.8.0", "ws": "^8.13.0" }, "devDependencies": { diff --git a/packages/node/src/transport/HttpInboundTransport.ts b/packages/node/src/transport/HttpInboundTransport.ts index ac86d80cec..01a3e5dfee 100644 --- a/packages/node/src/transport/HttpInboundTransport.ts +++ b/packages/node/src/transport/HttpInboundTransport.ts @@ -1,9 +1,18 @@ -import type { InboundTransport, Agent, TransportSession, EncryptedMessage, AgentContext } from '@credo-ts/core' +import type { + InboundTransport, + Agent, + TransportSession, + EncryptedMessage, + AgentContext, + AgentMessageReceivedEvent, + AgentMessageProcessedEvent, +} from '@credo-ts/core' import type { Express, Request, Response } from 'express' import type { Server } from 'http' -import { DidCommMimeType, CredoError, TransportService, utils, MessageReceiver } from '@credo-ts/core' +import { DidCommMimeType, CredoError, TransportService, utils, AgentEventTypes } from '@credo-ts/core' import express, { text } from 'express' +import { filter, firstValueFrom, ReplaySubject, timeout } from 'rxjs' const supportedContentTypes: string[] = [DidCommMimeType.V0, DidCommMimeType.V1] @@ -12,13 +21,25 @@ export class HttpInboundTransport implements InboundTransport { private port: number private path: string private _server?: Server + private processedMessageListenerTimeoutMs: number public get server() { return this._server } - public constructor({ app, path, port }: { app?: Express; path?: string; port: number }) { + public constructor({ + app, + path, + port, + processedMessageListenerTimeoutMs, + }: { + app?: Express + path?: string + port: number + processedMessageListenerTimeoutMs?: number + }) { this.port = port + this.processedMessageListenerTimeoutMs = processedMessageListenerTimeoutMs ?? 10000 // timeout after 10 seconds // Create Express App this.app = app ?? express() @@ -29,7 +50,6 @@ export class HttpInboundTransport implements InboundTransport { public async start(agent: Agent) { const transportService = agent.dependencyManager.resolve(TransportService) - const messageReceiver = agent.dependencyManager.resolve(MessageReceiver) agent.config.logger.debug(`Starting HTTP inbound transport`, { port: this.port, @@ -51,11 +71,33 @@ export class HttpInboundTransport implements InboundTransport { try { const message = req.body - const encryptedMessage = JSON.parse(message) - await messageReceiver.receiveMessage(encryptedMessage, { - session, + const encryptedMessage = JSON.parse(message) as EncryptedMessage + + const observable = agent.events.observable(AgentEventTypes.AgentMessageProcessed) + const subject = new ReplaySubject(1) + + observable + .pipe( + filter((e) => e.type === AgentEventTypes.AgentMessageProcessed), + filter((e) => e.payload.encryptedMessage === encryptedMessage), + timeout({ + first: this.processedMessageListenerTimeoutMs, + meta: 'HttpInboundTransport.start', + }) + ) + .subscribe(subject) + + agent.events.emit(agent.context, { + type: AgentEventTypes.AgentMessageReceived, + payload: { + message: encryptedMessage, + session: session, + }, }) + // Wait for message to be processed + await firstValueFrom(subject) + // If agent did not use session when processing message we need to send response here. if (!res.headersSent) { res.status(200).end() diff --git a/packages/node/src/transport/WsInboundTransport.ts b/packages/node/src/transport/WsInboundTransport.ts index ff23807fed..11c47c1262 100644 --- a/packages/node/src/transport/WsInboundTransport.ts +++ b/packages/node/src/transport/WsInboundTransport.ts @@ -1,6 +1,14 @@ -import type { Agent, InboundTransport, Logger, TransportSession, EncryptedMessage, AgentContext } from '@credo-ts/core' - -import { CredoError, TransportService, utils, MessageReceiver } from '@credo-ts/core' +import type { + Agent, + InboundTransport, + Logger, + TransportSession, + EncryptedMessage, + AgentContext, + AgentMessageReceivedEvent, +} from '@credo-ts/core' + +import { CredoError, TransportService, utils, AgentEventTypes } from '@credo-ts/core' // eslint-disable-next-line import/no-named-as-default import WebSocket, { Server } from 'ws' @@ -58,13 +66,19 @@ export class WsInboundTransport implements InboundTransport { } private listenOnWebSocketMessages(agent: Agent, socket: WebSocket, session: TransportSession) { - const messageReceiver = agent.dependencyManager.resolve(MessageReceiver) - // eslint-disable-next-line @typescript-eslint/no-explicit-any socket.addEventListener('message', async (event: any) => { this.logger.debug('WebSocket message event received.', { url: event.target.url }) try { - await messageReceiver.receiveMessage(JSON.parse(event.data), { session }) + const encryptedMessage = JSON.parse(event.data) as EncryptedMessage + + agent.events.emit(agent.context, { + type: AgentEventTypes.AgentMessageReceived, + payload: { + message: encryptedMessage, + session: session, + }, + }) } catch (error) { this.logger.error(`Error processing message: ${error}`) } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f083c623db..a6d920b425 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -658,6 +658,9 @@ importers: express: specifier: ^4.17.1 version: 4.19.2 + rxjs: + specifier: ^7.8.0 + version: 7.8.1 ws: specifier: ^8.13.0 version: 8.18.0