Skip to content

Commit

Permalink
[Mechanical] Rename ReplicatedLogletId to LogletId
Browse files Browse the repository at this point in the history
Summary:
Moves ReplicatedLogletId to be a common helper type for all loglets that want to benefit from its internal structure. The changes are mechanical since the type now is in `restate_types::logs` instead being specific under replicated_loglet.

Test Plan: Unit tests
  • Loading branch information
AhmedSoliman committed Dec 20, 2024
1 parent 809a53f commit 2178a2e
Show file tree
Hide file tree
Showing 31 changed files with 228 additions and 232 deletions.
20 changes: 8 additions & 12 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ use restate_types::logs::metadata::{
Chain, DefaultProvider, LogletConfig, LogletParams, Logs, LogsConfiguration,
NodeSetSelectionStrategy, ProviderKind, ReplicatedLogletConfig, SegmentIndex,
};
use restate_types::logs::{LogId, Lsn, TailState};
use restate_types::logs::{LogId, LogletId, Lsn, TailState};
use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::partition_table::PartitionTable;
use restate_types::replicated_loglet::{ReplicatedLogletId, ReplicatedLogletParams};
use restate_types::replicated_loglet::ReplicatedLogletParams;
use restate_types::retries::{RetryIter, RetryPolicy};
use restate_types::{logs, GenerationalNodeId, NodeId, PlainNodeId, Version, Versioned};

Expand All @@ -55,9 +55,6 @@ type Result<T, E = LogsControllerError> = std::result::Result<T, E>;

const FALLBACK_MAX_RETRY_DELAY: Duration = Duration::from_secs(5);

/// A single unified id type enables easier migration between loglet types.
type LogletId = ReplicatedLogletId;

