From b160cf72d49d6d9cec51b96d421162b07abe51d8 Mon Sep 17 00:00:00 2001 From: stephhuynh18 Date: Tue, 8 Aug 2023 14:43:48 -0700 Subject: [PATCH] Reduce scope --- .../core/src/__tests__/state-manager.test.ts | 3 +- .../src/state-management/state-manager.ts | 31 +++++-------------- 2 files changed, 9 insertions(+), 25 deletions(-) diff --git a/packages/core/src/__tests__/state-manager.test.ts b/packages/core/src/__tests__/state-manager.test.ts index 7aef86751b..b607c887bd 100644 --- a/packages/core/src/__tests__/state-manager.test.ts +++ b/packages/core/src/__tests__/state-manager.test.ts @@ -1123,7 +1123,8 @@ describe('anchor', () => { expect(await anchorRequestStore.load(tile.id)).toBeNull() }) - test('Anchor failing for non tip should remove the request from the store if the tip has no requested anchor', async () => { + // TODO: CDB-2659 Make this test work + test.skip('Anchor failing for non tip should remove the request from the store if the tip has no requested anchor', async () => { // @ts-ignore anchorRequestStore is private const anchorRequestStore = ceramic.repository.stateManager.anchorRequestStore const requestAnchorSpy = jest.spyOn( diff --git a/packages/core/src/state-management/state-manager.ts b/packages/core/src/state-management/state-manager.ts index 8be9e3fc50..26d1d9a9ec 100644 --- a/packages/core/src/state-management/state-manager.ts +++ b/packages/core/src/state-management/state-manager.ts @@ -18,7 +18,7 @@ import { import { RunningState } from './running-state.js' import { CID } from 'multiformats/cid' import { catchError, concatMap, takeUntil } from 'rxjs/operators' -import { empty, Observable, Subject, Subscription, timer, lastValueFrom, merge, of } from 'rxjs' +import { EMPTY, Observable, Subject, Subscription, timer, lastValueFrom, merge, of } from 'rxjs' import { SnapshotState } from './snapshot-state.js' import { CommitID, StreamID } from '@ceramicnetwork/streamid' import { LocalIndexApi } from '../indexing/local-index-api.js' @@ -345,14 +345,6 @@ export class StateManager { const carFile = await this._buildAnchorRequestCARFile(state$.id, state$.tip) const genesisCID = state$.value.log[0].cid const genesisCommit = carFile.get(genesisCID) - - // Set the anchor status to PENDING before it is added to the anchor request store. - // We do this to prevent the stream from being removed from the anchor request store by previous anchor requests. - const next = { - ...state$.value, - anchorStatus: AnchorStatus.PENDING, - } - state$.next(next) await this._saveAnchorRequestForState(state$, genesisCommit) const anchorStatus$ = this.anchorService.requestAnchor(carFile) @@ -422,10 +414,6 @@ export class StateManager { }) } - private _tipHasProcessingAnchorRequest(state$: RunningState): boolean { - return [AnchorStatus.PROCESSING, AnchorStatus.PENDING].includes(state$.value.anchorStatus) - } - private _processAnchorResponse( state$: RunningState, anchorStatus$: Observable @@ -462,11 +450,12 @@ export class StateManager { return } case AnchorRequestStatusName.COMPLETED: { - await this._handleAnchorCommit(state$, asr.cid, asr.anchorCommit.cid, asr.witnessCar) - - if (!this._tipHasProcessingAnchorRequest(state$)) { + if (asr.cid.equals(state$.tip)) { await this.anchorRequestStore.remove(state$.id) } + + await this._handleAnchorCommit(state$, asr.cid, asr.anchorCommit.cid, asr.witnessCar) + stopSignal.next() return } @@ -478,12 +467,8 @@ export class StateManager { // if this is the anchor response for the tip update the state if (asr.cid.equals(state$.tip)) { state$.next({ ...state$.value, anchorStatus: AnchorStatus.FAILED }) - } - - if (!this._tipHasProcessingAnchorRequest(state$)) { await this.anchorRequestStore.remove(state$.id) } - // we stop the polling as this is a terminal state stopSignal.next() return @@ -495,9 +480,7 @@ export class StateManager { // If this is the tip and the node received a REPLACED response for it the node has gotten into a weird state. // Hopefully this should resolve through updates that will be received shortly or through syncing the stream. - // If this is not the tip, we will remove the stream from the anchor request store ONLY if there are no outstanding anchor requests for the stream. - const isTip = asr.cid.equals(state$.tip) - if (isTip || !this._tipHasProcessingAnchorRequest(state$)) { + if (asr.cid.equals(state$.tip)) { await this.anchorRequestStore.remove(state$.id) } @@ -517,7 +500,7 @@ export class StateManager { // TODO: This can leave a stream with AnchorStatus PENDING or PROCESSING indefinitely. // We should distinguish whether the error is transient or permanent, and either transition // to AnchorStatus FAILED or keep retrying. - return empty() + return EMPTY }) ) .subscribe()