Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Binds Xp projector, refactors binding utils to make them reusable when running locally in dev mode #10190

Merged
merged 6 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions libs/adapters/src/rabbitmq/configuration/rascalConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ export function getAllRascalConfigs(
arguments: queueOptions,
},
},
[RascalQueues.XpProjection]: {
...queueConfig,
options: {
arguments: queueOptions,
},
},
[RascalQueues.FarcasterWorkerPolicy]: {
...queueConfig,
options: {
Expand Down Expand Up @@ -203,6 +209,21 @@ export function getAllRascalConfigs(
RascalRoutingKeys.ContestProjectionContestContentUpvoted,
],
},
[RascalBindings.XpProjection]: {
source: RascalExchanges.MessageRelayer,
destination: RascalQueues.XpProjection,
destinationType: 'queue',
bindingKeys: [
RascalRoutingKeys.XpProjectionSignUpFlowCompleted,
RascalRoutingKeys.XpProjectionCommunityCreated,
RascalRoutingKeys.XpProjectionCommunityJoined,
RascalRoutingKeys.XpProjectionThreadCreated,
RascalRoutingKeys.XpProjectionThreadUpvoted,
RascalRoutingKeys.XpProjectionCommentCreated,
RascalRoutingKeys.XpProjectionCommentUpvoted,
RascalRoutingKeys.XpProjectionUserMentioned,
],
},
[RascalBindings.FarcasterWorkerPolicy]: {
source: RascalExchanges.MessageRelayer,
destination: RascalQueues.FarcasterWorkerPolicy,
Expand Down Expand Up @@ -239,6 +260,10 @@ export function getAllRascalConfigs(
queue: RascalQueues.ContestProjection,
...subscriptionConfig,
},
[RascalSubscriptions.XpProjection]: {
queue: RascalQueues.XpProjection,
...subscriptionConfig,
},
[RascalSubscriptions.FarcasterWorkerPolicy]: {
queue: RascalQueues.FarcasterWorkerPolicy,
...subscriptionConfig,
Expand Down
3 changes: 3 additions & 0 deletions libs/adapters/src/rabbitmq/rabbitMQConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export function getRabbitMQConfig(
RascalQueues.NotificationsSettings,
RascalQueues.ContestWorkerPolicy,
RascalQueues.ContestProjection,
RascalQueues.XpProjection,
RascalQueues.FarcasterWorkerPolicy,
RascalQueues.DiscordBotPolicy,
]);
Expand All @@ -83,6 +84,7 @@ export function getRabbitMQConfig(
RascalBindings.NotificationsSettings,
RascalBindings.ContestWorkerPolicy,
RascalBindings.ContestProjection,
RascalBindings.XpProjection,
RascalBindings.FarcasterWorkerPolicy,
RascalBindings.DiscordBotPolicy,
]);
Expand All @@ -95,6 +97,7 @@ export function getRabbitMQConfig(
RascalSubscriptions.NotificationsSettings,
RascalSubscriptions.ContestWorkerPolicy,
RascalSubscriptions.ContestProjection,
RascalSubscriptions.XpProjection,
RascalSubscriptions.FarcasterWorkerPolicy,
RascalSubscriptions.DiscordBotPolicy,
]);
Expand Down
12 changes: 12 additions & 0 deletions libs/adapters/src/rabbitmq/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export enum RascalSubscriptions {
NotificationsSettings = BrokerSubscriptions.NotificationsSettings,
ContestWorkerPolicy = BrokerSubscriptions.ContestWorkerPolicy,
ContestProjection = BrokerSubscriptions.ContestProjection,
XpProjection = BrokerSubscriptions.XpProjection,
FarcasterWorkerPolicy = BrokerSubscriptions.FarcasterWorkerPolicy,
}

Expand All @@ -33,6 +34,7 @@ export enum RascalQueues {
NotificationsSettings = 'NotificationsSettingsQueue',
ContestWorkerPolicy = 'ContestWorkerPolicyQueue',
ContestProjection = 'ContestProjection',
XpProjection = 'XpProjection',
FarcasterWorkerPolicy = 'FarcasterWorkerPolicyQueue',
}

Expand All @@ -44,6 +46,7 @@ export enum RascalBindings {
ChainEvent = 'ChainEventBinding',
ContestWorkerPolicy = 'ContestWorkerPolicy',
ContestProjection = 'ContestProjection',
XpProjection = 'XpProjection',
FarcasterWorkerPolicy = 'FarcasterWorkerPolicy',
}

