diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 0f9a82d5..e26768d0 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -41,8 +41,8 @@ jobs: - name: Lint run: pnpm run prettier:check && pnpm run eslint:check - name: Test - run: pnpm run test - + run: pnpm run test --bail + test-e2e: name: Test e2e runs-on: ubuntu-latest @@ -62,7 +62,7 @@ jobs: - name: Start Hardhat run: pnpm dev:eth-node& - name: Test E2E - run: pnpm test:e2e + run: pnpm test:e2e --bail required-checks-passed: name: All required checks passed diff --git a/README.md b/README.md index 76e3134c..b87270db 100644 --- a/README.md +++ b/README.md @@ -191,7 +191,7 @@ The number of minutes used to calculate the scaling multiplier if a pending tran The maximum scaling multiplier used when the pending transaction lag exceeds the `scalingWindow`. -#### `deviationThresholdCoefficient` +#### `deviationThresholdCoefficient` _(optional)_ The global coefficient applied to all deviation checks. Used to differentiate alternate deployments. For example: @@ -199,6 +199,16 @@ The global coefficient applied to all deviation checks. Used to differentiate al "deviationThresholdCoefficient": 1, ``` +Defaults to `1`. + +#### `dataFeedUpdateInterval` + +The interval specifying how often to run the data feed update loop. In seconds. + +#### `dataFeedBatchSize` + +The batch size of active dAPIs that are to be fetched in a single RPC call. + ## Docker ### Build diff --git a/config/airseeker.example.json b/config/airseeker.example.json index 8fb7bece..322657f0 100644 --- a/config/airseeker.example.json +++ b/config/airseeker.example.json @@ -30,7 +30,9 @@ "sanitizationPercentile": 80, "scalingWindow": 2, "maxScalingMultiplier": 2 - } + }, + "dataFeedUpdateInterval": 60, + "dataFeedBatchSize": 10 } }, "deviationThresholdCoefficient": 1, diff --git a/jest-e2e.config.js b/jest-e2e.config.js index 1d48d668..f641ca21 100644 --- a/jest-e2e.config.js +++ b/jest-e2e.config.js @@ -1,7 +1,19 @@ -const config = require('./jest.config'); +const { join } = require('node:path'); +/** + * For a detailed explanation regarding each configuration property and type check, visit: + * https://jestjs.io/docs/configuration + * @type {import('jest').Config} + */ module.exports = { - ...config, - displayName: 'e2e', + collectCoverage: false, // It doesn't make sense to collect coverage for e2e tests because they target high level features and interaction with other services. + maxWorkers: 1, // We don't want to run tests in parallel because they might interfere with each other. This option is the same as --runInBand. See: https://stackoverflow.com/a/46489246. + + preset: 'ts-jest', + restoreMocks: true, + setupFiles: [join(__dirname, './jest.setup.js')], + testEnvironment: 'jest-environment-node', testMatch: ['**/?(*.)+(feature).[t]s?(x)'], + testPathIgnorePatterns: ['/.build', '/dist/', '/build/'], + verbose: true, }; diff --git a/jest-unit.config.js b/jest-unit.config.js index 4fb90338..774a7b34 100644 --- a/jest-unit.config.js +++ b/jest-unit.config.js @@ -1,7 +1,21 @@ -const config = require('./jest.config'); +const { join } = require('node:path'); +/** + * For a detailed explanation regarding each configuration property and type check, visit: + * https://jestjs.io/docs/configuration + * @type {import('jest').Config} + */ module.exports = { - ...config, - displayName: 'unit', + collectCoverage: true, + coverageDirectory: 'coverage', + coveragePathIgnorePatterns: ['node_modules', '/typechain-types'], // Coverage is collected for all files imported by the tests. We want to exclude files generated by Typechain. + coverageProvider: 'v8', + + preset: 'ts-jest', + restoreMocks: true, + setupFiles: [join(__dirname, './jest.setup.js')], + testEnvironment: 'jest-environment-node', testMatch: ['**/?(*.)+(spec|test).[t]s?(x)'], + testPathIgnorePatterns: ['/.build', '/dist/', '/build/'], + verbose: true, }; diff --git a/jest.config.js b/jest.config.js deleted file mode 100644 index 3c44659b..00000000 --- a/jest.config.js +++ /dev/null @@ -1,17 +0,0 @@ -const { join } = require('node:path'); - -/* - * For a detailed explanation regarding each configuration property and type check, visit: - * https://jestjs.io/docs/configuration - */ -module.exports = { - projects: ['/jest-e2e.config.js', '/jest-unit.config.js'], - collectCoverage: true, - coverageDirectory: 'coverage', - coverageProvider: 'v8', - modulePathIgnorePatterns: ['/.build', '/dist/', '/build/'], - preset: 'ts-jest', - restoreMocks: true, - setupFiles: [join(__dirname, './jest.setup.js')], - testEnvironment: 'jest-environment-node', -}; diff --git a/package.json b/package.json index 9b9ccdf7..ee238ec0 100644 --- a/package.json +++ b/package.json @@ -19,8 +19,8 @@ "eslint:fix": "pnpm run eslint:check --fix", "prettier:check": "prettier --check \"./**/*.{js,ts,md,json}\"", "prettier:fix": "prettier --write \"./**/*.{js,ts,md,json}\"", - "test": "jest --selectProjects unit --verbose --runInBand --bail --detectOpenHandles --silent", - "test:e2e": "jest --selectProjects e2e --runInBand", + "test": "jest --config=jest-unit.config.js", + "test:e2e": "jest ---config=jest-e2e.config.js", "tsc": "tsc --project .", "docker:build": "docker build -t api3/airseekerv2:latest -f docker/Dockerfile .", "docker:run": "docker run -it --rm api3/airseekerv2:latest", diff --git a/src/config/schema.test.ts b/src/config/schema.test.ts index 58d1226a..eb19a27e 100644 --- a/src/config/schema.test.ts +++ b/src/config/schema.test.ts @@ -59,6 +59,8 @@ describe('chains schema', () => { activeDapiNames: [], }, gasSettings, + dataFeedBatchSize: 10, + dataFeedUpdateInterval: 60, }, }; @@ -83,6 +85,8 @@ describe('chains schema', () => { activeDapiNames: [], }, gasSettings, + dataFeedBatchSize: 10, + dataFeedUpdateInterval: 60, }, }; @@ -107,6 +111,8 @@ describe('chains schema', () => { activeDapiNames: [], }, gasSettings, + dataFeedBatchSize: 10, + dataFeedUpdateInterval: 60, }, }; @@ -138,6 +144,8 @@ describe('chains schema', () => { activeDapiNames: [], }, gasSettings, + dataFeedBatchSize: 10, + dataFeedUpdateInterval: 60, }, }; @@ -180,6 +188,8 @@ describe('chains schema', () => { activeDapiNames: [], }, gasSettings, + dataFeedBatchSize: 10, + dataFeedUpdateInterval: 60, }, }; diff --git a/src/config/schema.ts b/src/config/schema.ts index f9f34bcb..dccfdbeb 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -66,6 +66,8 @@ export const optionalChainSchema = z __Temporary__DapiDataRegistry: temporaryDapiDataRegistrySchema, contracts: optionalContractsSchema.optional(), gasSettings: gasSettingsSchema, + dataFeedUpdateInterval: z.number().positive(), + dataFeedBatchSize: z.number().positive(), }) .strict(); @@ -133,8 +135,8 @@ export const configSchema = z .object({ sponsorWalletMnemonic: z.string().refine((mnemonic) => ethers.utils.isValidMnemonic(mnemonic), 'Invalid mnemonic'), chains: chainsSchema, - deviationThresholdCoefficient: z.number().optional().default(1), - fetchInterval: z.number().positive(), + fetchInterval: z.number().positive(), // TODO: Rename to signedDataFetchInterval + deviationThresholdCoefficient: z.number().positive().optional().default(1), // Explicitly agreed to make this optional. See: https://github.com/api3dao/airseeker-v2/pull/20#issuecomment-1750856113. }) .strict(); diff --git a/src/signed-data-store/signed-data-store.test.ts b/src/signed-data-store/signed-data-store.test.ts index 0766a6ab..e021e336 100644 --- a/src/signed-data-store/signed-data-store.test.ts +++ b/src/signed-data-store/signed-data-store.test.ts @@ -1,10 +1,10 @@ import { BigNumber, ethers } from 'ethers'; -import { generateRandomBytes32, signData } from '../../test/utils/evm'; +import { generateRandomBytes32, signData } from '../../test/utils'; import type { SignedData } from '../types'; -import * as localDataStore from './signed-data-store'; import { verifySignedDataIntegrity } from './signed-data-store'; +import * as localDataStore from './signed-data-store'; describe('datastore', () => { let testDataPoint: SignedData; diff --git a/src/update-feeds/index.ts b/src/update-feeds/index.ts new file mode 100644 index 00000000..5bbe0c4e --- /dev/null +++ b/src/update-feeds/index.ts @@ -0,0 +1 @@ +export * from './update-feeds'; diff --git a/src/update-feeds/temporary-contract-mock.ts b/src/update-feeds/temporary-contract-mock.ts new file mode 100644 index 00000000..da727539 --- /dev/null +++ b/src/update-feeds/temporary-contract-mock.ts @@ -0,0 +1,18 @@ +// NOTE: The function is currently returning static data, because the contract is not yet finalized, but we mark it as +// async in advance. +// +// eslint-disable-next-line @typescript-eslint/require-await +export const getStaticActiveDapis = async (_offset: number, _limit: number) => { + return { + totalCount: 1, + dapiNames: ['MOCK_FEED'], + dataFeedIds: ['0xebba8507d616ed80766292d200a3598fdba656d9938cecc392765d4a284a69a4'], + updateParameters: [{ deviationThresholdInPercentage: 0.5, deviationReference: 0.5, heartbeatInterval: 100 }], + // NOTE: We will need to decode this from the contract, because it will store the template IDs as encoded bytes. + dataFeedTemplateIds: [['0xcc35bd1800c06c12856a87311dd95bfcbb3add875844021d59a929d79f3c99bd']], + signedApiUrls: [['http://localhost:8080']], + airnodeAddresses: ['0xbF3137b0a7574563a23a8fC8badC6537F98197CC'], + }; +}; + +export type ActiveDapisBatch = Awaited>; diff --git a/src/update-feeds/update-feeds.test.ts b/src/update-feeds/update-feeds.test.ts new file mode 100644 index 00000000..36c049f0 --- /dev/null +++ b/src/update-feeds/update-feeds.test.ts @@ -0,0 +1,188 @@ +import { allowPartial } from '../../test/utils'; +import type { Chain } from '../config/schema'; +import { logger } from '../logger'; +import * as stateModule from '../state'; +import * as utilsModule from '../utils'; + +import * as contractMockModule from './temporary-contract-mock'; +import { runUpdateFeed, startUpdateFeedLoops } from './update-feeds'; + +describe(startUpdateFeedLoops.name, () => { + it('starts staggered update loops for a chain', async () => { + jest.spyOn(stateModule, 'getState').mockReturnValue( + allowPartial({ + config: { + chains: { + '123': { + dataFeedUpdateInterval: 0.1, // Have just 100 ms update interval to make the test run quicker. + providers: { + 'first-provider': { url: 'first-provider-url' }, + 'second-provider': { url: 'second-provider-url' }, + }, + }, + }, + }, + }) + ); + const intervalCalls = [] as number[]; + jest.spyOn(global, 'setInterval').mockImplementation((() => { + intervalCalls.push(Date.now()); + }) as any); + jest.spyOn(logger, 'debug'); + + await startUpdateFeedLoops(); + + // Expect the intervals to be called with the correct stagger time. + expect(setInterval).toHaveBeenCalledTimes(2); + expect(intervalCalls[1]! - intervalCalls[0]!).toBeGreaterThanOrEqual(50); + + // Expect the logs to be called with the correct context. + expect(logger.debug).toHaveBeenCalledTimes(3); + expect(logger.debug).toHaveBeenCalledWith('Starting update loops for chain', { + chainId: '123', + staggerTime: 50, + providerNames: ['first-provider', 'second-provider'], + }); + expect(logger.debug).toHaveBeenCalledWith('Starting update feed loop', { + chainId: '123', + providerName: 'first-provider', + }); + expect(logger.debug).toHaveBeenCalledWith('Starting update feed loop', { + chainId: '123', + providerName: 'second-provider', + }); + }); + + it('starts the update loops in parallel for each chain', async () => { + jest.spyOn(stateModule, 'getState').mockReturnValue( + allowPartial({ + config: { + chains: { + '123': { + dataFeedUpdateInterval: 0.1, + providers: { + 'first-provider': { url: 'first-provider-url' }, + }, + }, + '456': { + dataFeedUpdateInterval: 0.1, + providers: { + 'another-provider': { url: 'another-provider-url' }, + }, + }, + }, + }, + }) + ); + const intervalCalls = [] as number[]; + jest.spyOn(global, 'setInterval').mockImplementation((() => { + intervalCalls.push(Date.now()); + }) as any); + jest.spyOn(logger, 'debug'); + + await startUpdateFeedLoops(); + + // Expect the intervals to be called with the correct stagger time. + expect(setInterval).toHaveBeenCalledTimes(2); + expect(intervalCalls[1]! - intervalCalls[0]!).toBeLessThan(50); // Ensures that the loops are run in parallel. + + // Expect the logs to be called with the correct context. + expect(logger.debug).toHaveBeenCalledTimes(4); + expect(logger.debug).toHaveBeenCalledWith('Starting update loops for chain', { + chainId: '123', + staggerTime: 100, + providerNames: ['first-provider'], + }); + expect(logger.debug).toHaveBeenCalledWith('Starting update loops for chain', { + chainId: '456', + staggerTime: 100, + providerNames: ['another-provider'], + }); + expect(logger.debug).toHaveBeenCalledWith('Starting update feed loop', { + chainId: '123', + providerName: 'first-provider', + }); + expect(logger.debug).toHaveBeenCalledWith('Starting update feed loop', { + chainId: '456', + providerName: 'another-provider', + }); + }); +}); + +describe(runUpdateFeed.name, () => { + it('aborts when fetching first dAPIs batch fails', async () => { + jest.spyOn(contractMockModule, 'getStaticActiveDapis').mockRejectedValue(new Error('provider-error')); + jest.spyOn(logger, 'error'); + + await runUpdateFeed( + 'provider-name', + allowPartial({ dataFeedBatchSize: 2, dataFeedUpdateInterval: 10 }), + '123' + ); + + // Expect the logs to be called with the correct context. + expect(logger.error).toHaveBeenCalledTimes(1); + expect(logger.error).toHaveBeenCalledWith('Failed to get first active dAPIs batch', new Error('provider-error'), { + chainId: '123', + providerName: 'provider-name', + }); + }); + + it('fetches other batches in a staggered way and logs errors', async () => { + // Prepare the mocked contract so it returns three batches (of size 1) of dAPIs and the second batch fails to load. + const mockedFeed = await contractMockModule.getStaticActiveDapis(0, 0); + const firstBatch = { ...mockedFeed, totalCount: 3 }; + const thirdBatch = { ...mockedFeed, totalCount: 3 }; + const sleepCalls = [] as number[]; + const originalSleep = utilsModule.sleep; + jest.spyOn(utilsModule, 'sleep').mockImplementation(async (ms) => { + sleepCalls.push(ms); + return originalSleep(ms); + }); + jest.spyOn(contractMockModule, 'getStaticActiveDapis').mockResolvedValueOnce(firstBatch); + jest.spyOn(contractMockModule, 'getStaticActiveDapis').mockRejectedValueOnce(new Error('provider-error')); + jest.spyOn(contractMockModule, 'getStaticActiveDapis').mockResolvedValueOnce(thirdBatch); + jest.spyOn(logger, 'debug'); + jest.spyOn(logger, 'error'); + + await runUpdateFeed( + 'provider-name', + allowPartial({ dataFeedBatchSize: 1, dataFeedUpdateInterval: 0.15 }), + '123' + ); + + // Expect the contract to fetch the batches to be called with the correct stagger time. + expect(utilsModule.sleep).toHaveBeenCalledTimes(3); + expect(sleepCalls[0]).toBeGreaterThanOrEqual(40); // Reserving 10s as the buffer for computing stagger time. + expect(sleepCalls[1]).toBeGreaterThanOrEqual(0); + expect(sleepCalls[2]).toBe(49.999_999_999_999_99); // Stagger time is actually 150 / 3 = 50, but there is an rounding error. + + // Expect the logs to be called with the correct context. + expect(logger.error).toHaveBeenCalledTimes(1); + expect(logger.error).toHaveBeenCalledWith('Failed to get active dAPIs batch', new Error('provider-error'), { + chainId: '123', + providerName: 'provider-name', + }); + expect(logger.debug).toHaveBeenCalledTimes(4); + expect(logger.debug).toHaveBeenCalledWith('Fetching first batch of dAPIs batches', { + chainId: '123', + providerName: 'provider-name', + }); + expect(logger.debug).toHaveBeenCalledWith('Fetching batches of active dAPIs', { + batchesCount: 3, + staggerTime: 49.999_999_999_999_99, + chainId: '123', + providerName: 'provider-name', + }); + expect(logger.debug).toHaveBeenCalledWith('Fetching batch of active dAPIs', { + batchIndex: 1, + chainId: '123', + providerName: 'provider-name', + }); + expect(logger.debug).toHaveBeenCalledWith('Fetching batch of active dAPIs', { + batchIndex: 2, + chainId: '123', + providerName: 'provider-name', + }); + }); +}); diff --git a/src/update-feeds/update-feeds.ts b/src/update-feeds/update-feeds.ts new file mode 100644 index 00000000..ae90a93a --- /dev/null +++ b/src/update-feeds/update-feeds.ts @@ -0,0 +1,91 @@ +import { go } from '@api3/promise-utils'; +import { range, size } from 'lodash'; + +import type { Chain } from '../config/schema'; +import { logger } from '../logger'; +import { getState } from '../state'; +import { isFulfilled, sleep } from '../utils'; + +import { type ActiveDapisBatch, getStaticActiveDapis } from './temporary-contract-mock'; + +export const startUpdateFeedLoops = async () => { + const state = getState(); + const { + config: { chains }, + } = state; + + // Start update loops for each chain in parallel. + await Promise.all( + Object.entries(chains).map(async ([chainId, chain]) => { + const { dataFeedUpdateInterval, providers } = chain; + + // Calculate the stagger time for each provider on the same chain to maximize transaction throughput and update + // frequency. + const staggerTime = (dataFeedUpdateInterval / size(providers)) * 1000; + logger.debug(`Starting update loops for chain`, { chainId, staggerTime, providerNames: Object.keys(providers) }); + + for (const providerName of Object.keys(providers)) { + logger.debug(`Starting update feed loop`, { chainId, providerName }); + setInterval(async () => runUpdateFeed(providerName, chain, chainId), dataFeedUpdateInterval * 1000); + + await sleep(staggerTime); + } + }) + ); +}; + +export const runUpdateFeed = async (providerName: string, chain: Chain, chainId: string) => { + const { dataFeedBatchSize, dataFeedUpdateInterval } = chain; + // TODO: Consider adding a start timestamp (as ID) to the logs to identify batches from this runUpdateFeed tick. + const baseLogContext = { chainId, providerName }; + + logger.debug(`Fetching first batch of dAPIs batches`, baseLogContext); + const firstBatchStartTime = Date.now(); + const goFirstBatch = await go(async () => getActiveDapiBatch(chain)); + if (!goFirstBatch.success) { + logger.error(`Failed to get first active dAPIs batch`, goFirstBatch.error, baseLogContext); + return; + } + const processFirstBatchPromise = processBatch(goFirstBatch.data); + + // Calculate the stagger time between the rest of the batches. + const batchesCount = goFirstBatch.data.totalCount / dataFeedBatchSize; + const staggerTime = batchesCount <= 1 ? 0 : (dataFeedUpdateInterval / batchesCount) * 1000; + + // Wait the remaining stagger time required after fetching the first batch. + const firstBatchDuration = Date.now() - firstBatchStartTime; + await sleep(Math.max(0, staggerTime - firstBatchDuration)); + + // Fetch the rest of the batches in parallel in a staggered way. + logger.debug('Fetching batches of active dAPIs', { batchesCount, staggerTime, ...baseLogContext }); + const otherBatches = await Promise.allSettled( + range(1, batchesCount).map(async (batchIndex) => { + await sleep((batchIndex - 1) * staggerTime); + + logger.debug(`Fetching batch of active dAPIs`, { batchIndex, ...baseLogContext }); + return getActiveDapiBatch(chain, batchIndex * dataFeedBatchSize); + }) + ); + for (const batch of otherBatches.filter((batch) => !isFulfilled(batch))) { + logger.error(`Failed to get active dAPIs batch`, (batch as PromiseRejectedResult).reason, baseLogContext); + } + const processOtherBatchesPromises = otherBatches + .filter((result) => isFulfilled(result)) + .map(async (result) => processBatch((result as PromiseFulfilledResult).value)); + + // Wait for all the batches to be processed. + // + // TODO: Consider returning some information (success/error) and log some statistics (e.g. how many dAPIs were + // updated, etc...). + await Promise.all([processFirstBatchPromise, ...processOtherBatchesPromises]); +}; + +export const processBatch = async (_batch: ActiveDapisBatch) => { + // TODO: Implement. +}; + +export const getActiveDapiBatch = async (chain: Chain, offset = 0) => { + const { dataFeedBatchSize } = chain; + + return getStaticActiveDapis(offset, dataFeedBatchSize); +}; diff --git a/src/utils.ts b/src/utils.ts new file mode 100644 index 00000000..e7bbee67 --- /dev/null +++ b/src/utils.ts @@ -0,0 +1,16 @@ +import { goSync } from '@api3/promise-utils'; +import { ethers } from 'ethers'; + +export const sleep = async (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + +export function isFulfilled(item: PromiseSettledResult): item is PromiseFulfilledResult { + return item.status === 'fulfilled'; +} + +export function deriveBeaconId(airnodeAddress: string, templateId: string) { + return goSync(() => ethers.utils.solidityKeccak256(['address', 'bytes32'], [airnodeAddress, templateId])).data; +} + +export function deriveBeaconSetId(beaconIds: string[]) { + return goSync(() => ethers.utils.keccak256(ethers.utils.defaultAbiCoder.encode(['bytes32[]'], [beaconIds]))).data; +} diff --git a/test/fixtures/mock-config.ts b/test/fixtures/mock-config.ts index 6b44fd65..e78f7f3c 100644 --- a/test/fixtures/mock-config.ts +++ b/test/fixtures/mock-config.ts @@ -46,6 +46,8 @@ export const generateTestConfig = (): Config => ({ maxScalingMultiplier: 2, scalingWindow: 5, }, + dataFeedBatchSize: 10, + dataFeedUpdateInterval: 60, }, }, fetchInterval: 10, diff --git a/test/utils.ts b/test/utils.ts new file mode 100644 index 00000000..7dfd1e4b --- /dev/null +++ b/test/utils.ts @@ -0,0 +1,22 @@ +import { ethers } from 'ethers'; + +export const signData = async (signer: ethers.Signer, templateId: string, timestamp: string, data: string) => + signer.signMessage( + ethers.utils.arrayify( + ethers.utils.solidityKeccak256(['bytes32', 'uint256', 'bytes'], [templateId, timestamp, data]) + ) + ); + +export const generateRandomBytes32 = () => ethers.utils.hexlify(ethers.utils.randomBytes(32)); + +type DeepPartial = T extends object + ? { + [P in keyof T]?: DeepPartial; + } + : T; + +/** + * A helper functions which accepts a deeply partial object and casts it to a given (non-partial) type. This makes it + * convenient to create a mocked data object with properties that are only used for the given test. + */ +export const allowPartial = (obj: DeepPartial): T => obj as T; diff --git a/test/utils/evm.ts b/test/utils/evm.ts deleted file mode 100644 index c7f2d6a2..00000000 --- a/test/utils/evm.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { ethers } from 'ethers'; - -export const signData = async (signer: ethers.Signer, templateId: string, timestamp: string, data: string) => - signer.signMessage( - ethers.utils.arrayify( - ethers.utils.solidityKeccak256(['bytes32', 'uint256', 'bytes'], [templateId, timestamp, data]) - ) - ); - -export const generateRandomBytes32 = () => ethers.utils.hexlify(ethers.utils.randomBytes(32));