Skip to content

Commit

Permalink
Merge pull request #511 from paritytech/AndreiEres/fix-network-termin…
Browse files Browse the repository at this point in the history
…ation

Take down the tracer on the collector termination
  • Loading branch information
AndreiEres authored Aug 24, 2023
2 parents c476a41 + 8498efd commit 6613a13
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 12 deletions.
45 changes: 38 additions & 7 deletions essentials/src/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,16 @@ pub enum CollectorUpdateEvent {
/// Occurs on a new session
NewSession(u32),
/// Occurs when collector is disconnected and is about to terminate
Termination,
Termination(TerminationReason),
}

/// Represents the reason for a collector termination
#[derive(Clone, Debug)]
pub enum TerminationReason {
/// Indicates a normal termination
Normal,
/// Indicates an abnormal termination with error code and additional information
Abnormal(i32, String),
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -243,10 +252,26 @@ impl Collector {
if let Err(error) = self.process_chain_event(event).await {
error!("collector service could not process event: {}", error);
match error {
CollectorError::ExecutorFatal(_) | CollectorError::SendFatal(_) => {
self.broadcast_event_priority(CollectorUpdateEvent::Termination)
.await
.unwrap();
CollectorError::ExecutorFatal(e) => {
self.broadcast_event_priority(CollectorUpdateEvent::Termination(
TerminationReason::Abnormal(
1,
format!("Collector's executor error: {}", e),
),
))
.await
.unwrap();
return
},
CollectorError::SendFatal(e) => {
self.broadcast_event_priority(CollectorUpdateEvent::Termination(
TerminationReason::Abnormal(
1,
format!("Collector's chanel error: {}", e),
),
))
.await
.unwrap();
return
},
_ => continue,
Expand All @@ -255,13 +280,19 @@ impl Collector {
},
Err(e) => {
error!("collector service could not process events: {}", e);
self.broadcast_event_priority(CollectorUpdateEvent::Termination).await.unwrap();
self.broadcast_event_priority(CollectorUpdateEvent::Termination(
TerminationReason::Abnormal(1, format!("Collector's service error: {}", e)),
))
.await
.unwrap();
return
},
},
None => {
error!("no more events from the consumer channel");
self.broadcast_event_priority(CollectorUpdateEvent::Termination).await.unwrap();
self.broadcast_event_priority(CollectorUpdateEvent::Termination(TerminationReason::Normal))
.await
.unwrap();
return
},
}
Expand Down
22 changes: 17 additions & 5 deletions parachain-tracer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use polkadot_introspector_essentials::{
chain_head_subscription::ChainHeadSubscription,
chain_subscription::ChainSubscriptionEvent,
collector,
collector::{Collector, CollectorOptions, CollectorStorageApi, CollectorUpdateEvent},
collector::{Collector, CollectorOptions, CollectorStorageApi, CollectorUpdateEvent, TerminationReason},
consumer::{EventConsumerInit, EventStream},
historical_subscription::HistoricalSubscription,
init,
Expand Down Expand Up @@ -231,9 +231,15 @@ impl ParachainTracer {
CollectorUpdateEvent::NewSession(idx) => {
tracker.new_session(idx).await;
},
CollectorUpdateEvent::Termination => {
CollectorUpdateEvent::Termination(reason) => {
info!("collector is terminating");
break
match reason {
TerminationReason::Normal => break,
TerminationReason::Abnormal(code, info) => {
error!("Shutting down, {}", info);
std::process::exit(code)
},
}
},
},
Err(_) => {
Expand Down Expand Up @@ -298,10 +304,16 @@ impl ParachainTracer {
for to_tracker in trackers.values_mut() {
to_tracker.send(CollectorUpdateEvent::NewSession(idx)).await.unwrap();
},
CollectorUpdateEvent::Termination => {
CollectorUpdateEvent::Termination(reason) => {
info!("Received termination event, {} trackers will be terminated, {} futures are pending",
trackers.len(), futures.len());
break;
match reason {
TerminationReason::Normal => break,
TerminationReason::Abnormal(code, info) => {
error!("Shutting down, {}", info);
std::process::exit(code)
},
}
},
},
None => {
Expand Down

0 comments on commit 6613a13

Please sign in to comment.