Expand Down Expand Up @@ -77,6 +80,15 @@ export enum RascalRoutingKeys {
ContestProjectionContestContentAdded = EventNames.ContestContentAdded,
ContestProjectionContestContentUpvoted = EventNames.ContestContentUpvoted,

XpProjectionSignUpFlowCompleted = EventNames.SignUpFlowCompleted,
XpProjectionCommunityCreated = EventNames.CommunityCreated,
XpProjectionCommunityJoined = EventNames.CommunityJoined,
XpProjectionThreadCreated = `${EventNames.ThreadCreated}.${RoutingKeyTags.Contest}.#`,
XpProjectionThreadUpvoted = `${EventNames.ThreadUpvoted}.${RoutingKeyTags.Contest}.#`,
XpProjectionCommentCreated = EventNames.CommentCreated,
XpProjectionCommentUpvoted = EventNames.CommentUpvoted,
XpProjectionUserMentioned = EventNames.UserMentioned,

FarcasterWorkerPolicyCastCreated = EventNames.FarcasterCastCreated,
FarcasterWorkerPolicyReplyCastCreated = EventNames.FarcasterReplyCastCreated,
FarcasterWorkerPolicyVoteCreated = EventNames.FarcasterVoteCreated,
Expand Down
1 change: 1 addition & 0 deletions libs/core/src/ports/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ export enum BrokerSubscriptions {
ContestWorkerPolicy = 'ContestWorkerPolicy',
ContestProjection = 'ContestProjection',
FarcasterWorkerPolicy = 'FarcasterWorkerPolicy',
XpProjection = 'XpProjection',
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { EventHandler, Policy, command, logger } from '@hicommonwealth/core';
import { EvmEventSignatures } from '@hicommonwealth/evm-protocols';
import { Token, middleware, models } from '@hicommonwealth/model';
import { events } from '@hicommonwealth/schemas';
import { ZodUndefined } from 'zod';
import { models } from '../database';
import { systemActor } from '../middleware';
import { CreateToken } from '../token/CreateToken.command';
import { handleCommunityStakeTrades } from './handleCommunityStakeTrades';
import { handleLaunchpadTrade } from './handleLaunchpadTrade';

Expand All @@ -21,8 +23,8 @@ export const processChainEventCreated: EventHandler<
payload.eventSource.eventSignature ===
EvmEventSignatures.Launchpad.TokenLaunched
) {
await command(Token.CreateToken(), {
actor: middleware.systemActor({}),
await command(CreateToken(), {
actor: systemActor({}),
payload: {
chain_node_id: payload.eventSource.chainNodeId,
community_id: '', // not required for system actors
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { logger } from '@hicommonwealth/core';
import { DB } from '@hicommonwealth/model';
import { chainEvents, events } from '@hicommonwealth/schemas';
import { BigNumber } from 'ethers';
import Web3 from 'web3';
import { z } from 'zod';
import { DB } from '../models';

const log = logger(import.meta);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { logger } from '@hicommonwealth/core';
import { commonProtocol as cp } from '@hicommonwealth/evm-protocols';
import { commonProtocol, models } from '@hicommonwealth/model';
import { chainEvents, events } from '@hicommonwealth/schemas';
import { BigNumber } from 'ethers';
import Web3 from 'web3';
import { z } from 'zod';
import { models } from '../database';
import { commonProtocol } from '../services';

const log = logger(import.meta);

Expand Down
3 changes: 3 additions & 0 deletions libs/model/src/policies/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
export * from './ContestWorker.policy';
export * from './DiscordBot.policy';
export * from './FarcasterWorker.policy';
export * from './chainEventCreatedPolicy';
export * from './handleCommunityStakeTrades';
export * from './handleLaunchpadTrade';
17 changes: 14 additions & 3 deletions packages/commonwealth/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import { DatabaseCleaner } from './server/util/databaseCleaner';

// handle exceptions thrown in express routes
import 'express-async-errors';
import { dispatchSDKPublishWorkflow } from './server/util/dispatchSDKPublishWorkflow';

// bootstrap adapters
stats({
Expand Down Expand Up @@ -75,12 +74,12 @@ const start = async () => {

const { main } = await import('./main');

main(app, models, {
await main(app, models, {
port: config.PORT,
withLoggingMiddleware: true,
withPrerender: config.APP_ENV === 'production' && !config.NO_PRERENDER,
})
.then(() => {
.then(async () => {
isServiceHealthy = true;
// database clean-up jobs (should be run after the API so, we don't affect start-up time
// TODO: evaluate other options for maintenance jobs
Expand All @@ -91,12 +90,24 @@ const start = async () => {

// checking the DYNO env var ensures this only runs on one dyno
if (config.APP_ENV === 'production' && process.env.DYNO === 'web.1') {
const { dispatchSDKPublishWorkflow } = await import(
'./server/util/dispatchSDKPublishWorkflow'
);
dispatchSDKPublishWorkflow().catch((e) =>
log.error(
`Failed to dispatch publishing workflow ${JSON.stringify(e)}`,
),
);
}

// bootstrap bindings when in dev mode and DEV_MODULITH is true
if (config.NODE_ENV === 'development' && config.DEV_MODULITH) {
timolegros marked this conversation as resolved.
Show resolved Hide resolved
const { bootstrapBindings, bootstrapRelayer } = await import(
'./server/bindings/bootstrap'
);
await bootstrapBindings();
await bootstrapRelayer();
}
})
.catch((e) => log.error(e.message, e));
};
Expand Down
144 changes: 144 additions & 0 deletions packages/commonwealth/server/bindings/bootstrap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import {
RabbitMQAdapter,
RascalConfigServices,
buildRetryStrategy,
getRabbitMQConfig,
} from '@hicommonwealth/adapters';
import {
Broker,
BrokerSubscriptions,
broker,
logger,
stats,
} from '@hicommonwealth/core';
import {
ChainEventPolicy,
Contest,
ContestWorker,
DiscordBotPolicy,
FarcasterWorker,
User,
models,
} from '@hicommonwealth/model';
import { Client } from 'pg';
import { config } from 'server/config';
import { setupListener } from './pgListener';
import { incrementNumUnrelayedEvents, relayForever } from './relayForever';

const log = logger(import.meta);

function checkSubscriptionResponse(
subRes: boolean,
topic: BrokerSubscriptions,
) {
if (!subRes) {
log.fatal(`Failed to subscribe to ${topic}. Requires restart!`, undefined, {
topic,
});
}
}

export async function bootstrapBindings(): Promise<void> {
let brokerInstance: Broker;
try {
const rmqAdapter = new RabbitMQAdapter(
getRabbitMQConfig(
config.BROKER.RABBITMQ_URI,
RascalConfigServices.CommonwealthService,
),
);
await rmqAdapter.init();
broker({
adapter: rmqAdapter,
});
brokerInstance = rmqAdapter;
} catch (e) {
log.error(
'Rascal consumer setup failed. Please check the Rascal configuration',
);
throw e;
}

const chainEventSubRes = await brokerInstance.subscribe(
BrokerSubscriptions.ChainEvent,
ChainEventPolicy(),
);
checkSubscriptionResponse(chainEventSubRes, BrokerSubscriptions.ChainEvent);

const contestWorkerSubRes = await brokerInstance.subscribe(
BrokerSubscriptions.ContestWorkerPolicy,
ContestWorker(),
buildRetryStrategy(undefined, 20_000),
{
beforeHandleEvent: (topic, event, context) => {
context.start = Date.now();
},
afterHandleEvent: (topic, event, context) => {
const duration = Date.now() - context.start;
const handler = `${topic}.${event.name}`;
stats().histogram(`cw.handlerExecutionTime`, duration, { handler });
},
},
);
checkSubscriptionResponse(
contestWorkerSubRes,
BrokerSubscriptions.ContestWorkerPolicy,
);

const contestProjectionsSubRes = await brokerInstance.subscribe(
BrokerSubscriptions.ContestProjection,
Contest.Contests(),
);
checkSubscriptionResponse(
contestProjectionsSubRes,
BrokerSubscriptions.ContestProjection,
);

const xpProjectionSubRes = await brokerInstance.subscribe(
BrokerSubscriptions.XpProjection,
User.Xp(),
);
checkSubscriptionResponse(
xpProjectionSubRes,
BrokerSubscriptions.XpProjection,
);

const farcasterWorkerSubRes = await brokerInstance.subscribe(
BrokerSubscriptions.FarcasterWorkerPolicy,
FarcasterWorker(),
buildRetryStrategy(undefined, 20_000),
);
checkSubscriptionResponse(
farcasterWorkerSubRes,
BrokerSubscriptions.FarcasterWorkerPolicy,
);

const discordBotSubRes = await brokerInstance.subscribe(
BrokerSubscriptions.DiscordBotPolicy,
DiscordBotPolicy(),
);
checkSubscriptionResponse(
discordBotSubRes,
BrokerSubscriptions.DiscordBotPolicy,
);
}

export async function bootstrapRelayer(
maxRelayIterations?: number,
): Promise<Client> {
const count = await models.Outbox.count({
where: { relayed: false },
});
incrementNumUnrelayedEvents(count);

const pgClient = await setupListener();

relayForever(maxRelayIterations).catch((err) => {
log.fatal(
'Unknown fatal error requires immediate attention. Restart REQUIRED!',
err,
);
});

return pgClient;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
import type { DB } from '@hicommonwealth/model';
import { QueryTypes } from 'sequelize';
import { z } from 'zod';
import { config } from '../../config';
import { config } from '../config';

const log = logger(import.meta);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { broker, logger, stats } from '@hicommonwealth/core';
import { config } from '../../config';
import { config } from '../config';
import { relay } from './relay';

const INITIAL_ERROR_TIMEOUT = 2_000;
Expand Down
Loading
Loading