diff --git a/Cargo.toml b/Cargo.toml index 8450526e9..07741ce40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ bytes = "1.0" chrono = { version = "0.4" } clap = { version = "4.1.11", features = ["derive", "env"] } derive_more = { version = "1.0", features = ["std", "from", "try_into", "display"] } +dupit = { version = "0.2.0" } futures = "0.3" lazy_static = "1.4.0" maplit = "1.0.2" diff --git a/openraft/Cargo.toml b/openraft/Cargo.toml index cdc5fe670..931600d88 100644 --- a/openraft/Cargo.toml +++ b/openraft/Cargo.toml @@ -21,6 +21,7 @@ byte-unit = { workspace = true } chrono = { workspace = true } clap = { workspace = true } derive_more = { workspace = true } +dupit = { workspace = true } futures = { workspace = true } openraft-macros = { path = "../macros", version = "0.10.0" } maplit = { workspace = true } diff --git a/openraft/src/core/heartbeat/event.rs b/openraft/src/core/heartbeat/event.rs index 931f0810b..1025b4aad 100644 --- a/openraft/src/core/heartbeat/event.rs +++ b/openraft/src/core/heartbeat/event.rs @@ -8,7 +8,7 @@ use crate::LogId; use crate::RaftTypeConfig; /// The information for broadcasting a heartbeat. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] #[derive(PartialEq, Eq)] pub struct HeartbeatEvent where C: RaftTypeConfig diff --git a/openraft/src/core/heartbeat/handle.rs b/openraft/src/core/heartbeat/handle.rs index 42065ba0e..c86d50485 100644 --- a/openraft/src/core/heartbeat/handle.rs +++ b/openraft/src/core/heartbeat/handle.rs @@ -69,19 +69,19 @@ where C: RaftTypeConfig { for (target, node) in targets { tracing::debug!("id={} spawn HeartbeatWorker target={}", self.id, target); - let network = network_factory.new_client(target, &node).await; + let network = network_factory.new_client(target.clone(), &node).await; let worker = HeartbeatWorker { - id: self.id, + id: self.id.clone(), rx: self.rx.clone(), network, - target, + target: target.clone(), node, config: self.config.clone(), tx_notification: tx_notification.clone(), }; - let span = tracing::span!(parent: &Span::current(), Level::DEBUG, "heartbeat", id=display(self.id), target=display(target)); + let span = tracing::span!(parent: &Span::current(), Level::DEBUG, "heartbeat", id=display(&self.id), target=display(&target)); let (tx_shutdown, rx_shutdown) = C::oneshot(); diff --git a/openraft/src/core/heartbeat/worker.rs b/openraft/src/core/heartbeat/worker.rs index c8fe20d55..f80d86cc6 100644 --- a/openraft/src/core/heartbeat/worker.rs +++ b/openraft/src/core/heartbeat/worker.rs @@ -71,7 +71,7 @@ where _ = self.rx.changed().fuse() => {}, } - let heartbeat: Option> = *self.rx.borrow_watched(); + let heartbeat: Option> = self.rx.borrow_watched().clone(); // None is the initial value of the WatchReceiver, ignore it. let Some(heartbeat) = heartbeat else { @@ -82,9 +82,9 @@ where let option = RPCOption::new(timeout); let payload = AppendEntriesRequest { - vote: heartbeat.session_id.leader_vote.into_vote(), + vote: heartbeat.session_id.leader_vote.clone().into_vote(), prev_log_id: None, - leader_commit: heartbeat.committed, + leader_commit: heartbeat.committed.clone(), entries: vec![], }; @@ -94,9 +94,9 @@ where match res { Ok(Ok(_)) => { let res = self.tx_notification.send(Notification::HeartbeatProgress { - session_id: heartbeat.session_id, + session_id: heartbeat.session_id.clone(), sending_time: heartbeat.time, - target: self.target, + target: self.target.clone(), }); if res.is_err() { diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 2e5e83830..9384f8d81 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -251,7 +251,7 @@ where Err(err) } - #[tracing::instrument(level="trace", skip_all, fields(id=display(self.id), cluster=%self.config.cluster_name))] + #[tracing::instrument(level="trace", skip_all, fields(id=display(&self.id), cluster=%self.config.cluster_name))] async fn do_main(&mut self, rx_shutdown: OneshotReceiverOf) -> Result> { tracing::debug!("raft node is initializing"); @@ -294,18 +294,18 @@ where // TODO: this applied is a little stale when being returned to client. // Fix this when the following heartbeats are replaced with calling RaftNetwork. - let applied = self.engine.state.io_applied().copied(); + let applied = self.engine.state.io_applied().cloned(); (read_log_id, applied) }; - let my_id = self.id; - let my_vote = *self.engine.state.vote_ref(); + let my_id = self.id.clone(); + let my_vote = self.engine.state.vote_ref().clone(); let ttl = Duration::from_millis(self.config.heartbeat_interval); let eff_mem = self.engine.state.membership_state.effective().clone(); let core_tx = self.tx_notification.clone(); - let mut granted = btreeset! {my_id}; + let mut granted = btreeset! {my_id.clone()}; if eff_mem.is_quorum(granted.iter()) { let _ = tx.send(Ok(resp)); @@ -321,41 +321,46 @@ where }; for (target, progress) in voter_progresses { - let target = *target; + let target = target.clone(); if target == my_id { continue; } let rpc = AppendEntriesRequest { - vote: my_vote, - prev_log_id: progress.matching, + vote: my_vote.clone(), + prev_log_id: progress.matching.clone(), entries: vec![], - leader_commit: self.engine.state.committed().copied(), + leader_commit: self.engine.state.committed().cloned(), }; // Safe unwrap(): target is in membership let target_node = eff_mem.get_node(&target).unwrap().clone(); - let mut client = self.network_factory.new_client(target, &target_node).await; + let mut client = self.network_factory.new_client(target.clone(), &target_node).await; let option = RPCOption::new(ttl); - let fu = async move { - let outer_res = C::timeout(ttl, client.append_entries(rpc, option)).await; - match outer_res { - Ok(append_res) => match append_res { - Ok(x) => Ok((target, x)), - Err(err) => Err((target, err)), - }, - Err(_timeout) => { - let timeout_err = Timeout { - action: RPCTypes::AppendEntries, - id: my_id, - target, - timeout: ttl, - }; + let fu = { + let my_id = my_id.clone(); + let target = target.clone(); - Err((target, RPCError::Timeout(timeout_err))) + async move { + let outer_res = C::timeout(ttl, client.append_entries(rpc, option)).await; + match outer_res { + Ok(append_res) => match append_res { + Ok(x) => Ok((target, x)), + Err(err) => Err((target, err)), + }, + Err(_timeout) => { + let timeout_err = Timeout { + action: RPCTypes::AppendEntries, + id: my_id, + target: target.clone(), + timeout: ttl, + }; + + Err((target, RPCError::Timeout(timeout_err))) + } } } }; @@ -471,7 +476,7 @@ where /// /// The result of applying it to state machine is sent to `resp_tx`, if it is not `None`. /// The calling side may not receive a result from `resp_tx`, if raft is shut down. - #[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))] + #[tracing::instrument(level = "debug", skip_all, fields(id = display(&self.id)))] pub fn write_entry(&mut self, entry: C::Entry, resp_tx: Option>) { tracing::debug!(payload = display(&entry), "write_entry"); @@ -482,7 +487,7 @@ where // If the leader is transferring leadership, forward writes to the new leader. if let Some(to) = lh.leader.get_transfer_to() { if let Some(tx) = tx { - let err = lh.state.new_forward_to_leader(*to); + let err = lh.state.new_forward_to_leader(to.clone()); tx.send(Err(ClientWriteError::ForwardToLeader(err))); } return; @@ -501,7 +506,7 @@ where } /// Send a heartbeat message to every follower/learners. - #[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))] + #[tracing::instrument(level = "debug", skip_all, fields(id = display(&self.id)))] pub(crate) fn send_heartbeat(&mut self, emitter: impl fmt::Display) -> bool { tracing::debug!(now = display(C::now().display()), "send_heartbeat"); @@ -533,10 +538,21 @@ where pub fn flush_metrics(&mut self) { let (replication, heartbeat) = if let Some(leader) = self.engine.leader.as_ref() { let replication_prog = &leader.progress; - let replication = Some(replication_prog.iter().map(|(id, p)| (*id, *p.borrow())).collect()); + let replication = Some( + replication_prog + .iter() + .map(|(id, p)| { + ( + id.clone(), + as Borrow>>>::borrow(p).clone(), + ) + }) + .collect(), + ); let clock_prog = &leader.clock_progress; - let heartbeat = Some(clock_prog.iter().map(|(id, opt_t)| (*id, opt_t.map(SerdeInstant::new))).collect()); + let heartbeat = + Some(clock_prog.iter().map(|(id, opt_t)| (id.clone(), opt_t.map(SerdeInstant::new))).collect()); (replication, heartbeat) } else { @@ -563,19 +579,19 @@ where #[allow(deprecated)] let m = RaftMetrics { running_state: Ok(()), - id: self.id, + id: self.id.clone(), // --- data --- current_term: st.vote_ref().leader_id().get_term(), vote: st.io_state().io_progress.flushed().map(|io_id| io_id.to_vote()).unwrap_or_default(), last_log_index: st.last_log_id().index(), - last_applied: st.io_applied().copied(), - snapshot: st.io_snapshot_last_log_id().copied(), - purged: st.io_purged().copied(), + last_applied: st.io_applied().cloned(), + snapshot: st.io_snapshot_last_log_id().cloned(), + purged: st.io_purged().cloned(), // --- cluster --- state: st.server_state, - current_leader, + current_leader: current_leader.clone(), millis_since_quorum_ack, last_quorum_acked: last_quorum_acked.map(SerdeInstant::new), membership_config: membership_config.clone(), @@ -587,10 +603,10 @@ where #[allow(deprecated)] let data_metrics = RaftDataMetrics { - last_log: st.last_log_id().copied(), - last_applied: st.io_applied().copied(), - snapshot: st.io_snapshot_last_log_id().copied(), - purged: st.io_purged().copied(), + last_log: st.last_log_id().cloned(), + last_applied: st.io_applied().cloned(), + snapshot: st.io_snapshot_last_log_id().cloned(), + purged: st.io_purged().cloned(), millis_since_quorum_ack, last_quorum_acked: last_quorum_acked.map(SerdeInstant::new), replication, @@ -598,7 +614,7 @@ where }; let server_metrics = RaftServerMetrics { - id: self.id, + id: self.id.clone(), vote: st.io_state().io_progress.flushed().map(|io_id| io_id.to_vote()).unwrap_or_default(), state: st.server_state, current_leader, @@ -630,7 +646,7 @@ where let res = self.tx_metrics.send(m); if let Err(err) = res { - tracing::error!(error=%err, id=display(self.id), "error reporting metrics"); + tracing::error!(error=%err, id=display(&self.id), "error reporting metrics"); } } @@ -661,7 +677,7 @@ where // There is no Leader yet therefore use [`Condition::LogFlushed`] instead of // [`Condition::IOFlushed`]. Some(Condition::LogFlushed { - log_id: self.engine.state.last_log_id().copied(), + log_id: self.engine.state.last_log_id().cloned(), }) }; self.engine.output.push_command(Command::Respond { @@ -682,14 +698,17 @@ where pub(crate) fn reject_with_forward_to_leader(&self, tx: ResultSender) where E: From> + OptionalSend { let mut leader_id = self.current_leader(); - let leader_node = self.get_leader_node(leader_id); + let leader_node = self.get_leader_node(leader_id.clone()); // Leader is no longer a node in the membership config. if leader_node.is_none() { leader_id = None; } - let err = ForwardToLeader { leader_id, leader_node }; + let err = ForwardToLeader { + leader_id: leader_id, + leader_node, + }; let _ = tx.send(Err(err.into())); } @@ -697,7 +716,7 @@ where #[tracing::instrument(level = "debug", skip(self))] pub(crate) fn current_leader(&self) -> Option { tracing::debug!( - self_id = display(self.id), + self_id = display(&self.id), vote = display(self.engine.state.vote_ref()), "get current_leader" ); @@ -756,8 +775,8 @@ where last.index ); - let cmd = sm::Command::apply(first, last); - self.sm_handle.send(cmd).map_err(|e| StorageError::apply(last, AnyError::error(e)))?; + let cmd = sm::Command::apply(first, last.clone()); + self.sm_handle.send(cmd).map_err(|e| StorageError::apply(last.clone(), AnyError::error(e)))?; Ok(()) } @@ -813,25 +832,25 @@ where let target_node = self.engine.state.membership_state.effective().get_node(&target).unwrap(); let membership_log_id = self.engine.state.membership_state.effective().log_id(); - let network = self.network_factory.new_client(target, target_node).await; - let snapshot_network = self.network_factory.new_client(target, target_node).await; + let network = self.network_factory.new_client(target.clone(), target_node).await; + let snapshot_network = self.network_factory.new_client(target.clone(), target_node).await; let leader = self.engine.leader.as_ref().unwrap(); - let session_id = ReplicationSessionId::new(leader.committed_vote, *membership_log_id); + let session_id = ReplicationSessionId::new(leader.committed_vote.clone(), membership_log_id.clone()); ReplicationCore::::spawn( - target, + target.clone(), session_id, self.config.clone(), - self.engine.state.committed().copied(), + self.engine.state.committed().cloned(), progress_entry.matching, network, snapshot_network, self.log_store.get_log_reader().await, self.sm_handle.new_snapshot_reader(), self.tx_notification.clone(), - tracing::span!(parent: &self.span, Level::DEBUG, "replication", id=display(self.id), target=display(target)), + tracing::span!(parent: &self.span, Level::DEBUG, "replication", id=display(&self.id), target=display(&target)), ) } @@ -845,7 +864,7 @@ where let nodes = std::mem::take(&mut self.replications); tracing::debug!( - targets = debug(nodes.iter().map(|x| *x.0).collect::>()), + targets = debug(nodes.iter().map(|x| x.0.clone()).collect::>()), "remove all targets from replication_metrics" ); @@ -903,7 +922,7 @@ where /// Run an event handling loop /// /// It always returns a [`Fatal`] error upon returning. - #[tracing::instrument(level="debug", skip_all, fields(id=display(self.id)))] + #[tracing::instrument(level="debug", skip_all, fields(id=display(&self.id)))] async fn runtime_loop(&mut self, mut rx_shutdown: OneshotReceiverOf) -> Result> { // Ratio control the ratio of number of RaftMsg to process to number of Notification to process. let mut balancer = Balancer::new(10_000); @@ -1059,7 +1078,7 @@ where async fn spawn_parallel_vote_requests(&mut self, vote_req: &VoteRequest) { let members = self.engine.state.membership_state.effective().voter_ids(); - let vote = vote_req.vote; + let vote = vote_req.vote.clone(); for target in members { if target == self.id { @@ -1070,49 +1089,54 @@ where // Safe unwrap(): target must be in membership let target_node = self.engine.state.membership_state.effective().get_node(&target).unwrap().clone(); - let mut client = self.network_factory.new_client(target, &target_node).await; + let mut client = self.network_factory.new_client(target.clone(), &target_node).await; let tx = self.tx_notification.clone(); let ttl = Duration::from_millis(self.config.election_timeout_min); - let id = self.id; + let id = self.id.clone(); let option = RPCOption::new(ttl); + let vote = vote.clone(); + // False positive lint warning(`non-binding `let` on a future`): https://github.com/rust-lang/rust-clippy/issues/9932 #[allow(clippy::let_underscore_future)] let _ = C::spawn( - async move { - let tm_res = C::timeout(ttl, client.vote(req, option)).await; - let res = match tm_res { - Ok(res) => res, - - Err(_timeout) => { - let timeout_err = Timeout:: { - action: RPCTypes::Vote, - id, - target, - timeout: ttl, - }; - tracing::error!({error = %timeout_err, target = display(target)}, "timeout"); - return; - } - }; + { + let target = (&target).clone(); + async move { + let tm_res = C::timeout(ttl, client.vote(req, option)).await; + let res = match tm_res { + Ok(res) => res, + + Err(_timeout) => { + let timeout_err = Timeout:: { + action: RPCTypes::Vote, + id, + target: target.clone(), + timeout: ttl, + }; + tracing::error!({error = %timeout_err}, "timeout"); + return; + } + }; - match res { - Ok(resp) => { - let _ = tx.send(Notification::VoteResponse { - target, - resp, - candidate_vote: vote.into_non_committed(), - }); + match res { + Ok(resp) => { + let _ = tx.send(Notification::VoteResponse { + target, + resp, + candidate_vote: vote.into_non_committed(), + }); + } + Err(err) => tracing::error!({error=%err, target=display(&target)}, "while requesting vote"), } - Err(err) => tracing::error!({error=%err, target=display(target)}, "while requesting vote"), } } .instrument(tracing::debug_span!( parent: &Span::current(), "send_vote_req", - target = display(target) + target = display(&target) )), ); } @@ -1132,32 +1156,35 @@ where // Safe unwrap(): target must be in membership let target_node = self.engine.state.membership_state.effective().get_node(&target).unwrap().clone(); - let mut client = self.network_factory.new_client(target, &target_node).await; + let mut client = self.network_factory.new_client(target.clone(), &target_node).await; let ttl = Duration::from_millis(self.config.election_timeout_min); let option = RPCOption::new(ttl); - let fut = async move { - let tm_res = C::timeout(ttl, client.transfer_leader(r, option)).await; - let res = match tm_res { - Ok(res) => res, - Err(timeout) => { - tracing::error!({error = display(timeout), target = display(target)}, "timeout sending transfer_leader"); - return; - } - }; + let fut = { + let target = target.clone(); + async move { + let tm_res = C::timeout(ttl, client.transfer_leader(r, option)).await; + let res = match tm_res { + Ok(res) => res, + Err(timeout) => { + tracing::error!({error = display(timeout), target = display(&target)}, "timeout sending transfer_leader"); + return; + } + }; - if let Err(e) = res { - tracing::error!({error = display(e), target = display(target)}, "error sending transfer_leader"); - } else { - tracing::info!("Done transfer_leader sent to {}", target); + if let Err(e) = res { + tracing::error!({error = display(e), target = display(&target)}, "error sending transfer_leader"); + } else { + tracing::info!("Done transfer_leader sent to {}", target); + } } }; let span = tracing::debug_span!( parent: &Span::current(), "send_transfer_leader", - target = display(target) + target = display(&target) ); // False positive lint warning(`non-binding `let` on a future`): https://github.com/rust-lang/rust-clippy/issues/9932 @@ -1172,7 +1199,7 @@ where let resp = self.engine.handle_vote_req(req); let condition = Some(Condition::IOFlushed { - io_id: IOId::new(*self.engine.state.vote_ref()), + io_id: IOId::new(self.engine.state.vote_ref()), }); self.engine.output.push_command(Command::Respond { when: condition, @@ -1192,7 +1219,7 @@ where } // TODO: Make this method non-async. It does not need to run any async command in it. - #[tracing::instrument(level = "debug", skip(self, msg), fields(state = debug(self.engine.state.server_state), id=display(self.id)))] + #[tracing::instrument(level = "debug", skip(self, msg), fields(state = debug(self.engine.state.server_state), id=display(&self.id)))] pub(crate) async fn handle_api_msg(&mut self, msg: RaftMsg) { tracing::debug!("RAFT_event id={:<2} input: {}", self.id, msg); @@ -1299,7 +1326,7 @@ where }; } - #[tracing::instrument(level = "debug", skip_all, fields(state = debug(self.engine.state.server_state), id=display(self.id)))] + #[tracing::instrument(level = "debug", skip_all, fields(state = debug(self.engine.state.server_state), id=display(&self.id)))] pub(crate) fn handle_notification(&mut self, notify: Notification) -> Result<(), Fatal> { tracing::debug!("RAFT_event id={:<2} notify: {}", self.id, notify); @@ -1399,7 +1426,7 @@ where } Notification::LocalIO { io_id } => { - self.engine.state.io_state.io_progress.flush(io_id); + self.engine.state.io_state.io_progress.flush(io_id.clone()); match io_id { IOId::Log(log_io_id) => { @@ -1438,8 +1465,8 @@ where } => { if self.does_replication_session_match(&session_id, "HeartbeatProgress") { tracing::debug!( - session_id = display(session_id), - target = display(target), + session_id = display(&session_id), + target = display(&target), sending_time = display(sending_time.display()), "HeartbeatProgress" ); @@ -1465,7 +1492,7 @@ where // Update in-memory state first, then the io state. // In-memory state should always be ahead or equal to the io state. - let last_log_id = meta.last_log_id; + let last_log_id = meta.last_log_id.clone(); self.engine.finish_building_snapshot(meta); let st = self.engine.state.io_state_mut(); @@ -1483,12 +1510,12 @@ where if let Some(meta) = meta { let st = self.engine.state.io_state_mut(); - st.update_applied(meta.last_log_id); + st.update_applied(meta.last_log_id.clone()); st.update_snapshot(meta.last_log_id); } } sm::Response::Apply(res) => { - self.engine.state.io_state_mut().update_applied(Some(res.last_applied)); + self.engine.state.io_state_mut().update_applied(Some(res.last_applied.clone())); self.handle_apply_result(res); } @@ -1556,7 +1583,7 @@ where /// it is a stale message and should be just ignored. fn does_candidate_vote_match(&self, candidate_vote: &NonCommittedVote, msg: impl fmt::Display) -> bool { // If it finished voting, Candidate's vote is None. - let Some(my_vote) = self.engine.candidate_ref().map(|x| *x.vote_ref()) else { + let Some(my_vote) = self.engine.candidate_ref().map(|x| x.vote_ref().clone()) else { tracing::warn!( "A message will be ignored because this node is no longer Candidate: \ msg sent by vote: {}; when ({})", @@ -1583,7 +1610,7 @@ where /// If a message is sent by a previous Leader but is received by current Leader, /// it is a stale message and should be just ignored. fn does_leader_vote_match(&self, leader_vote: &CommittedVote, msg: impl fmt::Display) -> bool { - let Some(my_vote) = self.engine.leader.as_ref().map(|x| x.committed_vote) else { + let Some(my_vote) = self.engine.leader.as_ref().map(|x| x.committed_vote.clone()) else { tracing::warn!( "A message will be ignored because this node is no longer Leader: \ msg sent by vote: {}; when ({})", @@ -1697,9 +1724,9 @@ where match cmd { Command::UpdateIOProgress { io_id, .. } => { - self.engine.state.io_state.io_progress.submit(io_id); + self.engine.state.io_state.io_progress.submit(io_id.clone()); - let notify = Notification::LocalIO { io_id }; + let notify = Notification::LocalIO { io_id: io_id.clone() }; let _ = self.tx_notification.send(notify); } @@ -1707,11 +1734,11 @@ where committed_vote: vote, entries, } => { - let last_log_id = *entries.last().unwrap().get_log_id(); + let last_log_id = entries.last().unwrap().get_log_id(); tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),); - let io_id = IOId::new_log_io(vote, Some(last_log_id)); - let notify = Notification::LocalIO { io_id }; + let io_id = IOId::new_log_io(vote, Some(last_log_id.clone())); + let notify = Notification::LocalIO { io_id: io_id.clone() }; let callback = IOFlushed::new(notify, self.tx_notification.downgrade()); // Mark this IO request as submitted, @@ -1728,16 +1755,18 @@ where self.log_store.append(entries, callback).await?; } Command::SaveVote { vote } => { - self.engine.state.io_state_mut().io_progress.submit(IOId::new(vote)); + self.engine.state.io_state_mut().io_progress.submit(IOId::new(&vote)); self.log_store.save_vote(&vote).await?; - let _ = self.tx_notification.send(Notification::LocalIO { io_id: IOId::new(vote) }); + let _ = self.tx_notification.send(Notification::LocalIO { + io_id: IOId::new(&vote), + }); // If a non-committed vote is saved, // there may be a candidate waiting for the response. - if let VoteStatus::Pending(non_committed) = vote.into_vote_status() { + if let VoteStatus::Pending(non_committed) = vote.clone().into_vote_status() { let _ = self.tx_notification.send(Notification::VoteResponse { - target: self.id, + target: self.id.clone(), // last_log_id is not used when sending VoteRequest to local node resp: VoteResponse::new(vote, None, true), candidate_vote: non_committed, @@ -1745,24 +1774,24 @@ where } } Command::PurgeLog { upto } => { - self.log_store.purge(upto).await?; + self.log_store.purge(upto.clone()).await?; self.engine.state.io_state_mut().update_purged(Some(upto)); } Command::TruncateLog { since } => { - self.log_store.truncate(since).await?; + self.log_store.truncate(since.clone()).await?; // Inform clients waiting for logs to be applied. let removed = self.client_resp_channels.split_off(&since.index); if !removed.is_empty() { let leader_id = self.current_leader(); - let leader_node = self.get_leader_node(leader_id); + let leader_node = self.get_leader_node(leader_id.clone()); // False positive lint warning(`non-binding `let` on a future`): https://github.com/rust-lang/rust-clippy/issues/9932 #[allow(clippy::let_underscore_future)] let _ = C::spawn(async move { for (log_index, tx) in removed.into_iter() { tx.send(Err(ClientWriteError::ForwardToLeader(ForwardToLeader { - leader_id, + leader_id: leader_id.clone(), leader_node: leader_node.clone(), }))); @@ -1776,7 +1805,7 @@ where } Command::ReplicateCommitted { committed } => { for node in self.replications.values() { - let _ = node.tx_repl.send(Replicate::Committed(committed)); + let _ = node.tx_repl.send(Replicate::Committed(committed.clone())); } } Command::BroadcastHeartbeat { session_id, committed } => { @@ -1802,15 +1831,15 @@ where self.remove_all_replication().await; for ReplicationProgress(target, matching) in targets.iter() { - let handle = self.spawn_replication_stream(*target, *matching).await; - self.replications.insert(*target, handle); + let handle = self.spawn_replication_stream(target.clone(), matching.clone()).await; + self.replications.insert(target.clone(), handle); } let effective = self.engine.state.membership_state.effective().clone(); let nodes = targets.into_iter().map(|p| { let node_id = p.0; - (node_id, effective.get_node(&node_id).unwrap().clone()) + (node_id.clone(), effective.get_node(&node_id).unwrap().clone()) }); self.heartbeat_handle.spawn_workers(&mut self.network_factory, &self.tx_notification, nodes).await; diff --git a/openraft/src/core/sm/command.rs b/openraft/src/core/sm/command.rs index 51e170eec..6a7844b66 100644 --- a/openraft/src/core/sm/command.rs +++ b/openraft/src/core/sm/command.rs @@ -85,7 +85,7 @@ where C: RaftTypeConfig Command::BuildSnapshot => None, Command::GetSnapshot { .. } => None, Command::BeginReceivingSnapshot { .. } => None, - Command::InstallFullSnapshot { io_id, .. } => Some(*io_id), + Command::InstallFullSnapshot { io_id, .. } => Some(io_id.clone()), Command::Apply { .. } => None, Command::Func { .. } => None, } diff --git a/openraft/src/core/sm/worker.rs b/openraft/src/core/sm/worker.rs index 72e4928b7..caa746ee9 100644 --- a/openraft/src/core/sm/worker.rs +++ b/openraft/src/core/sm/worker.rs @@ -186,7 +186,7 @@ where #[allow(clippy::needless_collect)] let applying_entries = entries .iter() - .map(|e| ApplyingEntry::new(*e.get_log_id(), e.get_membership().cloned())) + .map(|e| ApplyingEntry::new(e.get_log_id().clone(), e.get_membership().cloned())) .collect::>(); let n_entries = end - since; diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index a7f234c4c..419ab78dc 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -257,15 +257,15 @@ where C: RaftTypeConfig pub(crate) fn condition(&self) -> Option> { match self { Command::RebuildReplicationStreams { .. } => None, - Command::Respond { when, .. } => *when, + Command::Respond { when, .. } => when.clone(), - Command::UpdateIOProgress { when, .. } => *when, + Command::UpdateIOProgress { when, .. } => when.clone(), Command::AppendInputEntries { .. } => None, Command::SaveVote { .. } => None, Command::TruncateLog { .. } => None, Command::SaveCommitted { .. } => None, - Command::PurgeLog { upto } => Some(Condition::Snapshot { log_id: Some(*upto) }), + Command::PurgeLog { upto } => Some(Condition::Snapshot { log_id: Some(upto.clone()) }), Command::ReplicateCommitted { .. } => None, Command::BroadcastHeartbeat { .. } => None, diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index efeebc2c7..5bd5032c1 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -113,7 +113,7 @@ where C: RaftTypeConfig /// [`RaftState`] pub(crate) fn new_candidate(&mut self, vote: Vote) -> &mut Candidate> { let now = C::now(); - let last_log_id = self.state.last_log_id().copied(); + let last_log_id = self.state.last_log_id().cloned(); let membership = self.state.membership_state.effective().membership(); @@ -209,13 +209,13 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip(self))] pub(crate) fn elect(&mut self) { let new_term = self.state.vote.leader_id().term + 1; - let new_vote = Vote::new(new_term, self.config.id); + let new_vote = Vote::new(new_term, self.config.id.clone()); - let candidate = self.new_candidate(new_vote); + let candidate = self.new_candidate(new_vote.clone()); tracing::info!("{}, new candidate: {}", func_name!(), candidate); - let last_log_id = candidate.last_log_id().copied(); + let last_log_id = candidate.last_log_id().cloned(); // Simulate sending RequestVote RPC to local node. // Safe unwrap(): it won't reject itself ˙–˙ @@ -290,7 +290,7 @@ where C: RaftTypeConfig local_leased_vote.display_lease_info(now) ); - return VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().copied(), false); + return VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().cloned(), false); } } @@ -309,7 +309,7 @@ where C: RaftTypeConfig // Return the updated vote, this way the candidate knows which vote is granted, in case // the candidate's vote is changed after sending the vote request. - return VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().copied(), false); + return VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().cloned(), false); } // Then check vote just as it does for every incoming event. @@ -320,14 +320,14 @@ where C: RaftTypeConfig // Return the updated vote, this way the candidate knows which vote is granted, in case // the candidate's vote is changed after sending the vote request. - VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().copied(), res.is_ok()) + VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().cloned(), res.is_ok()) } #[tracing::instrument(level = "debug", skip(self, resp))] pub(crate) fn handle_vote_resp(&mut self, target: C::NodeId, resp: VoteResponse) { tracing::info!( resp = display(&resp), - target = display(target), + target = display(&target), my_vote = display(self.state.vote_ref()), my_last_log_id = display(self.state.last_log_id().display()), "{}", @@ -401,7 +401,7 @@ where C: RaftTypeConfig let condition = if is_ok { Some(Condition::IOFlushed { - io_id: *self.state.accepted_io().unwrap(), + io_id: self.state.accepted_io().unwrap().clone(), }) } else { None @@ -426,7 +426,7 @@ where C: RaftTypeConfig // Vote is legal. let mut fh = self.following_handler(); - fh.ensure_log_consecutive(prev_log_id)?; + fh.ensure_log_consecutive(prev_log_id.as_ref())?; fh.append_entries(prev_log_id, entries); Ok(()) @@ -455,10 +455,10 @@ where C: RaftTypeConfig snapshot: Snapshot, tx: ResultSender>, ) { - tracing::info!(vote = display(vote), snapshot = display(&snapshot), "{}", func_name!()); + tracing::info!(vote = display(&vote), snapshot = display(&snapshot), "{}", func_name!()); let vote_res = self.vote_handler().accept_vote(&vote, tx, |state, _rejected| { - Ok(SnapshotResponse::new(*state.vote_ref())) + Ok(SnapshotResponse::new(state.vote_ref().clone())) }); let Some(tx) = vote_res else { @@ -471,7 +471,7 @@ where C: RaftTypeConfig // In this case, the response can only be sent when the snapshot is installed. let cond = fh.install_full_snapshot(snapshot); let res = Ok(SnapshotResponse { - vote: *self.state.vote_ref(), + vote: self.state.vote_ref().clone(), }); self.output.push_command(Command::Respond { @@ -569,8 +569,8 @@ where C: RaftTypeConfig tracing::info!(index = display(index), "{}", func_name!()); let snapshot_last_log_id = self.state.snapshot_last_log_id(); - let snapshot_last_log_id = if let Some(x) = snapshot_last_log_id { - *x + let snapshot_last_log_id = if let Some(log_id) = snapshot_last_log_id { + log_id.clone() } else { tracing::info!("no snapshot, can not purge"); return; @@ -599,14 +599,14 @@ where C: RaftTypeConfig // Safe unwrap: `index` is ensured to be present in the above code. let log_id = self.state.get_log_id(index).unwrap(); - tracing::info!(purge_upto = display(log_id), "{}", func_name!()); + tracing::info!(purge_upto = display(&log_id), "{}", func_name!()); self.log_handler().update_purge_upto(log_id); self.try_purge_log(); } pub(crate) fn trigger_transfer_leader(&mut self, to: C::NodeId) { - tracing::info!(to = display(to), "{}", func_name!()); + tracing::info!(to = display(&to), "{}", func_name!()); let Some((mut lh, _)) = self.get_leader_handler_or_reject(None) else { tracing::info!( @@ -636,15 +636,15 @@ where C: RaftTypeConfig // There may already be a Leader with higher vote let Some(leader) = leader else { return }; - let vote = *leader.committed_vote_ref(); - let last_log_id = leader.last_log_id().copied(); + let vote = leader.committed_vote_ref().clone(); + let last_log_id = leader.last_log_id().cloned(); self.replication_handler().rebuild_replication_streams(); // Before sending any log, update the vote. // This could not fail because `internal_server_state` will be cleared // once `state.vote` is changed to a value of other node. - let _res = self.vote_handler().update_vote(&vote.into_vote()); + let _res = self.vote_handler().update_vote(&vote.clone().into_vote()); debug_assert!(_res.is_ok(), "commit vote can not fail but: {:?}", _res); self.state.accept_io(IOId::new_log_io(vote, last_log_id)); @@ -673,8 +673,8 @@ where C: RaftTypeConfig ); Err(NotAllowed { - last_log_id: self.state.last_log_id().copied(), - vote: *self.state.vote_ref(), + last_log_id: self.state.last_log_id().cloned(), + vote: self.state.vote_ref().clone(), }) } @@ -683,7 +683,7 @@ where C: RaftTypeConfig fn check_members_contain_me(&self, m: &Membership) -> Result<(), NotInMembers> { if !m.is_voter(&self.config.id) { let e = NotInMembers { - node_id: self.config.id, + node_id: self.config.id.clone(), membership: m.clone(), }; Err(e) @@ -751,7 +751,7 @@ where C: RaftTypeConfig }; debug_assert!( - leader.committed_vote_ref().into_vote() >= *self.state.vote_ref(), + leader.committed_vote_ref().clone().into_vote() >= *self.state.vote_ref(), "leader.vote({}) >= state.vote({})", leader.committed_vote_ref(), self.state.vote_ref() @@ -784,7 +784,7 @@ where C: RaftTypeConfig pub(crate) fn following_handler(&mut self) -> FollowingHandler { debug_assert!(self.leader.is_none()); - let leader_vote = *self.state.vote_ref(); + let leader_vote = self.state.vote_ref().clone(); debug_assert!( leader_vote.is_committed(), "Expect the Leader vote to be committed: {}", diff --git a/openraft/src/engine/handler/establish_handler/mod.rs b/openraft/src/engine/handler/establish_handler/mod.rs index 6897d1e1f..7506209ff 100644 --- a/openraft/src/engine/handler/establish_handler/mod.rs +++ b/openraft/src/engine/handler/establish_handler/mod.rs @@ -21,17 +21,17 @@ where C: RaftTypeConfig self, candidate: Candidate>, ) -> Option<&'x mut Leader>> { - let vote = *candidate.vote_ref(); + let vote = candidate.vote_ref().clone(); debug_assert_eq!( vote.leader_id().voted_for(), - Some(self.config.id), + Some(self.config.id.clone()), "it can only commit its own vote" ); if let Some(l) = self.leader.as_ref() { #[allow(clippy::neg_cmp_op_on_partial_ord)] - if !(vote > l.committed_vote_ref().into_vote()) { + if !(vote > l.committed_vote_ref().clone().into_vote()) { tracing::warn!( "vote is not greater than current existing leader vote. Do not establish new leader and quit" ); diff --git a/openraft/src/engine/handler/following_handler/mod.rs b/openraft/src/engine/handler/following_handler/mod.rs index 995f8002c..bf9078bff 100644 --- a/openraft/src/engine/handler/following_handler/mod.rs +++ b/openraft/src/engine/handler/following_handler/mod.rs @@ -72,10 +72,10 @@ where C: RaftTypeConfig debug_assert!(x.get_log_id().index == prev_log_id.next_index()); } - let last_log_id = entries.last().map(|x| *x.get_log_id()); + let last_log_id = entries.last().map(|x| x.get_log_id().clone()); let last_log_id = std::cmp::max(prev_log_id, last_log_id); - let prev_accepted = self.state.accept_io(IOId::new_log_io(self.leader_vote, last_log_id)); + let prev_accepted = self.state.accept_io(IOId::new_log_io(self.leader_vote.clone(), last_log_id.clone())); let l = entries.len(); let since = self.state.first_conflicting_index(&entries); @@ -92,9 +92,9 @@ where C: RaftTypeConfig // No actual IO is needed, but just need to update IO state, // after all preceding IO flushed. - let to_submit = IOId::new_log_io(self.leader_vote, last_log_id); + let to_submit = IOId::new_log_io(self.leader_vote.clone(), last_log_id); - if Some(to_submit) <= prev_accepted { + if Some(&to_submit) <= prev_accepted.as_ref() { // No io state to update. return; } @@ -113,15 +113,18 @@ where C: RaftTypeConfig /// If not, truncate the local log and return an error. pub(crate) fn ensure_log_consecutive( &mut self, - prev_log_id: Option>, + prev_log_id: Option<&LogId>, ) -> Result<(), RejectAppendEntries> { - if let Some(ref prev) = prev_log_id { + if let Some(prev) = prev_log_id { if !self.state.has_log_id(prev) { let local = self.state.get_log_id(prev.index); tracing::debug!(local = display(DisplayOption(&local)), "prev_log_id does not match"); self.truncate_logs(prev.index); - return Err(RejectAppendEntries::ByConflictingLogId { local, expect: *prev }); + return Err(RejectAppendEntries::ByConflictingLogId { + local, + expect: prev.clone(), + }); } } @@ -151,7 +154,7 @@ where C: RaftTypeConfig self.output.push_command(Command::AppendInputEntries { // A follower should always use the node's vote. - committed_vote: self.leader_vote, + committed_vote: self.leader_vote.clone(), entries, }); } @@ -159,9 +162,9 @@ where C: RaftTypeConfig /// Commit entries that are already committed by the leader. #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn commit_entries(&mut self, leader_committed: Option>) { - let accepted = self.state.accepted_io().copied(); - let accepted = accepted.and_then(|x| x.last_log_id().copied()); - let committed = std::cmp::min(accepted, leader_committed); + let accepted = self.state.accepted_io().cloned(); + let accepted = accepted.and_then(|x| x.last_log_id().cloned()); + let committed = std::cmp::min(accepted.clone(), leader_committed.clone()); tracing::debug!( leader_committed = display(DisplayOption(&leader_committed)), @@ -173,7 +176,7 @@ where C: RaftTypeConfig if let Some(prev_committed) = self.state.update_committed(&committed) { self.output.push_command(Command::SaveCommitted { - committed: committed.unwrap(), + committed: committed.clone().unwrap(), }); self.output.push_command(Command::Apply { @@ -280,7 +283,7 @@ where C: RaftTypeConfig let meta = &snapshot.meta; tracing::info!("install_full_snapshot: meta:{:?}", meta); - let snap_last_log_id = meta.last_log_id; + let snap_last_log_id = meta.last_log_id.clone(); if snap_last_log_id.as_ref() <= self.state.committed() { tracing::info!( @@ -312,16 +315,16 @@ where C: RaftTypeConfig } } - let io_id = IOId::new_log_io(self.leader_vote, Some(snap_last_log_id)); - self.state.accept_io(io_id); - self.state.committed = Some(snap_last_log_id); + let io_id = IOId::new_log_io(self.leader_vote.clone(), Some(snap_last_log_id.clone())); + self.state.accept_io(io_id.clone()); + self.state.committed = Some(snap_last_log_id.clone()); self.update_committed_membership(EffectiveMembership::new_from_stored_membership( meta.last_membership.clone(), )); self.output.push_command(Command::from(sm::Command::install_full_snapshot(snapshot, io_id))); - self.state.purge_upto = Some(snap_last_log_id); + self.state.purge_upto = Some(snap_last_log_id.clone()); self.log_handler().purge_log(); Some(Condition::Snapshot { @@ -340,7 +343,7 @@ where C: RaftTypeConfig // Find the last 2 membership config entries: the committed and the effective. for ent in entries.rev() { if let Some(m) = ent.get_membership() { - memberships.insert(0, StoredMembership::new(Some(*ent.get_log_id()), m.clone())); + memberships.insert(0, StoredMembership::new(Some(ent.get_log_id().clone()), m.clone())); if memberships.len() == 2 { break; } diff --git a/openraft/src/engine/handler/leader_handler/mod.rs b/openraft/src/engine/handler/leader_handler/mod.rs index 2b9616a0d..2fca36c73 100644 --- a/openraft/src/engine/handler/leader_handler/mod.rs +++ b/openraft/src/engine/handler/leader_handler/mod.rs @@ -67,19 +67,19 @@ where C: RaftTypeConfig membership_entry.is_none(), "only one membership entry is allowed in a batch" ); - membership_entry = Some((*entry.get_log_id(), m.clone())); + membership_entry = Some((entry.get_log_id().clone(), m.clone())); } } self.state.accept_io(IOId::new_log_io( - self.leader.committed_vote, - self.leader.last_log_id().copied(), + self.leader.committed_vote.clone(), + self.leader.last_log_id().cloned(), )); self.output.push_command(Command::AppendInputEntries { // A leader should always use the leader's vote. // It is allowed to be different from local vote. - committed_vote: self.leader.committed_vote, + committed_vote: self.leader.committed_vote.clone(), entries, }); @@ -98,11 +98,11 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn send_heartbeat(&mut self) { let membership_log_id = self.state.membership_state.effective().log_id(); - let session_id = ReplicationSessionId::new(self.leader.committed_vote, *membership_log_id); + let session_id = ReplicationSessionId::new(self.leader.committed_vote.clone(), membership_log_id.clone()); self.output.push_command(Command::BroadcastHeartbeat { session_id, - committed: self.state.committed().copied(), + committed: self.state.committed().cloned(), }); } @@ -110,21 +110,21 @@ where C: RaftTypeConfig /// /// See: [Read Operation](crate::docs::protocol::read) pub(crate) fn get_read_log_id(&self) -> Option> { - let committed = self.state.committed().copied(); + let committed = self.state.committed().cloned(); // noop log id is the first log this leader proposed. - std::cmp::max(self.leader.noop_log_id, committed) + std::cmp::max(self.leader.noop_log_id.clone(), committed) } /// Disable proposing new logs for this Leader, and transfer Leader to another node pub(crate) fn transfer_leader(&mut self, to: C::NodeId) { - self.leader.mark_transfer(to); + self.leader.mark_transfer(to.clone()); self.state.vote.disable_lease(); self.output.push_command(Command::BroadcastTransferLeader { req: TransferLeaderRequest::new( - self.leader.committed_vote.into_vote(), + self.leader.committed_vote.clone().into_vote(), to, - self.leader.last_log_id().copied(), + self.leader.last_log_id().cloned(), ), }); } diff --git a/openraft/src/engine/handler/log_handler/mod.rs b/openraft/src/engine/handler/log_handler/mod.rs index 7b36aa24f..d66e3d72d 100644 --- a/openraft/src/engine/handler/log_handler/mod.rs +++ b/openraft/src/engine/handler/log_handler/mod.rs @@ -41,7 +41,7 @@ where C: RaftTypeConfig return; } - let upto = *purge_upto.unwrap(); + let upto = purge_upto.unwrap().clone(); st.purge_log(&upto); self.output.push_command(Command::PurgeLog { upto }); @@ -82,7 +82,7 @@ where C: RaftTypeConfig let purge_end = self.state.snapshot_meta.last_log_id.next_index().saturating_sub(max_keep); tracing::debug!( - snapshot_last_log_id = debug(self.state.snapshot_meta.last_log_id), + snapshot_last_log_id = debug(self.state.snapshot_meta.last_log_id.clone()), max_keep, "try purge: (-oo, {})", purge_end @@ -90,7 +90,7 @@ where C: RaftTypeConfig if st.last_purged_log_id().next_index() + batch_size > purge_end { tracing::debug!( - snapshot_last_log_id = debug(self.state.snapshot_meta.last_log_id), + snapshot_last_log_id = debug(self.state.snapshot_meta.last_log_id.clone()), max_keep, last_purged_log_id = display(st.last_purged_log_id().display()), batch_size, diff --git a/openraft/src/engine/handler/replication_handler/mod.rs b/openraft/src/engine/handler/replication_handler/mod.rs index bb438a7a3..80f7ec41a 100644 --- a/openraft/src/engine/handler/replication_handler/mod.rs +++ b/openraft/src/engine/handler/replication_handler/mod.rs @@ -65,7 +65,7 @@ where C: RaftTypeConfig "Only leader is allowed to call update_effective_membership()" ); - self.state.membership_state.append(EffectiveMembership::new_arc(Some(*log_id), m.clone())); + self.state.membership_state.append(EffectiveMembership::new_arc(Some(log_id.clone()), m.clone())); // TODO(9): currently only a leader has replication setup. // It's better to setup replication for both leader and candidate. @@ -112,7 +112,7 @@ where C: RaftTypeConfig /// accepted. #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn update_leader_clock(&mut self, node_id: C::NodeId, t: InstantOf) { - tracing::debug!(target = display(node_id), t = display(t.display()), "{}", func_name!()); + tracing::debug!(target = display(&node_id), t = display(t.display()), "{}", func_name!()); let granted = *self .leader @@ -148,7 +148,7 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn update_matching(&mut self, node_id: C::NodeId, log_id: Option>) { tracing::debug!( - node_id = display(node_id), + node_id = display(&node_id), log_id = display(log_id.display()), "{}", func_name!() @@ -158,11 +158,12 @@ where C: RaftTypeConfig // The value granted by a quorum may not yet be a committed. // A committed is **granted** and also is in current term. - let quorum_accepted = *self + let quorum_accepted = self .leader .progress .update_with(&node_id, |prog_entry| prog_entry.update_matching(log_id)) - .expect("it should always update existing progress"); + .expect("it should always update existing progress") + .clone(); tracing::debug!( quorum_accepted = display(quorum_accepted.display()), @@ -178,7 +179,7 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn try_commit_quorum_accepted(&mut self, granted: Option>) { // Only when the log id is proposed by current leader, it is committed. - if let Some(c) = granted { + if let Some(ref c) = granted { if !self.state.vote_ref().is_same_leader(c.committed_leader_id()) { return; } @@ -186,16 +187,16 @@ where C: RaftTypeConfig if let Some(prev_committed) = self.state.update_committed(&granted) { self.output.push_command(Command::ReplicateCommitted { - committed: self.state.committed().copied(), + committed: self.state.committed().cloned(), }); self.output.push_command(Command::SaveCommitted { - committed: self.state.committed().copied().unwrap(), + committed: self.state.committed().cloned().unwrap(), }); self.output.push_command(Command::Apply { already_committed: prev_committed, - upto: self.state.committed().copied().unwrap(), + upto: self.state.committed().cloned().unwrap(), }); if self.config.snapshot_policy.should_snapshot(&self.state) { @@ -219,7 +220,7 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn update_progress(&mut self, target: C::NodeId, repl_res: Result, String>) { tracing::debug!( - target = display(target), + target = display(&target), result = display(repl_res.display()), progress = display(&self.leader.progress), "{}", @@ -260,7 +261,7 @@ where C: RaftTypeConfig // Reset and resend(by self.send_to_all()) replication requests. prog_entry.inflight = Inflight::None; - targets.push(ReplicationProgress(*target, *prog_entry)); + targets.push(ReplicationProgress(target.clone(), prog_entry.clone())); } } self.output.push_command(Command::RebuildReplicationStreams { targets }); @@ -281,7 +282,7 @@ where C: RaftTypeConfig } let t = prog_entry.next_send(self.state, self.config.max_payload_entries); - tracing::debug!(target = display(*id), send = debug(&t), "next send"); + tracing::debug!(target = display(&*id), send = debug(&t), "next send"); match t { Ok(inflight) => { @@ -298,10 +299,13 @@ where C: RaftTypeConfig pub(crate) fn send_to_target(output: &mut EngineOutput, target: &C::NodeId, inflight: &Inflight) { let req = match inflight { Inflight::None => unreachable!("no data to send"), - Inflight::Logs { log_id_range } => Replicate::logs(*log_id_range), - Inflight::Snapshot { last_log_id } => Replicate::snapshot(*last_log_id), + Inflight::Logs { log_id_range } => Replicate::logs(log_id_range.clone()), + Inflight::Snapshot { last_log_id } => Replicate::snapshot(last_log_id.clone()), }; - output.push_command(Command::Replicate { target: *target, req }); + output.push_command(Command::Replicate { + target: target.clone(), + req, + }); } /// Try to run a pending purge job, if no tasks are using the logs to be purged. @@ -325,7 +329,7 @@ where C: RaftTypeConfig } // Safe unwrap(): it greater than an Option thus it must be a Some() - let purge_upto = *self.state.purge_upto().unwrap(); + let purge_upto = self.state.purge_upto().unwrap().clone(); // Check if any replication task is going to use the log that are going to purge. let mut in_use = false; @@ -358,7 +362,7 @@ where C: RaftTypeConfig return; } - let id = self.config.id; + let id = self.config.id.clone(); // The leader may not be in membership anymore if let Some(prog_entry) = self.leader.progress.get_mut(&id) { @@ -371,7 +375,7 @@ where C: RaftTypeConfig return; } // TODO: It should be self.state.last_log_id() but None is ok. - prog_entry.inflight = Inflight::logs(None, upto); + prog_entry.inflight = Inflight::logs(None, upto.clone()); self.update_matching(id, upto); } diff --git a/openraft/src/engine/handler/server_state_handler/mod.rs b/openraft/src/engine/handler/server_state_handler/mod.rs index 6840c89a0..9b0603336 100644 --- a/openraft/src/engine/handler/server_state_handler/mod.rs +++ b/openraft/src/engine/handler/server_state_handler/mod.rs @@ -23,7 +23,7 @@ where C: RaftTypeConfig let server_state = self.state.calc_server_state(&self.config.id); tracing::debug!( - id = display(self.config.id), + id = display(&self.config.id), prev_server_state = debug(self.state.server_state), server_state = debug(server_state), "update_server_state_if_changed" @@ -37,9 +37,9 @@ where C: RaftTypeConfig let is_leader = server_state == ServerState::Leader; if !was_leader && is_leader { - tracing::info!(id = display(self.config.id), "become leader"); + tracing::info!(id = display(&self.config.id), "become leader"); } else if was_leader && !is_leader { - tracing::info!(id = display(self.config.id), "quit leader"); + tracing::info!(id = display(&self.config.id), "quit leader"); } else { // nothing to do } diff --git a/openraft/src/engine/handler/snapshot_handler/mod.rs b/openraft/src/engine/handler/snapshot_handler/mod.rs index 80635376a..78700a58d 100644 --- a/openraft/src/engine/handler/snapshot_handler/mod.rs +++ b/openraft/src/engine/handler/snapshot_handler/mod.rs @@ -53,7 +53,7 @@ where C: RaftTypeConfig pub(crate) fn update_snapshot(&mut self, meta: SnapshotMeta) -> bool { tracing::info!("update_snapshot: {:?}", meta); - if meta.last_log_id <= self.state.snapshot_last_log_id().copied() { + if meta.last_log_id <= self.state.snapshot_last_log_id().cloned() { tracing::info!( "No need to install a smaller snapshot: current snapshot last_log_id({}), new snapshot last_log_id({})", self.state.snapshot_last_log_id().display(), diff --git a/openraft/src/engine/handler/vote_handler/accept_vote_test.rs b/openraft/src/engine/handler/vote_handler/accept_vote_test.rs index 855e0b735..aa2bf12ea 100644 --- a/openraft/src/engine/handler/vote_handler/accept_vote_test.rs +++ b/openraft/src/engine/handler/vote_handler/accept_vote_test.rs @@ -60,7 +60,7 @@ fn test_accept_vote_reject_smaller_vote() -> anyhow::Result<()> { // Command::Respond { when: Some(Condition::IOFlushed { - io_id: IOId::new(Vote::new(2, 1)) + io_id: IOId::new(&Vote::new(2, 1)) }), resp: Respond::new(mk_res(false), tx) }, diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index 8eb5bc157..3eb955a54 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -75,7 +75,7 @@ where C: RaftTypeConfig let res = f(self.state, e); let condition = Some(Condition::IOFlushed { - io_id: IOId::new(*self.state.vote_ref()), + io_id: IOId::new(self.state.vote_ref()), }); self.output.push_command(Command::Respond { @@ -106,7 +106,7 @@ where C: RaftTypeConfig // Ok } else { tracing::info!("vote {} is rejected by local vote: {}", vote, self.state.vote_ref()); - return Err(RejectVoteRequest::ByVote(*self.state.vote_ref())); + return Err(RejectVoteRequest::ByVote(self.state.vote_ref().clone())); } tracing::debug!(%vote, "vote is changing to" ); @@ -125,9 +125,9 @@ where C: RaftTypeConfig if vote > self.state.vote_ref() { tracing::info!("vote is changing from {} to {}", self.state.vote_ref(), vote); - self.state.vote.update(C::now(), leader_lease, *vote); - self.state.accept_io(IOId::new(*vote)); - self.output.push_command(Command::SaveVote { vote: *vote }); + self.state.vote.update(C::now(), leader_lease, vote.clone()); + self.state.accept_io(IOId::new(vote)); + self.output.push_command(Command::SaveVote { vote: vote.clone() }); } else { self.state.vote.touch(C::now(), leader_lease); } @@ -161,13 +161,13 @@ where C: RaftTypeConfig "become leader: node-{}, my vote: {}, last-log-id: {}", self.config.id, self.state.vote_ref(), - self.state.last_log_id().copied().unwrap_or_default() + self.state.last_log_id().cloned().unwrap_or_default() ); if let Some(l) = self.leader.as_mut() { tracing::debug!("leading vote: {}", l.committed_vote,); - if l.committed_vote.into_vote().leader_id() == self.state.vote_ref().leader_id() { + if l.committed_vote.clone().into_vote().leader_id() == self.state.vote_ref().leader_id() { tracing::debug!( "vote still belongs to the same leader. Just updating vote is enough: node-{}, {}", self.config.id, @@ -176,7 +176,7 @@ where C: RaftTypeConfig // TODO: this is not gonna happen, // because `self.leader`(previous `internal_server_state`) // does not include Candidate any more. - l.committed_vote = self.state.vote_ref().into_committed(); + l.committed_vote = self.state.vote_ref().clone().into_committed(); self.server_state_handler().update_server_state_if_changed(); return; } @@ -186,19 +186,19 @@ where C: RaftTypeConfig // Re-create a new Leader instance. let leader = self.state.new_leader(); - let leader_vote = *leader.committed_vote_ref(); + let leader_vote = leader.committed_vote_ref().clone(); *self.leader = Some(Box::new(leader)); let (last_log_id, noop_log_id) = { let leader = self.leader.as_ref().unwrap(); - (leader.last_log_id().copied(), leader.noop_log_id().copied()) + (leader.last_log_id().cloned(), leader.noop_log_id().cloned()) }; - self.state.accept_io(IOId::new_log_io(leader_vote, last_log_id)); + self.state.accept_io(IOId::new_log_io(leader_vote.clone(), last_log_id.clone())); self.output.push_command(Command::UpdateIOProgress { when: None, - io_id: IOId::new_log_io(leader_vote, last_log_id), + io_id: IOId::new_log_io(leader_vote, last_log_id.clone()), }); self.server_state_handler().update_server_state_if_changed(); @@ -221,7 +221,7 @@ where C: RaftTypeConfig /// This node then becomes raft-follower or raft-learner. pub(crate) fn become_following(&mut self) { debug_assert!( - self.state.vote_ref().leader_id().voted_for() != Some(self.config.id) + self.state.vote_ref().leader_id().voted_for().as_ref() != Some(&self.config.id) || !self.state.membership_state.effective().membership().is_voter(&self.config.id), "It must hold: vote is not mine, or I am not a voter(leader just left the cluster)" ); diff --git a/openraft/src/engine/log_id_list.rs b/openraft/src/engine/log_id_list.rs index 705e47906..258e446c3 100644 --- a/openraft/src/engine/log_id_list.rs +++ b/openraft/src/engine/log_id_list.rs @@ -63,7 +63,7 @@ where C: RaftTypeConfig }; // Recursion stack - let mut stack = vec![(first, last)]; + let mut stack = vec![(first, last.clone())]; loop { let (first, last) = match stack.pop() { @@ -75,7 +75,7 @@ where C: RaftTypeConfig // Case AA if first.leader_id == last.leader_id { - if res.last().map(|x| x.leader_id) < Some(first.leader_id) { + if res.last().map(|x| &x.leader_id) < Some(&first.leader_id) { res.push(first); } continue; @@ -83,7 +83,7 @@ where C: RaftTypeConfig // Two adjacent logs with different leader_id, no need to binary search if first.index + 1 == last.index { - if res.last().map(|x| x.leader_id) < Some(first.leader_id) { + if res.last().map(|x| &x.leader_id) < Some(&first.leader_id) { res.push(first); } res.push(last); @@ -94,7 +94,7 @@ where C: RaftTypeConfig if first.leader_id == mid.leader_id { // Case AAC - if res.last().map(|x| x.leader_id) < Some(first.leader_id) { + if res.last().map(|x| &x.leader_id) < Some(&first.leader_id) { res.push(first); } stack.push((mid, last)); @@ -105,7 +105,7 @@ where C: RaftTypeConfig // Case ABC // first.leader_id < mid_log_id.leader_id < last.leader_id // Deal with (first, mid) then (mid, last) - stack.push((mid, last)); + stack.push((mid.clone(), last)); stack.push((first, mid)); } } @@ -133,14 +133,14 @@ where C: RaftTypeConfig pub(crate) fn extend_from_same_leader<'a, LID: RaftLogId + 'a>(&mut self, new_ids: &[LID]) { if let Some(first) = new_ids.first() { let first_id = first.get_log_id(); - self.append(*first_id); + self.append(first_id.clone()); if let Some(last) = new_ids.last() { let last_id = last.get_log_id(); assert_eq!(last_id.leader_id, first_id.leader_id); if last_id != first_id { - self.append(*last_id); + self.append(last_id.clone()); } } } @@ -149,15 +149,15 @@ where C: RaftTypeConfig /// Extends a list of `log_id`. #[allow(dead_code)] pub(crate) fn extend<'a, LID: RaftLogId + 'a>(&mut self, new_ids: &[LID]) { - let mut prev = self.last().map(|x| x.leader_id); + let mut prev = self.last().map(|x| x.leader_id.clone()); for x in new_ids.iter() { let log_id = x.get_log_id(); - if prev != Some(log_id.leader_id) { - self.append(*log_id); + if prev.as_ref() != Some(&log_id.leader_id) { + self.append(log_id.clone()); - prev = Some(log_id.leader_id); + prev = Some(log_id.leader_id.clone()); } } @@ -165,7 +165,7 @@ where C: RaftTypeConfig let log_id = last.get_log_id(); if self.last() != Some(log_id) { - self.append(*log_id); + self.append(log_id.clone()); } } } @@ -201,9 +201,9 @@ where C: RaftTypeConfig // l >= 2 - let last = self.key_log_ids[l - 1]; + let last = &self.key_log_ids[l - 1]; - if self.key_log_ids.get(l - 2).map(|x| x.leader_id) == Some(last.leader_id) { + if self.key_log_ids.get(l - 2).map(|x| &x.leader_id) == Some(&last.leader_id) { // Replace the **last log id**. self.key_log_ids[l - 1] = new_log_id; return; @@ -235,7 +235,7 @@ where C: RaftTypeConfig // Add key log id if there is a gap between last.index and at - 1. let last = self.key_log_ids.last(); if let Some(last) = last { - let (last_leader_id, last_index) = (last.leader_id, last.index); + let (last_leader_id, last_index) = (last.leader_id.clone(), last.index); if last_index < at - 1 { self.append(LogId::new(last_leader_id, at - 1)); } @@ -250,7 +250,7 @@ where C: RaftTypeConfig // When installing snapshot it may need to purge across the `last_log_id`. if upto.index >= last.next_index() { debug_assert!(Some(upto) > self.last()); - self.key_log_ids = vec![*upto]; + self.key_log_ids = vec![upto.clone()]; return; } @@ -280,12 +280,12 @@ where C: RaftTypeConfig let res = self.key_log_ids.binary_search_by(|log_id| log_id.index.cmp(&index)); match res { - Ok(i) => Some(LogId::new(self.key_log_ids[i].leader_id, index)), + Ok(i) => Some(LogId::new(self.key_log_ids[i].leader_id.clone(), index)), Err(i) => { if i == 0 || i == self.key_log_ids.len() { None } else { - Some(LogId::new(self.key_log_ids[i - 1].leader_id, index)) + Some(LogId::new(self.key_log_ids[i - 1].leader_id.clone(), index)) } } } diff --git a/openraft/src/entry/mod.rs b/openraft/src/entry/mod.rs index 8d1c14481..60ab5c39f 100644 --- a/openraft/src/entry/mod.rs +++ b/openraft/src/entry/mod.rs @@ -34,7 +34,7 @@ where { fn clone(&self) -> Self { Self { - log_id: self.log_id, + log_id: self.log_id.clone(), payload: self.payload.clone(), } } @@ -105,7 +105,7 @@ where C: RaftTypeConfig } fn set_log_id(&mut self, log_id: &LogId) { - self.log_id = *log_id; + self.log_id = log_id.clone(); } } diff --git a/openraft/src/log_id/log_id_option_ext.rs b/openraft/src/log_id/log_id_option_ext.rs index 9a9bd829f..39a2fc35b 100644 --- a/openraft/src/log_id/log_id_option_ext.rs +++ b/openraft/src/log_id/log_id_option_ext.rs @@ -14,7 +14,7 @@ pub trait LogIdOptionExt { impl LogIdOptionExt for Option> { fn index(&self) -> Option { - self.map(|x| x.index) + self.as_ref().map(|x| x.index) } fn next_index(&self) -> u64 { diff --git a/openraft/src/log_id/mod.rs b/openraft/src/log_id/mod.rs index a878e4cab..6d1f12384 100644 --- a/openraft/src/log_id/mod.rs +++ b/openraft/src/log_id/mod.rs @@ -19,7 +19,7 @@ use crate::NodeId; /// /// The log id serves as unique identifier for a log entry across the system. It is composed of two /// parts: a leader id, which refers to the leader that proposed this log, and an integer index. -#[derive(Debug, Default, Copy, Clone, PartialOrd, Ord, PartialEq, Eq)] +#[derive(Debug, Default, Clone, PartialOrd, Ord, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub struct LogId { /// The id of the leader that proposed this log @@ -30,13 +30,15 @@ pub struct LogId { pub index: u64, } +impl Copy for LogId where NID: NodeId + Copy {} + impl RaftLogId for LogId { fn get_log_id(&self) -> &LogId { self } fn set_log_id(&mut self, log_id: &LogId) { - *self = *log_id + *self = log_id.clone() } } diff --git a/openraft/src/log_id_range.rs b/openraft/src/log_id_range.rs index 9a5f8cbf4..0be3fc60b 100644 --- a/openraft/src/log_id_range.rs +++ b/openraft/src/log_id_range.rs @@ -38,7 +38,7 @@ impl Validate for LogIdRange where C: RaftTypeConfig { fn validate(&self) -> Result<(), Box> { - validit::less_equal!(self.prev, self.last); + validit::less_equal!(&self.prev, &self.last); Ok(()) } } diff --git a/openraft/src/membership/effective_membership.rs b/openraft/src/membership/effective_membership.rs index e83d62801..2805c590d 100644 --- a/openraft/src/membership/effective_membership.rs +++ b/openraft/src/membership/effective_membership.rs @@ -58,7 +58,7 @@ where LID: RaftLogId, { fn from(v: (&LID, Membership)) -> Self { - EffectiveMembership::new(Some(*v.0.get_log_id()), v.1) + EffectiveMembership::new(Some(v.0.get_log_id().clone()), v.1) } } @@ -75,7 +75,7 @@ where C: RaftTypeConfig let configs = membership.get_joint_config(); let mut joint = vec![]; for c in configs { - joint.push(c.iter().copied().collect::>()); + joint.push(c.iter().cloned().collect::>()); } let quorum_set = Joint::from(joint); @@ -88,7 +88,7 @@ where C: RaftTypeConfig } pub(crate) fn new_from_stored_membership(stored: StoredMembership) -> Self { - Self::new(*stored.log_id(), stored.membership().clone()) + Self::new(stored.log_id().clone(), stored.membership().clone()) } pub(crate) fn stored_membership(&self) -> &Arc> { @@ -115,7 +115,7 @@ where C: RaftTypeConfig /// Returns an Iterator of all voter node ids. Learners are not included. pub fn voter_ids(&self) -> impl Iterator + '_ { - self.voter_ids.iter().copied() + self.voter_ids.iter().cloned() } /// Returns an Iterator of all learner node ids. Voters are not included. diff --git a/openraft/src/membership/membership.rs b/openraft/src/membership/membership.rs index b65b487fd..f6f993a06 100644 --- a/openraft/src/membership/membership.rs +++ b/openraft/src/membership/membership.rs @@ -144,7 +144,7 @@ where C: RaftTypeConfig /// Returns an Iterator of all learner node ids. Voters are not included. pub fn learner_ids(&self) -> impl Iterator + '_ { - self.nodes.keys().filter(|x| !self.is_voter(x)).copied() + self.nodes.keys().filter(|x| !self.is_voter(x)).cloned() } } @@ -188,7 +188,7 @@ where C: RaftTypeConfig if res.contains_key(k) { continue; } - res.insert(*k, v.clone()); + res.insert(k.clone(), v.clone()); } res @@ -281,19 +281,19 @@ where C: RaftTypeConfig let new_membership = match change { ChangeMembers::AddVoterIds(add_voter_ids) => { - let new_voter_ids = last.union(&add_voter_ids).copied().collect::>(); + let new_voter_ids = last.union(&add_voter_ids).cloned().collect::>(); self.next_coherent(new_voter_ids, retain) } ChangeMembers::AddVoters(add_voters) => { // Add nodes without overriding existent self.nodes = Self::extend_nodes(self.nodes, &add_voters); - let add_voter_ids = add_voters.keys().copied().collect::>(); - let new_voter_ids = last.union(&add_voter_ids).copied().collect::>(); + let add_voter_ids = add_voters.keys().cloned().collect::>(); + let new_voter_ids = last.union(&add_voter_ids).cloned().collect::>(); self.next_coherent(new_voter_ids, retain) } ChangeMembers::RemoveVoters(remove_voter_ids) => { - let new_voter_ids = last.difference(&remove_voter_ids).copied().collect::>(); + let new_voter_ids = last.difference(&remove_voter_ids).cloned().collect::>(); self.next_coherent(new_voter_ids, retain) } ChangeMembers::ReplaceAllVoters(all_voter_ids) => self.next_coherent(all_voter_ids, retain), @@ -333,7 +333,7 @@ where C: RaftTypeConfig pub(crate) fn to_quorum_set(&self) -> Joint, Vec>> { let mut qs = vec![]; for c in self.get_joint_config().iter() { - qs.push(c.iter().copied().collect::>()); + qs.push(c.iter().cloned().collect::>()); } Joint::new(qs) } diff --git a/openraft/src/metrics/wait.rs b/openraft/src/metrics/wait.rs index 963136cd0..90fffc0ca 100644 --- a/openraft/src/metrics/wait.rs +++ b/openraft/src/metrics/wait.rs @@ -101,7 +101,7 @@ where C: RaftTypeConfig #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] pub async fn current_leader(&self, leader_id: C::NodeId, msg: impl ToString) -> Result, WaitError> { self.metrics( - |m| m.current_leader == Some(leader_id), + |m| m.current_leader.as_ref() == Some(&leader_id), &format!("{} .current_leader == {}", msg.to_string(), leader_id), ) .await diff --git a/openraft/src/network/snapshot_transport.rs b/openraft/src/network/snapshot_transport.rs index f2b4c3e57..4168ebde1 100644 --- a/openraft/src/network/snapshot_transport.rs +++ b/openraft/src/network/snapshot_transport.rs @@ -87,7 +87,7 @@ mod tokio_rt { let done = (offset + n_read as u64) == end; let req = InstallSnapshotRequest { - vote, + vote: vote.clone(), meta: snapshot.meta.clone(), offset, data: buf, diff --git a/openraft/src/node.rs b/openraft/src/node.rs index c280df842..e512a623d 100644 --- a/openraft/src/node.rs +++ b/openraft/src/node.rs @@ -19,7 +19,6 @@ pub trait NodeIdEssential: + Debug + Display + Hash - + Copy + Clone + Default + 'static diff --git a/openraft/src/progress/entry/mod.rs b/openraft/src/progress/entry/mod.rs index 3b522f317..fdd6308cb 100644 --- a/openraft/src/progress/entry/mod.rs +++ b/openraft/src/progress/entry/mod.rs @@ -14,7 +14,7 @@ use crate::LogIdOptionExt; use crate::RaftTypeConfig; /// State of replication to a target node. -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Debug)] #[derive(PartialEq, Eq)] pub(crate) struct ProgressEntry where C: RaftTypeConfig @@ -31,13 +31,20 @@ where C: RaftTypeConfig pub(crate) searching_end: u64, } +impl Copy for ProgressEntry +where + C: RaftTypeConfig, + C::NodeId: Copy, +{ +} + impl ProgressEntry where C: RaftTypeConfig { #[allow(dead_code)] pub(crate) fn new(matching: Option>) -> Self { Self { - matching, + matching: matching.clone(), inflight: Inflight::None, searching_end: matching.next_index(), } @@ -70,8 +77,8 @@ where C: RaftTypeConfig match &self.inflight { Inflight::None => false, Inflight::Logs { log_id_range, .. } => { - let lid = Some(*upto); - lid > log_id_range.prev + let lid = Some(upto); + lid > log_id_range.prev.as_ref() } Inflight::Snapshot { last_log_id: _, .. } => false, } @@ -84,7 +91,7 @@ where C: RaftTypeConfig "update_matching" ); - self.inflight.ack(matching); + self.inflight.ack(matching.clone()); debug_assert!(matching >= self.matching); self.matching = matching; @@ -176,7 +183,7 @@ where C: RaftTypeConfig // Replicate by snapshot. if self.searching_end < purge_upto_next { let snapshot_last = log_state.snapshot_last_log_id(); - self.inflight = Inflight::snapshot(snapshot_last.copied()); + self.inflight = Inflight::snapshot(snapshot_last.cloned()); return Ok(&self.inflight); } @@ -249,17 +256,17 @@ where C: RaftTypeConfig self.inflight.validate()?; - match self.inflight { + match &self.inflight { Inflight::None => {} Inflight::Logs { log_id_range, .. } => { // matching <= prev_log_id <= last_log_id // prev_log_id.next_index() <= searching_end - validit::less_equal!(self.matching, log_id_range.prev); + validit::less_equal!(&self.matching, &log_id_range.prev); validit::less_equal!(log_id_range.prev.next_index(), self.searching_end); } Inflight::Snapshot { last_log_id, .. } => { // There is no need to send a snapshot smaller than last matching. - validit::less!(self.matching, last_log_id); + validit::less!(&self.matching, last_log_id); } } Ok(()) diff --git a/openraft/src/progress/inflight/mod.rs b/openraft/src/progress/inflight/mod.rs index 42b7afef3..82bdf5db3 100644 --- a/openraft/src/progress/inflight/mod.rs +++ b/openraft/src/progress/inflight/mod.rs @@ -112,7 +112,7 @@ where C: RaftTypeConfig *self = { debug_assert!(upto >= log_id_range.prev); debug_assert!(upto <= log_id_range.last); - Inflight::logs(upto, log_id_range.last) + Inflight::logs(upto, log_id_range.last.clone()) } } Inflight::Snapshot { last_log_id } => { diff --git a/openraft/src/progress/mod.rs b/openraft/src/progress/mod.rs index db1e19b74..693b3bcfc 100644 --- a/openraft/src/progress/mod.rs +++ b/openraft/src/progress/mod.rs @@ -35,7 +35,7 @@ pub(crate) trait Progress where ID: PartialEq + 'static, V: Borrow

