Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add round buffer. Closes #3 #342

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft

Add round buffer. Closes #3 #342

wants to merge 4 commits into from

Conversation

juliangruber
Copy link
Member

@juliangruber juliangruber commented Aug 31, 2024

Closes #3.

This PR implements self healing by buffering current round's measurements on disk.

  • Write tests
  • Write round index to first line of buffer file
  • Handle when the recovered round isn't the right one (wrong index)
  • Append or create the buffer file (see test failure)

@bajtos
Copy link
Member

bajtos commented Sep 2, 2024

I am not opposed to this solution, but I'd like to propose an alternative to consider:

  • At spark-evaluate startup, make an RPC API call to fetch recent MeasurementsAdded events all the way up to the beginning of the current round. See how dry-run does that:

    async function fetchMeasurementsAddedFromChain (contractAddress, roundIndex) {
    const { ieContract, provider } = await createMeridianContract(contractAddress)
    console.log('Fetching MeasurementsAdded events from the ledger')
    const blockNumber = await provider.getBlockNumber()
    // console.log('Current block number', blockNumber)
    // TODO: filter only measurements for the given `roundIndex`
    // See https://github.com/Meridian-IE/impact-evaluator/issues/57
    // max look-back period allowed by Glif.io is 2000 blocks (approx 16h40m)
    // SPARK round is ~60 minutes, i.e. ~120 blocks
    const rawEvents = await ieContract.queryFilter('MeasurementsAdded', blockNumber - 1800, 'latest')
    /** @type {Array<{ cid: string, roundIndex: bigint, sender: string }>} */
    const events = rawEvents
    .filter(isEventLog)
    .map(({ args: [cid, roundIndex, sender] }) => ({ cid, roundIndex, sender }))
    // console.log('events', events)
    const prev = roundIndex - 1n
    const prevFound = events.some(e => e.roundIndex === prev)
    if (!prevFound) {
    console.error(
    'Incomplete round data. No measurements from the previous round %s were found.',
    prev.toString()
    )
    process.exit(1)
    }
    const next = roundIndex + 1n
    const nextFound = events.some(e => e.roundIndex === next)
    if (!nextFound) {
    console.error(
    'Incomplete round data. No measurements from the next round %s were found.',
    next.toString()
    )
    process.exit(1)
    }
    return events.filter(e => e.roundIndex === roundIndex).map(e => e.cid)
    }

  • For each event found, call the preprocess step.

Upsides:

  • The service will not need any persistent volume.
  • We don't have to write & maintain complex code to manage a ringbuffer.

Downsides:

  • When there is an outage of the RPC API, or if the eth_getLogs call takes long time to complete, then spark-evaluat will need a lot of time before it starts & can handle events. (This can be mitigated, though. E.g. the startup does not have to wait until we get all events.)
  • On restart, we have to re-fetch measurements from older round events.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: No status
Development

Successfully merging this pull request may close these issues.

Self-healing
2 participants