Skip to content

Commit

Permalink
fix: add config to process messages concurrently (#2026)
Browse files Browse the repository at this point in the history
Signed-off-by: Pritam Singh <[email protected]>
Signed-off-by: Sai Ranjit Tummalapalli <[email protected]>
Co-authored-by: Pritam Singh <[email protected]>
  • Loading branch information
sairanjit and Zzocker authored Oct 11, 2024
1 parent 8037495 commit 5a8f536
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 25 deletions.
24 changes: 13 additions & 11 deletions packages/core/src/agent/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type { InitConfig } from '../types'
import type { Subscription } from 'rxjs'

import { Subject } from 'rxjs'
import { concatMap, takeUntil } from 'rxjs/operators'
import { mergeMap, takeUntil } from 'rxjs/operators'

import { InjectionSymbols } from '../constants'
import { SigningProviderToken } from '../crypto'
Expand Down Expand Up @@ -152,16 +152,18 @@ export class Agent<AgentModules extends AgentModulesInput = any> extends BaseAge
.observable<AgentMessageReceivedEvent>(AgentEventTypes.AgentMessageReceived)
.pipe(
takeUntil(stop$),
concatMap((e) =>
this.messageReceiver
.receiveMessage(e.payload.message, {
connection: e.payload.connection,
contextCorrelationId: e.payload.contextCorrelationId,
receivedAt: e.payload.receivedAt,
})
.catch((error) => {
this.logger.error('Failed to process message', { error })
})
mergeMap(
(e) =>
this.messageReceiver
.receiveMessage(e.payload.message, {
connection: e.payload.connection,
contextCorrelationId: e.payload.contextCorrelationId,
session: e.payload.session,
})
.catch((error) => {
this.logger.error('Failed to process message', { error })
}),
this.agentConfig.processDidCommMessagesConcurrently ? undefined : 1
)
)
.subscribe()
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/agent/AgentConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ export class AgentConfig {
return this.initConfig.backupBeforeStorageUpdate ?? true
}

public get processDidCommMessagesConcurrently() {
return this.initConfig.processDidCommMessagesConcurrently ?? false
}

public extend(config: Partial<InitConfig>): AgentConfig {
return new AgentConfig(
{ ...this.initConfig, logger: this.logger, label: this.label, ...config },
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/agent/Dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class Dispatcher {
}

public async dispatch(messageContext: InboundMessageContext): Promise<void> {
const { agentContext, connection, senderKey, recipientKey, message } = messageContext
const { agentContext, connection, senderKey, recipientKey, message, encryptedMessage } = messageContext

// Set default handler if available, middleware can still override the message handler
const messageHandler = this.messageHandlerRegistry.getHandlerForMessageType(message.type)
Expand Down Expand Up @@ -138,6 +138,7 @@ class Dispatcher {
message,
connection,
receivedAt: messageContext.receivedAt,
encryptedMessage,
},
})
}
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/agent/Events.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import type { AgentMessage } from './AgentMessage'
import type { TransportSession } from './TransportService'
import type { OutboundMessageContext, OutboundMessageSendStatus } from './models'
import type { ConnectionRecord } from '../modules/connections'
import type { EncryptedMessage } from '../types'
import type { Observable } from 'rxjs'

import { filter } from 'rxjs'
Expand Down Expand Up @@ -34,6 +36,7 @@ export interface AgentMessageReceivedEvent extends BaseEvent {
connection?: ConnectionRecord
contextCorrelationId?: string
receivedAt?: Date
session?: TransportSession
}
}

Expand All @@ -43,6 +46,7 @@ export interface AgentMessageProcessedEvent extends BaseEvent {
message: AgentMessage
connection?: ConnectionRecord
receivedAt?: Date
encryptedMessage?: EncryptedMessage
}
}

Expand Down
1 change: 1 addition & 0 deletions packages/core/src/agent/MessageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ export class MessageReceiver {
recipientKey,
agentContext,
receivedAt,
encryptedMessage,
})

// We want to save a session if there is a chance of returning outbound message via inbound transport.
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/agent/models/InboundMessageContext.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { OutboundMessageContext } from './OutboundMessageContext'
import type { Key } from '../../crypto'
import type { ConnectionRecord } from '../../modules/connections'
import type { EncryptedMessage } from '../../types'
import type { AgentMessage } from '../AgentMessage'
import type { MessageHandler } from '../MessageHandler'
import type { AgentContext } from '../context'
Expand All @@ -14,6 +15,7 @@ export interface MessageContextParams {
recipientKey?: Key
agentContext: AgentContext
receivedAt?: Date
encryptedMessage?: EncryptedMessage
}

