Skip to content

Commit

Permalink
feat: add recon service api
Browse files Browse the repository at this point in the history
  • Loading branch information
zachferland committed Aug 1, 2023
1 parent 44fedfa commit 7cf0883
Show file tree
Hide file tree
Showing 12 changed files with 66,784 additions and 23,974 deletions.
90,483 changes: 66,521 additions & 23,962 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions packages/cli/src/ceramic-daemon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ export function makeCeramicConfig(opts: DaemonConfig): CeramicConfig {
syncOverride: SYNC_OPTIONS_MAP[opts.node.syncOverride],
streamCacheLimit: opts.node.streamCacheLimit,
indexing: opts.indexing,
reconUrl: opts.node.reconUrl,
}
if (opts.stateStore?.mode == StateStoreMode.FS) {
ceramicConfig.stateStoreDirectory = opts.stateStore.localDirectory
Expand Down
6 changes: 6 additions & 0 deletions packages/cli/src/daemon-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,12 @@ export class DaemonCeramicNodeConfig {
*/
@jsonMember(Number, { name: 'stream-cache-limit' })
streamCacheLimit?: number

/**
* If set, experimental recon is enabled and uses another node to run recon.
*/
@jsonMember(String, { name: 'recon-url' })
reconUrl?: string
}

/**
Expand Down
3 changes: 2 additions & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@
"pg-boss": "^8.2.0",
"rxjs": "^7.5.2",
"sqlite3": "^5.0.8",
"uint8arrays": "^4.0.3"
"uint8arrays": "^4.0.3",
"zcgen-client": "^0.0.5"
},
"devDependencies": {
"@ceramicnetwork/3id-did-resolver": "^2.23.0-rc.0",
Expand Down
21 changes: 20 additions & 1 deletion packages/core/src/ceramic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import { SyncApi } from './sync/sync-api.js'
import { ProvidersCache } from './providers-cache.js'
import crypto from 'crypto'
import { SyncJobData } from './sync/interfaces.js'
import { ReconApi, ReconApiHTTP } from './recon.js'

const DEFAULT_CACHE_LIMIT = 500 // number of streams stored in the cache
const DEFAULT_QPS_LIMIT = 10 // Max number of pubsub query messages that can be published per second without rate limiting
Expand Down Expand Up @@ -141,6 +142,8 @@ export interface CeramicConfig {
useCentralizedPeerDiscovery?: boolean
syncOverride?: SyncOptions

reconUrl?: string

[index: string]: any // allow arbitrary properties
}

Expand All @@ -160,6 +163,7 @@ export interface CeramicModules {
repository: Repository
shutdownSignal: ShutdownSignal
providersCache: ProvidersCache
recon: ReconApi | null
}

/**
Expand Down Expand Up @@ -218,6 +222,7 @@ export class Ceramic implements CeramicApi {
private readonly anchorResumingService: AnchorResumingService
private readonly providersCache: ProvidersCache
private readonly syncApi: SyncApi
readonly recon: ReconApi

readonly _streamHandlers: HandlersMap
private readonly _anchorValidator: AnchorValidator
Expand Down Expand Up @@ -292,6 +297,7 @@ export class Ceramic implements CeramicApi {
anchorService: modules.anchorService,
conflictResolution: conflictResolution,
indexing: localIndex,
recon: modules.recon,
})
this.syncApi = new SyncApi(
{
Expand All @@ -305,7 +311,15 @@ export class Ceramic implements CeramicApi {
)
const pinApi = this._buildPinApi()
this.repository.index.setSyncQueryApi(this.syncApi)
this.admin = new LocalAdminApi(localIndex, this.syncApi, this.nodeStatus.bind(this), pinApi)
this.recon = modules.recon

this.admin = new LocalAdminApi(
localIndex,
this.syncApi,
this.nodeStatus.bind(this),
pinApi,
this.recon
)
}

get index(): LocalIndexApi {
Expand Down Expand Up @@ -537,6 +551,10 @@ export class Ceramic implements CeramicApi {
maxQueriesPerSecond
)

const recon = config.reconUrl
? new ReconApiHTTP(config.reconUrl, config.networkName, repository, dispatcher, logger)
: null

const params: CeramicParameters = {
gateway: config.gateway,
stateStoreDirectory: config.stateStoreDirectory,
Expand All @@ -557,6 +575,7 @@ export class Ceramic implements CeramicApi {
repository,
shutdownSignal,
providersCache,
recon,
}

return [modules, params]
Expand Down
6 changes: 4 additions & 2 deletions packages/core/src/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,14 @@ export class Dispatcher {
* @param cid - Commit CID
* @param streamId - StreamID of the stream the commit belongs to, used for logging.
*/
async retrieveCommit(cid: CID | string, streamId: StreamID): Promise<any> {
async retrieveCommit(cid: CID | string, streamId?: StreamID): Promise<any> {
try {
return await this._getFromIpfs(cid)
} catch (e) {
this._logger.err(
`Error while loading commit CID ${cid.toString()} from IPFS for stream ${streamId.toString()}: ${e}`
`Error while loading commit CID ${cid.toString()} from IPFS for stream ${
streamId ? streamId.toString() : ''
}: ${e}`
)
throw e
}
Expand Down
12 changes: 9 additions & 3 deletions packages/core/src/local-admin-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
import { StreamID } from '@ceramicnetwork/streamid'
import { LocalIndexApi } from './indexing/local-index-api.js'
import { SyncApi } from './sync/sync-api.js'
import { ReconApi } from './recon.js'

type NodeStatusFn = () => Promise<NodeStatusResponse>

Expand All @@ -19,7 +20,8 @@ export class LocalAdminApi implements AdminApi {
private readonly indexApi: LocalIndexApi,
private readonly syncApi: SyncApi,
private readonly nodeStatusFn: NodeStatusFn, // TODO(CDB-2293): circular dependency back into Ceramic
private readonly pinApi: PinApi
private readonly pinApi: PinApi,
private readonly recon: ReconApi | undefined
) {}

async nodeStatus(): Promise<NodeStatusResponse> {
Expand All @@ -32,7 +34,9 @@ export class LocalAdminApi implements AdminApi {

async startIndexingModelData(modelData: Array<ModelData>): Promise<void> {
await this.indexApi.indexModels(modelData)
await this.syncApi.startModelSync(modelData.map((idx) => idx.streamID.toString()))
const ids = modelData.map((idx) => idx.streamID.toString())
await this.syncApi.startModelSync(ids)
if (this.recon) ids.forEach(this.recon.subscribe.bind(this.recon))
}

async getIndexedModels(): Promise<Array<StreamID>> {
Expand All @@ -49,10 +53,12 @@ export class LocalAdminApi implements AdminApi {
}

async stopIndexingModelData(modelData: Array<ModelData>): Promise<void> {
const ids = modelData.map((idx) => idx.streamID.toString())
await Promise.all([
this.indexApi.stopIndexingModels(modelData),
this.syncApi.stopModelSync(modelData.map((data) => data.streamID.toString())),
this.syncApi.stopModelSync(ids),
])
if (this.recon) ids.forEach(this.recon.unsubscribe.bind(this.recon))
}

get pin(): PinApi {
Expand Down
193 changes: 193 additions & 0 deletions packages/core/src/recon.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import { EventID, StreamID } from '@ceramicnetwork/streamid'
import { DiagnosticsLogger } from '@ceramicnetwork/common'
import { Repository } from './state-management/repository.js'
import * as ReconClient from 'zcgen-client'
import { Dispatcher } from './dispatcher.js'
import { ModelInstanceDocument } from '@ceramicnetwork/stream-model-instance'
import { from, repeat, timer, switchMap, tap, Subscription } from 'rxjs'
import { retry } from 'rxjs/operators'

/**
* Recon Event
*/
export interface Event {
eventId: string
}

/**
* Describes Recon Service API
*/
export interface ReconApi {
readonly networkName: string
/**
* Recon subscription, subscribe by model
*/
subscribe(model: string): Subscription
/**
* Add event to recon
*/
addEvent(event: Event): Promise<void>
/**
* Unsubscribe to subscription by model
*/
unsubscribe(model: string): void
/**
* Close and unsubscribe to all
*/
close(): void
}

/**
* Recon subscription manager, manages simple map of models to subscriptions
*/
export interface SubManager {
/**
* Add active subscription
*/
add(model: string, sub: Subscription): void
/**
* Get subscription by model
*/
get(model: string): Subscription | undefined
/**
* Unsubscribe
*/
unsubscribe(model: string): void
/**
* Unsubscribe to all known subscriptions
*/
close(): void
}

export class ReconSubManager implements SubManager {
private readonly subscriptions: Record<string, Subscription>

constructor(private readonly logger: DiagnosticsLogger) {
this.subscriptions = {}
}

add(model: string, sub: Subscription): void {
this.subscriptions[model] = sub
this.logger.verbose(`Recon: subscription for model ${model} added`)
}

get(model: string): Subscription | undefined {
return this.subscriptions[model]
}

unsubscribe(model: string): void {
const sub = this.get(model)
if (!sub) return
sub.unsubscribe()
delete this.subscriptions[model]
this.logger.verbose(`Recon: unsubscribed for model ${model}`)
}

close(): void {
Object.keys(this.subscriptions).forEach((model) => {
this.unsubscribe(model)
})
this.logger.verbose(`Recon: closing, unsubscribed to all`)
}
}

/**
* Recon API
*/
export class ReconApiHTTP implements ReconApi {
private readonly api: ReconClient.DefaultApi
private readonly subscriptions: ReconSubManager

constructor(
url: string,
readonly networkName: string,
private readonly repository: Repository,
private readonly dispatcher: Dispatcher,
private readonly logger: DiagnosticsLogger
) {
const baseServer = new ReconClient.ServerConfiguration(url, {})
const config = ReconClient.createConfiguration({ baseServer })
this.api = new ReconClient.DefaultApi(config)
this.subscriptions = new ReconSubManager(logger)
}

subscribe(model: string): Subscription {
if (this.subscriptions.get(model)) return this.subscriptions.get(model)

let offset = 0
const increaseOffset = (val: number): void => {
offset += val
}

const obv$ = from(
this.api.ceramicSubscribeSortKeySortValueGet(
'model',
model,
undefined,
undefined,
offset,
1000
)
).pipe(
tap((arr) => increaseOffset(arr.length)),
switchMap(from),
repeat({ delay: 200 }),
retry({
delay: (error, count) => {
this.logger.warn(`Recon: subscription failed for model ${model}, attempting to retry`)
// exp backoff, max 3 minutes
return timer(count > 11 ? 3 * 60 * 1000 : 2 ^ (count * 100))
},
resetOnSuccess: true,
})
)

// in future could return observable, handler added here to keep recon code together for now
const sub = obv$.subscribe(this._eventHandler)
this.subscriptions.add(model, sub)
return sub
}

unsubscribe(model: string): void {
this.subscriptions.unsubscribe(model)
}

close(): void {
this.subscriptions.close()
}

// messy here, so that recon changes are minimized for now and uses existing apis,
// model/streamids used for lots of caching, but could later be implemented w/o or recon based
async _eventHandler(event: string): Promise<void> {
const eventId = EventID.fromString(event)
const commit = await this.dispatcher.retrieveCommit(eventId.event)

let header, gcid
if (commit.proof) {
gcid = commit.id
} else if (commit.id) {
const genesis = await this.dispatcher.retrieveCommit(commit.id)
header = genesis.header
gcid = commit.id
} else {
header = commit.header
gcid = eventId.event
}

const model = header ? StreamID.fromBytes(header.model) : undefined
// assumes model instance
const streamid = new StreamID(ModelInstanceDocument.STREAM_TYPE_ID, gcid)

this.logger.verbose(`Recon: received eventID ${eventId.toString()} for streamId ${streamid}`)
await this.repository.stateManager.handleUpdate(streamid, eventId.event, model)
}

async addEvent(event: Event): Promise<void> {
try {
await this.api.ceramicEventsPost(event)
this.logger.verbose(`Recon: added event ${event.eventId}`)
} catch (err) {
this.logger.err(`Recon: failed to add event ${event.eventId}`)
}
}
}
Loading

0 comments on commit 7cf0883

Please sign in to comment.