Skip to content

Commit

Permalink
refactor: inline size into LogEntry
Browse files Browse the repository at this point in the history
Signed-off-by: Phoeniix Zhao <[email protected]>
  • Loading branch information
Phoenix500526 committed Jun 5, 2024
1 parent e59e263 commit cdde114
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 160 deletions.
11 changes: 8 additions & 3 deletions crates/curp/src/log_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub struct LogEntry<C> {
pub(crate) propose_id: ProposeId,
/// Entry data
pub(crate) entry_data: EntryData<C>,
/// entry size
pub(crate) size: u64,
}

/// Entry data of a `LogEntry`
Expand Down Expand Up @@ -78,13 +80,16 @@ where
term: u64,
propose_id: ProposeId,
entry_data: impl Into<EntryData<C>>,
) -> Self {
Self {
) -> Result<Self, bincode::Error> {
let mut entry = Self {
term,
index,
propose_id,
entry_data: entry_data.into(),
}
size: 0,
};
entry.size = bincode::serialized_size(&entry)?;
Ok(entry)
}

/// Get the inflight id of this log entry
Expand Down
183 changes: 101 additions & 82 deletions crates/curp/src/server/cmd_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,12 +455,9 @@ mod tests {
done_tx,
);

let entry = Arc::new(LogEntry::new(
1,
1,
ProposeId(0, 0),
Arc::new(TestCommand::default()),
));
let entry = Arc::new(
LogEntry::new(1, 1, ProposeId(0, 0), Arc::new(TestCommand::default())).unwrap(),
);

ce_event_tx.send_sp_exe(Arc::clone(&entry));
assert_eq!(er_rx.recv().await.unwrap().1.values, Vec::<u32>::new());
Expand Down Expand Up @@ -499,12 +496,15 @@ mod tests {
);

let begin = Instant::now();
let entry = Arc::new(LogEntry::new(
1,
1,
ProposeId(0, 0),
Arc::new(TestCommand::default().set_exe_dur(Duration::from_secs(1))),
));
let entry = Arc::new(
LogEntry::new(
1,
1,
ProposeId(0, 0),
Arc::new(TestCommand::default().set_exe_dur(Duration::from_secs(1))),
)
.unwrap(),
);

ce_event_tx.send_sp_exe(Arc::clone(&entry));

Expand Down Expand Up @@ -546,16 +546,19 @@ mod tests {
done_tx,
);

let entry = Arc::new(LogEntry::new(
1,
1,
ProposeId(0, 0),
Arc::new(
TestCommand::default()
.set_exe_dur(Duration::from_secs(1))
.set_exe_should_fail(),
),
));
let entry = Arc::new(
LogEntry::new(
1,
1,
ProposeId(0, 0),
Arc::new(
TestCommand::default()
.set_exe_dur(Duration::from_secs(1))
.set_exe_should_fail(),
),
)
.unwrap(),
);

ce_event_tx.send_sp_exe(Arc::clone(&entry));

Expand Down Expand Up @@ -598,12 +601,9 @@ mod tests {
done_tx,
);

let entry = Arc::new(LogEntry::new(
1,
1,
ProposeId(0, 0),
Arc::new(TestCommand::default()),
));
let entry = Arc::new(
LogEntry::new(1, 1, ProposeId(0, 0), Arc::new(TestCommand::default())).unwrap(),
);

ce_event_tx.send_after_sync(entry);

Expand Down Expand Up @@ -640,12 +640,15 @@ mod tests {
done_tx,
);

let entry = Arc::new(LogEntry::new(
1,
1,
ProposeId(0, 0),
Arc::new(TestCommand::default().set_exe_should_fail()),
));
let entry = Arc::new(
LogEntry::new(
1,
1,
ProposeId(0, 0),
Arc::new(TestCommand::default().set_exe_should_fail()),
)
.unwrap(),
);

ce_event_tx.send_after_sync(entry);

Expand Down Expand Up @@ -685,18 +688,24 @@ mod tests {
done_tx,
);

