Skip to content

Commit

Permalink
Merge pull request #1793 from tursodatabase/embedded_replicas_improve…
Browse files Browse the repository at this point in the history
…ments

Add Database::get_sync_usage_stats
  • Loading branch information
haaawk authored Oct 23, 2024
2 parents 9ce06b7 + b83b029 commit e853d54
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 9 deletions.
15 changes: 15 additions & 0 deletions libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ cfg_replication!(
use crate::replication::remote_client::RemoteClient;
use crate::replication::EmbeddedReplicator;
pub use crate::replication::Frames;
pub use crate::replication::SyncUsageStats;

pub struct ReplicationContext {
pub(crate) replicator: EmbeddedReplicator,
Expand Down Expand Up @@ -277,6 +278,20 @@ impl Database {
Ok(self.sync_oneshot().await?)
}

#[cfg(feature = "replication")]
/// Return detailed logs about bytes synced with primary
pub async fn get_sync_usage_stats(&self) -> Result<SyncUsageStats> {
if let Some(ctx) = &self.replication_ctx {
let sync_stats = ctx.replicator.get_sync_usage_stats().await?;
Ok(sync_stats)
} else {
Err(crate::errors::Error::Misuse(
"No replicator available. Use Database::with_replicator() to enable replication"
.to_string(),
))
}
}

#[cfg(feature = "replication")]
/// Sync with primary at least to a given replication index
pub async fn sync_until(&self, replication_index: FrameNo) -> Result<crate::replication::Replicated> {
Expand Down
83 changes: 83 additions & 0 deletions libsql/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,60 @@ pub enum Frames {
Snapshot(SnapshotFile),
}

/// Detailed logs about bytes synced with primary
pub struct SyncUsageStats {
prefetched_bytes: u64,
prefetched_bytes_discarded_due_to_new_session: u64,
prefetched_bytes_discarded_due_to_consecutive_handshake: u64,
prefetched_bytes_discarded_due_to_invalid_frame_header: u64,
synced_bytes_discarded_due_to_invalid_frame_header: u64,
prefetched_bytes_used: u64,
synced_bytes_used: u64,
snapshot_bytes: u64,
}

impl SyncUsageStats {
/// Number of bytes prefetched while doing handshake
pub fn prefetched_bytes(&self) -> u64 {
self.prefetched_bytes
}

/// Number of bytes prefetched and discarded due to the change of the client session
pub fn prefetched_bytes_discarded_due_to_new_session(&self) -> u64 {
self.prefetched_bytes_discarded_due_to_new_session
}

/// Number of bytes prefetched and discarded due to consecutive handshake with new prefetch
pub fn prefetched_bytes_discarded_due_to_consecutive_handshake(&self) -> u64 {
self.prefetched_bytes_discarded_due_to_consecutive_handshake
}

/// Number of bytes prefetched and discarded due to invalid frame header in received frames
pub fn prefetched_bytes_discarded_due_to_invalid_frame_header(&self) -> u64 {
self.prefetched_bytes_discarded_due_to_invalid_frame_header
}

/// Number of bytes synced and discarded due to invalid frame header in received frames
pub fn synced_bytes_discarded_due_to_invalid_frame_header(&self) -> u64 {
self.synced_bytes_discarded_due_to_invalid_frame_header
}

/// Number of bytes prefetched and used
pub fn prefetched_bytes_used(&self) -> u64 {
self.prefetched_bytes_used
}

/// Number of bytes synced and used
pub fn synced_bytes_used(&self) -> u64 {
self.synced_bytes_used
}

/// Number of bytes downloaded as snapshots
pub fn snapshot_bytes(&self) -> u64 {
self.snapshot_bytes
}
}

#[derive(Clone)]
pub(crate) struct Writer {
pub(crate) client: client::Client,
Expand Down Expand Up @@ -210,6 +264,35 @@ impl EmbeddedReplicator {
})
}

pub async fn get_sync_usage_stats(&self) -> Result<SyncUsageStats> {
let mut replicator = self.replicator.lock().await;
match replicator.client_mut() {
Either::Right(_) => {
Err(crate::errors::Error::Misuse(
"Trying to get sync usage stats, but this is a local replicator".into(),
))
}
Either::Left(c) => {
let stats = c.sync_stats();
Ok(SyncUsageStats {
prefetched_bytes: stats.prefetched_bytes.load(std::sync::atomic::Ordering::SeqCst),
prefetched_bytes_discarded_due_to_new_session: stats
.prefetched_bytes_discarded_due_to_new_session.load(std::sync::atomic::Ordering::SeqCst),
prefetched_bytes_discarded_due_to_consecutive_handshake: stats
.prefetched_bytes_discarded_due_to_consecutive_handshake.load(std::sync::atomic::Ordering::SeqCst),
prefetched_bytes_discarded_due_to_invalid_frame_header: stats
.prefetched_bytes_discarded_due_to_invalid_frame_header.load(std::sync::atomic::Ordering::SeqCst),
synced_bytes_discarded_due_to_invalid_frame_header: stats
.synced_bytes_discarded_due_to_invalid_frame_header.load(std::sync::atomic::Ordering::SeqCst),
prefetched_bytes_used: stats.prefetched_bytes_used.load(std::sync::atomic::Ordering::SeqCst),
synced_bytes_used: stats.synced_bytes_used.load(std::sync::atomic::Ordering::SeqCst),
snapshot_bytes: stats.snapshot_bytes.load(std::sync::atomic::Ordering::SeqCst),
})
}
}

}

pub async fn sync_oneshot(&self) -> Result<Replicated> {
use libsql_replication::replicator::ReplicatorClient;

Expand Down
120 changes: 111 additions & 9 deletions libsql/src/replication/remote_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::future::Future;
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::time::{Duration, Instant};

use bytes::Bytes;
Expand All @@ -22,6 +24,63 @@ async fn time<O>(fut: impl Future<Output = O>) -> (O, Duration) {
(out, before.elapsed())
}

pub(crate) struct SyncStats {
pub prefetched_bytes: AtomicU64,
pub prefetched_bytes_discarded_due_to_new_session: AtomicU64,
pub prefetched_bytes_discarded_due_to_consecutive_handshake: AtomicU64,
pub prefetched_bytes_discarded_due_to_invalid_frame_header: AtomicU64,
pub synced_bytes_discarded_due_to_invalid_frame_header: AtomicU64,
pub prefetched_bytes_used: AtomicU64,
pub synced_bytes_used: AtomicU64,
pub snapshot_bytes: AtomicU64,
}

impl SyncStats {
fn new() -> Self {
Self {
prefetched_bytes: AtomicU64::new(0),
prefetched_bytes_discarded_due_to_new_session: AtomicU64::new(0),
prefetched_bytes_discarded_due_to_consecutive_handshake: AtomicU64::new(0),
prefetched_bytes_discarded_due_to_invalid_frame_header: AtomicU64::new(0),
synced_bytes_discarded_due_to_invalid_frame_header: AtomicU64::new(0),
prefetched_bytes_used: AtomicU64::new(0),
synced_bytes_used: AtomicU64::new(0),
snapshot_bytes: AtomicU64::new(0),
}
}

fn add_prefetched_bytes(&self, bytes: u64) {
self.prefetched_bytes.fetch_add(bytes, std::sync::atomic::Ordering::SeqCst);
}

fn add_prefetched_bytes_discarded_due_to_new_session(&self, bytes: u64) {
self.prefetched_bytes_discarded_due_to_new_session.fetch_add(bytes, std::sync::atomic::Ordering::SeqCst);
}

fn add_prefetched_bytes_discarded_due_to_consecutive_handshake(&self, bytes: u64) {
self.prefetched_bytes_discarded_due_to_consecutive_handshake.fetch_add(bytes, std::sync::atomic::Ordering::SeqCst);
}

fn add_prefetched_bytes_discarded_due_to_invalid_frame_header(&self, bytes: u64) {
self.prefetched_bytes_discarded_due_to_invalid_frame_header.fetch_add(bytes, std::sync::atomic::Ordering::SeqCst);
}

fn add_synced_bytes_discarded_due_to_invalid_frame_headear(&self, bytes: u64) {
self.synced_bytes_discarded_due_to_invalid_frame_header.fetch_add(bytes, std::sync::atomic::Ordering::SeqCst);
}

fn add_prefetched_bytes_used(&self, bytes: u64) {
self.prefetched_bytes_used.fetch_add(bytes, std::sync::atomic::Ordering::SeqCst);
}

fn add_synced_bytes_used(&self, bytes: u64) {
self.synced_bytes_used.fetch_add(bytes, std::sync::atomic::Ordering::SeqCst);
}
fn add_snapshot_bytes(&self, bytes: u64) {
self.snapshot_bytes.fetch_add(bytes, std::sync::atomic::Ordering::SeqCst);
}
}

/// A remote replicator client, that pulls frames over RPC
pub struct RemoteClient {
remote: super::client::Client,
Expand All @@ -38,6 +97,7 @@ pub struct RemoteClient {
frames_latency_count: u128,
snapshot_latency_sum: Duration,
snapshot_latency_count: u128,
sync_stats: Arc<SyncStats>,
}

impl RemoteClient {
Expand All @@ -57,9 +117,14 @@ impl RemoteClient {
frames_latency_count: 0,
snapshot_latency_sum: Duration::default(),
snapshot_latency_count: 0,
sync_stats: Arc::new(SyncStats::new()),
})
}

pub(crate) fn sync_stats(&self) -> Arc<SyncStats> {
self.sync_stats.clone()
}

fn next_offset(&self) -> FrameNo {
match self.last_received {
Some(fno) => fno + 1,
Expand Down Expand Up @@ -89,7 +154,7 @@ impl RemoteClient {
) -> Result<bool, Error> {
let hello = hello?.into_inner();
verify_session_token(&hello.session_token).map_err(Error::Client)?;
let new_session = self.session_token != Some(hello.session_token.clone());
let new_session = self.session_token.as_ref() != Some(&hello.session_token);
self.session_token = Some(hello.session_token.clone());
let current_replication_index = hello.current_replication_index;
if let Err(e) = self.meta.init_from_hello(hello) {
Expand All @@ -107,8 +172,13 @@ impl RemoteClient {
Ok(new_session)
}

async fn do_handshake_with_prefetch(&mut self) -> (Result<bool, Error>, Duration) {
async fn do_handshake_with_prefetch(&mut self) -> (Result<(), Error>, Duration) {
tracing::info!("Attempting to perform handshake with primary.");
if let Some((Ok(frames), _)) = &self.prefetched_batch_log_entries {
// TODO: check if it's ok to just do 4096 * frames.len()
let bytes = frames.get_ref().frames.iter().map(|f| f.data.len() as u64).sum();
self.sync_stats.add_prefetched_bytes_discarded_due_to_consecutive_handshake(bytes);
}
if self.dirty {
self.prefetched_batch_log_entries = None;
self.meta.reset();
Expand All @@ -135,30 +205,55 @@ impl RemoteClient {
} else {
(hello_fut.await, None)
};
let mut prefetched_bytes = None;
if let Some((Ok(frames), _)) = &frames {
let bytes = frames.get_ref().frames.iter().map(|f| f.data.len() as u64).sum();
self.sync_stats.add_prefetched_bytes(bytes);
prefetched_bytes = Some(bytes);
}
self.prefetched_batch_log_entries = if let Ok(true) = hello.0 {
tracing::debug!(
"Frames prefetching failed because of new session token returned by handshake"
);
if let Some(bytes) = prefetched_bytes {
self.sync_stats.add_prefetched_bytes_discarded_due_to_new_session(bytes);
}
None
} else {
frames
};

hello
(hello.0.map(|_| ()), hello.1)
}

async fn handle_next_frames_response(
&mut self,
frames: Result<Response<Frames>, Status>,
prefetched: bool,
) -> Result<<Self as ReplicatorClient>::FrameStream, Error> {
let frames = frames?.into_inner().frames;
let bytes = frames.iter().map(|f| f.data.len() as u64).sum();

if let Some(f) = frames.last() {
let header: FrameHeader = FrameHeader::read_from_prefix(&f.data)
let header_result = FrameHeader::read_from_prefix(&f.data);
if header_result.is_none() {
if prefetched {
self.sync_stats.add_prefetched_bytes_discarded_due_to_invalid_frame_header(bytes);
} else {
self.sync_stats.add_synced_bytes_discarded_due_to_invalid_frame_headear(bytes);
}
}
let header: FrameHeader = header_result
.ok_or_else(|| Error::Internal("invalid frame header".into()))?;
self.last_received = Some(header.frame_no.get());
}

if prefetched {
self.sync_stats.add_prefetched_bytes_used(bytes);
} else {
self.sync_stats.add_synced_bytes_used(bytes);
}

let frames_iter = frames
.into_iter()
.map(Ok);
Expand All @@ -174,17 +269,18 @@ impl RemoteClient {
Result<<Self as ReplicatorClient>::FrameStream, Error>,
Duration,
) {
let (frames, time) = match self.prefetched_batch_log_entries.take() {
Some((result, time)) => (result, time),
let ((frames, time), prefetched) = match self.prefetched_batch_log_entries.take() {
Some((result, time)) => ((result, time), true),
None => {
let req = self.make_request(LogOffset {
next_offset: self.next_offset(),
wal_flavor: None,
});
time(self.remote.replication.batch_log_entries(req)).await
let result = time(self.remote.replication.batch_log_entries(req)).await;
(result, false)
}
};
let res = self.handle_next_frames_response(frames).await;
let res = self.handle_next_frames_response(frames, prefetched).await;
(res, time)
}

Expand All @@ -193,13 +289,18 @@ impl RemoteClient {
next_offset: self.next_offset(),
wal_flavor: None,
});
let sync_stats = self.sync_stats.clone();
let mut frames = self
.remote
.replication
.snapshot(req)
.await?
.into_inner()
.map_err(|e| e.into())
.map_ok(move |f| {
sync_stats.add_snapshot_bytes(f.data.len() as u64);
f
})
.peekable();

{
Expand All @@ -212,6 +313,7 @@ impl RemoteClient {
}
}


Ok(Box::pin(frames))
}
}
Expand Down Expand Up @@ -255,7 +357,7 @@ impl ReplicatorClient for RemoteClient {
&result,
"handshake",
);
result.map(|_| ())
result
}

/// Return a stream of frames to apply to the database
Expand Down

0 comments on commit e853d54

Please sign in to comment.