export class InboundMessageContext<T extends AgentMessage = AgentMessage> {
Expand All @@ -28,6 +30,7 @@ export class InboundMessageContext<T extends AgentMessage = AgentMessage> {
public message: T
public messageHandler?: MessageHandler
public responseMessage?: OutboundMessageContext
public encryptedMessage?: EncryptedMessage

public constructor(message: T, context: MessageContextParams) {
this.message = message
Expand All @@ -37,6 +40,7 @@ export class InboundMessageContext<T extends AgentMessage = AgentMessage> {
this.sessionId = context.sessionId
this.agentContext = context.agentContext
this.receivedAt = context.receivedAt ?? new Date()
this.encryptedMessage = context.encryptedMessage
}

public setMessageHandler(messageHandler: MessageHandler) {
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export interface InitConfig {
connectionImageUrl?: string
autoUpdateStorageOnStartup?: boolean
backupBeforeStorageUpdate?: boolean
processDidCommMessagesConcurrently?: boolean
}

export type ProtocolVersion = `${number}.${number}`
Expand Down
1 change: 1 addition & 0 deletions packages/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"@credo-ts/core": "workspace:*",
"@types/express": "^4.17.15",
"express": "^4.17.1",
"rxjs": "^7.8.0",
"ws": "^8.13.0"
},
"devDependencies": {
Expand Down
56 changes: 49 additions & 7 deletions packages/node/src/transport/HttpInboundTransport.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
import type { InboundTransport, Agent, TransportSession, EncryptedMessage, AgentContext } from '@credo-ts/core'
import type {
InboundTransport,
Agent,
TransportSession,
EncryptedMessage,
AgentContext,
AgentMessageReceivedEvent,
AgentMessageProcessedEvent,
} from '@credo-ts/core'
import type { Express, Request, Response } from 'express'
import type { Server } from 'http'

import { DidCommMimeType, CredoError, TransportService, utils, MessageReceiver } from '@credo-ts/core'
import { DidCommMimeType, CredoError, TransportService, utils, AgentEventTypes } from '@credo-ts/core'
import express, { text } from 'express'
import { filter, firstValueFrom, ReplaySubject, timeout } from 'rxjs'

const supportedContentTypes: string[] = [DidCommMimeType.V0, DidCommMimeType.V1]

Expand All @@ -12,13 +21,25 @@ export class HttpInboundTransport implements InboundTransport {
private port: number
private path: string
private _server?: Server
private processedMessageListenerTimeoutMs: number

public get server() {
return this._server
}

public constructor({ app, path, port }: { app?: Express; path?: string; port: number }) {
public constructor({
app,
path,
port,
processedMessageListenerTimeoutMs,
}: {
app?: Express
path?: string
port: number
processedMessageListenerTimeoutMs?: number
}) {
this.port = port
this.processedMessageListenerTimeoutMs = processedMessageListenerTimeoutMs ?? 10000 // timeout after 10 seconds

// Create Express App
this.app = app ?? express()
Expand All @@ -29,7 +50,6 @@ export class HttpInboundTransport implements InboundTransport {

public async start(agent: Agent) {
const transportService = agent.dependencyManager.resolve(TransportService)
const messageReceiver = agent.dependencyManager.resolve(MessageReceiver)

agent.config.logger.debug(`Starting HTTP inbound transport`, {
port: this.port,
Expand All @@ -51,11 +71,33 @@ export class HttpInboundTransport implements InboundTransport {

try {
const message = req.body
const encryptedMessage = JSON.parse(message)
await messageReceiver.receiveMessage(encryptedMessage, {
session,
const encryptedMessage = JSON.parse(message) as EncryptedMessage

const observable = agent.events.observable<AgentMessageProcessedEvent>(AgentEventTypes.AgentMessageProcessed)
const subject = new ReplaySubject(1)

observable
.pipe(
filter((e) => e.type === AgentEventTypes.AgentMessageProcessed),
filter((e) => e.payload.encryptedMessage === encryptedMessage),
timeout({
first: this.processedMessageListenerTimeoutMs,
meta: 'HttpInboundTransport.start',
})
)
.subscribe(subject)

agent.events.emit<AgentMessageReceivedEvent>(agent.context, {
type: AgentEventTypes.AgentMessageReceived,
payload: {
message: encryptedMessage,
session: session,
},
})

// Wait for message to be processed
await firstValueFrom(subject)

// If agent did not use session when processing message we need to send response here.
if (!res.headersSent) {
res.status(200).end()
Expand Down
26 changes: 20 additions & 6 deletions packages/node/src/transport/WsInboundTransport.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import type { Agent, InboundTransport, Logger, TransportSession, EncryptedMessage, AgentContext } from '@credo-ts/core'

import { CredoError, TransportService, utils, MessageReceiver } from '@credo-ts/core'
import type {
Agent,
InboundTransport,
Logger,
TransportSession,
EncryptedMessage,
AgentContext,
AgentMessageReceivedEvent,
} from '@credo-ts/core'

import { CredoError, TransportService, utils, AgentEventTypes } from '@credo-ts/core'
// eslint-disable-next-line import/no-named-as-default
import WebSocket, { Server } from 'ws'

Expand Down Expand Up @@ -58,13 +66,19 @@ export class WsInboundTransport implements InboundTransport {
}

private listenOnWebSocketMessages(agent: Agent, socket: WebSocket, session: TransportSession) {
const messageReceiver = agent.dependencyManager.resolve(MessageReceiver)

// eslint-disable-next-line @typescript-eslint/no-explicit-any
socket.addEventListener('message', async (event: any) => {
this.logger.debug('WebSocket message event received.', { url: event.target.url })
try {
await messageReceiver.receiveMessage(JSON.parse(event.data), { session })
const encryptedMessage = JSON.parse(event.data) as EncryptedMessage

agent.events.emit<AgentMessageReceivedEvent>(agent.context, {
type: AgentEventTypes.AgentMessageReceived,
payload: {
message: encryptedMessage,
session: session,
},
})
} catch (error) {
this.logger.error(`Error processing message: ${error}`)
}
Expand Down
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 5a8f536

Please sign in to comment.