Skip to content

Commit

Permalink
feat(server): event emitter (#134)
Browse files Browse the repository at this point in the history
* feat(server): use event emitter instead of kafka

* fix formatting

* bump version to 0.3.3
  • Loading branch information
arielweinberger authored Jul 11, 2023
1 parent 12f6e52 commit 2a20dd9
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 49 deletions.
49 changes: 8 additions & 41 deletions apps/server/src/app/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import { ApolloDriver, ApolloDriverConfig } from "@nestjs/apollo";
import { ConfigModule, ConfigService } from "@nestjs/config";
import Joi from "joi";
import { randomUUID } from "crypto";

import { ApolloServerPluginLandingPageLocalDefault } from "@apollo/server/plugin/landingPage/default";
import { EventEmitterModule } from "@nestjs/event-emitter";

import { PromptsModule } from "./prompts/prompts.module";
import { HealthController } from "./health.controller";
import { formatError } from "../lib/gql-format-error";
Expand All @@ -20,8 +21,10 @@ import { MetricsModule } from "./metrics/metrics.module";
import { LoggerModule } from "./logger/logger.module";
import { PinoLogger } from "./logger/pino-logger";
import { AnalyticsModule } from "./analytics/analytics.module";
import { KafkaModule } from "@pezzo/kafka";
import { NotificationsModule } from "./notifications/notifications.module";
import { getConfigSchema } from "./config/common-config-schema";

const isCloud = process.env.PEZZO_CLOUD === "true";
const GQL_SCHEMA_PATH = join(process.cwd(), "apps/server/src/schema.graphql");

@Module({
Expand All @@ -30,50 +33,13 @@ const GQL_SCHEMA_PATH = join(process.cwd(), "apps/server/src/schema.graphql");
ConfigModule.forRoot({
isGlobal: true,
envFilePath: ".env",
validationSchema: Joi.object({
PINO_PRETTIFY: Joi.boolean().default(false),
SEGMENT_KEY: Joi.string().optional().default(null),
DATABASE_URL: Joi.string().required(),
PORT: Joi.number().default(3000),
SUPERTOKENS_CONNECTION_URI: Joi.string().required(),
SUPERTOKENS_API_KEY: Joi.string().optional(),
SUPERTOKENS_API_DOMAIN: Joi.string().default("http://localhost:3000"),
SUPERTOKENS_WEBSITE_DOMAIN: Joi.string().default(
"http://localhost:4200"
),
GOOGLE_OAUTH_CLIENT_ID: Joi.string().optional().default(null),
GOOGLE_OAUTH_CLIENT_SECRET: Joi.string().optional().default(null),
INFLUXDB_URL: Joi.string().required(),
INFLUXDB_TOKEN: Joi.string().required(),
CONSOLE_HOST: Joi.string().required(),
KAFKA_BROKERS: Joi.string().required(),
KAFKA_GROUP_ID: Joi.string().default("pezzo"),
KAFKA_REBALANCE_TIMEOUT: Joi.number().default(10000),
KAFKA_HEARTBEAT_INTERVAL: Joi.number().default(3000),
KAFKA_SESSION_TIMEOUT: Joi.number().default(10000),
}),
validationSchema: getConfigSchema(),
// In CI, we need to skip validation because we don't have a .env file
// This is consumed by the graphql:schema-generate Nx target
validate:
process.env.SKIP_CONFIG_VALIDATION === "true" ? () => ({}) : undefined,
}),
KafkaModule.forRootAsync({
imports: [ConfigModule],
useFactory: (config: ConfigService) => ({
client: {
brokers: config.get("KAFKA_BROKERS").split(","),
},
consumer: {
groupId: config.get("KAFKA_GROUP_ID"),
rebalanceTimeout: config.get("KAFKA_REBALANCE_TIMEOUT"),
heartbeatInterval: config.get("KAFKA_HEARTBEAT_INTERVAL"),
sessionTimeout: config.get("KAFKA_SESSION_TIMEOUT"),
},
producer: {},
}),
isGlobal: true,
inject: [ConfigService],
}),
EventEmitterModule.forRoot(),
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
playground: false,
Expand Down Expand Up @@ -113,6 +79,7 @@ const GQL_SCHEMA_PATH = join(process.cwd(), "apps/server/src/schema.graphql");
CredentialsModule,
IdentityModule,
MetricsModule,
...(isCloud ? [NotificationsModule] : []),
],
controllers: [HealthController],
})
Expand Down
29 changes: 29 additions & 0 deletions apps/server/src/app/config/common-config-schema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import Joi from "joi";

const commonConfigSchema = {
PORT: Joi.number().default(3000),
PEZZO_CLOUD: Joi.boolean().default(false),
PINO_PRETTIFY: Joi.boolean().default(false),
DATABASE_URL: Joi.string().required(),
SUPERTOKENS_CONNECTION_URI: Joi.string().required(),
SUPERTOKENS_API_KEY: Joi.string().optional(),
SUPERTOKENS_API_DOMAIN: Joi.string().default("http://localhost:3000"),
SUPERTOKENS_WEBSITE_DOMAIN: Joi.string().default("http://localhost:4200"),
INFLUXDB_URL: Joi.string().required(),
INFLUXDB_TOKEN: Joi.string().required(),
};

