Skip to content

Commit

Permalink
[Data Streaming Service] Add support for subscription requests.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Aug 22, 2023
1 parent 17e95e6 commit e570b95
Show file tree
Hide file tree
Showing 13 changed files with 1,263 additions and 196 deletions.
9 changes: 9 additions & 0 deletions config/src/config/state_sync_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ impl Default for StorageServiceConfig {
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct DataStreamingServiceConfig {
/// Whether or not to enable data subscription streaming.
pub enable_subscription_streaming: bool,

/// The interval (milliseconds) at which to refresh the global data summary.
pub global_summary_refresh_interval_ms: u64,

Expand All @@ -221,19 +224,25 @@ pub struct DataStreamingServiceConfig {
/// memory. Once the number grows beyond this value, garbage collection occurs.
pub max_notification_id_mappings: u64,

/// Maxinum number of consecutive subscriptions that can be made before
/// the subscription stream is terminated and a new stream must be created.
pub max_num_consecutive_subscriptions: u64,

/// The interval (milliseconds) at which to check the progress of each stream.
pub progress_check_interval_ms: u64,
}

impl Default for DataStreamingServiceConfig {
fn default() -> Self {
Self {
enable_subscription_streaming: true, // TODO: change to false before landing!
global_summary_refresh_interval_ms: 50,
max_concurrent_requests: MAX_CONCURRENT_REQUESTS,
max_concurrent_state_requests: MAX_CONCURRENT_STATE_REQUESTS,
max_data_stream_channel_sizes: 300,
max_request_retry: 5,
max_notification_id_mappings: 300,
max_num_consecutive_subscriptions: 50,
progress_check_interval_ms: 50,
}
}
Expand Down
181 changes: 170 additions & 11 deletions state-sync/aptos-data-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
global_summary::GlobalDataSummary,
interface::{
AptosDataClientInterface, Response, ResponseCallback, ResponseContext, ResponseError,
ResponseId,
ResponseId, SubscriptionRequestMetadata,
},
logging::{LogEntry, LogEvent, LogSchema},
metrics,
Expand All @@ -21,7 +21,7 @@ use aptos_config::{
network_id::PeerNetworkId,
};
use aptos_id_generator::{IdGenerator, U64IdGenerator};
use aptos_infallible::RwLock;
use aptos_infallible::{Mutex, RwLock};
use aptos_logger::{debug, info, sample, sample::SampleRate, trace, warn};
use aptos_network::{application::interface::NetworkClient, protocols::network::RpcError};
use aptos_storage_interface::DbReader;
Expand All @@ -30,7 +30,10 @@ use aptos_storage_service_types::{
requests::{
DataRequest, EpochEndingLedgerInfoRequest, NewTransactionOutputsWithProofRequest,
NewTransactionsOrOutputsWithProofRequest, NewTransactionsWithProofRequest,
StateValuesWithProofRequest, StorageServiceRequest, TransactionOutputsWithProofRequest,
StateValuesWithProofRequest, StorageServiceRequest,
SubscribeTransactionOutputsWithProofRequest,
SubscribeTransactionsOrOutputsWithProofRequest, SubscribeTransactionsWithProofRequest,
SubscriptionStreamMetadata, TransactionOutputsWithProofRequest,
TransactionsOrOutputsWithProofRequest, TransactionsWithProofRequest,
},
responses::{StorageServerSummary, StorageServiceResponse, TransactionOrOutputListWithProof},
Expand Down Expand Up @@ -76,6 +79,8 @@ pub struct AptosDataClient {
data_client_config: AptosDataClientConfig,
/// The underlying AptosNet storage service client.
storage_service_client: StorageServiceClient<NetworkClient<StorageServiceMessage>>,
/// The state of the active subscription stream.
active_subscription_state: Arc<Mutex<Option<SubscriptionState>>>,
/// All of the data-client specific data we have on each network peer.
peer_states: Arc<RwLock<PeerStates>>,
/// A cached, aggregate data summary of all unbanned peers' data summaries.
Expand All @@ -97,6 +102,7 @@ impl AptosDataClient {
let data_client = Self {
data_client_config,
storage_service_client: storage_service_client.clone(),
active_subscription_state: Arc::new(Mutex::new(None)),
peer_states: Arc::new(RwLock::new(PeerStates::new(
base_config,
data_client_config,
Expand Down Expand Up @@ -186,15 +192,80 @@ impl AptosDataClient {
self.identify_serviceable(regular_peers, request)
};

// Randomly select a peer to handle the request
serviceable_peers
.choose(&mut rand::thread_rng())
.copied()
.ok_or_else(|| {
Error::DataIsUnavailable(
format!("No connected peers are advertising that they can serve this data! Request: {:?}",request),
)
// Identify the peer based on the request type
if request.data_request.is_subscription_request() {
self.choose_peer_for_subscription_request(request, serviceable_peers)
} else {
choose_random_peer(serviceable_peers).ok_or_else(|| {
Error::DataIsUnavailable(format!(
"No peers are advertising that they can serve the data! Request: {:?}",
request
))
})
}
}

/// Choose a peer that can service the given subscription request
pub(crate) fn choose_peer_for_subscription_request(
&self,
request: &StorageServiceRequest,
serviceable_peers: Vec<PeerNetworkId>,
) -> crate::error::Result<PeerNetworkId, Error> {
// Get the stream ID from the request
let request_stream_id = match &request.data_request {
DataRequest::SubscribeTransactionsWithProof(request) => {
request.subscription_stream_metadata.subscription_stream_id
},
DataRequest::SubscribeTransactionOutputsWithProof(request) => {
request.subscription_stream_metadata.subscription_stream_id
},
DataRequest::SubscribeTransactionsOrOutputsWithProof(request) => {
request.subscription_stream_metadata.subscription_stream_id
},
data_request => {
return Err(Error::UnexpectedErrorEncountered(format!(
"Invalid subscription request type found: {:?}",
data_request
)))
},
};

// Grab the lock on the active subscription state
let mut active_subscription_state = self.active_subscription_state.lock();

// If we have an active subscription and the request is for the same
// stream ID, use the same peer (as long as it is still serviceable).
if let Some(subscription_state) = active_subscription_state.take() {
if subscription_state.subscription_stream_id == request_stream_id {
// The stream IDs match. Verify that the request is still serviceable.
let peer_network_id = subscription_state.peer_network_id;
if serviceable_peers.contains(&peer_network_id) {
// The previously chosen peer can still service the request
*active_subscription_state = Some(subscription_state);
return Ok(peer_network_id);
} else {
// The previously chosen peer can no longer service
// the request, so we need to return an error.
return Err(Error::DataIsUnavailable(format!(
"The peer that we were previously subscribing to can no longer service \
the subscriptions! Peer: {:?}, request: {:?}",
peer_network_id, request
)));
}
}
}

// Otherwise, we need to choose a new peer and update the subscription state
let peer_network_id = choose_random_peer(serviceable_peers).ok_or_else(|| {
Error::DataIsUnavailable(format!(
"No peers are advertising that they can serve the subscription! Request: {:?}",
request
))
})?;
let subscription_state = SubscriptionState::new(peer_network_id, request_stream_id);
*active_subscription_state = Some(subscription_state);

Ok(peer_network_id)
}

/// Identifies the peers in the given set of prospective peers
Expand Down Expand Up @@ -668,6 +739,72 @@ impl AptosDataClientInterface for AptosDataClient {
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}

async fn subscribe_to_transaction_outputs_with_proof(
&self,
request_metadata: SubscriptionRequestMetadata,
request_timeout_ms: u64,
) -> crate::error::Result<Response<(TransactionOutputListWithProof, LedgerInfoWithSignatures)>>
{
let subscription_stream_metadata = SubscriptionStreamMetadata {
known_version_at_stream_start: request_metadata.known_version_at_stream_start,
known_epoch_at_stream_start: request_metadata.known_epoch_at_stream_start,
subscription_stream_id: request_metadata.subscription_stream_id,
};
let data_request = DataRequest::SubscribeTransactionOutputsWithProof(
SubscribeTransactionOutputsWithProofRequest {
subscription_stream_metadata,
subscription_stream_index: request_metadata.subscription_stream_index,
},
);
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}

async fn subscribe_to_transactions_with_proof(
&self,
request_metadata: SubscriptionRequestMetadata,
include_events: bool,
request_timeout_ms: u64,
) -> crate::error::Result<Response<(TransactionListWithProof, LedgerInfoWithSignatures)>> {
let subscription_stream_metadata = SubscriptionStreamMetadata {
known_version_at_stream_start: request_metadata.known_version_at_stream_start,
known_epoch_at_stream_start: request_metadata.known_epoch_at_stream_start,
subscription_stream_id: request_metadata.subscription_stream_id,
};
let data_request =
DataRequest::SubscribeTransactionsWithProof(SubscribeTransactionsWithProofRequest {
subscription_stream_metadata,
include_events,
subscription_stream_index: request_metadata.subscription_stream_index,
});
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}

async fn subscribe_to_transactions_or_outputs_with_proof(
&self,
request_metadata: SubscriptionRequestMetadata,
include_events: bool,
request_timeout_ms: u64,
) -> crate::error::Result<Response<(TransactionOrOutputListWithProof, LedgerInfoWithSignatures)>>
{
let subscription_stream_metadata = SubscriptionStreamMetadata {
known_version_at_stream_start: request_metadata.known_version_at_stream_start,
known_epoch_at_stream_start: request_metadata.known_epoch_at_stream_start,
subscription_stream_id: request_metadata.subscription_stream_id,
};
let data_request = DataRequest::SubscribeTransactionsOrOutputsWithProof(
SubscribeTransactionsOrOutputsWithProofRequest {
subscription_stream_metadata,
include_events,
max_num_output_reductions: self.get_max_num_output_reductions(),
subscription_stream_index: request_metadata.subscription_stream_index,
},
);
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}
}

/// The AptosNet-specific request context needed to update a peer's scoring.
Expand Down Expand Up @@ -697,6 +834,28 @@ impl fmt::Debug for AptosNetResponseCallback {
}
}

/// A struct that holds a subscription state, including
/// the subscription stream ID and the peer serving the requests.
#[derive(Clone, Debug)]
struct SubscriptionState {
peer_network_id: PeerNetworkId,
subscription_stream_id: u64,
}

impl SubscriptionState {
fn new(peer_network_id: PeerNetworkId, subscription_stream_id: u64) -> Self {
Self {
peer_network_id,
subscription_stream_id,
}
}
}

/// Selects a peer randomly from the list of specified peers
fn choose_random_peer(peers: Vec<PeerNetworkId>) -> Option<PeerNetworkId> {
peers.choose(&mut rand::thread_rng()).copied()
}

/// Updates the metrics for the number of connected peers (priority and regular)
fn update_connected_peer_metrics(num_priority_peers: usize, num_regular_peers: usize) {
// Log the number of connected peers
Expand Down
5 changes: 5 additions & 0 deletions state-sync/aptos-data-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ impl Error {
Self::UnexpectedErrorEncountered(_) => "unexpected_error_encountered",
}
}

/// Returns true iff the error is a timeout error
pub fn is_timeout(&self) -> bool {
matches!(self, Self::TimeoutWaitingForResponse(_))
}
}

impl From<aptos_storage_service_client::Error> for Error {
Expand Down
46 changes: 46 additions & 0 deletions state-sync/aptos-data-client/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,52 @@ pub trait AptosDataClientInterface {
include_events: bool,
request_timeout_ms: u64,
) -> error::Result<Response<TransactionOrOutputListWithProof>>;

/// Subscribes to new transaction output lists with proofs. Subscriptions
/// start at `known_version + 1` and `known_epoch` (inclusive), as
/// specified by the stream metadata. The end version and proof version
/// are specified by the server. If the data cannot be fetched, an
/// error is returned.
async fn subscribe_to_transaction_outputs_with_proof(
&self,
subscription_request_metadata: SubscriptionRequestMetadata,
request_timeout_ms: u64,
) -> error::Result<Response<(TransactionOutputListWithProof, LedgerInfoWithSignatures)>>;

/// Subscribes to new transaction lists with proofs. Subscriptions start
/// at `known_version + 1` and `known_epoch` (inclusive), as specified
/// by the subscription metadata. If `include_events` is true,
/// events are included in the proof. The end version and proof version
/// are specified by the server. If the data cannot be fetched, an error
/// is returned.
async fn subscribe_to_transactions_with_proof(
&self,
subscription_request_metadata: SubscriptionRequestMetadata,
include_events: bool,
request_timeout_ms: u64,
) -> error::Result<Response<(TransactionListWithProof, LedgerInfoWithSignatures)>>;

/// Subscribes to new transaction or output lists with proofs. Subscriptions
/// start at `known_version + 1` and `known_epoch` (inclusive), as
/// specified by the subscription metadata. If `include_events` is true,
/// events are included in the proof. The end version and proof version
/// are specified by the server. If the data cannot be fetched, an error
/// is returned.
async fn subscribe_to_transactions_or_outputs_with_proof(
&self,
subscription_request_metadata: SubscriptionRequestMetadata,
include_events: bool,
request_timeout_ms: u64,
) -> error::Result<Response<(TransactionOrOutputListWithProof, LedgerInfoWithSignatures)>>;
}

/// Subscription stream metadata associated with each subscription request
#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
pub struct SubscriptionRequestMetadata {
pub known_version_at_stream_start: u64, // The highest known transaction version at stream start
pub known_epoch_at_stream_start: u64, // The highest known epoch at stream start
pub subscription_stream_id: u64, // The unique id of the subscription stream
pub subscription_stream_index: u64, // The index of the request in the subscription stream
}

/// A response error that users of the Aptos Data Client can use to notify
Expand Down
22 changes: 21 additions & 1 deletion state-sync/aptos-data-client/src/tests/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
client::AptosDataClient,
error::Result,
global_summary::GlobalDataSummary,
interface::{AptosDataClientInterface, Response},
interface::{AptosDataClientInterface, Response, SubscriptionRequestMetadata},
poller::DataSummaryPoller,
};
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
Expand Down Expand Up @@ -270,6 +270,26 @@ mock! {
include_events: bool,
request_timeout_ms: u64,
) -> Result<Response<TransactionOrOutputListWithProof>>;

async fn subscribe_to_transaction_outputs_with_proof(
&self,
subscription_request_metadata: SubscriptionRequestMetadata,
request_timeout_ms: u64,
) -> Result<Response<(TransactionOutputListWithProof, LedgerInfoWithSignatures)>>;

async fn subscribe_to_transactions_with_proof(
&self,
subscription_request_metadata: SubscriptionRequestMetadata,
include_events: bool,
request_timeout_ms: u64,
) -> Result<Response<(TransactionListWithProof, LedgerInfoWithSignatures)>>;

async fn subscribe_to_transactions_or_outputs_with_proof(
&self,
subscription_request_metadata: SubscriptionRequestMetadata,
include_events: bool,
request_timeout_ms: u64,
) -> Result<Response<(TransactionOrOutputListWithProof, LedgerInfoWithSignatures)>>;
}
}

Expand Down
Loading

0 comments on commit e570b95

Please sign in to comment.