diff --git a/libs/adapters/src/rabbitmq/configuration/rascalConfig.ts b/libs/adapters/src/rabbitmq/configuration/rascalConfig.ts index e7cda2dad77..0fe7ce6a5c8 100644 --- a/libs/adapters/src/rabbitmq/configuration/rascalConfig.ts +++ b/libs/adapters/src/rabbitmq/configuration/rascalConfig.ts @@ -134,6 +134,12 @@ export function getAllRascalConfigs( arguments: queueOptions, }, }, + [RascalQueues.XpProjection]: { + ...queueConfig, + options: { + arguments: queueOptions, + }, + }, [RascalQueues.FarcasterWorkerPolicy]: { ...queueConfig, options: { @@ -203,6 +209,21 @@ export function getAllRascalConfigs( RascalRoutingKeys.ContestProjectionContestContentUpvoted, ], }, + [RascalBindings.XpProjection]: { + source: RascalExchanges.MessageRelayer, + destination: RascalQueues.XpProjection, + destinationType: 'queue', + bindingKeys: [ + RascalRoutingKeys.XpProjectionSignUpFlowCompleted, + RascalRoutingKeys.XpProjectionCommunityCreated, + RascalRoutingKeys.XpProjectionCommunityJoined, + RascalRoutingKeys.XpProjectionThreadCreated, + RascalRoutingKeys.XpProjectionThreadUpvoted, + RascalRoutingKeys.XpProjectionCommentCreated, + RascalRoutingKeys.XpProjectionCommentUpvoted, + RascalRoutingKeys.XpProjectionUserMentioned, + ], + }, [RascalBindings.FarcasterWorkerPolicy]: { source: RascalExchanges.MessageRelayer, destination: RascalQueues.FarcasterWorkerPolicy, @@ -239,6 +260,10 @@ export function getAllRascalConfigs( queue: RascalQueues.ContestProjection, ...subscriptionConfig, }, + [RascalSubscriptions.XpProjection]: { + queue: RascalQueues.XpProjection, + ...subscriptionConfig, + }, [RascalSubscriptions.FarcasterWorkerPolicy]: { queue: RascalQueues.FarcasterWorkerPolicy, ...subscriptionConfig, diff --git a/libs/adapters/src/rabbitmq/rabbitMQConfig.ts b/libs/adapters/src/rabbitmq/rabbitMQConfig.ts index db3ac959f5c..91cd3db7106 100644 --- a/libs/adapters/src/rabbitmq/rabbitMQConfig.ts +++ b/libs/adapters/src/rabbitmq/rabbitMQConfig.ts @@ -74,6 +74,7 @@ export function getRabbitMQConfig( RascalQueues.NotificationsSettings, RascalQueues.ContestWorkerPolicy, RascalQueues.ContestProjection, + RascalQueues.XpProjection, RascalQueues.FarcasterWorkerPolicy, RascalQueues.DiscordBotPolicy, ]); @@ -83,6 +84,7 @@ export function getRabbitMQConfig( RascalBindings.NotificationsSettings, RascalBindings.ContestWorkerPolicy, RascalBindings.ContestProjection, + RascalBindings.XpProjection, RascalBindings.FarcasterWorkerPolicy, RascalBindings.DiscordBotPolicy, ]); @@ -95,6 +97,7 @@ export function getRabbitMQConfig( RascalSubscriptions.NotificationsSettings, RascalSubscriptions.ContestWorkerPolicy, RascalSubscriptions.ContestProjection, + RascalSubscriptions.XpProjection, RascalSubscriptions.FarcasterWorkerPolicy, RascalSubscriptions.DiscordBotPolicy, ]); diff --git a/libs/adapters/src/rabbitmq/types.ts b/libs/adapters/src/rabbitmq/types.ts index 66da47238a9..f3089cd0173 100644 --- a/libs/adapters/src/rabbitmq/types.ts +++ b/libs/adapters/src/rabbitmq/types.ts @@ -17,6 +17,7 @@ export enum RascalSubscriptions { NotificationsSettings = BrokerSubscriptions.NotificationsSettings, ContestWorkerPolicy = BrokerSubscriptions.ContestWorkerPolicy, ContestProjection = BrokerSubscriptions.ContestProjection, + XpProjection = BrokerSubscriptions.XpProjection, FarcasterWorkerPolicy = BrokerSubscriptions.FarcasterWorkerPolicy, } @@ -33,6 +34,7 @@ export enum RascalQueues { NotificationsSettings = 'NotificationsSettingsQueue', ContestWorkerPolicy = 'ContestWorkerPolicyQueue', ContestProjection = 'ContestProjection', + XpProjection = 'XpProjection', FarcasterWorkerPolicy = 'FarcasterWorkerPolicyQueue', } @@ -44,6 +46,7 @@ export enum RascalBindings { ChainEvent = 'ChainEventBinding', ContestWorkerPolicy = 'ContestWorkerPolicy', ContestProjection = 'ContestProjection', + XpProjection = 'XpProjection', FarcasterWorkerPolicy = 'FarcasterWorkerPolicy', } @@ -77,6 +80,15 @@ export enum RascalRoutingKeys { ContestProjectionContestContentAdded = EventNames.ContestContentAdded, ContestProjectionContestContentUpvoted = EventNames.ContestContentUpvoted, + XpProjectionSignUpFlowCompleted = EventNames.SignUpFlowCompleted, + XpProjectionCommunityCreated = EventNames.CommunityCreated, + XpProjectionCommunityJoined = EventNames.CommunityJoined, + XpProjectionThreadCreated = `${EventNames.ThreadCreated}.${RoutingKeyTags.Contest}.#`, + XpProjectionThreadUpvoted = `${EventNames.ThreadUpvoted}.${RoutingKeyTags.Contest}.#`, + XpProjectionCommentCreated = EventNames.CommentCreated, + XpProjectionCommentUpvoted = EventNames.CommentUpvoted, + XpProjectionUserMentioned = EventNames.UserMentioned, + FarcasterWorkerPolicyCastCreated = EventNames.FarcasterCastCreated, FarcasterWorkerPolicyReplyCastCreated = EventNames.FarcasterReplyCastCreated, FarcasterWorkerPolicyVoteCreated = EventNames.FarcasterVoteCreated, diff --git a/libs/core/src/ports/interfaces.ts b/libs/core/src/ports/interfaces.ts index 2cbde8dd7a4..76e36508e55 100644 --- a/libs/core/src/ports/interfaces.ts +++ b/libs/core/src/ports/interfaces.ts @@ -209,6 +209,7 @@ export enum BrokerSubscriptions { ContestWorkerPolicy = 'ContestWorkerPolicy', ContestProjection = 'ContestProjection', FarcasterWorkerPolicy = 'FarcasterWorkerPolicy', + XpProjection = 'XpProjection', } /** diff --git a/packages/commonwealth/server/workers/commonwealthConsumer/policies/chainEventCreated/chainEventCreatedPolicy.ts b/libs/model/src/policies/chainEventCreatedPolicy.ts similarity index 89% rename from packages/commonwealth/server/workers/commonwealthConsumer/policies/chainEventCreated/chainEventCreatedPolicy.ts rename to libs/model/src/policies/chainEventCreatedPolicy.ts index 6be7763e12c..f421b5321ee 100644 --- a/packages/commonwealth/server/workers/commonwealthConsumer/policies/chainEventCreated/chainEventCreatedPolicy.ts +++ b/libs/model/src/policies/chainEventCreatedPolicy.ts @@ -1,8 +1,10 @@ import { EventHandler, Policy, command, logger } from '@hicommonwealth/core'; import { EvmEventSignatures } from '@hicommonwealth/evm-protocols'; -import { Token, middleware, models } from '@hicommonwealth/model'; import { events } from '@hicommonwealth/schemas'; import { ZodUndefined } from 'zod'; +import { models } from '../database'; +import { systemActor } from '../middleware'; +import { CreateToken } from '../token/CreateToken.command'; import { handleCommunityStakeTrades } from './handleCommunityStakeTrades'; import { handleLaunchpadTrade } from './handleLaunchpadTrade'; @@ -21,8 +23,8 @@ export const processChainEventCreated: EventHandler< payload.eventSource.eventSignature === EvmEventSignatures.Launchpad.TokenLaunched ) { - await command(Token.CreateToken(), { - actor: middleware.systemActor({}), + await command(CreateToken(), { + actor: systemActor({}), payload: { chain_node_id: payload.eventSource.chainNodeId, community_id: '', // not required for system actors diff --git a/packages/commonwealth/server/workers/commonwealthConsumer/policies/chainEventCreated/handleCommunityStakeTrades.ts b/libs/model/src/policies/handleCommunityStakeTrades.ts similarity index 98% rename from packages/commonwealth/server/workers/commonwealthConsumer/policies/chainEventCreated/handleCommunityStakeTrades.ts rename to libs/model/src/policies/handleCommunityStakeTrades.ts index 521ffa08470..11a9dfd0678 100644 --- a/packages/commonwealth/server/workers/commonwealthConsumer/policies/chainEventCreated/handleCommunityStakeTrades.ts +++ b/libs/model/src/policies/handleCommunityStakeTrades.ts @@ -1,9 +1,9 @@ import { logger } from '@hicommonwealth/core'; -import { DB } from '@hicommonwealth/model'; import { chainEvents, events } from '@hicommonwealth/schemas'; import { BigNumber } from 'ethers'; import Web3 from 'web3'; import { z } from 'zod'; +import { DB } from '../models'; const log = logger(import.meta); diff --git a/packages/commonwealth/server/workers/commonwealthConsumer/policies/chainEventCreated/handleLaunchpadTrade.ts b/libs/model/src/policies/handleLaunchpadTrade.ts similarity index 97% rename from packages/commonwealth/server/workers/commonwealthConsumer/policies/chainEventCreated/handleLaunchpadTrade.ts rename to libs/model/src/policies/handleLaunchpadTrade.ts index 02627d52c43..31f70fe96fd 100644 --- a/packages/commonwealth/server/workers/commonwealthConsumer/policies/chainEventCreated/handleLaunchpadTrade.ts +++ b/libs/model/src/policies/handleLaunchpadTrade.ts @@ -1,10 +1,11 @@ import { logger } from '@hicommonwealth/core'; import { commonProtocol as cp } from '@hicommonwealth/evm-protocols'; -import { commonProtocol, models } from '@hicommonwealth/model'; import { chainEvents, events } from '@hicommonwealth/schemas'; import { BigNumber } from 'ethers'; import Web3 from 'web3'; import { z } from 'zod'; +import { models } from '../database'; +import { commonProtocol } from '../services'; const log = logger(import.meta); diff --git a/libs/model/src/policies/index.ts b/libs/model/src/policies/index.ts index 5dc647d7a7d..b7d8e07b52b 100644 --- a/libs/model/src/policies/index.ts +++ b/libs/model/src/policies/index.ts @@ -1,3 +1,6 @@ export * from './ContestWorker.policy'; export * from './DiscordBot.policy'; export * from './FarcasterWorker.policy'; +export * from './chainEventCreatedPolicy'; +export * from './handleCommunityStakeTrades'; +export * from './handleLaunchpadTrade'; diff --git a/packages/commonwealth/server.ts b/packages/commonwealth/server.ts index 3725bc1f620..b2f4e9bf482 100644 --- a/packages/commonwealth/server.ts +++ b/packages/commonwealth/server.ts @@ -23,7 +23,6 @@ import { DatabaseCleaner } from './server/util/databaseCleaner'; // handle exceptions thrown in express routes import 'express-async-errors'; -import { dispatchSDKPublishWorkflow } from './server/util/dispatchSDKPublishWorkflow'; // bootstrap adapters stats({ @@ -75,12 +74,12 @@ const start = async () => { const { main } = await import('./main'); - main(app, models, { + await main(app, models, { port: config.PORT, withLoggingMiddleware: true, withPrerender: config.APP_ENV === 'production' && !config.NO_PRERENDER, }) - .then(() => { + .then(async () => { isServiceHealthy = true; // database clean-up jobs (should be run after the API so, we don't affect start-up time // TODO: evaluate other options for maintenance jobs @@ -91,12 +90,24 @@ const start = async () => { // checking the DYNO env var ensures this only runs on one dyno if (config.APP_ENV === 'production' && process.env.DYNO === 'web.1') { + const { dispatchSDKPublishWorkflow } = await import( + './server/util/dispatchSDKPublishWorkflow' + ); dispatchSDKPublishWorkflow().catch((e) => log.error( `Failed to dispatch publishing workflow ${JSON.stringify(e)}`, ), ); } + + // bootstrap bindings when in dev mode and DEV_MODULITH is true + if (config.NODE_ENV === 'development' && config.DEV_MODULITH) { + const { bootstrapBindings, bootstrapRelayer } = await import( + './server/bindings/bootstrap' + ); + await bootstrapBindings(); + await bootstrapRelayer(); + } }) .catch((e) => log.error(e.message, e)); }; diff --git a/packages/commonwealth/server/bindings/bootstrap.ts b/packages/commonwealth/server/bindings/bootstrap.ts new file mode 100644 index 00000000000..e60ee4ccf41 --- /dev/null +++ b/packages/commonwealth/server/bindings/bootstrap.ts @@ -0,0 +1,144 @@ +import { + RabbitMQAdapter, + RascalConfigServices, + buildRetryStrategy, + getRabbitMQConfig, +} from '@hicommonwealth/adapters'; +import { + Broker, + BrokerSubscriptions, + broker, + logger, + stats, +} from '@hicommonwealth/core'; +import { + ChainEventPolicy, + Contest, + ContestWorker, + DiscordBotPolicy, + FarcasterWorker, + User, + models, +} from '@hicommonwealth/model'; +import { Client } from 'pg'; +import { config } from 'server/config'; +import { setupListener } from './pgListener'; +import { incrementNumUnrelayedEvents, relayForever } from './relayForever'; + +const log = logger(import.meta); + +function checkSubscriptionResponse( + subRes: boolean, + topic: BrokerSubscriptions, +) { + if (!subRes) { + log.fatal(`Failed to subscribe to ${topic}. Requires restart!`, undefined, { + topic, + }); + } +} + +export async function bootstrapBindings(): Promise { + let brokerInstance: Broker; + try { + const rmqAdapter = new RabbitMQAdapter( + getRabbitMQConfig( + config.BROKER.RABBITMQ_URI, + RascalConfigServices.CommonwealthService, + ), + ); + await rmqAdapter.init(); + broker({ + adapter: rmqAdapter, + }); + brokerInstance = rmqAdapter; + } catch (e) { + log.error( + 'Rascal consumer setup failed. Please check the Rascal configuration', + ); + throw e; + } + + const chainEventSubRes = await brokerInstance.subscribe( + BrokerSubscriptions.ChainEvent, + ChainEventPolicy(), + ); + checkSubscriptionResponse(chainEventSubRes, BrokerSubscriptions.ChainEvent); + + const contestWorkerSubRes = await brokerInstance.subscribe( + BrokerSubscriptions.ContestWorkerPolicy, + ContestWorker(), + buildRetryStrategy(undefined, 20_000), + { + beforeHandleEvent: (topic, event, context) => { + context.start = Date.now(); + }, + afterHandleEvent: (topic, event, context) => { + const duration = Date.now() - context.start; + const handler = `${topic}.${event.name}`; + stats().histogram(`cw.handlerExecutionTime`, duration, { handler }); + }, + }, + ); + checkSubscriptionResponse( + contestWorkerSubRes, + BrokerSubscriptions.ContestWorkerPolicy, + ); + + const contestProjectionsSubRes = await brokerInstance.subscribe( + BrokerSubscriptions.ContestProjection, + Contest.Contests(), + ); + checkSubscriptionResponse( + contestProjectionsSubRes, + BrokerSubscriptions.ContestProjection, + ); + + const xpProjectionSubRes = await brokerInstance.subscribe( + BrokerSubscriptions.XpProjection, + User.Xp(), + ); + checkSubscriptionResponse( + xpProjectionSubRes, + BrokerSubscriptions.XpProjection, + ); + + const farcasterWorkerSubRes = await brokerInstance.subscribe( + BrokerSubscriptions.FarcasterWorkerPolicy, + FarcasterWorker(), + buildRetryStrategy(undefined, 20_000), + ); + checkSubscriptionResponse( + farcasterWorkerSubRes, + BrokerSubscriptions.FarcasterWorkerPolicy, + ); + + const discordBotSubRes = await brokerInstance.subscribe( + BrokerSubscriptions.DiscordBotPolicy, + DiscordBotPolicy(), + ); + checkSubscriptionResponse( + discordBotSubRes, + BrokerSubscriptions.DiscordBotPolicy, + ); +} + +export async function bootstrapRelayer( + maxRelayIterations?: number, +): Promise { + const count = await models.Outbox.count({ + where: { relayed: false }, + }); + incrementNumUnrelayedEvents(count); + + const pgClient = await setupListener(); + + relayForever(maxRelayIterations).catch((err) => { + log.fatal( + 'Unknown fatal error requires immediate attention. Restart REQUIRED!', + err, + ); + }); + + return pgClient; +} diff --git a/packages/commonwealth/server/workers/messageRelayer/pgListener.ts b/packages/commonwealth/server/bindings/pgListener.ts similarity index 100% rename from packages/commonwealth/server/workers/messageRelayer/pgListener.ts rename to packages/commonwealth/server/bindings/pgListener.ts diff --git a/packages/commonwealth/server/workers/messageRelayer/relay.ts b/packages/commonwealth/server/bindings/relay.ts similarity index 97% rename from packages/commonwealth/server/workers/messageRelayer/relay.ts rename to packages/commonwealth/server/bindings/relay.ts index fbf54cda314..f43c57eee8b 100644 --- a/packages/commonwealth/server/workers/messageRelayer/relay.ts +++ b/packages/commonwealth/server/bindings/relay.ts @@ -8,7 +8,7 @@ import { import type { DB } from '@hicommonwealth/model'; import { QueryTypes } from 'sequelize'; import { z } from 'zod'; -import { config } from '../../config'; +import { config } from '../config'; const log = logger(import.meta); diff --git a/packages/commonwealth/server/workers/messageRelayer/relayForever.ts b/packages/commonwealth/server/bindings/relayForever.ts similarity index 97% rename from packages/commonwealth/server/workers/messageRelayer/relayForever.ts rename to packages/commonwealth/server/bindings/relayForever.ts index 536d6ea1895..26e6fef1f0a 100644 --- a/packages/commonwealth/server/workers/messageRelayer/relayForever.ts +++ b/packages/commonwealth/server/bindings/relayForever.ts @@ -1,5 +1,5 @@ import { broker, logger, stats } from '@hicommonwealth/core'; -import { config } from '../../config'; +import { config } from '../config'; import { relay } from './relay'; const INITIAL_ERROR_TIMEOUT = 2_000; diff --git a/packages/commonwealth/server/config.ts b/packages/commonwealth/server/config.ts index 71de7fae3a3..44b9cd792d3 100644 --- a/packages/commonwealth/server/config.ts +++ b/packages/commonwealth/server/config.ts @@ -25,6 +25,7 @@ const { LIBP2P_PRIVATE_KEY, DISPATCHER_APP_ID, DISPATCHER_APP_PRIVATE_KEY, + DEV_MODULITH, } = process.env; const NO_PRERENDER = _NO_PRERENDER; @@ -105,6 +106,7 @@ export const config = configure( : undefined, DISPATCHER_APP_PRIVATE_KEY, }, + DEV_MODULITH: DEV_MODULITH === 'true', }, z.object({ NO_PRERENDER: z.boolean(), @@ -187,5 +189,6 @@ export const config = configure( 'The private key of the Common Workflow Dispatcher GitHub app', ), }), + DEV_MODULITH: z.boolean(), }), ); diff --git a/packages/commonwealth/server/workers/commonwealthConsumer/commonwealthConsumer.ts b/packages/commonwealth/server/workers/commonwealthConsumer/commonwealthConsumer.ts index 1b5be79a3c6..846b4969f80 100644 --- a/packages/commonwealth/server/workers/commonwealthConsumer/commonwealthConsumer.ts +++ b/packages/commonwealth/server/workers/commonwealthConsumer/commonwealthConsumer.ts @@ -1,36 +1,17 @@ import { HotShotsStats, - RabbitMQAdapter, - RascalConfigServices, ServiceKey, - buildRetryStrategy, - getRabbitMQConfig, startHealthCheckLoop, } from '@hicommonwealth/adapters'; -import { - Broker, - BrokerSubscriptions, - broker, - handleEvent, - logger, - stats, -} from '@hicommonwealth/core'; -import { - Contest, - ContestWorker, - DiscordBotPolicy, - FarcasterWorker, -} from '@hicommonwealth/model'; +import { handleEvent, logger, stats } from '@hicommonwealth/core'; +import { ContestWorker } from '@hicommonwealth/model'; import { EventNames } from '@hicommonwealth/schemas'; +import { bootstrapBindings } from 'server/bindings/bootstrap'; import { fileURLToPath } from 'url'; -import { config } from '../../config'; -import { ChainEventPolicy } from './policies/chainEventCreated/chainEventCreatedPolicy'; const log = logger(import.meta); -stats({ - adapter: HotShotsStats(), -}); +stats({ adapter: HotShotsStats() }); let isServiceHealthy = false; @@ -44,17 +25,6 @@ startHealthCheckLoop({ }, }); -function checkSubscriptionResponse( - subRes: boolean, - topic: BrokerSubscriptions, -) { - if (!subRes) { - log.fatal(`Failed to subscribe to ${topic}. Requires restart!`, undefined, { - topic, - }); - } -} - // CommonwealthConsumer is a server that consumes (and processes) RabbitMQ messages // from external apps or services (like the Snapshot Service). It exists because we // don't want to modify the Commonwealth database directly from external apps/services. @@ -63,82 +33,6 @@ function checkSubscriptionResponse( // properly handling/processing those messages. Using the script is rarely necessary in // local development. -export async function setupCommonwealthConsumer(): Promise { - let brokerInstance: Broker; - try { - const rmqAdapter = new RabbitMQAdapter( - getRabbitMQConfig( - config.BROKER.RABBITMQ_URI, - RascalConfigServices.CommonwealthService, - ), - ); - await rmqAdapter.init(); - broker({ - adapter: rmqAdapter, - }); - brokerInstance = rmqAdapter; - } catch (e) { - log.error( - 'Rascal consumer setup failed. Please check the Rascal configuration', - ); - throw e; - } - - const chainEventSubRes = await brokerInstance.subscribe( - BrokerSubscriptions.ChainEvent, - ChainEventPolicy(), - ); - checkSubscriptionResponse(chainEventSubRes, BrokerSubscriptions.ChainEvent); - - const contestWorkerSubRes = await brokerInstance.subscribe( - BrokerSubscriptions.ContestWorkerPolicy, - ContestWorker(), - buildRetryStrategy(undefined, 20_000), - { - beforeHandleEvent: (topic, event, context) => { - context.start = Date.now(); - }, - afterHandleEvent: (topic, event, context) => { - const duration = Date.now() - context.start; - const handler = `${topic}.${event.name}`; - stats().histogram(`cw.handlerExecutionTime`, duration, { handler }); - }, - }, - ); - checkSubscriptionResponse( - contestWorkerSubRes, - BrokerSubscriptions.ContestWorkerPolicy, - ); - - const contestProjectionsSubRes = await brokerInstance.subscribe( - BrokerSubscriptions.ContestProjection, - Contest.Contests(), - ); - checkSubscriptionResponse( - contestProjectionsSubRes, - BrokerSubscriptions.ContestProjection, - ); - - const farcasterWorkerSubRes = await brokerInstance.subscribe( - BrokerSubscriptions.FarcasterWorkerPolicy, - FarcasterWorker(), - buildRetryStrategy(undefined, 20_000), - ); - checkSubscriptionResponse( - farcasterWorkerSubRes, - BrokerSubscriptions.FarcasterWorkerPolicy, - ); - - const discordBotSubRes = await brokerInstance.subscribe( - BrokerSubscriptions.DiscordBotPolicy, - DiscordBotPolicy(), - ); - checkSubscriptionResponse( - discordBotSubRes, - BrokerSubscriptions.DiscordBotPolicy, - ); -} - function startRolloverLoop() { log.info('Starting rollover loop'); @@ -162,7 +56,7 @@ function startRolloverLoop() { async function main() { try { log.info('Starting main consumer'); - await setupCommonwealthConsumer(); + await bootstrapBindings(); isServiceHealthy = true; startRolloverLoop(); } catch (error) { diff --git a/packages/commonwealth/server/workers/messageRelayer/messageRelayer.ts b/packages/commonwealth/server/workers/messageRelayer/messageRelayer.ts index 759ea1c41ff..0215f55a939 100644 --- a/packages/commonwealth/server/workers/messageRelayer/messageRelayer.ts +++ b/packages/commonwealth/server/workers/messageRelayer/messageRelayer.ts @@ -6,9 +6,8 @@ import { startHealthCheckLoop, } from '@hicommonwealth/adapters'; import { broker, logger } from '@hicommonwealth/core'; +import { bootstrapRelayer } from 'server/bindings/bootstrap'; import { config } from '../../config'; -import { setupListener } from './pgListener'; -import { incrementNumUnrelayedEvents, relayForever } from './relayForever'; const log = logger(import.meta); @@ -26,43 +25,16 @@ startHealthCheckLoop({ }); export async function startMessageRelayer(maxRelayIterations?: number) { - const { models } = await import('@hicommonwealth/model'); - - try { - const rmqAdapter = new RabbitMQAdapter( - getRabbitMQConfig( - config.BROKER.RABBITMQ_URI, - RascalConfigServices.CommonwealthService, - ), - ); - await rmqAdapter.init(); - broker({ - adapter: rmqAdapter, - }); - } catch (e) { - log.error( - 'Rascal consumer setup failed. Please check the Rascal configuration', - ); - throw e; - } - - const count = await models.Outbox.count({ - where: { - relayed: false, - }, - }); - incrementNumUnrelayedEvents(count); - - const pgClient = await setupListener(); - + const rmqAdapter = new RabbitMQAdapter( + getRabbitMQConfig( + config.BROKER.RABBITMQ_URI, + RascalConfigServices.CommonwealthService, + ), + ); + await rmqAdapter.init(); + broker({ adapter: rmqAdapter }); + const pgClient = await bootstrapRelayer(maxRelayIterations); isServiceHealthy = true; - relayForever(maxRelayIterations).catch((err) => { - log.fatal( - 'Unknown error fatal requires immediate attention. Restart REQUIRED!', - err, - ); - }); - return pgClient; } diff --git a/packages/commonwealth/test/integration/commonwealthConsumer/chainEventCreatedPolicy.spec.ts b/packages/commonwealth/test/integration/commonwealthConsumer/chainEventCreatedPolicy.spec.ts index b873d824cac..e3c7e096119 100644 --- a/packages/commonwealth/test/integration/commonwealthConsumer/chainEventCreatedPolicy.spec.ts +++ b/packages/commonwealth/test/integration/commonwealthConsumer/chainEventCreatedPolicy.spec.ts @@ -1,11 +1,9 @@ import { EventContext, dispose } from '@hicommonwealth/core'; -import { DB, tester } from '@hicommonwealth/model'; +import { DB, processChainEventCreated, tester } from '@hicommonwealth/model'; import { BalanceType } from '@hicommonwealth/shared'; import { expect } from 'chai'; import { BigNumber } from 'ethers'; import { afterAll, afterEach, beforeAll, describe, test } from 'vitest'; -// eslint-disable-next-line max-len -import { processChainEventCreated } from '../../../server/workers/commonwealthConsumer/policies/chainEventCreated/chainEventCreatedPolicy'; // These are all values for a real txn on the Ethereum Sepolia Testnet const transactionHash = diff --git a/packages/commonwealth/test/integration/messageRelayer/messageRelayer.spec.ts b/packages/commonwealth/test/integration/messageRelayer/messageRelayer.spec.ts index cd21234219c..f7bd1d8964a 100644 --- a/packages/commonwealth/test/integration/messageRelayer/messageRelayer.spec.ts +++ b/packages/commonwealth/test/integration/messageRelayer/messageRelayer.spec.ts @@ -3,12 +3,12 @@ import { models } from '@hicommonwealth/model'; import { EventNames } from '@hicommonwealth/schemas'; import { delay } from '@hicommonwealth/shared'; import { expect } from 'chai'; -import { afterEach, describe, test } from 'vitest'; -import { startMessageRelayer } from '../../../server/workers/messageRelayer/messageRelayer'; import { numUnrelayedEvents, resetNumUnrelayedEvents, -} from '../../../server/workers/messageRelayer/relayForever'; +} from 'server/bindings/relayForever'; +import { afterEach, describe, test } from 'vitest'; +import { startMessageRelayer } from '../../../server/workers/messageRelayer/messageRelayer'; import { testOutboxEvents } from './util'; describe('messageRelayer', { timeout: 20_000 }, () => { diff --git a/packages/commonwealth/test/integration/messageRelayer/pgListener.spec.ts b/packages/commonwealth/test/integration/messageRelayer/pgListener.spec.ts index 2492f923e81..bd0139141b6 100644 --- a/packages/commonwealth/test/integration/messageRelayer/pgListener.spec.ts +++ b/packages/commonwealth/test/integration/messageRelayer/pgListener.spec.ts @@ -2,12 +2,12 @@ import { models } from '@hicommonwealth/model'; import { delay } from '@hicommonwealth/shared'; import { expect } from 'chai'; import { Client } from 'pg'; -import { afterAll, afterEach, beforeAll, describe, test } from 'vitest'; -import { setupListener } from '../../../server/workers/messageRelayer/pgListener'; +import { setupListener } from 'server/bindings/pgListener'; import { numUnrelayedEvents, resetNumUnrelayedEvents, -} from '../../../server/workers/messageRelayer/relayForever'; +} from 'server/bindings/relayForever'; +import { afterAll, afterEach, beforeAll, describe, test } from 'vitest'; describe.skip('pgListener', { timeout: 10_000 }, () => { let client: Client; diff --git a/packages/commonwealth/test/integration/messageRelayer/relay.spec.ts b/packages/commonwealth/test/integration/messageRelayer/relay.spec.ts index e14cd88c41b..b04c64d11f1 100644 --- a/packages/commonwealth/test/integration/messageRelayer/relay.spec.ts +++ b/packages/commonwealth/test/integration/messageRelayer/relay.spec.ts @@ -1,8 +1,8 @@ import { Broker, successfulInMemoryBroker } from '@hicommonwealth/core'; import { models } from '@hicommonwealth/model'; import { expect } from 'chai'; +import { relay } from 'server/bindings/relay'; import { afterEach, describe, test } from 'vitest'; -import { relay } from '../../../server/workers/messageRelayer/relay'; import { testOutboxEvents } from './util'; describe('relay', () => {