const cloudConfigSchema = {
SENDGRID_API_KEY: Joi.string().required(),
SEGMENT_KEY: Joi.string().required(),
GOOGLE_OAUTH_CLIENT_ID: Joi.string().required(),
GOOGLE_OAUTH_CLIENT_SECRET: Joi.string().required(),
};

const isCloud = process.env.PEZZO_CLOUD === "true";

export const getConfigSchema = () =>
Joi.object({
...commonConfigSchema,
...(isCloud ? cloudConfigSchema : {}),
});
11 changes: 7 additions & 4 deletions apps/server/src/app/identity/org-invitations.resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,20 @@ import { UsersService } from "./users.service";
import { ExtendedUser } from "./models/extended-user.model";
import { InvitationWhereUniqueInput } from "../../@generated/invitation/invitation-where-unique.input";
import { Organization } from "../../@generated/organization/organization.model";
import { KafkaProducerService } from "@pezzo/kafka";
import { UpdateOrgInvitationInput } from "./inputs/update-org-invitation.input";
import { ConfigService } from "@nestjs/config";
import { PinoLogger } from "../logger/pino-logger";
import { OrganizationsService } from "./organizations.service";
import { InvitationsService } from "./invitations.service";
import { EventEmitter2 } from "@nestjs/event-emitter";
import { KafkaSchemas } from "@pezzo/kafka";

@UseGuards(AuthGuard)
@Resolver(() => Invitation)
export class OrgInvitationsResolver {
constructor(
private eventEmitter: EventEmitter2,
private usersService: UsersService,
private kafkaProducer: KafkaProducerService,
private organizationService: OrganizationsService,
private invitationsService: InvitationsService,
private logger: PinoLogger,
Expand Down Expand Up @@ -114,15 +115,17 @@ export class OrgInvitationsResolver {
.assign({ topic })
.info("Sending kafka invitation created event");

await this.kafkaProducer.produce("org-invitation-created", {
const payload: KafkaSchemas["org-invitation-created"] = {
key: invitation.id,
invitationUrl: invitationUrl.toString(),
invitationId: invitation.id,
organizationId,
organizationName: organization.name,
email,
role: invitation.role,
});
};

this.eventEmitter.emit("org-invitation-created", payload);

return invitation;
}
Expand Down
7 changes: 7 additions & 0 deletions apps/server/src/app/notifications/notifications.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { Module } from "@nestjs/common";
import { NotificationsService } from "./notifications.service";

@Module({
providers: [NotificationsService],
})
export class NotificationsModule {}
35 changes: 35 additions & 0 deletions apps/server/src/app/notifications/notifications.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { Injectable } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { OnEvent } from "@nestjs/event-emitter";
import { KafkaSchemas } from "@pezzo/kafka";
import sgMail, { MailDataRequired } from "@sendgrid/mail";
import { PinoLogger } from "../logger/pino-logger";

export const emailTemplates: Record<keyof KafkaSchemas, string> = {
"org-invitation-created": "d-a36b6b8076b040ba89aff0dd5bf11936",
};

@Injectable()
export class NotificationsService {
constructor(private config: ConfigService, private logger: PinoLogger) {
sgMail.setApiKey(this.config.get("SENDGRID_API_KEY"));
}

@OnEvent("org-invitation-created")
async sendOrgInvitationEmail(data: KafkaSchemas["org-invitation-created"]) {
const templateId = emailTemplates["org-invitation-created"];

this.logger.info({ templateId, data }, "Sending org invitation email");

const mailData: MailDataRequired = {
to: data.email,
from: "Pezzo <[email protected]>",
templateId,
dynamicTemplateData: {
...data,
},
};

await sgMail.send(mailData);
}
}
2 changes: 1 addition & 1 deletion libs/common/src/version.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"version":"0.3.1"}
{"version":"0.3.3"}
118 changes: 116 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "pezzo",
"version": "0.3.1",
"version": "0.3.3",
"license": "MIT",
"scripts": {
"graphql:codegen": "nx graphql:codegen:offline server",
Expand All @@ -24,11 +24,13 @@
"@nestjs/common": "^9.0.0",
"@nestjs/config": "^2.3.1",
"@nestjs/core": "^9.0.0",
"@nestjs/event-emitter": "^2.0.0",
"@nestjs/graphql": "^11.0.5",
"@nestjs/platform-express": "^9.0.0",
"@nrwl/nest": "^15.9.2",
"@prisma/client": "^3.14.0",
"@segment/analytics-node": "^1.0.0-beta.26",
"@sendgrid/mail": "^7.7.0",
"@sentry/react": "^7.53.1",
"@swc/helpers": "^0.5.0",
"@tanstack/react-query": "^4.29.3",
Expand Down

0 comments on commit 2a20dd9

Please sign in to comment.