Skip to content

Commit

Permalink
Add logging to event bus and fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Jun 14, 2024
1 parent 3457e98 commit 9a27e5c
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 23 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 3 additions & 6 deletions src/app/cli/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ pub fn configure_cli_catalog(

#[tracing::instrument(level = "info", skip_all)]
async fn initialize_components(cli_catalog: &Catalog) -> Result<(), CLIError> {
// TODO: For some reason, we if we do in a single transaction, there is a hangup
// TODO: Generalize on-startup initialization into a trait
DatabaseTransactionRunner::new(cli_catalog.clone())
.transactional(|transactional_catalog| async move {
let registrator = transactional_catalog
Expand All @@ -467,11 +467,8 @@ async fn initialize_components(cli_catalog: &Catalog) -> Result<(), CLIError> {
registrator
.ensure_predefined_accounts_are_registered()
.await
.map_err(CLIError::critical)
})
.await?;
DatabaseTransactionRunner::new(cli_catalog.clone())
.transactional(|transactional_catalog| async move {
.map_err(CLIError::critical)?;

let initializer = transactional_catalog
.get_one::<DatasetOwnershipServiceInMemoryStateInitializer>()
.map_err(CLIError::critical)?;
Expand Down
2 changes: 1 addition & 1 deletion src/infra/core/tests/tests/test_query_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ async fn test_dataset_tail_empty_dataset() {
let tempdir = tempfile::tempdir().unwrap();
let catalog = create_catalog_with_local_workspace(
tempdir.path(),
MockDatasetActionAuthorizer::new().expect_check_read_a_dataset(2),
MockDatasetActionAuthorizer::new().expect_check_read_a_dataset(1),
);

let dataset_repo = catalog.get_one::<dyn DatasetRepository>().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/infra/core/tests/tests/test_sync_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn do_test_sync(

let dataset_authorizer = construct_authorizer(
&AuthorizationExpectations {
d1_reads: 8,
d1_reads: 7,
d2_reads: 2,
d1_writes: 1,
d2_writes: 4,
Expand Down
3 changes: 2 additions & 1 deletion src/utils/event-bus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ internal-error = { workspace = true }
async-trait = "0.1"
dill = "0.8"
futures = "0.3"
tracing = "0.1"

[dev-dependencies]
test-log = { version = "0.2", features = ["trace"] }
tokio = { version = "1", default-features = false, features=["rt", "macros"] }
tokio = { version = "1", default-features = false, features = ["rt", "macros"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
thiserror = { version = "1", default-features = false }
39 changes: 25 additions & 14 deletions src/utils/event-bus/src/event_bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

use std::sync::Arc;

use dill::{component, Catalog};
use dill::{Builder, Catalog};
use internal_error::InternalError;

use crate::{AsyncEventHandler, EventHandler};
Expand All @@ -20,7 +20,7 @@ pub struct EventBus {
catalog: Arc<Catalog>,
}

#[component(pub)]
#[dill::component(pub)]
impl EventBus {
pub fn new(catalog: Arc<Catalog>) -> EventBus {
Self { catalog }
Expand All @@ -37,13 +37,16 @@ impl EventBus {
}

fn sync_dispatch<TEvent: 'static + Clone>(&self, event: &TEvent) -> Result<(), InternalError> {
let sync_handlers = self
.catalog
.get::<dill::AllOf<dyn EventHandler<TEvent>>>()
.unwrap();
let builders = self.catalog.builders_for::<dyn EventHandler<TEvent>>();

for sync_handler in sync_handlers {
sync_handler.handle(event)?;
for b in builders {
tracing::debug!(
handler = b.instance_type_name(),
event = std::any::type_name::<TEvent>(),
"Dispatching event to sync handler"
);
let inst = b.get(&self.catalog).unwrap();
inst.handle(event)?;
}

Ok(())
Expand All @@ -53,17 +56,25 @@ impl EventBus {
&self,
event: &TEvent,
) -> Result<(), InternalError> {
let async_handlers = self
.catalog
.get::<dill::AllOf<dyn AsyncEventHandler<TEvent>>>()
.unwrap();
let builders = self.catalog.builders_for::<dyn AsyncEventHandler<TEvent>>();

let async_handler_futures: Vec<_> = async_handlers
let mut handlers = Vec::new();
for b in builders {
tracing::debug!(
handler = b.instance_type_name(),
event = std::any::type_name::<TEvent>(),
"Dispatching event to async handler"
);
let handler = b.get(&self.catalog).unwrap();
handlers.push(handler);
}

let futures: Vec<_> = handlers
.iter()
.map(|handler| handler.handle(event))
.collect();

let results = futures::future::join_all(async_handler_futures).await;
let results = futures::future::join_all(futures).await;
results.into_iter().try_for_each(|res| res)?;

Ok(())
Expand Down

0 comments on commit 9a27e5c

Please sign in to comment.