diff --git a/jest.config.js b/jest.config.js index e746ad68..11cdbdd2 100644 --- a/jest.config.js +++ b/jest.config.js @@ -1,10 +1,12 @@ const { join } = require('node:path'); -/* +/** * For a detailed explanation regarding each configuration property and type check, visit: * https://jestjs.io/docs/configuration + * @type {import('jest').Config} */ module.exports = { + bail: true, collectCoverage: true, coverageDirectory: 'coverage', coverageProvider: 'v8', @@ -13,4 +15,5 @@ module.exports = { restoreMocks: true, setupFiles: [join(__dirname, './jest.setup.js')], testEnvironment: 'jest-environment-node', + verbose: true, }; diff --git a/package.json b/package.json index 0232ccea..10cbb909 100644 --- a/package.json +++ b/package.json @@ -19,8 +19,8 @@ "eslint:fix": "pnpm run eslint:check --fix", "prettier:check": "prettier --check \"./**/*.{js,ts,md,json}\"", "prettier:fix": "prettier --write \"./**/*.{js,ts,md,json}\"", - "test": "jest --verbose --runInBand --bail --detectOpenHandles --silent", - "test:e2e": "jest --selectProjects e2e --runInBand", + "test": "jest", + "test:e2e": "jest --runInBand", "tsc": "tsc --project .", "docker:build": "docker build -t api3/airseekerv2:latest -f docker/Dockerfile .", "docker:run": "docker run -it --rm api3/airseekerv2:latest", diff --git a/src/signed-data-store/signed-data-store.test.ts b/src/signed-data-store/signed-data-store.test.ts index 086a0253..124ada26 100644 --- a/src/signed-data-store/signed-data-store.test.ts +++ b/src/signed-data-store/signed-data-store.test.ts @@ -1,7 +1,7 @@ import { BigNumber, ethers } from 'ethers'; import * as localDataStore from './signed-data-store'; import { verifySignedDataIntegrity } from './signed-data-store'; -import { generateRandomBytes32, signData } from '../../test/utils/evm'; +import { generateRandomBytes32, signData } from '../../test/utils'; import type { SignedData } from '../types'; describe('datastore', () => { diff --git a/src/state/state.ts b/src/state/state.ts index 23be35d6..27a61d9f 100644 --- a/src/state/state.ts +++ b/src/state/state.ts @@ -1,6 +1,6 @@ import type { Config } from '../config/schema'; -interface State { +export interface State { config: Config; dataFetcherInterval?: NodeJS.Timeout; } diff --git a/src/update-feeds/temporary-contract-mock.ts b/src/update-feeds/temporary-contract-mock.ts new file mode 100644 index 00000000..da727539 --- /dev/null +++ b/src/update-feeds/temporary-contract-mock.ts @@ -0,0 +1,18 @@ +// NOTE: The function is currently returning static data, because the contract is not yet finalized, but we mark it as +// async in advance. +// +// eslint-disable-next-line @typescript-eslint/require-await +export const getStaticActiveDapis = async (_offset: number, _limit: number) => { + return { + totalCount: 1, + dapiNames: ['MOCK_FEED'], + dataFeedIds: ['0xebba8507d616ed80766292d200a3598fdba656d9938cecc392765d4a284a69a4'], + updateParameters: [{ deviationThresholdInPercentage: 0.5, deviationReference: 0.5, heartbeatInterval: 100 }], + // NOTE: We will need to decode this from the contract, because it will store the template IDs as encoded bytes. + dataFeedTemplateIds: [['0xcc35bd1800c06c12856a87311dd95bfcbb3add875844021d59a929d79f3c99bd']], + signedApiUrls: [['http://localhost:8080']], + airnodeAddresses: ['0xbF3137b0a7574563a23a8fC8badC6537F98197CC'], + }; +}; + +export type ActiveDapisBatch = Awaited>; diff --git a/src/update-feeds/update-feeds.test.ts b/src/update-feeds/update-feeds.test.ts new file mode 100644 index 00000000..49d7a145 --- /dev/null +++ b/src/update-feeds/update-feeds.test.ts @@ -0,0 +1,192 @@ +import { runUpdateFeed, startUpdateFeedsLoops } from './update-feeds'; +import * as contractMockModule from './temporary-contract-mock'; +import * as stateModule from '../state'; +import { logger } from '../logger'; +import { allowPartial } from '../../test/utils'; +import type { Chain } from '../config/schema'; + +describe(startUpdateFeedsLoops.name, () => { + it('starts staggered update loops for a chain', async () => { + jest.spyOn(stateModule, 'getState').mockReturnValue( + allowPartial({ + config: { + chains: { + '123': { + dataFeedUpdateInterval: 1, // Have just 1 second update interval to make the test run quicker. + providers: { + 'first-provider': { url: 'first-provider-url' }, + 'second-provider': { url: 'second-provider-url' }, + }, + }, + }, + }, + }) + ); + const intervalCalls = [] as number[]; + jest.spyOn(global, 'setInterval').mockImplementation((() => { + intervalCalls.push(Date.now()); + }) as any); + jest.spyOn(logger, 'debug'); + + await startUpdateFeedsLoops(); + + // Expect the intervals to be called with the correct stagger time. + expect(setInterval).toHaveBeenCalledTimes(2); + expect(intervalCalls[1]! - intervalCalls[0]!).toBeGreaterThanOrEqual(500); + + // Expect the logs to be called with the correct context. + expect(logger.debug).toHaveBeenCalledTimes(3); + expect(logger.debug).toHaveBeenCalledWith('Starting update loops for chain', { + chainId: '123', + staggerTime: 500, + providerNames: ['first-provider', 'second-provider'], + }); + expect(logger.debug).toHaveBeenCalledWith('Starting update feed loop', { + chainId: '123', + providerName: 'first-provider', + }); + expect(logger.debug).toHaveBeenCalledWith('Starting update feed loop', { + chainId: '123', + providerName: 'second-provider', + }); + }); + + it('starts the update loops in parallel for each chain', async () => { + jest.spyOn(stateModule, 'getState').mockReturnValue( + allowPartial({ + config: { + chains: { + '123': { + dataFeedUpdateInterval: 1, + providers: { + 'first-provider': { url: 'first-provider-url' }, + }, + }, + '456': { + dataFeedUpdateInterval: 1, + providers: { + 'another-provider': { url: 'another-provider-url' }, + }, + }, + }, + }, + }) + ); + const intervalCalls = [] as number[]; + jest.spyOn(global, 'setInterval').mockImplementation((() => { + intervalCalls.push(Date.now()); + }) as any); + jest.spyOn(logger, 'debug'); + + await startUpdateFeedsLoops(); + + // Expect the intervals to be called with the correct stagger time. + expect(setInterval).toHaveBeenCalledTimes(2); + expect(intervalCalls[1]! - intervalCalls[0]!).toBeLessThan(50); // Ensures that the loops are run in parallel. + + // Expect the logs to be called with the correct context. + expect(logger.debug).toHaveBeenCalledTimes(4); + expect(logger.debug).toHaveBeenCalledWith('Starting update loops for chain', { + chainId: '123', + staggerTime: 1000, + providerNames: ['first-provider'], + }); + expect(logger.debug).toHaveBeenCalledWith('Starting update loops for chain', { + chainId: '456', + staggerTime: 1000, + providerNames: ['another-provider'], + }); + expect(logger.debug).toHaveBeenCalledWith('Starting update feed loop', { + chainId: '123', + providerName: 'first-provider', + }); + expect(logger.debug).toHaveBeenCalledWith('Starting update feed loop', { + chainId: '456', + providerName: 'another-provider', + }); + }); +}); + +describe(runUpdateFeed.name, () => { + it('aborts when fetching first dAPIs batch fails', async () => { + jest.spyOn(contractMockModule, 'getStaticActiveDapis').mockRejectedValue(new Error('provider-error')); + jest.spyOn(logger, 'error'); + + await runUpdateFeed( + 'provider-name', + allowPartial({ dataFeedBatchSize: 2, dataFeedUpdateInterval: 10 }), + '123' + ); + + // Expect the logs to be called with the correct context. + expect(logger.error).toHaveBeenCalledTimes(1); + expect(logger.error).toHaveBeenCalledWith('Failed to get first active dAPIs batch', new Error('provider-error'), { + chainId: '123', + providerName: 'provider-name', + }); + }); + + it('fetches other batches in a staggered way and logs errors', async () => { + // Prepare the mocked contract so it returns three batches (of size 1) of dAPIs and the second batch fails to load. + const mockedFeed = await contractMockModule.getStaticActiveDapis(0, 0); + const firstBatch = { ...mockedFeed, totalCount: 3 }; + const thirdBatch = { ...mockedFeed, totalCount: 3 }; + const getStaticActiveDapisCalls = [] as number[]; + // eslint-disable-next-line @typescript-eslint/require-await + jest.spyOn(contractMockModule, 'getStaticActiveDapis').mockImplementationOnce(async () => { + getStaticActiveDapisCalls.push(Date.now()); + return firstBatch; + }); + // eslint-disable-next-line @typescript-eslint/require-await + jest.spyOn(contractMockModule, 'getStaticActiveDapis').mockImplementationOnce(async () => { + getStaticActiveDapisCalls.push(Date.now()); + throw new Error('provider-error'); + }); + // eslint-disable-next-line @typescript-eslint/require-await + jest.spyOn(contractMockModule, 'getStaticActiveDapis').mockImplementationOnce(async () => { + getStaticActiveDapisCalls.push(Date.now()); + return thirdBatch; + }); + jest.spyOn(logger, 'debug'); + jest.spyOn(logger, 'error'); + + await runUpdateFeed( + 'provider-name', + allowPartial({ dataFeedBatchSize: 1, dataFeedUpdateInterval: 1.5 }), + '123' + ); + + // Expect the contract to fetch the batches to be called with the correct stagger time. + expect(getStaticActiveDapisCalls).toHaveLength(3); + expect(getStaticActiveDapisCalls[1]! - getStaticActiveDapisCalls[0]!).toBeGreaterThanOrEqual(500); + expect(getStaticActiveDapisCalls[2]! - getStaticActiveDapisCalls[1]!).toBeGreaterThanOrEqual(500); + + // Expect the logs to be called with the correct context. + expect(logger.error).toHaveBeenCalledTimes(1); + expect(logger.error).toHaveBeenCalledWith('Failed to get active dAPIs batch', new Error('provider-error'), { + chainId: '123', + providerName: 'provider-name', + }); + expect(logger.debug).toHaveBeenCalledTimes(4); + expect(logger.debug).toHaveBeenCalledWith('Fetching first batch of dAPIs batches', { + chainId: '123', + providerName: 'provider-name', + }); + expect(logger.debug).toHaveBeenCalledWith('Fetching batches of active dAPIs', { + batchesCount: 3, + staggerTime: 500, + chainId: '123', + providerName: 'provider-name', + }); + expect(logger.debug).toHaveBeenCalledWith('Fetching batch of active dAPIs', { + batchIndex: 1, + chainId: '123', + providerName: 'provider-name', + }); + expect(logger.debug).toHaveBeenCalledWith('Fetching batch of active dAPIs', { + batchIndex: 2, + chainId: '123', + providerName: 'provider-name', + }); + }); +}); diff --git a/src/update-feeds/update-feeds.ts b/src/update-feeds/update-feeds.ts index 88c66be4..38219df9 100644 --- a/src/update-feeds/update-feeds.ts +++ b/src/update-feeds/update-feeds.ts @@ -1,75 +1,86 @@ import { range, size } from 'lodash'; -import { go, goSync } from '@api3/promise-utils'; -import { ethers } from 'ethers'; +import { go } from '@api3/promise-utils'; +import { type ActiveDapisBatch, getStaticActiveDapis } from './temporary-contract-mock'; import { getState } from '../state'; import { isFulfilled, sleep } from '../utils'; import { logger } from '../logger'; -import type { LogContext } from '@api3/commons'; import type { Chain } from '../config/schema'; -export const startUpdateFeedsLoops = () => { +export const startUpdateFeedsLoops = async () => { const state = getState(); const { config: { chains }, } = state; // Start update loops for each chain in parallel. - // eslint-disable-next-line unicorn/no-array-for-each - Object.entries(chains).forEach(async ([chainId, chain]) => { - const { dataFeedUpdateInterval, providers } = chain; + await Promise.all( + Object.entries(chains).map(async ([chainId, chain]) => { + const { dataFeedUpdateInterval, providers } = chain; - // Calculate the stagger time for each provider on the same chain to maximize transaction throughput and update - // frequency. - const staggerTime = (dataFeedUpdateInterval / size(providers)) * 1000; - logger.debug(`Starting update loops for chain`, { chainId, staggerTime, providerNames: Object.keys(providers) }); + // Calculate the stagger time for each provider on the same chain to maximize transaction throughput and update + // frequency. + const staggerTime = (dataFeedUpdateInterval / size(providers)) * 1000; + logger.debug(`Starting update loops for chain`, { chainId, staggerTime, providerNames: Object.keys(providers) }); - for (const providerName of Object.keys(providers)) { - logger.debug(`Starting update feed loop`, { chainId, providerName }); - setInterval(async () => runUpdateFeed(providerName, chain, chainId), dataFeedUpdateInterval * 1000); + for (const providerName of Object.keys(providers)) { + logger.debug(`Starting update feed loop`, { chainId, providerName }); + setInterval(async () => runUpdateFeed(providerName, chain, chainId), dataFeedUpdateInterval * 1000); - await sleep(staggerTime); - } - }); + await sleep(staggerTime); + } + }) + ); }; export const runUpdateFeed = async (providerName: string, chain: Chain, chainId: string) => { const { dataFeedBatchSize, dataFeedUpdateInterval } = chain; + // TODO: Consider adding a start timestamp (as ID) to the logs to identify batches from this runUpdateFeed tick. const baseLogContext = { chainId, providerName }; logger.debug(`Fetching first batch of dAPIs batches`, baseLogContext); + const firstBatchStartTime = Date.now(); const goFirstBatch = await go(async () => getActiveDapiBatch(chain)); if (!goFirstBatch.success) { - logger.error(`Failed to get active dAPIs batch`, goFirstBatch.error, baseLogContext); + logger.error(`Failed to get first active dAPIs batch`, goFirstBatch.error, baseLogContext); return; } + const processFirstBatchPromise = processBatch(goFirstBatch.data); - // Fetch the rest of the batches in parallel in a staggered way. + // Calculate the stagger time between the rest of the batches. const batchesCount = goFirstBatch.data.totalCount / dataFeedBatchSize; - // TODO: It's not a good idea to have this run periodically in a setInterval because the update feed loops will - // overlap. And just this data fetching part will take up all of the interval time. - const staggerTime = batchesCount <= 2 ? 0 : (dataFeedUpdateInterval / (batchesCount - 1)) * 1000; + const staggerTime = batchesCount <= 1 ? 0 : (dataFeedUpdateInterval / batchesCount) * 1000; + + // Wait the remaining stagger time required after fetching the first batch. + const firstBatchDuration = Date.now() - firstBatchStartTime; + await sleep(Math.max(0, staggerTime - firstBatchDuration)); + + // Fetch the rest of the batches in parallel in a staggered way. logger.debug('Fetching batches of active dAPIs', { batchesCount, staggerTime, ...baseLogContext }); const otherBatches = await Promise.allSettled( range(1, batchesCount).map(async (batchIndex) => { - await sleep(batchIndex * staggerTime); + await sleep((batchIndex - 1) * staggerTime); + logger.debug(`Fetching batch of active dAPIs`, { batchIndex, ...baseLogContext }); return getActiveDapiBatch(chain, batchIndex * dataFeedBatchSize); }) ); for (const batch of otherBatches.filter((batch) => !isFulfilled(batch))) { logger.error(`Failed to get active dAPIs batch`, (batch as PromiseRejectedResult).reason, baseLogContext); } - const batches = [ - goFirstBatch.data, - ...otherBatches - .filter((batch) => isFulfilled(batch)) - .map((batch) => (batch as PromiseFulfilledResult).value), - ]; + const processOtherBatchesPromises = otherBatches + .filter((result) => isFulfilled(result)) + .map(async (result) => processBatch((result as PromiseFulfilledResult).value)); + + // Wait for all the batches to be processed. + // + // TODO: Consider returning some information (success/error) and log some statistics (e.g. how many dAPIs were + // updated, etc...). + await Promise.all([processFirstBatchPromise, ...processOtherBatchesPromises]); +}; - // Verify the batches returned by the contract. - const _validBatches = batches.filter((batch, batchIndex) => - verifyBatch(batch, { chainId, providerName, batchIndex }) - ); +// eslint-disable-next-line lodash/prefer-noop +export const processBatch = async (_batch: ActiveDapisBatch) => { + // TODO: Implement. }; export const getActiveDapiBatch = async (chain: Chain, offset = 0) => { @@ -77,64 +88,3 @@ export const getActiveDapiBatch = async (chain: Chain, offset = 0) => { return getStaticActiveDapis(offset, dataFeedBatchSize); }; - -// NOTE: Temporary type of the data returned by the contract. -type ActiveDapisBatch = Awaited>; - -// NOTE: The function is currently returning static data, because the contract is not yet finalized. -// eslint-disable-next-line @typescript-eslint/require-await -export const getStaticActiveDapis = async (_offset: number, _limit: number) => { - return { - totalCount: 1, - dapiNames: ['MOCK_FEED'], - dataFeedIds: ['0xebba8507d616ed80766292d200a3598fdba656d9938cecc392765d4a284a69a4'], - updateParameters: [{ deviationThresholdInPercentage: 0.5, deviationReference: 0.5, heartbeatInterval: 100 }], - // NOTE: We will need to decode this from the contract, because it will store the template IDs as encoded bytes. - dataFeedTemplateIds: [['0xcc35bd1800c06c12856a87311dd95bfcbb3add875844021d59a929d79f3c99bd']], - signedApiUrls: [['http://localhost:8080']], - airnodeAddresses: ['0xbF3137b0a7574563a23a8fC8badC6537F98197CC'], - }; -}; - -export function deriveBeaconId(airnodeAddress: string, templateId: string) { - return goSync(() => ethers.utils.solidityKeccak256(['address', 'bytes32'], [airnodeAddress, templateId])).data; -} - -export function deriveBeaconSetId(beaconIds: string[]) { - return goSync(() => ethers.utils.keccak256(ethers.utils.defaultAbiCoder.encode(['bytes32[]'], [beaconIds]))).data; -} - -export const verifyBatch = (batch: ActiveDapisBatch, logContext: LogContext) => { - const { dapiNames, dataFeedIds, updateParameters, dataFeedTemplateIds, signedApiUrls, airnodeAddresses } = batch; - if ( - dapiNames.length !== dataFeedIds.length || - dapiNames.length !== updateParameters.length || - dapiNames.length !== dataFeedTemplateIds.length || - dapiNames.length !== signedApiUrls.length || - dapiNames.length !== airnodeAddresses.length - ) { - logger.error(`Invalid active dAPIs batch length`, logContext); - return false; - } - - for (const [index, dataFeedId] of dataFeedIds.entries()) { - const templateIds = dataFeedTemplateIds[index]!; - const airnodeAddress = airnodeAddresses[index]!; - - if (templateIds.length === 1) { - const derivedDataFeedId = deriveBeaconId(airnodeAddress, templateIds[0]!); - if (dataFeedId !== derivedDataFeedId) { - logger.error(`Invalid beacon ID`, { dataFeedId, derivedDataFeedId, ...logContext }); - return false; - } - } else { - const derivedBeaconSetId = deriveBeaconSetId(templateIds); - if (dataFeedId !== derivedBeaconSetId) { - logger.error(`Invalid beacon set ID`, { dataFeedId, derivedBeaconSetId, ...logContext }); - return false; - } - } - } - - return true; -}; diff --git a/src/utils.ts b/src/utils.ts index fe68ad59..e7bbee67 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,5 +1,16 @@ +import { goSync } from '@api3/promise-utils'; +import { ethers } from 'ethers'; + export const sleep = async (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); export function isFulfilled(item: PromiseSettledResult): item is PromiseFulfilledResult { return item.status === 'fulfilled'; } + +export function deriveBeaconId(airnodeAddress: string, templateId: string) { + return goSync(() => ethers.utils.solidityKeccak256(['address', 'bytes32'], [airnodeAddress, templateId])).data; +} + +export function deriveBeaconSetId(beaconIds: string[]) { + return goSync(() => ethers.utils.keccak256(ethers.utils.defaultAbiCoder.encode(['bytes32[]'], [beaconIds]))).data; +} diff --git a/test/utils.ts b/test/utils.ts new file mode 100644 index 00000000..7dfd1e4b --- /dev/null +++ b/test/utils.ts @@ -0,0 +1,22 @@ +import { ethers } from 'ethers'; + +export const signData = async (signer: ethers.Signer, templateId: string, timestamp: string, data: string) => + signer.signMessage( + ethers.utils.arrayify( + ethers.utils.solidityKeccak256(['bytes32', 'uint256', 'bytes'], [templateId, timestamp, data]) + ) + ); + +export const generateRandomBytes32 = () => ethers.utils.hexlify(ethers.utils.randomBytes(32)); + +type DeepPartial = T extends object + ? { + [P in keyof T]?: DeepPartial; + } + : T; + +/** + * A helper functions which accepts a deeply partial object and casts it to a given (non-partial) type. This makes it + * convenient to create a mocked data object with properties that are only used for the given test. + */ +export const allowPartial = (obj: DeepPartial): T => obj as T; diff --git a/test/utils/evm.ts b/test/utils/evm.ts deleted file mode 100644 index c7f2d6a2..00000000 --- a/test/utils/evm.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { ethers } from 'ethers'; - -export const signData = async (signer: ethers.Signer, templateId: string, timestamp: string, data: string) => - signer.signMessage( - ethers.utils.arrayify( - ethers.utils.solidityKeccak256(['bytes32', 'uint256', 'bytes'], [templateId, timestamp, data]) - ) - ); - -export const generateRandomBytes32 = () => ethers.utils.hexlify(ethers.utils.randomBytes(32));