Skip to content

Commit

Permalink
refactor(wadm): better visibility controls
Browse files Browse the repository at this point in the history
Signed-off-by: Brooks Townsend <[email protected]>
  • Loading branch information
brooksmtownsend committed Jan 17, 2025
1 parent 41e6e35 commit 64e3d93
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 14 deletions.
33 changes: 20 additions & 13 deletions crates/wadm/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use async_nats::jetstream::{stream::Stream, Context};
use config::WadmConfig;
use tokio::{sync::Semaphore, task::JoinSet};
Expand Down Expand Up @@ -73,12 +74,18 @@ pub const APP_SPEC_ANNOTATION: &str = "wasmcloud.dev/appspec";
pub const SCALER_KEY: &str = "wasmcloud.dev/scaler";
/// The default link name. In the future, this will likely be pulled in from another crate
pub const DEFAULT_LINK_NAME: &str = "default";
pub(crate) const WADM_EVENT_STREAM_NAME: &str = "wadm_events";
pub(crate) const WADM_EVENT_CONSUMER_STREAM_NAME: &str = "wadm_event_consumer";
pub(crate) const COMMAND_STREAM_NAME: &str = "wadm_commands";
pub(crate) const STATUS_STREAM_NAME: &str = "wadm_status";
pub(crate) const NOTIFY_STREAM_NAME: &str = "wadm_notify";
pub(crate) const WASMBUS_EVENT_STREAM_NAME: &str = "wasmbus_events";
/// Default stream name for wadm events
pub const DEFAULT_WADM_EVENT_STREAM_NAME: &str = "wadm_events";
/// Default stream name for wadm event consumer
pub const DEFAULT_WADM_EVENT_CONSUMER_STREAM_NAME: &str = "wadm_event_consumer";
/// Default stream name for wadm commands
pub const DEFAULT_COMMAND_STREAM_NAME: &str = "wadm_commands";
/// Default stream name for wadm status
pub const DEFAULT_STATUS_STREAM_NAME: &str = "wadm_status";
/// Default stream name for wadm notifications
pub const DEFAULT_NOTIFY_STREAM_NAME: &str = "wadm_notify";
/// Default stream name for wasmbus events
pub const DEFAULT_WASMBUS_EVENT_STREAM_NAME: &str = "wasmbus_events";

/// Start wadm with the provided [WadmConfig], returning [JoinSet] with two tasks:
/// 1. The server task that listens for API requests
Expand Down Expand Up @@ -117,7 +124,7 @@ pub(crate) const WASMBUS_EVENT_STREAM_NAME: &str = "wasmbus_events";
/// }
/// };
/// ```
pub async fn start_wadm(config: WadmConfig) -> anyhow::Result<JoinSet<anyhow::Result<()>>> {
pub async fn start_wadm(config: WadmConfig) -> Result<JoinSet<Result<()>>> {
// Build storage adapter for lattice state (on by default)
let (client, context) = nats::get_client_and_context(
config.nats_server.clone(),
Expand Down Expand Up @@ -171,7 +178,7 @@ pub async fn start_wadm(config: WadmConfig) -> anyhow::Result<JoinSet<anyhow::Re

let event_stream = nats::ensure_limits_stream(
&context,
internal_stream_name(WADM_EVENT_STREAM_NAME),
internal_stream_name(DEFAULT_WADM_EVENT_STREAM_NAME),
vec![DEFAULT_WADM_EVENTS_TOPIC.to_owned()],
Some(
"A stream that stores all events coming in on the wadm.evt subject in a cluster"
Expand All @@ -186,7 +193,7 @@ pub async fn start_wadm(config: WadmConfig) -> anyhow::Result<JoinSet<anyhow::Re

let command_stream = nats::ensure_stream(
&context,
internal_stream_name(COMMAND_STREAM_NAME),
internal_stream_name(DEFAULT_COMMAND_STREAM_NAME),
vec![DEFAULT_COMMANDS_TOPIC.to_owned()],
Some("A stream that stores all commands for wadm".to_string()),
config.max_command_stream_bytes,
Expand All @@ -196,7 +203,7 @@ pub async fn start_wadm(config: WadmConfig) -> anyhow::Result<JoinSet<anyhow::Re

let status_stream = nats::ensure_status_stream(
&context,
internal_stream_name(STATUS_STREAM_NAME),
internal_stream_name(DEFAULT_STATUS_STREAM_NAME),
vec![DEFAULT_STATUS_TOPIC.to_owned()],
config.max_status_stream_bytes,
config.stream_persistence.into(),
Expand All @@ -222,7 +229,7 @@ pub async fn start_wadm(config: WadmConfig) -> anyhow::Result<JoinSet<anyhow::Re

let wasmbus_event_stream = nats::ensure_limits_stream(
&context,
WASMBUS_EVENT_STREAM_NAME.to_string(),
DEFAULT_WASMBUS_EVENT_STREAM_NAME.to_string(),
wasmbus_event_subjects.clone(),
Some(
"A stream that stores all events coming in on the wasmbus.evt subject in a cluster"
Expand All @@ -237,7 +244,7 @@ pub async fn start_wadm(config: WadmConfig) -> anyhow::Result<JoinSet<anyhow::Re

let notify_stream = nats::ensure_notify_stream(
&context,
NOTIFY_STREAM_NAME.to_owned(),
DEFAULT_NOTIFY_STREAM_NAME.to_owned(),
vec![format!("{WADM_NOTIFY_PREFIX}.*")],
config.max_notify_stream_bytes,
config.stream_persistence.into(),
Expand All @@ -248,7 +255,7 @@ pub async fn start_wadm(config: WadmConfig) -> anyhow::Result<JoinSet<anyhow::Re

let event_consumer_stream = nats::ensure_event_consumer_stream(
&context,
WADM_EVENT_CONSUMER_STREAM_NAME.to_owned(),
DEFAULT_WADM_EVENT_CONSUMER_STREAM_NAME.to_owned(),
DEFAULT_WADM_EVENT_CONSUMER_TOPIC.to_owned(),
vec![&wasmbus_event_stream, &event_stream],
Some(
Expand Down
2 changes: 1 addition & 1 deletion crates/wadm/src/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl From<&str> for StreamPersistence {
}

/// Creates a NATS client from the given options
pub async fn get_client_and_context(
pub(crate) async fn get_client_and_context(
url: String,
js_domain: Option<String>,
seed: Option<String>,
Expand Down

0 comments on commit 64e3d93

Please sign in to comment.