Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor measurement reporter #64

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/aggregator/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl GroupedMessages {

for tag in tags {
let msgs =
PendingMessage::list(conn.clone(), epoch, tag.clone(), profiler.clone()).await?;
PendingMessage::list(conn.clone(), epoch, tag.clone(), profiler.as_ref()).await?;
pending_msgs.insert(tag, msgs);
}
Ok(pending_msgs)
Expand Down Expand Up @@ -132,7 +132,7 @@ impl GroupedMessages {
for new_msgs in new_pending_msgs.chunks(INSERT_BATCH_SIZE) {
let new_msgs = new_msgs.to_vec();
new_msgs
.insert_batch(store_conns.get(), profiler.clone())
.insert_batch(store_conns.get(), profiler.as_ref())
.await?;
}
}
Expand Down Expand Up @@ -337,7 +337,7 @@ mod tests {
let db_pool = Arc::new(DBPool::new(true));
let conn = Arc::new(Mutex::new(db_pool.get().await.unwrap()));
new_rec_msgs
.insert_batch(conn.clone(), profiler.clone())
.insert_batch(conn.clone(), profiler.as_ref())
.await
.unwrap();
drop(conn);
Expand Down
2 changes: 1 addition & 1 deletion src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ pub async fn start_aggregation(
db_conn.clone(),
&epoch_config,
out_stream.as_ref().map(|v| v.as_ref()),
profiler.clone(),
profiler.as_ref(),
)
.await?;
if let Some(out_stream) = out_stream.as_ref() {
Expand Down
26 changes: 10 additions & 16 deletions src/aggregator/processing.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::group::GroupedMessages;
use super::recovered::RecoveredMessages;
use super::report::report_measurements;
use super::report::MeasurementReporter;
use super::AggregatorError;
use crate::epoch::{is_epoch_expired, EpochConfig};
use crate::models::{DBConnection, DBPool, DBStorageConnections, PendingMessage, RecoveredMessage};
Expand All @@ -16,7 +16,7 @@ pub async fn process_expired_epochs(
conn: Arc<Mutex<DBConnection>>,
epoch_config: &EpochConfig,
out_stream: Option<&DynRecordStream>,
profiler: Arc<Profiler>,
profiler: &Profiler,
) -> Result<(), AggregatorError> {
let epochs = RecoveredMessage::list_distinct_epochs(conn.clone()).await?;
for epoch in epochs {
Expand All @@ -29,15 +29,9 @@ pub async fn process_expired_epochs(
.fetch_all_recovered_with_nonzero_count(conn.clone(), epoch as u8, profiler.clone())
.await?;

report_measurements(
&mut rec_msgs,
epoch_config,
epoch as u8,
true,
out_stream,
profiler.clone(),
)
.await?;
MeasurementReporter::new(epoch_config, out_stream, profiler, epoch as u8, true)
.report(&mut rec_msgs)
.await?;
RecoveredMessage::delete_epoch(conn.clone(), epoch, profiler.clone()).await?;
PendingMessage::delete_epoch(conn.clone(), epoch, profiler.clone()).await?;
}
Expand Down Expand Up @@ -194,7 +188,7 @@ pub fn start_subtask(

info!("Task {}: Deleting old pending messages", id);
for (epoch, msg_tag) in pending_tags_to_remove {
PendingMessage::delete_tag(store_conns.get(), epoch as i16, msg_tag, profiler.clone())
PendingMessage::delete_tag(store_conns.get(), epoch as i16, msg_tag, profiler.as_ref())
.await
.unwrap();
}
Expand All @@ -205,14 +199,14 @@ pub fn start_subtask(
let rec_epochs: Vec<u8> = rec_msgs.map.keys().cloned().collect();
let mut measurements_count = 0;
for epoch in rec_epochs {
measurements_count += report_measurements(
&mut rec_msgs,
measurements_count += MeasurementReporter::new(
epoch_config.as_ref(),
out_stream.as_ref().map(|v| v.as_ref()),
profiler.as_ref(),
epoch,
false,
out_stream.as_ref().map(|v| v.as_ref()),
profiler.clone(),
)
.report(&mut rec_msgs)
.await
.unwrap();
}
Expand Down
14 changes: 7 additions & 7 deletions src/aggregator/recovered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl RecoveredMessages {
conn.clone(),
epoch as i16,
msg_tags.to_vec(),
profiler.clone(),
profiler.as_ref(),
)
.await?;
for rec_msg in recovered_msgs {
Expand All @@ -78,7 +78,7 @@ impl RecoveredMessages {
&mut self,
conn: Arc<Mutex<DBConnection>>,
epoch: u8,
profiler: Arc<Profiler>,
profiler: &Profiler,
) -> Result<(), AggregatorError> {
let recovered_msgs =
RecoveredMessage::list_with_nonzero_count(conn, epoch as i16, profiler).await?;
Expand All @@ -103,7 +103,7 @@ impl RecoveredMessages {
store_conns.get(),
rec_msg.id,
rec_msg.count,
profiler.clone(),
profiler.as_ref(),
)
.await?;
}
Expand All @@ -112,7 +112,7 @@ impl RecoveredMessages {
for new_msgs in new_msgs.chunks(INSERT_BATCH_SIZE) {
let new_msgs = new_msgs.to_vec();
new_msgs
.insert_batch(store_conns.get(), profiler.clone())
.insert_batch(store_conns.get(), profiler.as_ref())
.await?;
}
Ok(())
Expand Down Expand Up @@ -260,15 +260,15 @@ mod tests {

new_rec_msgs
.clone()
.insert_batch(conn.clone(), profiler.clone())
.insert_batch(conn.clone(), profiler.as_ref())
.await
.unwrap();

let mut recovered_msgs = RecoveredMessages::default();

for epoch in 3..=4 {
let mut rec_msg =
RecoveredMessage::list(conn.clone(), epoch, vec![vec![60; 20]], profiler.clone())
RecoveredMessage::list(conn.clone(), epoch, vec![vec![60; 20]], profiler.as_ref())
.await
.unwrap()[0]
.clone();
Expand All @@ -287,7 +287,7 @@ mod tests {

for epoch in 3..=4 {
let rec_msg =
RecoveredMessage::list(conn.clone(), epoch, vec![vec![60; 20]], profiler.clone())
RecoveredMessage::list(conn.clone(), epoch, vec![vec![60; 20]], profiler.as_ref())
.await
.unwrap()[0]
.clone();
Expand Down
Loading