Skip to content

Commit

Permalink
fix: make the demo clean up after itself
Browse files Browse the repository at this point in the history
  • Loading branch information
SupernaviX committed Dec 6, 2024
1 parent a4ff638 commit 66e026e
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 8 deletions.
1 change: 1 addition & 0 deletions firefly-cardanoconnect/src/persistence/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl Persistence for MockPersistence {
return Ok(());
};
listeners.retain(|it| it.id != *listener_id);
self.all_blocks.remove(listener_id);

Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions firefly-cardanoconnect/src/persistence/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ impl Persistence for SqlitePersistence {
.call_unwrap(move |c| {
c.prepare_cached("DELETE FROM listeners WHERE id = ?1 AND stream_id = ?2")?
.execute([listener_id.to_string(), stream_id.to_string()])?;
c.prepare_cached("DELETE FROM block_records WHERE listener_id = ?1")?
.execute([listener_id.to_string()])?;
Ok(())
})
.await
Expand Down
21 changes: 13 additions & 8 deletions scripts/demo/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,22 @@ async fn create_or_get_stream(firefly: &FireflyCardanoClient, name: &str) -> Res
}
}

async fn recreate_listener(
async fn clear_listeners(firefly: &FireflyCardanoClient, stream_id: &str) -> Result<()> {
let existing_listeners = firefly.list_listeners(stream_id).await?;
for listener in existing_listeners {
println!("deleting existing listener {}", listener.name);
firefly.delete_listener(stream_id, &listener.id).await?;
}
Ok(())
}

async fn create_listener(
firefly: &FireflyCardanoClient,
stream_id: &str,
name: &str,
from_block: &str,
filters: Vec<ListenerFilter>,
) -> Result<String> {
let existing_listeners = firefly.list_listeners(stream_id).await?;
if let Some(listener) = existing_listeners.iter().find(|l| l.name == name) {
println!("deleting existing listener {name}");
firefly.delete_listener(stream_id, &listener.id).await?;
}
println!("creating listener {name}");
firefly
.create_listener(
Expand Down Expand Up @@ -98,6 +102,7 @@ async fn main() -> Result<()> {
// A "stream" is a communication channel for events.
// Each application will create one and reuse it indefinitely.
let stream_id = create_or_get_stream(&firefly, stream_topic).await?;
clear_listeners(&firefly, &stream_id).await?;

// from_block tells the stream which block to start listening from.
// It can be "earliest", "latest", or a kupo-style "slot.hash" string.
Expand All @@ -106,7 +111,7 @@ async fn main() -> Result<()> {
// A "listener" represents a logical process consuming events from this stream.
// Listeners use filters to listen for specific events.
// Here we create one listener just for our new transaction.
let tx_listener_id = recreate_listener(
let tx_listener_id = create_listener(
&firefly,
&stream_id,
&format!("listener-{txid}"),
Expand All @@ -116,7 +121,7 @@ async fn main() -> Result<()> {
.await?;

// Here we create another that listens to every transaction, so we can track the block height
let all_txs_listener_id = recreate_listener(
let all_txs_listener_id = create_listener(
&firefly,
&stream_id,
"listener-all",
Expand Down

0 comments on commit 66e026e

Please sign in to comment.