This gem work in few scenarios:
- As middleware for shoryuken.
- It saves all events to the database and also catches and throws all exceptions.
- As a middleware, it can log all incoming messages.
- As a client that can send messages to SNS topics and SQS queues.
- Also it can help you with CI\CD to manage topics, queues and subscriptions such as database migration.
Sequel
Edit Gemfile:
# Gemfile
gem 'sequel'
gem 'cyclone_lariat'
And run in console:
$ bundle install
$ bundle exec cyclone_lariat install
ActiveRecord
Edit Gemfile:
# Gemfile
gem 'active_record'
gem 'cyclone_lariat'
And run in console:
$ bundle install
$ bundle exec cyclone_lariat install --adapter=active_record
Last install command will create 2 files:
- ./lib/tasks/cyclone_lariat.rake - Rake tasks, for management migrations
- ./config/initializers/cyclone_lariat.rb - Configuration default values for cyclone lariat usage
Sequel
# frozen_string_literal: true
CycloneLariat.configure do |c|
c.version = 1 # api version
c.aws_key = ENV['AWS_KEY'] # aws key
c.aws_secret_key = ENV['AWS_SECRET_KEY'] # aws secret
c.aws_account_id = ENV['AWS_ACCOUNT_ID'] # aws account id
c.aws_region = ENV['AWS_REGION'] # aws region
c.publisher = ENV['APP_NAME'] # name of your publishers, usually name of your application
c.instance = ENV['INSTANCE'] # stage, production, test
c.driver = :sequel # driver Sequel
c.inbox_dataset = DB[:inbox_messages] # Sequel dataset for store incoming messages (on receiver)
c.versions_dataset = DB[:lariat_versions] # Sequel dataset for versions of publisher migrations
c.fake_publish = ENV['INSTANCE'] == 'test' # when true, prevents messages from being published
end
Before using the event store, add and apply these migrations:
Sequel.migration do
up do
run <<-SQL
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
SQL
end
down do
run <<-SQL
DROP EXTENSION IF EXISTS "uuid-ossp";
SQL
end
end
Sequel.migration do
change do
create_table :inbox_messages do
column :uuid, :uuid, primary_key: true
String :type, null: false
Integer :version, null: false
String :publisher, null: false
column :data, :json, null: false
String :client_error_message, null: true, default: nil
column :client_error_details, :json, null: true, default: nil
DateTime :sent_at, null: true, default: nil
DateTime :received_at, null: false, default: Sequel::CURRENT_TIMESTAMP
DateTime :processed_at, null: true, default: nil
end
end
end
Sequel.migration do
change do
create_table :lariat_versions do
Integer :version, null: false, unique: true
end
end
end
ActiveRecord
# frozen_string_literal: true
CycloneLariat.configure do |c|
c.version = 1 # api version
c.aws_key = ENV['AWS_KEY'] # aws key
c.aws_secret_key = ENV['AWS_SECRET_KEY'] # aws secret
c.aws_account_id = ENV['AWS_ACCOUNT_ID'] # aws account id
c.aws_region = ENV['AWS_REGION'] # aws region
c.publisher = ENV['APP_NAME'] # name of your publishers, usually name of your application
c.instance = ENV['INSTANCE'] # stage, production, test
c.driver = :active_record # driver ActiveRecord
c.inbox_dataset = CycloneLariatInboxMessage # ActiveRecord model for store income messages (on receiver)
c.versions_dataset = CycloneLariatVersion # ActiveRecord model for versions of publisher migrations
c.fake_publish = ENV['INSTANCE'] == 'test' # when true, prevents messages from being published
end
Before using the event store, add and apply these migrations:
# migrations
execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp"')
create_table :cyclone_lariat_messages, id: :uuid, primary_key: :uuid, default: -> { 'public.uuid_generate_v4()' } do |t|
t.string :kind, null: false
t.string :type, null: false
t.integer :version, null: false
t.string :publisher, null: false
t.jsonb :data, null: false
t.string :client_error_message, null: true, default: nil
t.jsonb :client_error_details, null: true, default: nil
t.datetime :sent_at, null: true, default: nil
t.datetime :received_at, null: false, default: -> { 'CURRENT_TIMESTAMP' }
t.datetime :processed_at, null: true, default: nil
end
create_table :cyclone_lariat_versions do |t|
t.integer :version, null: false, index: { unique: true }
end
# models
class CycloneLariatMessage < ActiveRecord::Base
self.inheritance_column = :_type_disabled
self.primary_key = 'uuid'
end
class CycloneLariatVersion < ActiveRecord::Base
end
If you are only using your application as a publisher, you may not need to set the messages_dataset parameter.
At first lets understand what the difference between SQS and SNS:
- Amazon Simple Queue Service (SQS) lets you send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be available.
- Amazon Simple Notification Service (SNS) sends notifications two ways Application2Person (like send sms). And the second way is Application2Application, that's way more important for us. In this way you case use SNS service like fanout.
For use cyclone_lariat as Publisher lets make install CycloneLariat.
Before creating the first migration, let's explain what CycloneLariat::Messages is.
Message in Amazon SQS\SNS service it's a object that has several attributes. The main attributes are the body, which consists of the published data. The body is a String, but we can use it as a JSON object. Cyclone_lariat use by default scheme - version 1:
// Scheme: version 1
{
"uuid": "f2ce3813-0905-4d81-a60e-f289f2431f50", // Uniq message identificator
"publisher": "sample_app", // Publisher application name
"request_id": "51285005-8a06-4181-b5fd-bf29f3b1a45a", // Optional: X-Request-Id
"type": "event_note_created", // Type of Event or Command
"version": 1, // Version of data structure
"data": {
"id": 12,
"text": "Sample of published data",
"attributes": ["one", "two", "three"]
},
"sent_at": "2022-11-09T11:42:18.203+01:00" // Time when message was sended in ISO8601 Standard
}
Idea about X-Request-Id you can see at StackOverflow.
As you see, type has prefix 'event_' in cyclone lariat you has two kinds of messages - Messages::V1::Event and Messages::V1::Command.
If you want log all your messages you can use extended scheme - version 2:
// Scheme: version 2
{
"uuid": "f2ce3813-0905-4d81-a60e-f289f2431f50", // Uniq message identificator
"publisher": "sample_app", // Publisher application name
"request_id": "51285005-8a06-4181-b5fd-bf29f3b1a45a", // Optional: X-Request-Id
"type": "event_note_created", // Type of Event or Command
"version": 2, // Version of data structure
"subject": {
"type": "user", // Subject type
"uuid": "a27c29e2-bbd3-490a-8f1b-caa4f8d902ef" // Subject uuid
},
"object": {
"type": "note", // Object type
"uuid": "f46e74db-3335-4c5e-b476-c2a87660a942" // Object uuid
},
"data": {
"id": 12,
"text": "Sample of published data",
"attributes": ["one", "two", "three"]
},
"sent_at": "2022-11-09T11:42:18.203+01:00" // Time when message was sended in ISO8601 Standard
}
The difference between scheme first and second version - is subject and object. This values need to help with actions log. For example, user #42, write to support, "why he could not sign in". The messages log is:
Subject | Action | Object |
---|---|---|
user #42 | sign_up | user #42 |
user #42 | sign_in | user #42 |
user #42 | create_note | note #769 |
user #1 | ban | user #42 |
It is important to understand that user #42 can be both a subject and an object. And you should save both of these fields to keep track of the entire history of this user.
Commands and events are both simple domain structures that contain solely data for reading. That means they contain no behaviour or business logic.
A command is an object that is sent to the domain for a state change which is handled by a command handler. They should be named with a verb in an imperative mood plus the aggregate name which it operates on. Such request can be rejected due to the data the command holds being invalid/inconsistent. There should be exactly 1 handler for each command. Once the command has been executed, the consumer can then carry out whatever the task is depending on the output of the command.
An event is a statement of fact about what change has been made to the domain state. They are named with the aggregate name where the change took place plus the verb past-participle. An event happens off the back of a command. A command can emit any number of events. The sender of the event does not care who receives it or whether it has been received at all.
For publishing Messages::V1::Event or Messages::V1::Commands, you have two ways, send Message directly:
CycloneLariat.configure do |config|
# Options app here
end
client = CycloneLariat::Clients::Sns.new(publisher: 'auth', version: 1)
payload = {
first_name: 'John',
last_name: 'Doe',
mail: '[email protected]'
}
client.publish_command('register_user', data: payload, fifo: false)
That's call, will generate a message body:
{
"uuid": "f2ce3813-0905-4d81-a60e-f289f2431f50",
"publisher": "auth",
"type": "command_register_user",
"version": 1,
"data": {
"first_name": "John",
"last_name": "Doe",
"mail": "[email protected]"
},
"sent_at": "2022-11-09T11:42:18.203+01:00" // The time the message was sent. ISO8601 standard.
}
Or for second schema version code:
CycloneLariat.configure do |config|
# Options app here
end
client = CycloneLariat::Clients::Sns.new(publisher: 'auth', version: 2)
client.publish_event(
'sign_up',
data: {
first_name: 'John',
last_name: 'Doe',
mail: '[email protected]'
},
subject: { type: 'user', uuid: '40250522-21c8-4fc7-9b0b-47d9666a4430'},
object: { type: 'user', uuid: '40250522-21c8-4fc7-9b0b-47d9666a4430'},
fifo: false
)
Or is it better to make your own client, like a Repository pattern.
require 'cyclone_lariat/publisher' # If require: false in Gemfile
class Publisher < CycloneLariat::Publisher
def email_is_created(mail)
sns.publish event('email_is_created', data: { mail: mail }), fifo: false
end
def email_is_removed(mail)
sns.publish event('email_is_removed', data: { mail: mail }), fifo: false
end
def delete_user(mail)
sns.publish command('delete_user', data: { mail: mail }), fifo: false
end
def welcome_message(mail, text)
sqs.publish command('welcome', data: {mail: mail, txt: text}), fifo: false
end
end
# Init repo
publisher = Publisher.new
# And send topics
publisher.email_is_created '[email protected]'
publisher.email_is_removed '[email protected]'
publisher.delete_user '[email protected]'
publisher.welcome_message '[email protected]', 'You are welcome'
An Amazon SNS topic and SQS queue is a logical access point that acts as a communication channel. Both of them has specific address ARN.
# Topic example
arn:aws:sns:eu-west-1:247602342345:test-event-fanout-cyclone_lariat-note_added.fifo
# Queue example
arn:aws:sqs:eu-west-1:247602342345:test-event-queue-cyclone_lariat-note_added-notifier.fifo
Split ARN:
arn:aws:sns
- Prefix for SNS Topicsarn:aws:sqs
- Prefix for SQS Queueseu-west-1
- AWS Region247602342345
- AWS accounttest-event-fanout-cyclone_lariat-note_added
- Topic \ Queue name.fifo
- if Topic or queue is FIFO, they must has that suffix.
Region and account_id usually set using the cyclone_lariat configuration.
In cyclone_lariat we have a declaration for defining topic and queue names. This can help in organizing the order.
CycloneLariat.configure do |config|
config.instance = 'test'
config.publisher = 'cyclone_lariat'
# ...
end
CycloneLariat::Clients::Sns.new.publish_command(
'register_user',
data: {
first_name: 'John',
last_name: 'Doe',
mail: '[email protected]'
},
fifo: false
)
# or in repository-like style:
class Publisher < CycloneLariat::Publisher
def register_user(first:, last:, mail:)
sns.publish command(
'register_user',
data: {
mail: mail,
name: {
first: first,
last: last
}
}
), fifo: false
end
end
We will publish a message on this topic: test-command-fanout-cyclone_lariat-register_user
.
Let's split the topic title:
test
- instance;command
- kind - event or command;fanount
- resource type - fanout for SNS topics;cyclone_lariat
- publisher name;regiser_user
- message type.
For queues you also can define destination.
CycloneLariat::Clients::Sqs.new.publish_event(
'register_user',
data: { mail: '[email protected]' },
dest: :mailer,
fifo: false
)
# or in repository-like style:
class YourClient < CycloneLariat::Clients::Sns
# ...
def register_user(first:, last:, mail:)
publish event('register_user', data: { mail: mail }), fifo: false
end
end
We will publish a message on this queue: test-event-queue-cyclone_lariat-register_user-mailer
.
Let's split the queue title:
test
- instance;event
- kind - event or command;queue
- resource type - queue for SQS;cyclone_lariat
- publisher name;regiser_user
- message type.mailer
- destination
You also can sent message to queue with custom name. But this way does not recommended.
# Directly
CycloneLariat::Clients::Sqs.new.publish_event(
'register_user', data: { mail: '[email protected]' },
dest: :mailer, topic: 'custom_topic_name.fifo', fifo: false
)
# Repository
class Publisher < CycloneLariat::Publisher
# ...
def register_user(first:, last:, mail:)
publish event('register_user', data: { mail: mail }),
topic: 'custom_topic_name.fifo', fifo: false
end
end
Will publish message on queue: custom_topic_name
The main idea you can read on AWS Docs.
FIFO message should consist two fields:
group_id
- In each topic, the FIFO sequence is defined only within one group. AWS Docsdeduplication_id
- Within the same group, a unique identifier must be defined for each message. AWS Docs
The unique identifier can definitely be the entire message. In this case, you
do not need to pass the deduplication_id parameter. But you must create a queue
with the content_based_deduplication
parameter in migration.
class Publisher < CycloneLariat::Publisher
def user_created(mail:, uuid:)
sns.publish event('user_created', data: {
user: {
uuid: uuid,
mail: mail
},
},
deduplication_id: uuid,
group_id: uuid),
fifo: true
end
def user_mail_changed(mail:, uuid:)
sns.publish event('user_mail_created', data: {
user: {
uuid: uuid,
mail: mail
},
},
deduplication_id: mail,
group_id: uuid),
fifo: true
end
end
Instead of stub all requests to AWS services, you can set up cyclone lariat for make fake publishing.
CycloneLariat.configure do |c|
# ...
c.fake_publish = ENV['INSTANCE'] == 'test' # when true, prevents messages from being published
end
With cyclone_lariat you can use migrations that can create, delete, and subscribe to your queues and topics, just like database migrations do. Before using this function, you must complete the cyclone_lariat configuration.
$ bundle exec cyclone_lariat generate migration user_created
This command should create a migration file, let's edit it.
# ./lariat/migrate/1668097991_user_created_queue.rb
# frozen_string_literal: true
class UserCreatedQueue < CycloneLariat::Migration
def up
create queue(:user_created, dest: :mailer, content_based_deduplication: true, fifo: true)
end
def down
delete queue(:user_created, dest: :mailer, content_based_deduplication: true, fifo: true)
end
end
The content_based_dedupplication
parameter can only be specified for FIFO resources. When true, the whole message is
used as the unique message identifier instead of the deduplication_id
key.
To apply migration use:
$ rake cyclone_lariat:migrate
To decline migration use:
$ rake cyclone_lariat:rollback
Since the SNS\SQS management does not support an ACID transaction (in the sense of a database), I highly recommend using the atomic schema:
# BAD:
class UserCreated < CycloneLariat::Migration
def up
create queue(:user_created, dest: :mailer, fifo: true)
create topic(:user_created, fifo: true)
subscribe topic: topic(:user_created, fifo: true),
endpoint: queue(:user_created, dest: :mailer, fifo: true)
end
def down
unsubscribe topic: topic(:user_created, fifo: true),
endpoint: queue(:user_created, dest: :mailer, fifo: true)
delete topic(:user_created, fifo: true)
delete queue(:user_created, dest: :mailer, fifo: true)
end
end
# GOOD:
class UserCreatedQueue < CycloneLariat::Migration
def up
create queue(:user_created, dest: :mailer, fifo: true)
end
def down
delete queue(:user_created, dest: :mailer, fifo: true)
end
end
class UserCreatedTopic < CycloneLariat::Migration
def up
create topic(:user_created, fifo: true)
end
def down
delete topic(:user_created, fifo: true)
end
end
class UserCreatedSubscription < CycloneLariat::Migration
def up
subscribe topic: topic(:user_created, fifo: true),
endpoint: queue(:user_created, dest: :mailer, fifo: true)
end
def down
unsubscribe topic: topic(:user_created, fifo: true),
endpoint: queue(:user_created, dest: :mailer, fifo: true)
end
end
The first example is when your registration service creates new user. You also have two services: mailer - sending a welcome email, and statistics service.
create topic(:user_created, fifo: true)
create queue(:user_created, dest: :mailer, fifo: true)
create queue(:user_created, dest: :stat, fifo: true)
subscribe topic: topic(:user_created, fifo: true),
endpoint: queue(:user_created, dest: :mailer, fifo: true)
subscribe topic: topic(:user_created, fifo: true),
endpoint: queue(:user_created, dest: :statistic, fifo: true)
The second example is when you have three services: registration - creates new users, order service - allows you to create new orders, statistics service collects all statistics.
create topic(:user_created, fifo: false)
create topic(:order_created, fifo: false)
create queue(publisher: :any, dest: :statistic, fifo: false)
subscribe topic: topic(:user_created, fifo: false),
endpoint: queue(publisher: :any, dest: :statistic, fifo: false)
subscribe topic: topic(:order_created, fifo: false),
endpoint: queue(publisher: :any, dest: :statistic, fifo: false)
If queue receives messages from multiple sources you must specify publisher as :any
. If the
subscriber receives messages with different types, cyclone_lariat
uses a specific keyword - all
.
For better organisation you can subscribe topic on topic. For example, you have management_panel and client_panel services. Each of these services can register a user with predefined roles. And you want to send this information to the mailer and statistics services.
create topic(:client_created, fifo: false)
create topic(:manager_created, fifo: false)
create topic(:user_created, publisher: :any, fifo: false)
create queue(:user_created, publisher: :any, dest: :mailer, fifo: false)
create queue(:user_created, publisher: :any, dest: :stat, fifo: false)
subscribe topic: topic(:client_created, fifo: false),
endpoint: topic(:user_created, publisher: :any, fifo: false)
subscribe topic: topic(:manager_created, fifo: false),
endpoint: topic(:user_created, publisher: :any, fifo: false)
subscribe topic: topic(:user_created, publisher: :any, fifo: false),
endpoint: queue(:user_created, publisher: :any, dest: :mailer, fifo: false)
subscribe topic: topic(:user_created, publisher: :any, fifo: false),
endpoint: queue(:user_created, publisher: :any, dest: :stat, fifo: false)
You can create Topic and Queues with custom names. That way recommended for:
- Remove old resources
- Receive messages from external sources
create custom_topic('custom_topic_name')
delete custom_queue('custom_topic_name')
We recommend locate migration on:
- topic - on Publisher side;
- queue - on Subscriber side;
- subscription - on Subscriber side.
$ bundle exec cyclone_lariat install - install cyclone_lariat
$ bundle exec cyclone_lariat generate migration - generate new migration
$ rake cyclone_lariat:list:queues # List all queues
$ rake cyclone_lariat:list:subscriptions # List all subscriptions
$ rake cyclone_lariat:list:topics # List all topics
$ rake cyclone_lariat:migrate # Migrate topics for SQS/SNS
$ rake cyclone_lariat:rollback[version] # Rollback topics for SQS/SNS
$ rake cyclone_lariat:graph # Make graph
Graph generated in grpahviz format for the entry scheme. You should install it on your system. For convert it in png use:
$ rake cyclone_lariat:graph | dot -Tpng -o foo.png
This is gem work like middleware for shoryuken. It save all events to database. And catch and produce all exceptions.
The logic of lariat as a subscriber. Imagine that you are working with an http server. And it gives you various response codes. You have the following processing:
- 2xx - success, we process the page.
- 4хх - Logic error send the error to the developer and wait until he fixes it
- 5xx - Send an error and try again
If you use middleware:
- Store all events to dataset
- Notify every input sqs message
- Notify every error
require 'sequel'
require 'cyclone_lariat/middleware' # If require: false in Gemfile
require 'luna_park/notifiers/log'
require_relative './config/initializers/cyclone_lariat'
Shoryuken::Logging.logger = Logger.new STDOUT
Shoryuken::Logging.logger.level = Logger::INFO
class Receiver
include Shoryuken::Worker
DB = Sequel.connect(host: 'localhost', user: 'ruby')
shoryuken_options auto_delete: true,
body_parser: ->(sqs_msg) {
JSON.parse(sqs_msg.body, symbolize_names: true)
},
queue: CycloneLariat.queue(:user_created, dest: :stat, fifo: true).name
server_middleware do |chain|
# Options dataset, errors_notifier and message_notifier is optionals.
# If you dont define notifiers - middleware does not notify
# If you dont define dataset - middleware does not store events in db
chain.add CycloneLariat::Middleware,
dataset: DB[:events],
errors_notifier: LunaPark::Notifiers::Sentry.new,
message_notifier: LunaPark::Notifiers::Log.new(min_lvl: :debug, format: :pretty_json),
before_save: -> { |message| message.data[:password] = nil }
end
class UserIsNotRegistered < LunaPark::Errors::Business
end
def perform(sqs_message, sqs_message_body)
# Your logic here
# If you want to raise business error
raise UserIsNotRegistered.new(first_name: 'John', last_name: 'Doe')
end
end
This extension allows you to save messages to a database inside a transaction. It prevents messages from being lost when publishing fails. After the transaction is copmpleted, publishing will be perfromed and successfully published messages will be deleted from the database. For more information, see Transactional outbox pattern
OutboxErrorLogger = LunaPark::Notifiers::Log.new
CycloneLariat::Outbox.configure do |config|
config.dataset = DB[:outbox_messages] # Outbox messages dataset. Sequel dataset or ActiveRecord model
config.on_sending_error = lambda do |event, error|
OutboxErrorLogger.error(error, details: event.to_h)
end
end
CycloneLariat::Outbox.load
Before using the outbox, add and apply this migration:
# Sequel
DB.create_table :outbox_messages do
column :uuid, :uuid, primary_key: true
column :deduplication_id, String, null: true
column :group_id, String, null: true
column :serialized_message, :json, null: false
column :sending_error, String, null: true
DateTime :created_at, null: false, default: Sequel::CURRENT_TIMESTAMP
end
# ActiveRecord
create_table(:outbox_messages, id: :uuid, primary_key: :uuid, default: -> { 'public.uuid_generate_v4()' }) do |t|
t.string :deduplication_id, null: true
t.string :group_id, null: true
t.string :sending_error, null: true
t.jsonb :serialized_message, null: false
t.datetime :created_at, null: false, default: -> { 'CURRENT_TIMESTAMP' }
end
# Sequel
DB.transaction(with_outbox: true) do |outbox|
some_action
outbox << CycloneLariat::Messages::V1::Event.new(...)
...
end
# ActiveRecord
ActiveRecord::Base.transaction(with_outbox: true) do |outbox|
some_action
outbox << CycloneLariat::Messages::V1::Event.new(...)
...
end
To resend messages you can use the following service:
CycloneLariat::Outbox::Services::Resend.call
This service tries to publish messages from the outbox table with sending_error != nil
.
Successfully published messages will be removed.
For simplify write some Rake tasks you can use CycloneLariat::Repo::InboxMessages
.
# For retry all unprocessed
CycloneLariat::Repo::InboxMessages.new.each_unprocessed do |event|
# Your logic here
end
# For retry all events with client errors
CycloneLariat::Repo::InboxMessages.new.each_with_client_errors do |event|
# Your logic here
end