diff --git a/.dev/.env.example b/.dev/.env.example index 9531c353..3cf2776b 100644 --- a/.dev/.env.example +++ b/.dev/.env.example @@ -11,3 +11,7 @@ DATABASE_NAME_PREFIX= # optional - defaults to 'false' # enable or disable the sync module SYNC_ENABLED= + +# optional - defaults to 'false' +# enable or disable the sse module +modules__sse__enabled= diff --git a/.dev/.env.local b/.dev/.env.local index fbea88ed..b30bc1da 100644 --- a/.dev/.env.local +++ b/.dev/.env.local @@ -1,4 +1,7 @@ # required - the information below is used to connect to the backbone -transportLibrary__baseUrl="http://localhost:8090" +transportLibrary__baseUrl="http://host.docker.internal:8090" transportLibrary__platformClientId="test" transportLibrary__platformClientSecret="test" + +modules__sse__enabled=true +modules__sse__baseUrlOverride="http://host.docker.internal:8092" diff --git a/.dev/appsettings.override.json b/.dev/appsettings.override.json index 6f4e0822..c902a1db 100644 --- a/.dev/appsettings.override.json +++ b/.dev/appsettings.override.json @@ -46,6 +46,10 @@ "Providers": { "Dummy": { "Enabled": true + }, + "Sse": { + "Enabled": true, + "SseServerBaseAddress": "http://sse-server:8080" } } } diff --git a/.dev/compose.backbone.yml b/.dev/compose.backbone.yml index 0b0b7ea0..31db4ec9 100644 --- a/.dev/compose.backbone.yml +++ b/.dev/compose.backbone.yml @@ -18,6 +18,8 @@ services: condition: service_started database-migrator: condition: service_completed_successfully + sse-server: + condition: service_started configs: - source: Config target: app/appsettings.override.json @@ -36,6 +38,19 @@ services: - source: Config target: app/appsettings.override.json + sse-server: + image: ghcr.io/nmshd/backbone-sse-server:${BACKBONE_VERSION} + container_name: sse-server + hostname: sse-server + ports: + - "8092:8080" + depends_on: + database: + condition: service_started + configs: + - source: Config + target: app/appsettings.override.json + database-migrator: container_name: database-migrator-test image: ghcr.io/nmshd/backbone-database-migrator:${BACKBONE_VERSION} @@ -59,8 +74,6 @@ services: environment: - POSTGRES_PASSWORD=Passw0rd - POSTGRES_DB=enmeshed - ports: - - 5432:5432 healthcheck: test: ["CMD", "pg_isready", "-U", "postgres"] interval: 5s @@ -72,15 +85,11 @@ services: hostname: azurite image: mcr.microsoft.com/azure-storage/azurite command: azurite -d /data/debug.log -l /data --blobHost "0.0.0.0" --queueHost "0.0.0.0" - ports: - - "10000:10000" rabbitmq: container_name: bkb-rabbitmq hostname: rabbitmq image: rabbitmq:3.12.10-management-alpine - ports: - - "5672:5672" ### seeds ### diff --git a/.dev/compose.yml b/.dev/compose.yml index e6825ee0..5626472c 100644 --- a/.dev/compose.yml +++ b/.dev/compose.yml @@ -16,6 +16,8 @@ services: - transportLibrary__baseUrl - transportLibrary__platformClientId - transportLibrary__platformClientSecret + - modules__sse__enabled=${modules__sse__enabled:-false} + - modules__sse__baseUrlOverride volumes: - ..:/usr/app - ./config.json:/config.json:ro @@ -42,6 +44,8 @@ services: - transportLibrary__baseUrl - transportLibrary__platformClientId - transportLibrary__platformClientSecret + - modules__sse__enabled=${modules__sse__enabled:-false} + - modules__sse__baseUrlOverride volumes: - ..:/usr/app - ./config.json:/config.json:ro diff --git a/.dev/config.json b/.dev/config.json index 1c6e754f..8685c05c 100644 --- a/.dev/config.json +++ b/.dev/config.json @@ -1,7 +1,7 @@ { "debug": true, "database": { - "connectionString": "mongodb://mongo:27017/?readPreference=primary&appname=connector&ssl=false" + "connectionString": "mongodb://mongo:27017" }, "infrastructure": { "httpServer": { diff --git a/.dev/scripts/clearDb.ts b/.dev/scripts/clearDb.ts index 1bfa19f5..f5af71c6 100755 --- a/.dev/scripts/clearDb.ts +++ b/.dev/scripts/clearDb.ts @@ -3,7 +3,7 @@ import { MongoDbConnection } from "@js-soft/docdb-access-mongo"; async function clearDb() { - const connectionString = "mongodb://localhost:27017/?readPreference=primary&appname=clearDb&ssl=false"; + const connectionString = "mongodb://localhost:27017"; const dbConnection: MongoDbConnection = new MongoDbConnection(connectionString); await dbConnection.connect(); diff --git a/.dev/scripts/establishRelationshipAndSpamMessages.ts b/.dev/scripts/establishRelationshipAndSpamMessages.ts new file mode 100644 index 00000000..3e5c20f4 --- /dev/null +++ b/.dev/scripts/establishRelationshipAndSpamMessages.ts @@ -0,0 +1,67 @@ +import { sleep } from "@js-soft/ts-utils"; +import { ConnectorClient, ConnectorRelationshipStatus } from "@nmshd/connector-sdk"; + +async function run() { + const connector1 = ConnectorClient.create({ + baseUrl: "http://localhost:3000", + apiKey: "xxx" + }); + + const connector2 = ConnectorClient.create({ + baseUrl: "http://localhost:3001", + apiKey: "xxx" + }); + + const { connector1Address, connector2Address } = await establishOrReturnRelationship(connector1, connector2); + + while (true) { + await connector1.messages.sendMessage({ recipients: [connector2Address], content: {} }); + await sleep(2000); + + await connector2.messages.sendMessage({ recipients: [connector1Address], content: {} }); + await sleep(2000); + } +} + +async function establishOrReturnRelationship(connector1: ConnectorClient, connector2: ConnectorClient) { + const identityInfo = (await connector1.account.getIdentityInfo()).result; + + const relationships = (await connector1.relationships.getRelationships()).result; + + if (relationships.length > 0) { + if (relationships[0].status === ConnectorRelationshipStatus.PENDING) { + await connector1.relationships.acceptRelationshipChange(relationships[0].id, relationships[0].changes[0].id); + } + + return { + connector1Address: identityInfo.address, + connector2Address: relationships[0].peer + }; + } + + const template = (await connector1.relationshipTemplates.createOwnRelationshipTemplate({ expiresAt: "2099", maxNumberOfAllocations: 1, content: {} })).result; + + await connector2.relationshipTemplates.loadPeerRelationshipTemplate({ reference: template.truncatedReference }); + + const relationship = (await connector2.relationships.createRelationship({ templateId: template.id, content: {} })).result; + + await connector1.account.sync(); + + const accepted = (await connector1.relationships.acceptRelationshipChange(relationship.id, relationship.changes[0].id)).result; + console.log(accepted); + + await connector2.account.sync(); + + return { + connector1Address: identityInfo.address, + connector2Address: accepted.peer + }; +} + +run() + .then(() => { + console.log("Script finished successfully"); + }) + .catch((error) => { + console.error("Script failed with error", error); + }); diff --git a/.vscode/tasks.json b/.vscode/tasks.json index d749a2d3..bd90d864 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -54,6 +54,15 @@ "reveal": "always" } }, + { + "label": "Clear Connectors", + "command": "docker compose -f .dev/compose.yml down -v", + "type": "shell", + "isBackground": true, + "presentation": { + "reveal": "always" + } + }, { "label": "Run 1", "command": "docker compose -f .dev/compose.yml --env-file .dev/.env${input:envFilePostfix} up connector-1", diff --git a/config/default.json b/config/default.json index d7fc1e31..73431271 100644 --- a/config/default.json +++ b/config/default.json @@ -68,13 +68,6 @@ "displayName": "Attribute Listener", "location": "@nmshd/runtime:AttributeListenerModule" }, - "sync": { - "displayName": "Sync", - "location": "sync/SyncModule", - "enabled": false, - - "interval": 60 - }, "autoAcceptRelationshipCreationChanges": { "displayName": "Auto Accept Relationship Creation Changes", "location": "autoAcceptRelationshipCreationChanges/AutoAcceptRelationshipCreationChangesModule", @@ -121,6 +114,18 @@ "displayName": "Message Broker Publisher", "location": "messageBrokerPublisher/MessageBrokerPublisherModule", "brokers": [] + }, + "sync": { + "displayName": "Sync", + "location": "sync/SyncModule", + "enabled": false, + + "interval": 60 + }, + "sse": { + "enabled": false, + "displayName": "Server Sent Events", + "location": "sse/SseModule" } } } diff --git a/package-lock.json b/package-lock.json index 3969ac64..f7792dba 100644 --- a/package-lock.json +++ b/package-lock.json @@ -23,6 +23,7 @@ "axios": "^1.7.2", "compression": "1.7.4", "cors": "2.8.5", + "eventsource": "^2.0.2", "express": "4.19.2", "helmet": "7.1.0", "json-stringify-safe": "5.0.1", @@ -48,6 +49,7 @@ "@types/amqplib": "^0.10.5", "@types/compression": "^1.7.5", "@types/cors": "^2.8.17", + "@types/eventsource": "^1.1.15", "@types/express": "4.17.21", "@types/jest": "^29.5.12", "@types/jest-json-schema": "^6.1.4", @@ -2581,6 +2583,12 @@ "@types/node": "*" } }, + "node_modules/@types/eventsource": { + "version": "1.1.15", + "resolved": "https://registry.npmjs.org/@types/eventsource/-/eventsource-1.1.15.tgz", + "integrity": "sha512-XQmGcbnxUNa06HR3VBVkc9+A2Vpi9ZyLJcdS5dwaQQ/4ZMWFO+5c90FnMUpbtMZwB/FChoYHwuVg8TvkECacTA==", + "dev": true + }, "node_modules/@types/express": { "version": "4.17.21", "resolved": "https://registry.npmjs.org/@types/express/-/express-4.17.21.tgz", @@ -5347,6 +5355,14 @@ "node": ">=0.8.x" } }, + "node_modules/eventsource": { + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/eventsource/-/eventsource-2.0.2.tgz", + "integrity": "sha512-IzUmBGPR3+oUG9dUeXynyNmf91/3zUSJg1lCktzKw47OXuhco54U3r9B7O4XX+Rb1Itm9OZ2b0RkTs10bICOxA==", + "engines": { + "node": ">=12.0.0" + } + }, "node_modules/execa": { "version": "5.1.1", "resolved": "https://registry.npmjs.org/execa/-/execa-5.1.1.tgz", diff --git a/package.json b/package.json index fd9e1993..891f9dc9 100644 --- a/package.json +++ b/package.json @@ -78,6 +78,7 @@ "axios": "^1.7.2", "compression": "1.7.4", "cors": "2.8.5", + "eventsource": "^2.0.2", "express": "4.19.2", "helmet": "7.1.0", "json-stringify-safe": "5.0.1", @@ -103,6 +104,7 @@ "@types/amqplib": "^0.10.5", "@types/compression": "^1.7.5", "@types/cors": "^2.8.17", + "@types/eventsource": "^1.1.15", "@types/express": "4.17.21", "@types/jest": "^29.5.12", "@types/jest-json-schema": "^6.1.4", diff --git a/src/ConnectorRuntime.ts b/src/ConnectorRuntime.ts index 510d6f85..a3a7f156 100644 --- a/src/ConnectorRuntime.ts +++ b/src/ConnectorRuntime.ts @@ -230,6 +230,10 @@ export class ConnectorRuntime extends Runtime { }; } + public async getBackboneAuthenticationToken(): Promise { + return await this.accountController.authenticator.getToken(); + } + protected async loadModule(moduleConfiguration: ModuleConfiguration): Promise { const connectorModuleConfiguration = moduleConfiguration as ConnectorRuntimeModuleConfiguration; @@ -304,10 +308,16 @@ export class ConnectorRuntime extends Runtime { } protected async stop(): Promise { - try { - await super.stop(); - } catch (e) { - this.logger.error(e); + if (this.isStarted) { + try { + await super.stop(); + } catch (e) { + this.logger.error(e); + } + } else if (this.connectorMode === "debug") { + this.logger.warn("It seemed like the connector runtime didn't do a proper startup. Closing infrastructure."); + + await this.stopInfrastructure(); } try { diff --git a/src/modules/sse/SseModule.ts b/src/modules/sse/SseModule.ts new file mode 100644 index 00000000..388f398c --- /dev/null +++ b/src/modules/sse/SseModule.ts @@ -0,0 +1,101 @@ +import { ILogger } from "@js-soft/logging-abstractions"; +import eventSourceModule from "eventsource"; +import { ConnectorMode } from "../../ConnectorMode"; +import { ConnectorRuntime } from "../../ConnectorRuntime"; +import { ConnectorRuntimeModule, ConnectorRuntimeModuleConfiguration } from "../../ConnectorRuntimeModule"; + +export enum BackboneEventName { + DatawalletModificationsCreated = "DatawalletModificationsCreated", + ExternalEventCreated = "ExternalEventCreated" +} + +export interface IBackboneEventContent { + eventName: BackboneEventName; + sentAt: string; + payload: any; +} + +export interface SseModuleConfiguration extends ConnectorRuntimeModuleConfiguration { + baseUrlOverride?: string; +} + +export default class SseModule extends ConnectorRuntimeModule { + private eventSource: eventSourceModule | undefined; + + public constructor(runtime: ConnectorRuntime, configuration: ConnectorRuntimeModuleConfiguration, logger: ILogger, connectorMode: ConnectorMode) { + super(runtime, configuration, logger, connectorMode); + } + + public init(): void | Promise { + if (this.configuration.baseUrlOverride && this.connectorMode !== "debug") { + throw new Error("baseUrlOverride is only allowed in debug mode"); + } + } + + public async start(): Promise { + await this.runSync(); + + await this.recreateEventSource(); + } + + private async recreateEventSource(): Promise { + if (this.eventSource) { + try { + this.eventSource.close(); + } catch (error) { + this.logger.error("Failed to close event source", error); + } + } + + const baseUrl = this.configuration.baseUrlOverride ?? this.runtime["runtimeConfig"].transportLibrary.baseUrl; + const sseUrl = `${baseUrl}/api/v1/sse`; + + this.logger.info(`Connecting to SSE endpoint: ${sseUrl}`); + + const token = await this.runtime.getBackboneAuthenticationToken(); + + const eventSource = new eventSourceModule(sseUrl, { + https: { rejectUnauthorized: true }, + proxy: process.env.HTTPS_PROXY ?? process.env.HTTP_PROXY, + headers: { authorization: `Bearer ${token}` } + }); + this.eventSource = eventSource; + + eventSource.addEventListener("ExternalEventCreated", async () => await this.runSync()); + + await new Promise((resolve, reject) => { + eventSource.onopen = () => { + this.logger.info("Connected to SSE endpoint"); + resolve(); + + eventSource.onopen = () => { + // noop + }; + }; + + eventSource.onerror = (error) => { + reject(error); + }; + }); + + eventSource.onerror = async (error) => { + if (error.status === 401) await this.recreateEventSource(); + }; + } + + private async runSync(): Promise { + this.logger.info("Running sync"); + + const services = this.runtime.getServices(); + + const syncResult = await services.transportServices.account.syncEverything(); + if (syncResult.isError) { + this.logger.error(syncResult); + return; + } + } + + public stop(): void { + this.eventSource?.close(); + } +} diff --git a/test/compose.yml b/test/compose.yml index 506c8e76..8d291a9a 100644 --- a/test/compose.yml +++ b/test/compose.yml @@ -1,4 +1,3 @@ -version: "3.7" services: mongo: container_name: connector-mongodb