Skip to content

Commit

Permalink
Reduce scope
Browse files Browse the repository at this point in the history
  • Loading branch information
stephhuynh18 committed Aug 8, 2023
1 parent 7ca2da4 commit b160cf7
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 25 deletions.
3 changes: 2 additions & 1 deletion packages/core/src/__tests__/state-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,8 @@ describe('anchor', () => {
expect(await anchorRequestStore.load(tile.id)).toBeNull()
})

test('Anchor failing for non tip should remove the request from the store if the tip has no requested anchor', async () => {
// TODO: CDB-2659 Make this test work
test.skip('Anchor failing for non tip should remove the request from the store if the tip has no requested anchor', async () => {
// @ts-ignore anchorRequestStore is private
const anchorRequestStore = ceramic.repository.stateManager.anchorRequestStore
const requestAnchorSpy = jest.spyOn(
Expand Down
31 changes: 7 additions & 24 deletions packages/core/src/state-management/state-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
import { RunningState } from './running-state.js'
import { CID } from 'multiformats/cid'
import { catchError, concatMap, takeUntil } from 'rxjs/operators'
import { empty, Observable, Subject, Subscription, timer, lastValueFrom, merge, of } from 'rxjs'
import { EMPTY, Observable, Subject, Subscription, timer, lastValueFrom, merge, of } from 'rxjs'
import { SnapshotState } from './snapshot-state.js'
import { CommitID, StreamID } from '@ceramicnetwork/streamid'
import { LocalIndexApi } from '../indexing/local-index-api.js'
Expand Down Expand Up @@ -345,14 +345,6 @@ export class StateManager {
const carFile = await this._buildAnchorRequestCARFile(state$.id, state$.tip)
const genesisCID = state$.value.log[0].cid
const genesisCommit = carFile.get(genesisCID)

// Set the anchor status to PENDING before it is added to the anchor request store.
// We do this to prevent the stream from being removed from the anchor request store by previous anchor requests.
const next = {
...state$.value,
anchorStatus: AnchorStatus.PENDING,
}
state$.next(next)
await this._saveAnchorRequestForState(state$, genesisCommit)

const anchorStatus$ = this.anchorService.requestAnchor(carFile)
Expand Down Expand Up @@ -422,10 +414,6 @@ export class StateManager {
})
}

private _tipHasProcessingAnchorRequest(state$: RunningState): boolean {
return [AnchorStatus.PROCESSING, AnchorStatus.PENDING].includes(state$.value.anchorStatus)
}

private _processAnchorResponse(
state$: RunningState,
anchorStatus$: Observable<CASResponse>
Expand Down Expand Up @@ -462,11 +450,12 @@ export class StateManager {
return
}
case AnchorRequestStatusName.COMPLETED: {
await this._handleAnchorCommit(state$, asr.cid, asr.anchorCommit.cid, asr.witnessCar)

if (!this._tipHasProcessingAnchorRequest(state$)) {
if (asr.cid.equals(state$.tip)) {
await this.anchorRequestStore.remove(state$.id)
}

await this._handleAnchorCommit(state$, asr.cid, asr.anchorCommit.cid, asr.witnessCar)

stopSignal.next()
return
}
Expand All @@ -478,12 +467,8 @@ export class StateManager {
// if this is the anchor response for the tip update the state
if (asr.cid.equals(state$.tip)) {
state$.next({ ...state$.value, anchorStatus: AnchorStatus.FAILED })
}

if (!this._tipHasProcessingAnchorRequest(state$)) {
await this.anchorRequestStore.remove(state$.id)
}

// we stop the polling as this is a terminal state
stopSignal.next()
return
Expand All @@ -495,9 +480,7 @@ export class StateManager {

// If this is the tip and the node received a REPLACED response for it the node has gotten into a weird state.
// Hopefully this should resolve through updates that will be received shortly or through syncing the stream.
// If this is not the tip, we will remove the stream from the anchor request store ONLY if there are no outstanding anchor requests for the stream.
const isTip = asr.cid.equals(state$.tip)
if (isTip || !this._tipHasProcessingAnchorRequest(state$)) {
if (asr.cid.equals(state$.tip)) {
await this.anchorRequestStore.remove(state$.id)
}

Expand All @@ -517,7 +500,7 @@ export class StateManager {
// TODO: This can leave a stream with AnchorStatus PENDING or PROCESSING indefinitely.
// We should distinguish whether the error is transient or permanent, and either transition
// to AnchorStatus FAILED or keep retrying.
return empty()
return EMPTY
})
)
.subscribe()
Expand Down

0 comments on commit b160cf7

Please sign in to comment.