#[derive(Debug, thiserror::Error)]
pub enum LogsControllerError {
#[error("failed writing to the metadata store: {0}")]
Expand Down Expand Up @@ -335,7 +332,7 @@ fn try_provisioning(
#[cfg(feature = "replicated-loglet")]
DefaultProvider::Replicated(ref config) => build_new_replicated_loglet_configuration(
config,
ReplicatedLogletId::new(log_id, SegmentIndex::OLDEST),
LogletId::new(log_id, SegmentIndex::OLDEST),
&Metadata::with_current(|m| m.nodes_config_ref()),
observed_cluster_state,
None,
Expand All @@ -350,7 +347,7 @@ fn try_provisioning(
#[cfg(feature = "replicated-loglet")]
pub fn build_new_replicated_loglet_configuration(
replicated_loglet_config: &ReplicatedLogletConfig,
loglet_id: ReplicatedLogletId,
loglet_id: LogletId,
nodes_config: &NodesConfiguration,
observed_cluster_state: &ObservedClusterState,
previous_params: Option<&ReplicatedLogletParams>,
Expand Down Expand Up @@ -1284,12 +1281,11 @@ pub mod tests {
use restate_types::logs::metadata::{
DefaultProvider, LogsConfiguration, NodeSetSelectionStrategy, ReplicatedLogletConfig,
};
use restate_types::logs::LogletId;
use restate_types::nodes_config::{
LogServerConfig, NodeConfig, NodesConfiguration, Role, StorageState,
};
use restate_types::replicated_loglet::{
NodeSet, ReplicatedLogletId, ReplicatedLogletParams, ReplicationProperty,
};
use restate_types::replicated_loglet::{NodeSet, ReplicatedLogletParams, ReplicationProperty};
use restate_types::{GenerationalNodeId, NodeId, PlainNodeId};

use crate::cluster_controller::logs_controller::{
Expand Down Expand Up @@ -1492,7 +1488,7 @@ pub mod tests {
.build();

let seq_n0 = ReplicatedLogletParams {
loglet_id: ReplicatedLogletId::from(1),
loglet_id: LogletId::from(1),
sequencer: GenerationalNodeId::new(0, 1),
replication: ReplicationProperty::new(NonZeroU8::new(2).unwrap()),
nodeset: NodeSet::from([0, 1, 2]),
Expand Down Expand Up @@ -1583,7 +1579,7 @@ pub mod tests {

let initial = build_new_replicated_loglet_configuration(
replicated_loglet_config,
ReplicatedLogletId::from(1),
LogletId::from(1),
&nodes.nodes_config,
&nodes.observed_state,
None,
Expand Down
25 changes: 13 additions & 12 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ use std::time::Duration;

use anyhow::{anyhow, Context};
use codederror::CodedError;
use tokio::sync::{mpsc, oneshot};
use tokio::time;
use tokio::time::{Instant, Interval, MissedTickBehavior};
use tonic::codec::CompressionEncoding;
use tracing::{debug, info};

use restate_metadata_store::ReadModifyWriteError;
use restate_types::cluster_controller::SchedulingPlan;
use restate_types::logs::metadata::{
Expand All @@ -28,12 +34,7 @@ use restate_types::metadata_store::keys::{
use restate_types::partition_table::{
self, PartitionTable, PartitionTableBuilder, ReplicationStrategy,
};
use restate_types::replicated_loglet::{ReplicatedLogletId, ReplicatedLogletParams};
use tokio::sync::{mpsc, oneshot};
use tokio::time;
use tokio::time::{Instant, Interval, MissedTickBehavior};
use tonic::codec::CompressionEncoding;
use tracing::{debug, info};
use restate_types::replicated_loglet::ReplicatedLogletParams;

use restate_bifrost::{Bifrost, BifrostAdmin, SealedSegment};
use restate_core::metadata_store::{retry_on_network_error, MetadataStoreClient};
Expand All @@ -51,19 +52,19 @@ use restate_types::config::{AdminOptions, Configuration};
use restate_types::health::HealthStatus;
use restate_types::identifiers::{PartitionId, SnapshotId};
use restate_types::live::Live;
use restate_types::logs::{LogId, Lsn};
use restate_types::logs::{LogId, LogletId, Lsn};
use restate_types::net::metadata::MetadataKind;
use restate_types::net::partition_processor_manager::CreateSnapshotRequest;
use restate_types::protobuf::common::AdminStatus;
use restate_types::{GenerationalNodeId, Version, Versioned};

use self::state::ClusterControllerState;
use super::cluster_state_refresher::ClusterStateRefresher;
use super::grpc_svc_handler::ClusterCtrlSvcHandler;
use super::protobuf::cluster_ctrl_svc_server::ClusterCtrlSvcServer;
use crate::cluster_controller::logs_controller::{self, NodeSetSelectorHints};
use crate::cluster_controller::observed_cluster_state::ObservedClusterState;
use crate::cluster_controller::scheduler::SchedulingPlanNodeSetSelectorHints;
use state::ClusterControllerState;

#[derive(Debug, thiserror::Error, CodedError)]
pub enum Error {
Expand Down Expand Up @@ -761,13 +762,13 @@ impl SealAndExtendTask {
let (loglet_id, previous_params) = match segment.config.kind {
#[cfg(any(test, feature = "memory-loglet"))]
ProviderKind::InMemory => {
let loglet_id = ReplicatedLogletId::from_str(&segment.config.params)
.context("Invalid loglet id")?;
let loglet_id =
LogletId::from_str(&segment.config.params).context("Invalid loglet id")?;
(loglet_id, None)
}
ProviderKind::Local => {
let loglet_id = ReplicatedLogletId::from_str(&segment.config.params)
.context("Invalid loglet id")?;
let loglet_id =
LogletId::from_str(&segment.config.params).context("Invalid loglet id")?;
(loglet_id, None)
}
#[cfg(feature = "replicated-loglet")]
Expand Down
5 changes: 2 additions & 3 deletions crates/bifrost/benches/replicated_loglet_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@ use restate_types::identifiers::{InvocationId, LeaderEpoch, PartitionProcessorRp
use restate_types::invocation::{
InvocationTarget, ServiceInvocation, ServiceInvocationSpanContext,
};
use restate_types::logs::{LogId, Record};
use restate_types::logs::{LogId, LogletId, Record};
use restate_types::net::codec::{serialize_message, MessageBodyExt, WireDecode};
use restate_types::net::replicated_loglet::{Append, CommonRequestHeader};
use restate_types::protobuf::node::Message;
use restate_types::replicated_loglet::ReplicatedLogletId;
use restate_types::time::MillisSinceEpoch;
use restate_types::GenerationalNodeId;
use restate_wal_protocol::{Command, Destination, Envelope};
Expand Down Expand Up @@ -124,7 +123,7 @@ fn serialize_append_message(payloads: Arc<[Record]>) -> anyhow::Result<Message>
header: CommonRequestHeader {
log_id: LogId::from(12u16),
segment_index: 2.into(),
loglet_id: ReplicatedLogletId::new(12u16.into(), 4.into()),
loglet_id: LogletId::new(12u16.into(), 4.into()),
},
payloads,
};
Expand Down
5 changes: 2 additions & 3 deletions crates/bifrost/src/providers/replicated_loglet/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ use std::sync::Arc;
use restate_core::ShutdownError;
use restate_types::errors::MaybeRetryableError;
use restate_types::logs::metadata::SegmentIndex;
use restate_types::logs::LogId;
use restate_types::replicated_loglet::ReplicatedLogletId;
use restate_types::logs::{LogId, LogletId};

use crate::loglet::OperationError;

Expand All @@ -25,7 +24,7 @@ pub(crate) enum ReplicatedLogletError {
#[error("cannot find the tail of the loglet: {0}")]
FindTailFailed(String),
#[error("could not seal loglet_id={0}, insufficient nodes available for seal")]
SealFailed(ReplicatedLogletId),
SealFailed(LogletId),
#[error(transparent)]
Shutdown(#[from] ShutdownError),
}
Expand Down
20 changes: 10 additions & 10 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,8 @@ mod tests {
use restate_types::config::{set_current_config, Configuration};
use restate_types::health::HealthStatus;
use restate_types::live::Live;
use restate_types::logs::Keys;
use restate_types::replicated_loglet::{NodeSet, ReplicatedLogletId, ReplicationProperty};
use restate_types::logs::{Keys, LogletId};
use restate_types::replicated_loglet::{NodeSet, ReplicationProperty};
use restate_types::{GenerationalNodeId, PlainNodeId};

use crate::loglet::{AppendError, Loglet};
Expand Down Expand Up @@ -421,7 +421,7 @@ mod tests {
// ** Single-node replicated-loglet smoke tests **
#[test(restate_core::test(start_paused = true))]
async fn test_append_local_sequencer_single_node() -> Result<()> {
let loglet_id = ReplicatedLogletId::new_unchecked(122);
let loglet_id = LogletId::new_unchecked(122);
let params = ReplicatedLogletParams {
loglet_id,
sequencer: GenerationalNodeId::new(1, 1),
Expand Down Expand Up @@ -464,7 +464,7 @@ mod tests {
// ** Single-node replicated-loglet seal **
#[test(restate_core::test(start_paused = true))]
async fn test_seal_local_sequencer_single_node() -> Result<()> {
let loglet_id = ReplicatedLogletId::new_unchecked(122);
let loglet_id = LogletId::new_unchecked(122);
let params = ReplicatedLogletParams {
loglet_id,
sequencer: GenerationalNodeId::new(1, 1),
Expand Down Expand Up @@ -513,7 +513,7 @@ mod tests {
#[test(restate_core::test(start_paused = true))]
async fn single_node_gapless_loglet_smoke_test() -> Result<()> {
let record_cache = RecordCache::new(1_000_000);
let loglet_id = ReplicatedLogletId::new_unchecked(122);
let loglet_id = LogletId::new_unchecked(122);
let params = ReplicatedLogletParams {
loglet_id,
sequencer: GenerationalNodeId::new(1, 1),
Expand All @@ -528,7 +528,7 @@ mod tests {

#[test(restate_core::test(start_paused = true))]
async fn single_node_single_loglet_readstream() -> Result<()> {
let loglet_id = ReplicatedLogletId::new_unchecked(122);
let loglet_id = LogletId::new_unchecked(122);
let params = ReplicatedLogletParams {
loglet_id,
sequencer: GenerationalNodeId::new(1, 1),
Expand All @@ -544,7 +544,7 @@ mod tests {

#[test(restate_core::test(start_paused = true))]
async fn single_node_single_loglet_readstream_with_trims() -> Result<()> {
let loglet_id = ReplicatedLogletId::new_unchecked(122);
let loglet_id = LogletId::new_unchecked(122);
let params = ReplicatedLogletParams {
loglet_id,
sequencer: GenerationalNodeId::new(1, 1),
Expand All @@ -567,7 +567,7 @@ mod tests {

#[test(restate_core::test(start_paused = true))]
async fn single_node_append_after_seal() -> Result<()> {
let loglet_id = ReplicatedLogletId::new_unchecked(122);
let loglet_id = LogletId::new_unchecked(122);
let params = ReplicatedLogletParams {
loglet_id,
sequencer: GenerationalNodeId::new(1, 1),
Expand All @@ -583,7 +583,7 @@ mod tests {

#[test(restate_core::test(start_paused = true))]
async fn single_node_append_after_seal_concurrent() -> Result<()> {
let loglet_id = ReplicatedLogletId::new_unchecked(122);
let loglet_id = LogletId::new_unchecked(122);
let params = ReplicatedLogletParams {
loglet_id,
sequencer: GenerationalNodeId::new(1, 1),
Expand All @@ -600,7 +600,7 @@ mod tests {

#[test(restate_core::test(start_paused = true))]
async fn single_node_seal_empty() -> Result<()> {
let loglet_id = ReplicatedLogletId::new_unchecked(122);
let loglet_id = LogletId::new_unchecked(122);
let params = ReplicatedLogletParams {
loglet_id,
sequencer: GenerationalNodeId::new(1, 1),
Expand Down
6 changes: 3 additions & 3 deletions crates/bifrost/src/providers/replicated_loglet/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use restate_core::{
};
use restate_types::{
config::Configuration,
logs::{LogletOffset, Record, RecordCache, SequenceNumber},
logs::{LogletId, LogletOffset, Record, RecordCache, SequenceNumber},
net::log_server::Store,
replicated_loglet::{NodeSet, ReplicatedLogletId, ReplicatedLogletParams, ReplicationProperty},
replicated_loglet::{NodeSet, ReplicatedLogletParams, ReplicationProperty},
GenerationalNodeId,
};

Expand Down Expand Up @@ -74,7 +74,7 @@ impl SequencerSharedState {
&self.my_params
}

pub fn loglet_id(&self) -> &ReplicatedLogletId {
pub fn loglet_id(&self) -> &LogletId {
&self.my_params.loglet_id
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use tracing::{debug, trace, warn};
use restate_core::network::rpc_router::{RpcError, RpcRouter};
use restate_core::network::{Networking, TransportConnect};
use restate_core::{cancellation_watcher, ShutdownError, TaskCenterFutureExt};
use restate_types::logs::{LogletOffset, SequenceNumber};
use restate_types::logs::{LogletId, LogletOffset, SequenceNumber};
use restate_types::net::log_server::{
Digest, LogServerRequestHeader, RecordStatus, Status, Store, StoreFlags,
};
use restate_types::nodes_config::NodesConfiguration;
use restate_types::replicated_loglet::{NodeSet, ReplicatedLogletId, ReplicatedLogletParams};
use restate_types::replicated_loglet::{NodeSet, ReplicatedLogletParams};
use restate_types::{GenerationalNodeId, PlainNodeId};

use crate::loglet::util::TailOffsetWatch;
Expand All @@ -40,7 +40,7 @@ struct ReplicationFailed;
/// Tracks digest responses and record repairs to achieve a consistent and durable
/// state of the loglet tail.
pub struct Digests {
loglet_id: ReplicatedLogletId,
loglet_id: LogletId,
// inclusive. The first record we need to repair.
start_offset: LogletOffset,
// exclusive (this should be the durable global_tail after finishing)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ use restate_core::network::{Networking, TransportConnect};
use restate_core::TaskCenterFutureExt;
use restate_types::config::Configuration;
use restate_types::logs::metadata::SegmentIndex;
use restate_types::logs::{LogId, LogletOffset, RecordCache, SequenceNumber};
use restate_types::logs::{LogId, LogletId, LogletOffset, RecordCache, SequenceNumber};
use restate_types::net::log_server::{GetLogletInfo, LogServerRequestHeader, Status, WaitForTail};
use restate_types::net::replicated_loglet::{CommonRequestHeader, GetSequencerState};
use restate_types::replicated_loglet::{
EffectiveNodeSet, ReplicatedLogletId, ReplicatedLogletParams,
};
use restate_types::replicated_loglet::{EffectiveNodeSet, ReplicatedLogletParams};
use restate_types::PlainNodeId;

use super::{NodeTailStatus, RepairTail, RepairTailResult, SealTask};
Expand Down Expand Up @@ -485,7 +483,7 @@ impl<T: TransportConnect> FindTailTask<T> {

pub(super) struct FindTailOnNode<'a> {
pub(super) node_id: PlainNodeId,
pub(super) loglet_id: ReplicatedLogletId,
pub(super) loglet_id: LogletId,
pub(super) get_loglet_info_rpc: &'a RpcRouter<GetLogletInfo>,
pub(super) known_global_tail: &'a TailOffsetWatch,
}
Expand Down Expand Up @@ -603,7 +601,7 @@ impl<'a> FindTailOnNode<'a> {

struct WaitForTailOnNode {
node_id: PlainNodeId,
loglet_id: ReplicatedLogletId,
loglet_id: LogletId,
wait_for_tail_rpc: RpcRouter<WaitForTail>,
known_global_tail: TailOffsetWatch,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tokio::time::Instant;
use tracing::{debug, trace};

use restate_core::network::TransportConnect;
use restate_types::replicated_loglet::ReplicatedLogletId;
use restate_types::logs::LogletId;

use crate::loglet::{Loglet, OperationError};
use crate::providers::replicated_loglet::loglet::ReplicatedLoglet;
Expand All @@ -24,7 +24,7 @@ pub struct PeriodicTailChecker {}

impl PeriodicTailChecker {
pub async fn run<T: TransportConnect>(
loglet_id: ReplicatedLogletId,
loglet_id: LogletId,
loglet: Weak<ReplicatedLoglet<T>>,
duration: Duration,
) -> anyhow::Result<()> {
Expand Down
8 changes: 3 additions & 5 deletions crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ use restate_core::network::rpc_router::{RpcError, RpcRouter};
use restate_core::network::{Incoming, Networking, TransportConnect};
use restate_core::{TaskCenter, TaskKind};
use restate_types::config::Configuration;
use restate_types::logs::{LogletOffset, SequenceNumber};
use restate_types::logs::{LogletId, LogletOffset, SequenceNumber};
use restate_types::net::log_server::{LogServerRequestHeader, Seal, Sealed, Status};
use restate_types::replicated_loglet::{
EffectiveNodeSet, NodeSet, ReplicatedLogletId, ReplicatedLogletParams,
};
use restate_types::replicated_loglet::{EffectiveNodeSet, NodeSet, ReplicatedLogletParams};
use restate_types::retries::RetryPolicy;
use restate_types::{GenerationalNodeId, PlainNodeId};

Expand Down Expand Up @@ -132,7 +130,7 @@ impl SealTask {

struct SealSingleNode<T> {
node_id: PlainNodeId,
loglet_id: ReplicatedLogletId,
loglet_id: LogletId,
sequencer: GenerationalNodeId,
seal_router: RpcRouter<Seal>,
networking: Networking<T>,
Expand Down
Loading

0 comments on commit 2178a2e

Please sign in to comment.