diff --git a/firefly-cardanoconnect/src/persistence/mocks.rs b/firefly-cardanoconnect/src/persistence/mocks.rs index 3bfe6c4..0ee2e53 100644 --- a/firefly-cardanoconnect/src/persistence/mocks.rs +++ b/firefly-cardanoconnect/src/persistence/mocks.rs @@ -110,6 +110,7 @@ impl Persistence for MockPersistence { return Ok(()); }; listeners.retain(|it| it.id != *listener_id); + self.all_blocks.remove(listener_id); Ok(()) } diff --git a/firefly-cardanoconnect/src/persistence/sqlite.rs b/firefly-cardanoconnect/src/persistence/sqlite.rs index 7d9b601..8051d22 100644 --- a/firefly-cardanoconnect/src/persistence/sqlite.rs +++ b/firefly-cardanoconnect/src/persistence/sqlite.rs @@ -190,6 +190,8 @@ impl Persistence for SqlitePersistence { .call_unwrap(move |c| { c.prepare_cached("DELETE FROM listeners WHERE id = ?1 AND stream_id = ?2")? .execute([listener_id.to_string(), stream_id.to_string()])?; + c.prepare_cached("DELETE FROM block_records WHERE listener_id = ?1")? + .execute([listener_id.to_string()])?; Ok(()) }) .await diff --git a/scripts/demo/src/main.rs b/scripts/demo/src/main.rs index 8b0725b..03cc848 100644 --- a/scripts/demo/src/main.rs +++ b/scripts/demo/src/main.rs @@ -39,18 +39,22 @@ async fn create_or_get_stream(firefly: &FireflyCardanoClient, name: &str) -> Res } } -async fn recreate_listener( +async fn clear_listeners(firefly: &FireflyCardanoClient, stream_id: &str) -> Result<()> { + let existing_listeners = firefly.list_listeners(stream_id).await?; + for listener in existing_listeners { + println!("deleting existing listener {}", listener.name); + firefly.delete_listener(stream_id, &listener.id).await?; + } + Ok(()) +} + +async fn create_listener( firefly: &FireflyCardanoClient, stream_id: &str, name: &str, from_block: &str, filters: Vec, ) -> Result { - let existing_listeners = firefly.list_listeners(stream_id).await?; - if let Some(listener) = existing_listeners.iter().find(|l| l.name == name) { - println!("deleting existing listener {name}"); - firefly.delete_listener(stream_id, &listener.id).await?; - } println!("creating listener {name}"); firefly .create_listener( @@ -98,6 +102,7 @@ async fn main() -> Result<()> { // A "stream" is a communication channel for events. // Each application will create one and reuse it indefinitely. let stream_id = create_or_get_stream(&firefly, stream_topic).await?; + clear_listeners(&firefly, &stream_id).await?; // from_block tells the stream which block to start listening from. // It can be "earliest", "latest", or a kupo-style "slot.hash" string. @@ -106,7 +111,7 @@ async fn main() -> Result<()> { // A "listener" represents a logical process consuming events from this stream. // Listeners use filters to listen for specific events. // Here we create one listener just for our new transaction. - let tx_listener_id = recreate_listener( + let tx_listener_id = create_listener( &firefly, &stream_id, &format!("listener-{txid}"), @@ -116,7 +121,7 @@ async fn main() -> Result<()> { .await?; // Here we create another that listens to every transaction, so we can track the block height - let all_txs_listener_id = recreate_listener( + let all_txs_listener_id = create_listener( &firefly, &stream_id, "listener-all",