Skip to content

Commit

Permalink
feat(*)!: add wire transfer process manager
Browse files Browse the repository at this point in the history
Signed-off-by: Brooks Townsend <[email protected]>
  • Loading branch information
brooksmtownsend committed Dec 14, 2023
1 parent ca8ca40 commit e823ac9
Show file tree
Hide file tree
Showing 42 changed files with 1,124 additions and 11 deletions.
3 changes: 2 additions & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ build:
(cd $dir && wash build); \
done

version := "0.2.0"
version := "0.3.0"
push:
# Push to GHCR
wash push ghcr.io/cosmonic/cosmonic-gitops/wiretransfer_processmanager:{{version}} process_manager/build/wiretransfer_processmanager_s.wasm
wash push ghcr.io/cosmonic/cosmonic-gitops/bankaccount_projector:{{version}} projector/build/bankaccount_projector_s.wasm
wash push ghcr.io/cosmonic/cosmonic-gitops/bankaccount_aggregate:{{version}} aggregate/build/bankaccount_aggregate_s.wasm
wash push ghcr.io/cosmonic/cosmonic-gitops/bankaccount_catalog:{{version}} eventcatalog/actor/build/bankaccountcatalog_s.wasm
2 changes: 1 addition & 1 deletion aggregate/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "bankaccount-aggregate"
version = "0.2.0"
version = "0.3.0"
authors = ["Cosmonic Team"]
edition = "2021"

Expand Down
131 changes: 131 additions & 0 deletions aggregate/src/commands.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,102 @@
use crate::*;

pub(crate) fn handle_reserve_funds(
input: ReserveFunds,
state: Option<BankAccountAggregateState>,
) -> Result<EventList> {
let Some(old_state) = state else {
return Err(anyhow::anyhow!(
"Rejected command to reserve funds. Account {} does not exist.",
input.account_number
));
};
let avail_balance = old_state.available_balance();
if input.amount as u32 > avail_balance {
error!(
"Rejecting command to reserve funds, account {} does not have sufficient funds. Available {}",
&input.account_number, avail_balance
);
Ok(vec![])
} else {
Ok(vec![Event::new(
FundsReserved::TYPE,
STREAM,
&FundsReserved {
account_number: input.account_number.to_string(),
wire_transfer_id: input.wire_transfer_id,
customer_id: old_state.customer_id.to_string(),
amount: input.amount,
},
)])
}
}

pub(crate) fn handle_release_funds(
input: ReleaseFunds,
state: Option<BankAccountAggregateState>,
) -> Result<EventList> {
let Some(old_state) = state else {
return Err(anyhow::anyhow!(
"Rejected command to release funds. Account {} does not exist.",
input.account_number
));
};

if old_state
.reserved_funds
.contains_key(&input.wire_transfer_id)
{
Ok(vec![Event::new(
FundsReleased::TYPE,
STREAM,
&FundsReleased {
customer_id: input.customer_id,
account_number: input.account_number.to_string(),
wire_transfer_id: input.wire_transfer_id.to_string(),
},
)])
} else {
error!(
"Rejecting command to release funds, account {} does not have a wire transfer hold for {}",
&input.account_number, input.wire_transfer_id
);
Ok(vec![])
}
}

pub(crate) fn handle_commit_funds(
input: CommitFunds,
state: Option<BankAccountAggregateState>,
) -> Result<EventList> {
let Some(old_state) = state else {
return Err(anyhow::anyhow!(
"Rejected command to commit funds. Account {} does not exist.",
input.account_number
));
};

if old_state
.reserved_funds
.contains_key(&input.wire_transfer_id)
{
Ok(vec![Event::new(
FundsCommitted::TYPE,
STREAM,
&FundsCommitted {
customer_id: input.customer_id,
account_number: input.account_number.to_string(),
wire_transfer_id: input.wire_transfer_id.to_string(),
},
)])
} else {
error!(
"Rejecting command to commit funds, account {} does not have a wire transfer hold for {}",
&input.account_number, input.wire_transfer_id
);
Ok(vec![])
}
}

