Skip to content

Commit

Permalink
chore: Log what causes "validation ignored" error (#2920)
Browse files Browse the repository at this point in the history
* chore(core): log error

* chore(core): leave metrics

* chore(core): Trigger CI

* chore(core): Trigger CI
  • Loading branch information
ukstv authored Aug 25, 2023
1 parent bf45e19 commit 3e62086
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 24 deletions.
6 changes: 0 additions & 6 deletions packages/core/src/pubsub/pubsub-message.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { StreamID } from '@ceramicnetwork/streamid'
import { CID } from 'multiformats/cid'
import { ServiceMetrics as Metrics } from '@ceramicnetwork/observability'
import * as dagCBOR from '@ipld/dag-cbor'
import { create as createDigest } from 'multiformats/hashes/digest'
import * as sha256 from '@stablelib/sha256'
Expand Down Expand Up @@ -103,9 +102,6 @@ export type PubsubMessage = co.TypeOf<typeof PubsubMessage>
const textEncoder = new TextEncoder()
const textDecoder = new TextDecoder('utf-8')

const PUBSUB_PUBLISHED = 'pubsub_published'
const PUBSUB_RECEIVED = 'pubsub_received'

function messageHash(message: any): string {
// DAG-CBOR encoding
const encoded = dagCBOR.encode(message)
Expand All @@ -130,14 +126,12 @@ export function buildQueryMessage(streamId: StreamID): QueryMessage {
}

export function serialize(message: PubsubMessage): Uint8Array {
Metrics.count(PUBSUB_PUBLISHED, 1, { typ: message.typ }) // really attempted to publish...
const payload = PubsubMessage.encode(message)
return textEncoder.encode(JSON.stringify(payload))
}

export function deserialize(message: any): PubsubMessage {
const asString = textDecoder.decode(message.data)
const parsed = JSON.parse(asString)
Metrics.count(PUBSUB_RECEIVED, 1, { typ: parsed.typ })
return co.decode(PubsubMessage, parsed) as PubsubMessage
}
47 changes: 29 additions & 18 deletions packages/core/src/pubsub/pubsub.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Observable, EMPTY, pipe, of, from, Subscription, UnaryFunction } from 'rxjs'
import { map, catchError, mergeMap, withLatestFrom } from 'rxjs/operators'
import { ServiceMetrics as Metrics } from '@ceramicnetwork/observability'
import { map, catchError, mergeMap, withLatestFrom, tap } from 'rxjs/operators'
import { IpfsApi } from '@ceramicnetwork/common'
import { deserialize, PubsubMessage, serialize } from './pubsub-message.js'
import { DiagnosticsLogger, ServiceLogger } from '@ceramicnetwork/common'
Expand Down Expand Up @@ -42,6 +43,9 @@ function ipfsToPubsub(
)
}

const PUBSUB_PUBLISHED = 'pubsub_published'
const PUBSUB_RECEIVED = 'pubsub_received'

/**
* Receive and publish messages to IPFS pubsub.
*/
Expand Down Expand Up @@ -76,7 +80,10 @@ export class Pubsub extends Observable<PubsubMessage> {
logger,
`IPFS did not provide any internal messages, please check your IPFS configuration.`
),
ipfsToPubsub(this.peerId$, pubsubLogger, topic)
ipfsToPubsub(this.peerId$, pubsubLogger, topic),
tap((message) => {
Metrics.count(PUBSUB_RECEIVED, 1, { typ: message.typ })
})
)
.subscribe(subscriber)
})
Expand All @@ -96,23 +103,27 @@ export class Pubsub extends Observable<PubsubMessage> {
.pipe(
mergeMap(async (peerId) => {
const serializedMessage = serialize(message)
await this.ipfs.pubsub.publish(this.topic, serializedMessage)
return { peerId, serializedMessage }
const logMessage = { ...message, ...JSON.parse(textDecoder.decode(serializedMessage)) }
try {
await this.ipfs.pubsub.publish(this.topic, serializedMessage)
Metrics.count(PUBSUB_PUBLISHED, 1, { typ: message.typ }) // really attempted to publish...
this.pubsubLogger.log({
peer: peerId,
event: 'published',
topic: this.topic,
message: logMessage,
})
} catch (error) {
this.pubsubLogger.log({
peer: peerId,
event: 'publish-error',
topic: this.topic,
message: logMessage,
})
this.logger.err(error)
}
})
)
.subscribe({
next: ({ peerId, serializedMessage }) => {
const logMessage = { ...message, ...JSON.parse(textDecoder.decode(serializedMessage)) }
this.pubsubLogger.log({
peer: peerId,
event: 'published',
topic: this.topic,
message: logMessage,
})
},
error: (error) => {
this.logger.err(error)
},
})
.subscribe()
}
}

0 comments on commit 3e62086

Please sign in to comment.