, - P: PartialOrd + Copy, + P: PartialOrd + Clone, QS: QuorumSet, { /// Update one of the scalar value and re-calculate the committed value with provided function. @@ -144,10 +144,10 @@ where impl Display for VecProgress where - ID: PartialEq + Debug + Copy + 'static, - V: Copy + 'static, + ID: PartialEq + Debug + Clone + 'static, + V: Clone + 'static, V: Borrow

, - P: PartialOrd + Ord + Copy + 'static, + P: PartialOrd + Ord + Clone + 'static, QS: QuorumSet + 'static, ID: Display, V: Display, @@ -211,7 +211,7 @@ where ID: 'static, V: Borrow

, QS: QuorumSet, - P: Copy, + P: Clone, { pub(crate) fn new(quorum_set: QS, learner_ids: impl IntoIterator, default_v: impl Fn() -> V) -> Self { let mut vector = quorum_set.ids().map(|id| (id, default_v())).collect::>(); @@ -222,7 +222,7 @@ where Self { quorum_set, - granted: *default_v().borrow(), + granted: default_v().borrow().clone(), voter_count, vector, stat: Default::default(), @@ -281,7 +281,7 @@ impl Progress for VecProgress where ID: PartialEq + 'static, V: Borrow

, - P: PartialOrd + Copy, + P: PartialOrd + Clone, QS: QuorumSet, { /// Update one of the scalar value and re-calculate the committed value. @@ -333,7 +333,7 @@ where let elt = &mut self.vector[index]; - let prev_progress = *elt.1.borrow(); + let prev_progress = elt.1.borrow().clone(); f(&mut elt.1); @@ -374,7 +374,7 @@ where self.stat.is_quorum_count += 1; if self.quorum_set.is_quorum(it) { - self.granted = *prog; + self.granted = prog.clone(); break; } } diff --git a/openraft/src/proposer/candidate.rs b/openraft/src/proposer/candidate.rs index 1747b8f78..2ec282816 100644 --- a/openraft/src/proposer/candidate.rs +++ b/openraft/src/proposer/candidate.rs @@ -99,18 +99,18 @@ where /// Return the node ids that has granted this vote. #[allow(dead_code)] pub(crate) fn granters(&self) -> impl Iterator + '_ { - self.progress().iter().filter(|(_, granted)| *granted).map(|(target, _)| *target) + self.progress().iter().filter(|(_, granted)| *granted).map(|(target, _)| target.clone()) } pub(crate) fn into_leader(self) -> Leader { // Mark the vote as committed, i.e., being granted and saved by a quorum. let vote = { - let vote = *self.vote_ref(); + let vote = self.vote_ref().clone(); debug_assert!(!vote.is_committed()); vote.into_committed() }; - let last_leader_log_ids = self.last_log_id().copied().into_iter().collect::>(); + let last_leader_log_ids = self.last_log_id().cloned().into_iter().collect::>(); Leader::new(vote, self.quorum_set.clone(), self.learner_ids, &last_leader_log_ids) } diff --git a/openraft/src/proposer/leader.rs b/openraft/src/proposer/leader.rs index e97ae76ed..843ad9c06 100644 --- a/openraft/src/proposer/leader.rs +++ b/openraft/src/proposer/leader.rs @@ -85,13 +85,13 @@ where last_leader_log_id: &[LogIdOf], ) -> Self { debug_assert!( - Some(vote.committed_leader_id()) >= last_leader_log_id.last().map(|x| *x.committed_leader_id()), + Some(vote.committed_leader_id()) >= last_leader_log_id.last().map(|x| x.committed_leader_id().clone()), "vote {} must GE last_leader_log_id.last() {}", vote, last_leader_log_id.display() ); debug_assert!( - Some(vote.committed_leader_id()) >= last_leader_log_id.first().map(|x| *x.committed_leader_id()), + Some(vote.committed_leader_id()) >= last_leader_log_id.first().map(|x| x.committed_leader_id().clone()), "vote {} must GE last_leader_log_id.first() {}", vote, last_leader_log_id.display() @@ -102,12 +102,12 @@ where let vote_leader_id = vote.committed_leader_id(); let first = last_leader_log_id.first(); - let noop_log_id = if first.map(|x| *x.committed_leader_id()) == Some(vote_leader_id) { + let noop_log_id = if first.map(|x| x.committed_leader_id()) == Some(&vote_leader_id) { // There is already log id proposed by the this leader. // E.g. the Leader is restarted without losing leadership. // // Set to the first log id proposed by this Leader. - first.copied() + first.cloned() } else { // Set to a log id that will be proposed. Some(LogId::new( @@ -116,15 +116,15 @@ where )) }; - let last_log_id = last_leader_log_id.last().copied(); + let last_log_id = last_leader_log_id.last().cloned(); let leader = Self { transfer_to: None, committed_vote: vote, next_heartbeat: C::now(), - last_log_id, + last_log_id: last_log_id.clone(), noop_log_id, - progress: VecProgress::new(quorum_set.clone(), learner_ids.iter().copied(), || { + progress: VecProgress::new(quorum_set.clone(), learner_ids.iter().cloned(), || { ProgressEntry::empty(last_log_id.next_index()) }), clock_progress: VecProgress::new(quorum_set, learner_ids, || None), @@ -169,7 +169,7 @@ where let committed_leader_id = self.committed_vote.committed_leader_id(); let first = LogId::new(committed_leader_id, self.last_log_id().next_index()); - let mut last = first; + let mut last = first.clone(); for entry in entries { entry.set_log_id(&last); @@ -200,11 +200,11 @@ where // Thus vote.voted_for() is this node. // Safe unwrap: voted_for() is always non-None in Openraft - let node_id = self.committed_vote.into_vote().leader_id().voted_for().unwrap(); + let node_id = self.committed_vote.clone().into_vote().leader_id().voted_for().unwrap(); let now = C::now(); tracing::debug!( - leader_id = display(node_id), + leader_id = display(&node_id), now = display(now.display()), "{}: update with leader's local time, before retrieving quorum acked clock", func_name!() diff --git a/openraft/src/quorum/quorum_set_impl.rs b/openraft/src/quorum/quorum_set_impl.rs index 0a5d008b6..6f813cf95 100644 --- a/openraft/src/quorum/quorum_set_impl.rs +++ b/openraft/src/quorum/quorum_set_impl.rs @@ -4,7 +4,7 @@ use crate::quorum::quorum_set::QuorumSet; /// Impl a simple majority quorum set impl QuorumSet for BTreeSet -where ID: PartialOrd + Ord + Copy + 'static +where ID: PartialOrd + Ord + Clone + 'static { type Iter = std::collections::btree_set::IntoIter; @@ -29,7 +29,7 @@ where ID: PartialOrd + Ord + Copy + 'static /// Impl a simple majority quorum set impl QuorumSet for Vec -where ID: PartialOrd + Ord + Copy + 'static +where ID: PartialOrd + Ord + Clone + 'static { type Iter = std::collections::btree_set::IntoIter; diff --git a/openraft/src/raft/impl_raft_blocking_write.rs b/openraft/src/raft/impl_raft_blocking_write.rs index e3470095e..e76e77f37 100644 --- a/openraft/src/raft/impl_raft_blocking_write.rs +++ b/openraft/src/raft/impl_raft_blocking_write.rs @@ -85,7 +85,7 @@ where C: RaftTypeConfig> tracing::debug!("res of first step: {}", res); - let (log_id, joint) = (res.log_id, res.membership.clone().unwrap()); + let (log_id, joint) = (&res.log_id, res.membership.clone().unwrap()); if joint.get_joint_config().len() == 1 { return Ok(res); @@ -124,7 +124,7 @@ where C: RaftTypeConfig> /// /// A `node` is able to store the network address of a node. Thus an application does not /// need another store for mapping node-id to ip-addr when implementing the RaftNetwork. - #[tracing::instrument(level = "debug", skip(self, id), fields(target=display(id)))] + #[tracing::instrument(level = "debug", skip(self, id), fields(target=display(&id)))] pub async fn add_learner( &self, id: C::NodeId, @@ -134,7 +134,7 @@ where C: RaftTypeConfig> let (tx, rx) = oneshot_channel::(); let msg = RaftMsg::ChangeMembership { - changes: ChangeMembers::AddNodes(btreemap! {id=>node}), + changes: ChangeMembers::AddNodes(btreemap! {id.clone()=>node}), retain: true, tx, }; @@ -152,12 +152,12 @@ where C: RaftTypeConfig> // Otherwise, blocks until the replication to the new learner becomes up to date. // The log id of the membership that contains the added learner. - let membership_log_id = resp.log_id; + let membership_log_id = &resp.log_id; let wait_res = self .wait(None) .metrics( - |metrics| match self.check_replication_upto_date(metrics, id, Some(membership_log_id)) { + |metrics| match self.check_replication_upto_date(metrics, &id, Some(membership_log_id)) { Ok(_matching) => true, // keep waiting Err(_) => false, diff --git a/openraft/src/raft/message/vote.rs b/openraft/src/raft/message/vote.rs index d4a046d19..98ad9891d 100644 --- a/openraft/src/raft/message/vote.rs +++ b/openraft/src/raft/message/vote.rs @@ -53,9 +53,9 @@ where C: RaftTypeConfig { pub fn new(vote: impl Borrow>, last_log_id: Option>, granted: bool) -> Self { Self { - vote: *vote.borrow(), + vote: vote.borrow().clone(), vote_granted: granted, - last_log_id: last_log_id.map(|x| *x.borrow()), + last_log_id: last_log_id.map(|x| x.borrow().clone()), } } @@ -74,7 +74,7 @@ where C: RaftTypeConfig f, "{{{}, last_log:{:?}}}", self.vote, - self.last_log_id.map(|x| x.to_string()) + self.last_log_id.as_ref().map(|x| x.to_string()) ) } } diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index a44d121de..fab839259 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -245,7 +245,7 @@ where C: RaftTypeConfig { let (tx_api, rx_api) = C::mpsc_unbounded(); let (tx_notify, rx_notify) = C::mpsc_unbounded(); - let (tx_metrics, rx_metrics) = C::watch_channel(RaftMetrics::new_initial(id)); + let (tx_metrics, rx_metrics) = C::watch_channel(RaftMetrics::new_initial(id.clone())); let (tx_data_metrics, rx_data_metrics) = C::watch_channel(RaftDataMetrics::default()); let (tx_server_metrics, rx_server_metrics) = C::watch_channel(RaftServerMetrics::default()); let (tx_shutdown, rx_shutdown) = C::oneshot(); @@ -262,11 +262,11 @@ where C: RaftTypeConfig parent: tracing::Span::current(), Level::DEBUG, "RaftCore", - id = display(id), + id = display(&id), cluster = display(&config.cluster_name) ); - let eng_config = EngineConfig::new(id, config.as_ref()); + let eng_config = EngineConfig::new(id.clone(), config.as_ref()); let state = { let mut helper = StorageHelper::new(&mut log_store, &mut state_machine); @@ -285,7 +285,7 @@ where C: RaftTypeConfig ); let core: RaftCore = RaftCore { - id, + id: id.clone(), config: config.clone(), runtime_config: runtime_config.clone(), network_factory: network, @@ -298,7 +298,7 @@ where C: RaftTypeConfig replications: Default::default(), - heartbeat_handle: HeartbeatWorkersHandle::new(id, config.clone()), + heartbeat_handle: HeartbeatWorkersHandle::new(id.clone(), config.clone()), tx_api: tx_api.clone(), rx_api, @@ -454,9 +454,9 @@ where C: RaftTypeConfig tracing::debug!(req = display(&req), "Raft::install_snapshot()"); - let req_vote = req.vote; - let my_vote = self.with_raft_state(|state| *state.vote_ref()).await?; - let resp = InstallSnapshotResponse { vote: my_vote }; + let req_vote = req.vote.clone(); + let my_vote = self.with_raft_state(|state| state.vote_ref().clone()).await?; + let resp = InstallSnapshotResponse { vote: my_vote.clone() }; // Check vote. // It is not mandatory because it is just a read operation @@ -492,7 +492,7 @@ where C: RaftTypeConfig /// reads. This method is perfect for making decisions on where to route client requests. #[tracing::instrument(level = "debug", skip(self))] pub async fn current_leader(&self) -> Option { - self.metrics().borrow_watched().current_leader + self.metrics().borrow_watched().current_leader.clone() } /// Check to ensure this node is still the cluster leader, in order to guard against stale reads @@ -771,15 +771,15 @@ where C: RaftTypeConfig fn check_replication_upto_date( &self, metrics: &RaftMetrics, - node_id: C::NodeId, - membership_log_id: Option>, + node_id: &C::NodeId, + membership_log_id: Option<&LogId>, ) -> Result>, ()> { - if metrics.membership_config.log_id() < &membership_log_id { + if metrics.membership_config.log_id().as_ref() < membership_log_id { // Waiting for the latest metrics to report. return Err(()); } - if metrics.membership_config.membership().get_node(&node_id).is_none() { + if metrics.membership_config.membership().get_node(node_id).is_none() { // This learner has been removed. return Ok(None); } @@ -793,7 +793,7 @@ where C: RaftTypeConfig }; let replication_metrics = repl; - let target_metrics = match replication_metrics.get(&node_id) { + let target_metrics = match replication_metrics.get(node_id) { None => { // Maybe replication is not reported yet. Keep waiting. return Err(()); @@ -801,7 +801,7 @@ where C: RaftTypeConfig Some(x) => x, }; - let matched = *target_metrics; + let matched = target_metrics.clone(); let distance = replication_lag(&matched.index(), &metrics.last_log_index); diff --git a/openraft/src/raft_state/io_state.rs b/openraft/src/raft_state/io_state.rs index 290719bbc..519788e77 100644 --- a/openraft/src/raft_state/io_state.rs +++ b/openraft/src/raft_state/io_state.rs @@ -93,8 +93,8 @@ where C: RaftTypeConfig // Applied does not have to be flushed in local store. // less_equal!(self.applied.as_ref(), a.submitted().and_then(|x| x.last_log_id())); - less_equal!(self.snapshot, self.applied); - less_equal!(self.purged, self.snapshot); + less_equal!(&self.snapshot, &self.applied); + less_equal!(&self.purged, &self.snapshot); Ok(()) } } @@ -103,7 +103,7 @@ impl IOState where C: RaftTypeConfig { pub(crate) fn new( - vote: Vote, + vote: &Vote, applied: Option>, snapshot: Option>, purged: Option>, diff --git a/openraft/src/raft_state/io_state/io_id.rs b/openraft/src/raft_state/io_state/io_id.rs index 17e33663b..a33e7e18f 100644 --- a/openraft/src/raft_state/io_state/io_id.rs +++ b/openraft/src/raft_state/io_state/io_id.rs @@ -1,6 +1,8 @@ use std::cmp::Ordering; use std::fmt; +use dupit::Duplicate; + use crate::raft_state::io_state::log_io_id::LogIOId; use crate::vote::ref_vote::RefVote; use crate::vote::CommittedVote; @@ -26,7 +28,7 @@ use crate::Vote; /// [`append()`]: `crate::storage::RaftLogStorage::append()` /// [`truncate()`]: `crate::storage::RaftLogStorage::truncate()` /// [`purge()`]: `crate::storage::RaftLogStorage::purge()` -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] #[derive(PartialEq, Eq)] pub(crate) enum IOId where C: RaftTypeConfig @@ -38,6 +40,13 @@ where C: RaftTypeConfig Log(LogIOId), } +impl Copy for IOId +where + C: RaftTypeConfig, + C::NodeId: Copy, +{ +} + impl fmt::Display for IOId where C: RaftTypeConfig { @@ -68,11 +77,11 @@ where C: RaftTypeConfig impl IOId where C: RaftTypeConfig { - pub(crate) fn new(vote: Vote) -> Self { + pub(crate) fn new(vote: &Vote) -> Self { if vote.is_committed() { - Self::new_log_io(vote.into_committed(), None) + Self::new_log_io(vote.clone().into_committed(), None) } else { - Self::new_vote_io(vote.into_non_committed()) + Self::new_vote_io(vote.clone().into_non_committed()) } } @@ -89,8 +98,8 @@ where C: RaftTypeConfig // The above lint is disabled because in future Vote may not be `Copy` pub(crate) fn to_vote(&self) -> Vote { match self { - Self::Vote(non_committed_vote) => non_committed_vote.into_vote(), - Self::Log(log_io_id) => log_io_id.committed_vote.into_vote(), + Self::Vote(non_committed_vote) => non_committed_vote.dup().into_vote(), + Self::Log(log_io_id) => log_io_id.committed_vote.dup().into_vote(), } } @@ -113,8 +122,8 @@ where C: RaftTypeConfig match self { Self::Vote(_vote) => ErrorSubject::Vote, Self::Log(log_io_id) => { - if let Some(log_id) = log_io_id.log_id { - ErrorSubject::Log(log_id) + if let Some(log_id) = &log_io_id.log_id { + ErrorSubject::Log(log_id.clone()) } else { ErrorSubject::Logs } diff --git a/openraft/src/raft_state/membership_state/change_handler.rs b/openraft/src/raft_state/membership_state/change_handler.rs index 978ceaeeb..fdece9fb4 100644 --- a/openraft/src/raft_state/membership_state/change_handler.rs +++ b/openraft/src/raft_state/membership_state/change_handler.rs @@ -52,8 +52,8 @@ where C: RaftTypeConfig Ok(()) } else { Err(InProgress { - committed: *committed.log_id(), - membership_log_id: *effective.log_id(), + committed: committed.log_id().clone(), + membership_log_id: effective.log_id().clone(), }) } } diff --git a/openraft/src/raft_state/mod.rs b/openraft/src/raft_state/mod.rs index 03f38f7b8..b280799bb 100644 --- a/openraft/src/raft_state/mod.rs +++ b/openraft/src/raft_state/mod.rs @@ -222,7 +222,7 @@ where C: RaftTypeConfig /// /// Returns the previously accepted value. pub(crate) fn accept_io(&mut self, accepted: IOId) -> Option> { - let curr_accepted = self.io_state.io_progress.accepted().copied(); + let curr_accepted = self.io_state.io_progress.accepted().cloned(); tracing::debug!( "{}: accept_log: current: {}, new_accepted: {}", @@ -233,17 +233,17 @@ where C: RaftTypeConfig if cfg!(debug_assertions) { let new_vote = accepted.to_vote(); - let current_vote = curr_accepted.map(|io_id| io_id.to_vote()); + let current_vote = curr_accepted.clone().map(|io_id| io_id.to_vote()); assert!( - Some(new_vote) >= current_vote, + Some(&new_vote) >= current_vote.as_ref(), "new accepted.committed_vote {} must be >= current accepted.committed_vote: {}", new_vote, current_vote.display(), ); } - if Some(accepted) > curr_accepted { - self.io_state.io_progress.accept(accepted); + if Some(accepted.clone()) > curr_accepted { + self.io_state.io_progress.accept(accepted.clone()); } curr_accepted @@ -265,9 +265,9 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn update_committed(&mut self, committed: &Option>) -> Option>> { if committed.as_ref() > self.committed() { - let prev = self.committed().copied(); + let prev = self.committed().cloned(); - self.committed = *committed; + self.committed = committed.clone(); self.membership_state.commit(committed); Some(prev) @@ -394,7 +394,7 @@ where C: RaftTypeConfig let last_leader_log_ids = self.log_ids.by_last_leader(); Leader::new( - self.vote_ref().into_committed(), + self.vote_ref().clone().into_committed(), em.to_quorum_set(), em.learner_ids(), last_leader_log_ids, diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 641c14326..f524c5670 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -154,7 +154,7 @@ where LS: RaftLogStorage, { /// Spawn a new replication task for the target node. - #[tracing::instrument(level = "trace", skip_all,fields(target=display(target), session_id=display(session_id)))] + #[tracing::instrument(level = "trace", skip_all,fields(target=display(&target), session_id=display(&session_id)))] #[allow(clippy::type_complexity)] #[allow(clippy::too_many_arguments)] pub(crate) fn spawn( @@ -208,7 +208,7 @@ where } } - #[tracing::instrument(level="debug", skip(self), fields(session=%self.session_id, target=display(self.target), cluster=%self.config.cluster_name))] + #[tracing::instrument(level="debug", skip(self), fields(session=%self.session_id, target=display(&self.target), cluster=%self.config.cluster_name))] async fn main(mut self) -> Result<(), ReplicationClosed> { loop { let action = self.next_action.take(); @@ -229,13 +229,13 @@ where let res = match d { Data::Committed => { let m = &self.matching; - let d = LogIdRange::new(*m, *m); + let d = LogIdRange::new(m.clone(), m.clone()); - log_data = Some(d); + log_data = Some(d.clone()); self.send_log_entries(d, false).await } Data::Logs(log) => { - log_data = Some(log); + log_data = Some(log.clone()); self.send_log_entries(log, true).await } Data::Snapshot(snap) => self.stream_snapshot(snap).await, @@ -370,7 +370,7 @@ where log_ids: LogIdRange, has_payload: bool, ) -> Result>, ReplicationError> { - tracing::debug!(log_id_range = display(log_ids), "send_log_entries",); + tracing::debug!(log_id_range = display(&log_ids), "send_log_entries",); // Series of logs to send, and the last log id to send let (logs, sending_range) = { @@ -391,14 +391,14 @@ where if start == end { // Heartbeat RPC, no logs to send, last log id is the same as prev_log_id - let r = LogIdRange::new(rng.prev, rng.prev); + let r = LogIdRange::new(rng.prev.clone(), rng.prev.clone()); (vec![], r) } else { // limited_get_log_entries will return logs smaller than the range [start, end). let logs = self.log_reader.limited_get_log_entries(start, end).await?; - let first = *logs.first().map(|x| x.get_log_id()).unwrap(); - let last = *logs.last().map(|x| x.get_log_id()).unwrap(); + let first = logs.first().map(|x| x.get_log_id().clone()).unwrap(); + let last = logs.last().map(|x| x.get_log_id().clone()).unwrap(); debug_assert!( !logs.is_empty() && logs.len() <= (end - start) as usize, @@ -410,7 +410,7 @@ where last ); - let r = LogIdRange::new(rng.prev, Some(last)); + let r = LogIdRange::new(rng.prev.clone(), Some(last)); (logs, r) } }; @@ -420,8 +420,8 @@ where // Build the heartbeat frame to be sent to the follower. let payload = AppendEntriesRequest { vote: self.session_id.vote(), - prev_log_id: sending_range.prev, - leader_commit: self.committed, + prev_log_id: sending_range.prev.clone(), + leader_commit: self.committed.clone(), entries: logs, }; @@ -443,7 +443,7 @@ where let to = Timeout { action: RPCTypes::AppendEntries, id: self.session_id.vote().leader_id().voted_for().unwrap(), - target: self.target, + target: self.target.clone(), timeout: the_timeout, }; RPCError::Timeout(to) @@ -461,10 +461,10 @@ where AppendEntriesResponse::Success => { self.notify_heartbeat_progress(leader_time); - let matching = sending_range.last; + let matching = &sending_range.last; if has_payload { - self.notify_progress(ReplicationResult(Ok(matching))); - Ok(self.next_action_to_send(matching, log_ids)) + self.notify_progress(ReplicationResult(Ok(matching.clone()))); + Ok(self.next_action_to_send(matching.clone(), log_ids)) } else { Ok(None) } @@ -475,8 +475,8 @@ where self.notify_heartbeat_progress(leader_time); if has_payload { - self.notify_progress(ReplicationResult(Ok(matching))); - Ok(self.next_action_to_send(matching, log_ids)) + self.notify_progress(ReplicationResult(Ok(matching.clone()))); + Ok(self.next_action_to_send(matching.clone(), log_ids)) } else { Ok(None) } @@ -496,7 +496,7 @@ where })) } AppendEntriesResponse::Conflict => { - let conflict = sending_range.prev; + let conflict = sending_range.prev.clone(); debug_assert!(conflict.is_some(), "prev_log_id=None never conflict"); let conflict = conflict.unwrap(); @@ -517,9 +517,9 @@ where fn send_progress_error(&mut self, err: RPCError) { let _ = self.tx_raft_core.send(Notification::ReplicationProgress { progress: Progress { - target: self.target, + target: self.target.clone(), result: Err(err.to_string()), - session_id: self.session_id, + session_id: self.session_id.clone(), }, }); } @@ -531,8 +531,8 @@ where fn notify_heartbeat_progress(&mut self, sending_time: InstantOf) { let _ = self.tx_raft_core.send({ Notification::HeartbeatProgress { - session_id: self.session_id, - target: self.target, + session_id: self.session_id.clone(), + target: self.target.clone(), sending_time, } }); @@ -541,17 +541,17 @@ where /// Notify RaftCore with the success replication result(log matching or conflict). fn notify_progress(&mut self, replication_result: ReplicationResult) { tracing::debug!( - target = display(self.target), + target = display(self.target.clone()), curr_matching = display(self.matching.display()), result = display(&replication_result), "{}", func_name!() ); - match replication_result.0 { + match &replication_result.0 { Ok(matching) => { self.validate_matching(matching); - self.matching = matching; + self.matching = matching.clone(); } Err(_conflict) => { // Conflict is not allowed to be less than the current matching. @@ -561,9 +561,9 @@ where let _ = self.tx_raft_core.send({ Notification::ReplicationProgress { progress: Progress { - session_id: self.session_id, - target: self.target, - result: Ok(replication_result), + session_id: self.session_id.clone(), + target: self.target.clone(), + result: Ok(replication_result.clone()), }, } }); @@ -576,9 +576,9 @@ where /// - otherwise panic, consider it as a bug. /// /// [`loosen-follower-log-revert`]: crate::docs::feature_flags#feature_flag_loosen_follower_log_revert - fn validate_matching(&self, matching: Option>) { + fn validate_matching(&self, matching: &Option>) { if cfg!(feature = "loosen-follower-log-revert") { - if self.matching > matching { + if &self.matching > matching { tracing::warn!( "follower log is reverted from {} to {}; with 'loosen-follower-log-revert' enabled, this is allowed", self.matching.display(), @@ -587,7 +587,7 @@ where } } else { debug_assert!( - self.matching <= matching, + &self.matching <= matching, "follower log is reverted from {} to {}", self.matching.display(), matching.display(), diff --git a/openraft/src/replication/replication_session_id.rs b/openraft/src/replication/replication_session_id.rs index eae636b32..6ef845cd0 100644 --- a/openraft/src/replication/replication_session_id.rs +++ b/openraft/src/replication/replication_session_id.rs @@ -29,7 +29,7 @@ use crate::Vote; /// Now node `c` is a new empty node, no log is replicated to it. /// But the delayed message `{target=c, matched=log_id-1}` may be process by raft core and make raft /// core believe node `c` already has `log_id=1`, and commit it. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] #[derive(PartialEq, Eq)] pub(crate) struct ReplicationSessionId where C: RaftTypeConfig @@ -65,10 +65,10 @@ where C: RaftTypeConfig } pub(crate) fn committed_vote(&self) -> CommittedVote { - self.leader_vote + self.leader_vote.clone() } pub(crate) fn vote(&self) -> Vote { - self.leader_vote.into_vote() + self.leader_vote.clone().into_vote() } } diff --git a/openraft/src/storage/callback.rs b/openraft/src/storage/callback.rs index 5b5cbc19f..833a1dae1 100644 --- a/openraft/src/storage/callback.rs +++ b/openraft/src/storage/callback.rs @@ -124,7 +124,7 @@ where C: RaftTypeConfig let res = match result { Ok(x) => { tracing::debug!("LogApplied upto {}", self.last_log_id); - let resp = (self.last_log_id, x); + let resp = (self.last_log_id.clone(), x); self.tx.send(Ok(resp)) } Err(e) => { diff --git a/openraft/src/storage/helper.rs b/openraft/src/storage/helper.rs index d188654a2..c50aa84a3 100644 --- a/openraft/src/storage/helper.rs +++ b/openraft/src/storage/helper.rs @@ -86,7 +86,7 @@ where // TODO: It is possible `committed < last_applied` because when installing snapshot, // new committed should be saved, but not yet. if committed < last_applied { - committed = last_applied; + committed = last_applied.clone(); } // Re-apply log entries to recover SM to latest state. @@ -96,7 +96,7 @@ where self.reapply_committed(start, end).await?; - last_applied = committed; + last_applied = committed.clone(); } let mem_state = self.get_membership().await?; @@ -110,9 +110,9 @@ where last_applied.display(), ); - self.log_store.purge(last_applied.unwrap()).await?; - last_log_id = last_applied; - last_purged_log_id = last_applied; + self.log_store.purge(last_applied.clone().unwrap()).await?; + last_log_id = last_applied.clone(); + last_purged_log_id = last_applied.clone(); } tracing::info!( @@ -120,7 +120,7 @@ where last_purged_log_id.display(), last_log_id.display() ); - let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, &mut log_reader).await?; + let log_ids = LogIdList::load_log_ids(last_purged_log_id.clone(), last_log_id, &mut log_reader).await?; let snapshot = self.state_machine.get_current_snapshot().await?; @@ -140,7 +140,12 @@ where }; let snapshot_meta = snapshot.map(|x| x.meta).unwrap_or_default(); - let io_state = IOState::new(vote, last_applied, snapshot_meta.last_log_id, last_purged_log_id); + let io_state = IOState::new( + &vote, + last_applied.clone(), + snapshot_meta.last_log_id.clone(), + last_purged_log_id.clone(), + ); let now = C::now(); @@ -291,7 +296,7 @@ where for ent in entries.iter().rev() { if let Some(mem) = ent.get_membership() { - let em = StoredMembership::new(Some(*ent.get_log_id()), mem.clone()); + let em = StoredMembership::new(Some(ent.get_log_id().clone()), mem.clone()); res.insert(0, em); if res.len() == 2 { return Ok(res); diff --git a/openraft/src/storage/log_reader_ext.rs b/openraft/src/storage/log_reader_ext.rs index 37182abdf..137500dfa 100644 --- a/openraft/src/storage/log_reader_ext.rs +++ b/openraft/src/storage/log_reader_ext.rs @@ -30,7 +30,7 @@ where C: RaftTypeConfig )); } - Ok(*entries[0].get_log_id()) + Ok(entries[0].get_log_id().clone()) } } diff --git a/openraft/src/storage/snapshot_meta.rs b/openraft/src/storage/snapshot_meta.rs index e754ff8d6..4dfdb8e75 100644 --- a/openraft/src/storage/snapshot_meta.rs +++ b/openraft/src/storage/snapshot_meta.rs @@ -48,8 +48,8 @@ where C: RaftTypeConfig { pub fn signature(&self) -> SnapshotSignature { SnapshotSignature { - last_log_id: self.last_log_id, - last_membership_log_id: *self.last_membership.log_id(), + last_log_id: self.last_log_id.clone(), + last_membership_log_id: self.last_membership.log_id().clone(), snapshot_id: self.snapshot_id.clone(), } } diff --git a/openraft/src/storage/v2/raft_log_storage_ext.rs b/openraft/src/storage/v2/raft_log_storage_ext.rs index 6bf7cbd3e..e2192ef35 100644 --- a/openraft/src/storage/v2/raft_log_storage_ext.rs +++ b/openraft/src/storage/v2/raft_log_storage_ext.rs @@ -30,7 +30,7 @@ where C: RaftTypeConfig { let entries = entries.into_iter().collect::>(); - let last_log_id = *entries.last().unwrap().get_log_id(); + let last_log_id = entries.last().unwrap().get_log_id().clone(); let (tx, mut rx) = C::mpsc_unbounded(); diff --git a/openraft/src/testing/log/suite.rs b/openraft/src/testing/log/suite.rs index b57e5c55e..00c414653 100644 --- a/openraft/src/testing/log/suite.rs +++ b/openraft/src/testing/log/suite.rs @@ -440,9 +440,9 @@ where Duration::default(), Vote::default(), ); - want.io_state.io_progress.accept(IOId::new(Vote::default())); - want.io_state.io_progress.submit(IOId::new(Vote::default())); - want.io_state.io_progress.flush(IOId::new(Vote::default())); + want.io_state.io_progress.accept(IOId::new(&Vote::default())); + want.io_state.io_progress.submit(IOId::new(&Vote::default())); + want.io_state.io_progress.flush(IOId::new(&Vote::default())); assert_eq!(want, initial, "uninitialized state"); Ok(()) @@ -565,7 +565,7 @@ where "state machine has higher log" ); assert_eq!( - initial.last_purged_log_id().copied(), + initial.last_purged_log_id().cloned(), Some(log_id_0(3, 1)), "state machine has higher log" ); @@ -790,13 +790,13 @@ where C::sleep(Duration::from_millis(1_000)).await; let ent = store.try_get_log_entry(3).await?; - assert_eq!(Some(log_id_0(1, 3)), ent.map(|x| *x.get_log_id())); + assert_eq!(Some(log_id_0(1, 3)), ent.map(|x| x.get_log_id().clone())); let ent = store.try_get_log_entry(0).await?; - assert_eq!(None, ent.map(|x| *x.get_log_id())); + assert_eq!(None, ent.map(|x| x.get_log_id().clone())); let ent = store.try_get_log_entry(11).await?; - assert_eq!(None, ent.map(|x| *x.get_log_id())); + assert_eq!(None, ent.map(|x| x.get_log_id().clone())); Ok(()) } @@ -1215,7 +1215,7 @@ where let snapshot_last_log_id = Some(log_id_0(3, 3)); let snapshot_last_membership = StoredMembership::new(Some(log_id_0(1, 2)), Membership::new(vec![btreeset![1, 2, 3]], None)); - let snapshot_applied_state = (snapshot_last_log_id, snapshot_last_membership.clone()); + let snapshot_applied_state = (snapshot_last_log_id.clone(), snapshot_last_membership.clone()); tracing::info!("--- build and get snapshot on leader state machine"); let ss1 = sm_l.get_snapshot_builder().await.build_snapshot().await?; @@ -1327,7 +1327,7 @@ where { let entries = entries.into_iter().collect::>(); - let last_log_id = *entries.last().unwrap().get_log_id(); + let last_log_id = entries.last().unwrap().get_log_id().clone(); let (tx, mut rx) = C::mpsc_unbounded(); diff --git a/openraft/src/vote/committed.rs b/openraft/src/vote/committed.rs index 9676fb091..077a5a1b9 100644 --- a/openraft/src/vote/committed.rs +++ b/openraft/src/vote/committed.rs @@ -10,7 +10,8 @@ use crate::Vote; /// Represents a committed Vote that has been accepted by a quorum. /// /// The inner `Vote`'s attribute `committed` is always set to `true` -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] +#[derive(dupit::Duplicate)] #[derive(PartialEq, Eq)] #[derive(PartialOrd)] pub(crate) struct CommittedVote @@ -19,6 +20,13 @@ where C: RaftTypeConfig vote: Vote>, } +impl Copy for CommittedVote +where + C: RaftTypeConfig, + C::NodeId: Copy, +{ +} + /// The `CommittedVote` is totally ordered. /// /// Because: diff --git a/openraft/src/vote/leader_id/leader_id_adv.rs b/openraft/src/vote/leader_id/leader_id_adv.rs index b31328c8d..929f1c8ec 100644 --- a/openraft/src/vote/leader_id/leader_id_adv.rs +++ b/openraft/src/vote/leader_id/leader_id_adv.rs @@ -31,12 +31,12 @@ impl LeaderId { } pub fn voted_for(&self) -> Option { - Some(self.node_id) + Some(self.node_id.clone()) } #[allow(clippy::wrong_self_convention)] pub(crate) fn to_committed(&self) -> CommittedLeaderId { - *self + self.clone() } /// Return if it is the same leader as the committed leader id. diff --git a/openraft/src/vote/leader_id/leader_id_std.rs b/openraft/src/vote/leader_id/leader_id_std.rs index acfbcb729..44290c53b 100644 --- a/openraft/src/vote/leader_id/leader_id_std.rs +++ b/openraft/src/vote/leader_id/leader_id_std.rs @@ -52,7 +52,7 @@ impl LeaderId { } pub fn voted_for(&self) -> Option { - self.voted_for + self.voted_for.clone() } #[allow(clippy::wrong_self_convention)] diff --git a/openraft/src/vote/non_committed.rs b/openraft/src/vote/non_committed.rs index 45c24bc10..0a0f82772 100644 --- a/openraft/src/vote/non_committed.rs +++ b/openraft/src/vote/non_committed.rs @@ -9,7 +9,8 @@ use crate::Vote; /// Represents a non-committed Vote that has **NOT** been granted by a quorum. /// /// The inner `Vote`'s attribute `committed` is always set to `false` -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] +#[derive(dupit::Duplicate)] #[derive(PartialEq, Eq)] #[derive(PartialOrd)] pub(crate) struct NonCommittedVote @@ -18,6 +19,13 @@ where C: RaftTypeConfig vote: Vote>, } +impl Copy for NonCommittedVote +where + C: RaftTypeConfig, + C::NodeId: Copy, +{ +} + impl NonCommittedVote where C: RaftTypeConfig {