pub(crate) fn handle_create_account(input: CreateAccount) -> Result<EventList> {
Ok(vec![Event::new(
AccountCreated::TYPE,
Expand Down Expand Up @@ -44,6 +141,40 @@ pub(crate) fn handle_withdraw_funds(
}
}

pub(crate) fn handle_wire_funds(
input: WireFunds,
state: Option<BankAccountAggregateState>,
) -> Result<EventList> {
let Some(state) = state else {
return Err(anyhow::anyhow!(
"Rejected command to wire funds. Account {} does not exist.",
input.account_number
));
};

if state.available_balance() < input.amount as u32 {
error!(
"Rejecting command to wire funds, account {} does not have sufficient funds. Available {}",
&input.account_number, state.available_balance()
);
Ok(vec![])
} else {
Ok(vec![Event::new(
WireTransferInitiated::TYPE,
STREAM,
&WireTransferInitiated {
note: input.note,
account_number: input.target_account_number,
target_routing_number: input.target_routing_number,
target_account_number: input.account_number,
amount: input.amount,
customer_id: input.customer_id,
wire_transfer_id: input.wire_transaction_id,
},
)])
}
}

pub(crate) fn handle_deposit_funds(
input: DepositFunds,
state: Option<BankAccountAggregateState>,
Expand Down
63 changes: 63 additions & 0 deletions aggregate/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,60 @@ pub(crate) fn apply_funds_deposited(
Ok(StateAck::ok(Some(state)))
}

pub(crate) fn apply_funds_released(
input: FundsReleased,
state: Option<BankAccountAggregateState>,
) -> Result<StateAck> {
let Some(state) = state else {
error!(
"Rejecting funds released event. Account {} does not exist.",
input.account_number
);
return Ok(StateAck::error(
"Account does not exist",
None::<BankAccountAggregateState>,
));
};
let state = state.release_funds(&input.wire_transfer_id);
Ok(StateAck::ok(Some(state)))
}

pub(crate) fn apply_funds_committed(
input: FundsCommitted,
state: Option<BankAccountAggregateState>,
) -> Result<StateAck> {
let Some(state) = state else {
error!(
"Rejecting funds committed event. Account {} does not exist.",
input.account_number
);
return Ok(StateAck::error(
"Account does not exist",
None::<BankAccountAggregateState>,
));
};
let state = state.commit_funds(&input.wire_transfer_id);
Ok(StateAck::ok(Some(state)))
}

pub(crate) fn apply_funds_reserved(
input: FundsReserved,
state: Option<BankAccountAggregateState>,
) -> Result<StateAck> {
let Some(state) = state else {
error!(
"Rejecting funds reserved event. Account {} does not exist.",
input.account_number
);
return Ok(StateAck::error(
"Account does not exist",
None::<BankAccountAggregateState>,
));
};
let state = state.reserve_funds(&input.wire_transfer_id, input.amount as u32);
Ok(StateAck::ok(Some(state)))
}

pub(crate) fn apply_funds_withdrawn(
input: FundsWithdrawn,
state: Option<BankAccountAggregateState>,
Expand All @@ -54,3 +108,12 @@ pub(crate) fn apply_funds_withdrawn(
let state = state.withdraw(input.amount as u32);
Ok(StateAck::ok(Some(state)))
}

pub(crate) fn apply_wire_transfer_initiated(
_input: WireTransferInitiated,
state: Option<BankAccountAggregateState>,
) -> Result<StateAck> {
// We don't currently change internal state because of this. The first time a wire transfer
// impacts the the account is when funds are reserved (by the process manager)
Ok(StateAck::ok(state))
}
64 changes: 64 additions & 0 deletions aggregate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,30 @@ concordance_gen::generate!({

impl BankAccountAggregate for BankAccountAggregateImpl {
// -- Commands --
fn handle_reserve_funds(
&self,
input: ReserveFunds,
state: Option<BankAccountAggregateState>,
) -> anyhow::Result<EventList> {
commands::handle_reserve_funds(input, state)
}

fn handle_release_funds(
&self,
input: ReleaseFunds,
state: Option<BankAccountAggregateState>,
) -> anyhow::Result<EventList> {
commands::handle_release_funds(input, state)
}

fn handle_commit_funds(
&self,
input: CommitFunds,
state: Option<BankAccountAggregateState>,
) -> anyhow::Result<EventList> {
commands::handle_commit_funds(input, state)
}

fn handle_create_account(
&self,
input: CreateAccount,
Expand All @@ -34,6 +58,14 @@ impl BankAccountAggregate for BankAccountAggregateImpl {
commands::handle_withdraw_funds(input, state)
}

fn handle_wire_funds(
&self,
input: WireFunds,
state: Option<BankAccountAggregateState>,
) -> anyhow::Result<EventList> {
commands::handle_wire_funds(input, state)
}

fn handle_deposit_funds(
&self,
input: DepositFunds,
Expand All @@ -60,13 +92,45 @@ impl BankAccountAggregate for BankAccountAggregateImpl {
events::apply_funds_deposited(input, state)
}

fn apply_funds_released(
&self,
input: FundsReleased,
state: Option<BankAccountAggregateState>,
) -> anyhow::Result<StateAck> {
events::apply_funds_released(input, state)
}

fn apply_funds_committed(
&self,
input: FundsCommitted,
state: Option<BankAccountAggregateState>,
) -> anyhow::Result<StateAck> {
events::apply_funds_committed(input, state)
}

fn apply_funds_reserved(
&self,
input: FundsReserved,
state: Option<BankAccountAggregateState>,
) -> anyhow::Result<StateAck> {
events::apply_funds_reserved(input, state)
}

fn apply_funds_withdrawn(
&self,
input: FundsWithdrawn,
state: Option<BankAccountAggregateState>,
) -> anyhow::Result<StateAck> {
events::apply_funds_withdrawn(input, state)
}

fn apply_wire_transfer_initiated(
&self,
input: WireTransferInitiated,
state: Option<BankAccountAggregateState>,
) -> anyhow::Result<StateAck> {
events::apply_wire_transfer_initiated(input, state)
}
}

const STREAM: &str = "bankaccount";
2 changes: 1 addition & 1 deletion eventcatalog/actor/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion eventcatalog/actor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "bankaccountcatalog"
version = "0.2.0"
version = "0.3.0"
authors = ["Cosmonic Team"]
edition = "2021"

Expand Down
Binary file modified eventcatalog/all_events.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
19 changes: 19 additions & 0 deletions eventcatalog/events/CommitFunds/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
name: CommitFunds
summary: "A request to commit funds under hold to a wire transfer"
version: 0.0.1
consumers:
- 'Bank Account Aggregate'
producers:
- 'Wire Transfer Process Manager'
tags:
- label: 'command'
externalLinks: []
badges: []
---
A request to commit the funds currently on hold for a given wire transfer.

<Mermaid />

## Schema
<SchemaViewer />
22 changes: 22 additions & 0 deletions eventcatalog/events/CommitFunds/schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"$id": "https://cosmonic.com/concordance/bankaccount/CommitFunds.json",
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "CommitFunds",
"type": "object",
"properties": {
"accountNumber": {
"type": "string",
"description": "The account number"
},
"customerId": {
"type": "string",
"description": "The ID of the customer performing the withdrawal"
},
"wireTransferId": {
"type": "string",
"description": "A unique ID identifying the wire transfer transaction"
}
},
"required": ["accountNumber", "customerId", "wireTransferId"]
}

Loading

0 comments on commit e823ac9

Please sign in to comment.