Skip to content

Commit

Permalink
custom parallization
Browse files Browse the repository at this point in the history
  • Loading branch information
rtso committed Oct 2, 2024
1 parent 205d703 commit 3cb1cee
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 4 deletions.
156 changes: 152 additions & 4 deletions rust/sdk-examples/src/processors/events/events_storer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,29 @@ use crate::{
use ahash::AHashMap;
use anyhow::Result;
use aptos_indexer_processor_sdk::{
traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable},
traits::{
async_step::AsyncRunType, processable::CustomRunType, AsyncStep, IntoRunnableStep,
NamedStep, Processable, RunnableAsyncStep, RunnableStep,
},
types::transaction_context::TransactionContext,
utils::errors::ProcessorError,
utils::{
errors::ProcessorError,
step_metrics::{StepMetricLabels, StepMetricsBuilder},
},
};
use async_trait::async_trait;
use bigdecimal::Zero;
use diesel::{
pg::{upsert::excluded, Pg},
query_builder::QueryFragment,
ExpressionMethods,
};
use tracing::{error, info};
use instrumented_channel::{
instrumented_bounded_channel, InstrumentedAsyncReceiver, InstrumentedAsyncSender,
};
use std::time::{Duration, Instant};
use tokio::task::JoinHandle;
use tracing::{error, info, warn};

pub struct EventsStorer
where
Expand Down Expand Up @@ -55,7 +67,7 @@ fn insert_events_query(
impl Processable for EventsStorer {
type Input = EventModel;
type Output = EventModel;
type RunType = AsyncRunType;
type RunType = CustomRunType;

async fn process(
&mut self,
Expand Down Expand Up @@ -91,3 +103,139 @@ impl NamedStep for EventsStorer {
"EventsStorer".to_string()
}
}

// This trait implementation is required if you want to customize running the step
impl IntoRunnableStep<EventModel, EventModel, EventsStorer, CustomRunType> for EventsStorer {
fn into_runnable_step(self) -> impl RunnableStep<EventModel, EventModel> {
RunnableEventsStorer::new(self)
}
}

pub struct RunnableEventsStorer {
pub step: EventsStorer,
}

impl RunnableEventsStorer {
pub fn new(step: EventsStorer) -> Self {
Self { step }
}
}

impl RunnableStep<EventModel, EventModel> for RunnableEventsStorer {
fn spawn(
self,
input_receiver: Option<InstrumentedAsyncReceiver<TransactionContext<EventModel>>>,
output_channel_size: usize,
_input_sender: Option<InstrumentedAsyncSender<TransactionContext<EventModel>>>,
) -> (
InstrumentedAsyncReceiver<TransactionContext<EventModel>>,
JoinHandle<()>,
) {
let mut step = self.step;
let step_name = step.name();
let input_receiver = input_receiver.expect("Input receiver must be set");

let (output_sender, output_receiver) =
instrumented_bounded_channel(&step_name, output_channel_size);

// TIP: You may replace this tokio task with your own code to customize the parallelization of this step
info!(step_name = step_name, "Spawning processing task");
let handle = tokio::spawn(async move {
loop {
let input_with_context = match input_receiver.recv().await {
Ok(input_with_context) => input_with_context,
Err(e) => {
// If the previous steps have finished and the channels have closed , we should break out of the loop
warn!(
step_name = step_name,
error = e.to_string(),
"No input received from channel"
);
break;
},
};
let processing_duration = Instant::now();
let output_with_context = match step.process(input_with_context).await {
Ok(output_with_context) => output_with_context,
Err(e) => {
error!(
step_name = step_name,
error = e.to_string(),
"Failed to process input"
);
break;
},
};
if let Some(output_with_context) = output_with_context {
match StepMetricsBuilder::default()
.labels(StepMetricLabels {
step_name: step.name(),
})
.latest_processed_version(output_with_context.end_version)
.latest_transaction_timestamp(
output_with_context.get_start_transaction_timestamp_unix(),
)
.num_transactions_processed_count(
output_with_context.get_num_transactions(),
)
.processing_duration_in_secs(processing_duration.elapsed().as_secs_f64())
.processed_size_in_bytes(output_with_context.total_size_in_bytes)
.build()
{
Ok(mut metrics) => metrics.log_metrics(),
Err(e) => {
error!(
step_name = step_name,
error = e.to_string(),
"Failed to log metrics"
);
break;
},
}
match output_sender.send(output_with_context).await {
Ok(_) => (),
Err(e) => {
error!(
step_name = step_name,
error = e.to_string(),
"Error sending output to channel"
);
break;
},
}
}
}

// Wait for output channel to be empty before ending the task and closing the send channel
loop {
let channel_size = output_sender.len();
info!(
step_name = step_name,
channel_size = channel_size,
"Waiting for output channel to be empty"
);
if channel_size.is_zero() {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
info!(
step_name = step_name,
"Output channel is empty. Closing send channel."
);
});

(output_receiver, handle)
}
}

impl NamedStep for RunnableEventsStorer {
fn name(&self) -> String {
self.step.name()
}

fn type_name(&self) -> String {
let step_type = std::any::type_name::<EventsStorer>().to_string();
format!("{} (via RunnableAsyncStep)", step_type)
}
}
4 changes: 4 additions & 0 deletions rust/sdk/src/traits/processable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ pub trait RunnableStepType {}
// This is a dummy implementation for the unit type
impl RunnableStepType for () {}

pub struct CustomRunType;

impl RunnableStepType for CustomRunType {}

#[async_trait]
pub trait Processable
where
Expand Down

0 comments on commit 3cb1cee

Please sign in to comment.