diff --git a/packages/core/src/pubsub/pubsub-message.ts b/packages/core/src/pubsub/pubsub-message.ts index a7f2cbb3ee..41f72dd3cd 100644 --- a/packages/core/src/pubsub/pubsub-message.ts +++ b/packages/core/src/pubsub/pubsub-message.ts @@ -1,6 +1,5 @@ import { StreamID } from '@ceramicnetwork/streamid' import { CID } from 'multiformats/cid' -import { ServiceMetrics as Metrics } from '@ceramicnetwork/observability' import * as dagCBOR from '@ipld/dag-cbor' import { create as createDigest } from 'multiformats/hashes/digest' import * as sha256 from '@stablelib/sha256' @@ -103,9 +102,6 @@ export type PubsubMessage = co.TypeOf const textEncoder = new TextEncoder() const textDecoder = new TextDecoder('utf-8') -const PUBSUB_PUBLISHED = 'pubsub_published' -const PUBSUB_RECEIVED = 'pubsub_received' - function messageHash(message: any): string { // DAG-CBOR encoding const encoded = dagCBOR.encode(message) @@ -130,7 +126,6 @@ export function buildQueryMessage(streamId: StreamID): QueryMessage { } export function serialize(message: PubsubMessage): Uint8Array { - Metrics.count(PUBSUB_PUBLISHED, 1, { typ: message.typ }) // really attempted to publish... const payload = PubsubMessage.encode(message) return textEncoder.encode(JSON.stringify(payload)) } @@ -138,6 +133,5 @@ export function serialize(message: PubsubMessage): Uint8Array { export function deserialize(message: any): PubsubMessage { const asString = textDecoder.decode(message.data) const parsed = JSON.parse(asString) - Metrics.count(PUBSUB_RECEIVED, 1, { typ: parsed.typ }) return co.decode(PubsubMessage, parsed) as PubsubMessage } diff --git a/packages/core/src/pubsub/pubsub.ts b/packages/core/src/pubsub/pubsub.ts index 050c7ea187..bada27a26c 100644 --- a/packages/core/src/pubsub/pubsub.ts +++ b/packages/core/src/pubsub/pubsub.ts @@ -1,5 +1,6 @@ import { Observable, EMPTY, pipe, of, from, Subscription, UnaryFunction } from 'rxjs' -import { map, catchError, mergeMap, withLatestFrom } from 'rxjs/operators' +import { ServiceMetrics as Metrics } from '@ceramicnetwork/observability' +import { map, catchError, mergeMap, withLatestFrom, tap } from 'rxjs/operators' import { IpfsApi } from '@ceramicnetwork/common' import { deserialize, PubsubMessage, serialize } from './pubsub-message.js' import { DiagnosticsLogger, ServiceLogger } from '@ceramicnetwork/common' @@ -42,6 +43,9 @@ function ipfsToPubsub( ) } +const PUBSUB_PUBLISHED = 'pubsub_published' +const PUBSUB_RECEIVED = 'pubsub_received' + /** * Receive and publish messages to IPFS pubsub. */ @@ -76,7 +80,10 @@ export class Pubsub extends Observable { logger, `IPFS did not provide any internal messages, please check your IPFS configuration.` ), - ipfsToPubsub(this.peerId$, pubsubLogger, topic) + ipfsToPubsub(this.peerId$, pubsubLogger, topic), + tap((message) => { + Metrics.count(PUBSUB_RECEIVED, 1, { typ: message.typ }) + }) ) .subscribe(subscriber) }) @@ -96,23 +103,27 @@ export class Pubsub extends Observable { .pipe( mergeMap(async (peerId) => { const serializedMessage = serialize(message) - await this.ipfs.pubsub.publish(this.topic, serializedMessage) - return { peerId, serializedMessage } + const logMessage = { ...message, ...JSON.parse(textDecoder.decode(serializedMessage)) } + try { + await this.ipfs.pubsub.publish(this.topic, serializedMessage) + Metrics.count(PUBSUB_PUBLISHED, 1, { typ: message.typ }) // really attempted to publish... + this.pubsubLogger.log({ + peer: peerId, + event: 'published', + topic: this.topic, + message: logMessage, + }) + } catch (error) { + this.pubsubLogger.log({ + peer: peerId, + event: 'publish-error', + topic: this.topic, + message: logMessage, + }) + this.logger.err(error) + } }) ) - .subscribe({ - next: ({ peerId, serializedMessage }) => { - const logMessage = { ...message, ...JSON.parse(textDecoder.decode(serializedMessage)) } - this.pubsubLogger.log({ - peer: peerId, - event: 'published', - topic: this.topic, - message: logMessage, - }) - }, - error: (error) => { - this.logger.err(error) - }, - }) + .subscribe() } }