let entry1 = Arc::new(LogEntry::new(
1,
1,
ProposeId(0, 0),
Arc::new(TestCommand::new_put(vec![1], 1)),
));
let entry2 = Arc::new(LogEntry::new(
2,
1,
ProposeId(0, 1),
Arc::new(TestCommand::new_get(vec![1])),
));
let entry1 = Arc::new(
LogEntry::new(
1,
1,
ProposeId(0, 0),
Arc::new(TestCommand::new_put(vec![1], 1)),
)
.unwrap(),
);
let entry2 = Arc::new(
LogEntry::new(
2,
1,
ProposeId(0, 1),
Arc::new(TestCommand::new_get(vec![1])),
)
.unwrap(),
);

ce_event_tx.send_sp_exe(Arc::clone(&entry1));
ce_event_tx.send_sp_exe(Arc::clone(&entry2));
Expand Down Expand Up @@ -747,31 +756,40 @@ mod tests {
done_tx,
);

let entry1 = Arc::new(LogEntry::new(
1,
1,
ProposeId(0, 0),
Arc::new(TestCommand::new_put(vec![1], 1).set_as_dur(Duration::from_millis(50))),
));
let entry2 = Arc::new(LogEntry::new(
2,
1,
ProposeId(0, 1),
Arc::new(TestCommand::new_get(vec![1])),
));
let entry1 = Arc::new(
LogEntry::new(
1,
1,
ProposeId(0, 0),
Arc::new(TestCommand::new_put(vec![1], 1).set_as_dur(Duration::from_millis(50))),
)
.unwrap(),
);
let entry2 = Arc::new(
LogEntry::new(
2,
1,
ProposeId(0, 1),
Arc::new(TestCommand::new_get(vec![1])),
)
.unwrap(),
);
ce_event_tx.send_sp_exe(Arc::clone(&entry1));
ce_event_tx.send_sp_exe(Arc::clone(&entry2));

assert_eq!(er_rx.recv().await.unwrap().1.revisions, Vec::<i64>::new());

ce_event_tx.send_reset(None);

let entry3 = Arc::new(LogEntry::new(
3,
1,
ProposeId(0, 2),
Arc::new(TestCommand::new_get(vec![1])),
));
let entry3 = Arc::new(
LogEntry::new(
3,
1,
ProposeId(0, 2),
Arc::new(TestCommand::new_get(vec![1])),
)
.unwrap(),
);

ce_event_tx.send_after_sync(entry3);

Expand Down Expand Up @@ -813,23 +831,21 @@ mod tests {
s2_id,
0,
0,
vec![LogEntry::new(
1,
1,
ProposeId(0, 0),
Arc::new(TestCommand::default()),
)],
vec![LogEntry::new(1, 1, ProposeId(0, 0), Arc::new(TestCommand::default())).unwrap()],
0,
)
.unwrap();
start_cmd_workers(Arc::clone(&ce1), Arc::new(curp), task_rx, done_tx);

let entry = Arc::new(LogEntry::new(
1,
1,
ProposeId(0, 1),
Arc::new(TestCommand::new_put(vec![1], 1).set_exe_dur(Duration::from_millis(50))),
));
let entry = Arc::new(
LogEntry::new(
1,
1,
ProposeId(0, 1),
Arc::new(TestCommand::new_put(vec![1], 1).set_exe_dur(Duration::from_millis(50))),
)
.unwrap(),
);

ce_event_tx.send_after_sync(entry);

Expand Down Expand Up @@ -866,12 +882,15 @@ mod tests {

ce_event_tx.send_reset(Some(snapshot)).await.unwrap();

let entry = Arc::new(LogEntry::new(
1,
1,
ProposeId(0, 2),
Arc::new(TestCommand::new_get(vec![1])),
));
let entry = Arc::new(
LogEntry::new(
1,
1,
ProposeId(0, 2),
Arc::new(TestCommand::new_get(vec![1])),
)
.unwrap(),
);
ce_event_tx.send_after_sync(entry);
assert_eq!(er_rx.recv().await.unwrap().1.revisions, vec![1]);
task_manager1.shutdown(true).await;
Expand Down
Loading

0 comments on commit cdde114

Please sign in to comment.