Skip to content

Commit

Permalink
[Data Streaming Service] Add support for subscriptions.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Aug 15, 2023
1 parent f7a65d3 commit 41f105f
Show file tree
Hide file tree
Showing 13 changed files with 1,203 additions and 197 deletions.
4 changes: 4 additions & 0 deletions config/src/config/state_sync_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ impl Default for StorageServiceConfig {
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct DataStreamingServiceConfig {
/// Whether or not to enable data subscription streaming.
pub enable_subscription_streaming: bool,

/// The interval (milliseconds) at which to refresh the global data summary.
pub global_summary_refresh_interval_ms: u64,

Expand Down Expand Up @@ -228,6 +231,7 @@ pub struct DataStreamingServiceConfig {
impl Default for DataStreamingServiceConfig {
fn default() -> Self {
Self {
enable_subscription_streaming: true,
global_summary_refresh_interval_ms: 50,
max_concurrent_requests: MAX_CONCURRENT_REQUESTS,
max_concurrent_state_requests: MAX_CONCURRENT_STATE_REQUESTS,
Expand Down
181 changes: 170 additions & 11 deletions state-sync/aptos-data-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use aptos_config::{
network_id::PeerNetworkId,
};
use aptos_id_generator::{IdGenerator, U64IdGenerator};
use aptos_infallible::RwLock;
use aptos_infallible::{Mutex, RwLock};
use aptos_logger::{debug, info, sample, sample::SampleRate, trace, warn};
use aptos_network::{application::interface::NetworkClient, protocols::network::RpcError};
use aptos_storage_interface::DbReader;
Expand All @@ -30,7 +30,10 @@ use aptos_storage_service_types::{
requests::{
DataRequest, EpochEndingLedgerInfoRequest, NewTransactionOutputsWithProofRequest,
NewTransactionsOrOutputsWithProofRequest, NewTransactionsWithProofRequest,
StateValuesWithProofRequest, StorageServiceRequest, TransactionOutputsWithProofRequest,
StateValuesWithProofRequest, StorageServiceRequest,
SubscribeTransactionOutputsWithProofRequest,
SubscribeTransactionsOrOutputsWithProofRequest, SubscribeTransactionsWithProofRequest,
SubscriptionStreamMetadata, TransactionOutputsWithProofRequest,
TransactionsOrOutputsWithProofRequest, TransactionsWithProofRequest,
},
responses::{StorageServerSummary, StorageServiceResponse, TransactionOrOutputListWithProof},
Expand Down Expand Up @@ -76,6 +79,8 @@ pub struct AptosDataClient {
data_client_config: AptosDataClientConfig,
/// The underlying AptosNet storage service client.
storage_service_client: StorageServiceClient<NetworkClient<StorageServiceMessage>>,
/// The state of the active subscription stream.
active_subscription_state: Arc<Mutex<Option<SubscriptionState>>>,
/// All of the data-client specific data we have on each network peer.
peer_states: Arc<RwLock<PeerStates>>,
/// A cached, aggregate data summary of all unbanned peers' data summaries.
Expand All @@ -97,6 +102,7 @@ impl AptosDataClient {
let data_client = Self {
data_client_config,
storage_service_client: storage_service_client.clone(),
active_subscription_state: Arc::new(Mutex::new(None)),
peer_states: Arc::new(RwLock::new(PeerStates::new(
base_config,
data_client_config,
Expand Down Expand Up @@ -186,15 +192,60 @@ impl AptosDataClient {
self.identify_serviceable(regular_peers, request)
};

// Randomly select a peer to handle the request
serviceable_peers
.choose(&mut rand::thread_rng())
.copied()
.ok_or_else(|| {
Error::DataIsUnavailable(
format!("No connected peers are advertising that they can serve this data! Request: {:?}",request),
)
})
// Identify the peer based on the request type
if request.data_request.is_subscription_request() {
self.choose_peer_for_subscription_request(request, serviceable_peers)
} else {
choose_peer_randomly(request, serviceable_peers)
}
}

/// Choose a peer that can service the given subscription request
pub(crate) fn choose_peer_for_subscription_request(
&self,
request: &StorageServiceRequest,
serviceable_peers: Vec<PeerNetworkId>,
) -> crate::error::Result<PeerNetworkId, Error> {
// Get the stream ID associated with the request
let request_stream_id = match &request.data_request {
DataRequest::SubscribeTransactionsWithProof(request) => {
request.subscription_stream_metadata.subscription_stream_id
},
DataRequest::SubscribeTransactionOutputsWithProof(request) => {
request.subscription_stream_metadata.subscription_stream_id
},
DataRequest::SubscribeTransactionsOrOutputsWithProof(request) => {
request.subscription_stream_metadata.subscription_stream_id
},
data_request => {
return Err(Error::UnexpectedErrorEncountered(format!(
"Invalid subscription request type found: {:?}",
data_request
)))
},
};

// If the active stream ID matches the request stream ID,
// use the previously chosen peer (as long as it is still serviceable).
let mut active_subscription_state = self.active_subscription_state.lock();
if let Some(subscription_state) = active_subscription_state.take() {
if subscription_state.subscription_stream_id == request_stream_id {
// The stream IDs match. Verify that the request is still serviceable.
let peer_network_id = subscription_state.peer_network_id;
if serviceable_peers.contains(&peer_network_id) {
// The previously chosen peer can still service the request
*active_subscription_state = Some(subscription_state);
return Ok(peer_network_id);
}
}
}

// Otherwise, we need to choose a new peer and update the active state
let peer_network_id = choose_peer_randomly(request, serviceable_peers)?;
let subscription_state = SubscriptionState::new(peer_network_id, request_stream_id);
*active_subscription_state = Some(subscription_state);

Ok(peer_network_id)
}

/// Identifies the peers in the given set of prospective peers
Expand Down Expand Up @@ -668,6 +719,81 @@ impl AptosDataClientInterface for AptosDataClient {
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}

async fn subscribe_to_transaction_outputs_with_proof(
&self,
known_version_at_stream_start: Version,
known_epoch_at_stream_start: Epoch,
subscription_stream_id: u64,
subscription_stream_index: u64,
request_timeout_ms: u64,
) -> crate::error::Result<Response<(TransactionOutputListWithProof, LedgerInfoWithSignatures)>>
{
let subscription_stream_metadata = SubscriptionStreamMetadata {
known_version_at_stream_start,
known_epoch_at_stream_start,
subscription_stream_id,
};
let data_request = DataRequest::SubscribeTransactionOutputsWithProof(
SubscribeTransactionOutputsWithProofRequest {
subscription_stream_metadata,
subscription_stream_index,
},
);
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}

async fn subscribe_to_transactions_with_proof(
&self,
known_version_at_stream_start: Version,
known_epoch_at_stream_start: Epoch,
include_events: bool,
subscription_stream_id: u64,
subscription_stream_index: u64,
request_timeout_ms: u64,
) -> crate::error::Result<Response<(TransactionListWithProof, LedgerInfoWithSignatures)>> {
let subscription_stream_metadata = SubscriptionStreamMetadata {
known_version_at_stream_start,
known_epoch_at_stream_start,
subscription_stream_id,
};
let data_request =
DataRequest::SubscribeTransactionsWithProof(SubscribeTransactionsWithProofRequest {
subscription_stream_metadata,
include_events,
subscription_stream_index,
});
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}

async fn subscribe_to_transactions_or_outputs_with_proof(
&self,
known_version_at_stream_start: Version,
known_epoch_at_stream_start: Epoch,
include_events: bool,
subscription_stream_id: u64,
subscription_stream_index: u64,
request_timeout_ms: u64,
) -> crate::error::Result<Response<(TransactionOrOutputListWithProof, LedgerInfoWithSignatures)>>
{
let subscription_stream_metadata = SubscriptionStreamMetadata {
known_version_at_stream_start,
known_epoch_at_stream_start,
subscription_stream_id,
};
let data_request = DataRequest::SubscribeTransactionsOrOutputsWithProof(
SubscribeTransactionsOrOutputsWithProofRequest {
subscription_stream_metadata,
include_events,
max_num_output_reductions: self.get_max_num_output_reductions(),
subscription_stream_index,
},
);
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}
}

/// The AptosNet-specific request context needed to update a peer's scoring.
Expand Down Expand Up @@ -697,6 +823,39 @@ impl fmt::Debug for AptosNetResponseCallback {
}
}

/// A struct that holds a subscription state, including
/// the subscription stream ID and the peer serving the requests.
#[derive(Clone, Debug)]
struct SubscriptionState {
peer_network_id: PeerNetworkId,
subscription_stream_id: u64,
}

impl SubscriptionState {
fn new(peer_network_id: PeerNetworkId, subscription_stream_id: u64) -> Self {
Self {
peer_network_id,
subscription_stream_id,
}
}
}

/// Selects a peer randomly from the list of serviceable peers
fn choose_peer_randomly(
request: &StorageServiceRequest,
serviceable_peers: Vec<PeerNetworkId>,
) -> Result<PeerNetworkId, Error> {
serviceable_peers
.choose(&mut rand::thread_rng())
.copied()
.ok_or_else(|| {
Error::DataIsUnavailable(format!(
"No connected peers are advertising that they can serve this data! Request: {:?}",
request
))
})
}

/// Updates the metrics for the number of connected peers (priority and regular)
fn update_connected_peer_metrics(num_priority_peers: usize, num_regular_peers: usize) {
// Log the number of connected peers
Expand Down
5 changes: 5 additions & 0 deletions state-sync/aptos-data-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ impl Error {
Self::UnexpectedErrorEncountered(_) => "unexpected_error_encountered",
}
}

/// Returns true iff the error is a timeout error
pub fn is_timeout(&self) -> bool {
matches!(self, Self::TimeoutWaitingForResponse(_))
}
}

impl From<aptos_storage_service_client::Error> for Error {
Expand Down
41 changes: 41 additions & 0 deletions state-sync/aptos-data-client/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,47 @@ pub trait AptosDataClientInterface {
include_events: bool,
request_timeout_ms: u64,
) -> error::Result<Response<TransactionOrOutputListWithProof>>;

/// Subscribes to new transaction output lists with proofs. Subscriptions
/// start at `known_version + 1` and `known_epoch` (inclusive). The end
/// version and proof version are specified by the server. If the data
/// cannot be fetched, an error is returned.
async fn subscribe_to_transaction_outputs_with_proof(
&self,
known_version: Version,
known_epoch: Epoch,
subscription_stream_id: u64,
subscription_stream_index: u64,
request_timeout_ms: u64,
) -> error::Result<Response<(TransactionOutputListWithProof, LedgerInfoWithSignatures)>>;

/// Subscribes to new transaction lists with proofs. Subscriptions start
/// at `known_version + 1` and `known_epoch` (inclusive). The end version
/// and proof version are specified by the server. If the data cannot be
/// fetched, an error is returned.
async fn subscribe_to_transactions_with_proof(
&self,
known_version: Version,
known_epoch: Epoch,
include_events: bool,
subscription_stream_id: u64,
subscription_stream_index: u64,
request_timeout_ms: u64,
) -> error::Result<Response<(TransactionListWithProof, LedgerInfoWithSignatures)>>;

/// Subscribes to new transaction or output lists with proofs. Subscriptions
/// start at `known_version + 1` and `known_epoch` (inclusive). The end
/// version and proof version are specified by the server. If the data
/// cannot be fetched, an error is returned.
async fn subscribe_to_transactions_or_outputs_with_proof(
&self,
known_version: Version,
known_epoch: Epoch,
include_events: bool,
subscription_stream_id: u64,
subscription_stream_index: u64,
request_timeout_ms: u64,
) -> error::Result<Response<(TransactionOrOutputListWithProof, LedgerInfoWithSignatures)>>;
}

/// A response error that users of the Aptos Data Client can use to notify
Expand Down
29 changes: 29 additions & 0 deletions state-sync/aptos-data-client/src/tests/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,35 @@ mock! {
include_events: bool,
request_timeout_ms: u64,
) -> Result<Response<TransactionOrOutputListWithProof>>;

async fn subscribe_to_transaction_outputs_with_proof(
&self,
known_version: Version,
known_epoch: Epoch,
subscription_stream_id: u64,
subscription_stream_index: u64,
request_timeout_ms: u64,
) -> Result<Response<(TransactionOutputListWithProof, LedgerInfoWithSignatures)>>;

async fn subscribe_to_transactions_with_proof(
&self,
known_version: Version,
known_epoch: Epoch,
include_events: bool,
subscription_stream_id: u64,
subscription_stream_index: u64,
request_timeout_ms: u64,
) -> Result<Response<(TransactionListWithProof, LedgerInfoWithSignatures)>>;

async fn subscribe_to_transactions_or_outputs_with_proof(
&self,
known_version: Version,
known_epoch: Epoch,
include_events: bool,
subscription_stream_id: u64,
subscription_stream_index: u64,
request_timeout_ms: u64,
) -> Result<Response<(TransactionOrOutputListWithProof, LedgerInfoWithSignatures)>>;
}
}

Expand Down
Loading

0 comments on commit 41f105f

Please sign in to comment.