Injects eventstore connector modules, components, bus and eventstore config into a nestjs application. An example is provided in the examples folder.
npm i --save nestjs-eventstore
EventStoreCqrsModule
uses @nestjs/cqrs
module under the hood. It overrides the default eventbus of @nestjs/cqrs
and pushes the event to the eventstore rather than the internal eventBus.
Therefore the eventBus.publish(event, streamName)
method takes two arguments instead of one. The first one is the event itself, and the second one is the stream name.
Once the event is pushed to the eventStore all the subscriptions listening to that event are pushed that event from the event store. Event handlers can then be triggered to cater for those events.
app.module.ts
import {
EventStoreBusConfig,
EventStoreSubscriptionType,
} from 'nestjs-eventstore';
//linking of events from EventStore to local events
const EventInstantiators = [
SomeEvent: (_id: any, data: any, loggedInUserId: any) => new SomeEvent(_id, data, loggedInUserId);
];
export const eventStoreBusConfig: EventStoreBusConfig = {
subscriptions: [
{ // persistanct subscription
type: EventStoreSubscriptionType.Persistent,
stream: '$ce-persons',
persistentSubscriptionName: 'contacts',
},
{ // Catchup subscription
type: EventStoreSubscriptionType.CatchUp,
stream: '$ce-users',
},
],
eventInstantiators: {
...EventInstantiators
},
};
@Module({
imports: [
ConfigModule.load(path.resolve(__dirname, 'config', '**/!(*.d).{ts,js}')),
EventStoreCqrsModule.forRootAsync(
{
useFactory: async (config: ConfigService) => {
return {
connectionSettings: config.get('eventstore.connectionSettings'),
endpoint: config.get('eventstore.tcpEndpoint'),
};
},
inject: [ConfigService],
},
eventStoreBusConfig,
),
],
})
export class AppModule {}
custom.command.handler.ts
This following is a way to use the command handlers that push to the custom eventBus to the eventstore using aggregate root.
import { ICommandHandler, CommandHandler } from '@nestjs/cqrs';
import { SomeCommand } from '../impl/some.command';
import { EventPublisher } from 'nestjs-eventstore'; //this is necessary as it overrides the default publisher
import { ObjectAggregate } from '../../models/object.aggregate';
@CommandHandler(SomeCommand)
export class SomeCommandHandler
implements ICommandHandler<SomeCommand> {
constructor(private readonly publisher: EventPublisher) {}
async execute(command: SomeCommand) {
const { object, loggedInUserId } = command;
const objectAggregate = this.publisher.mergeObjectContext(
new ObjectAggregate(object._id, object),
);
objectAggregate.add(loggedInUserId);
objectAggregate.commit();
}
}
EventStoreModule
connects directly to the event store without cqrs implementation.
app.module.ts
@Module({
imports: [
ConfigModule.load(path.resolve(__dirname, 'config', '**/!(*.d).{ts,js}')),
EventStoreCqrsModule.forRootAsync(
{
useFactory: async (config: ConfigService) => {
return {
connectionSettings: config.get('eventstore.connectionSettings'),
endpoint: config.get('eventstore.tcpEndpoint'),
};
},
inject: [ConfigService],
},
),
],
})
export class AppModule {}