Skip to content

Commit

Permalink
Merge pull request #652 from tursodatabase/snapshot-injection-bug
Browse files Browse the repository at this point in the history
snapshot injection bug
  • Loading branch information
MarinPostma authored Nov 20, 2023
2 parents b0c05e0 + 09a9cda commit f3c6ca6
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 2 deletions.
9 changes: 8 additions & 1 deletion libsql-replication/src/injector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub struct Injector {
/// Injector connection
// connection must be dropped before the hook context
connection: Arc<Mutex<sqld_libsql_bindings::Connection<InjectorHook>>>,
biggest_uncommitted_seen: FrameNo,
}

/// Methods from this trait are called before and after performing a frame injection.
Expand Down Expand Up @@ -64,6 +65,7 @@ impl Injector {
buffer,
capacity: buffer_capacity,
connection: Arc::new(Mutex::new(connection)),
biggest_uncommitted_seen: 0,
})
}

Expand All @@ -86,6 +88,7 @@ impl Injector {
Err(e) => {
// something went wrong, rollback the connection to make sure we can retry in a
// clean state
self.biggest_uncommitted_seen = 0;
let connection = self.connection.lock();
let mut rollback = connection.prepare_cached("ROLLBACK")?;
let _ = rollback.execute(());
Expand All @@ -112,6 +115,8 @@ impl Injector {
}
};

self.biggest_uncommitted_seen = self.biggest_uncommitted_seen.max(last_frame_no);

drop(lock);

let connection = self.connection.lock();
Expand All @@ -134,7 +139,9 @@ impl Injector {
let _ = rollback.execute(());
self.is_txn = false;
assert!(self.buffer.lock().is_empty());
return Ok(Some(last_frame_no));
let commit_frame_no = self.biggest_uncommitted_seen;
self.biggest_uncommitted_seen = 0;
return Ok(Some(commit_frame_no));
} else if e.extended_code == LIBSQL_INJECT_OK_TXN {
self.is_txn = true;
assert!(self.buffer.lock().is_empty());
Expand Down
2 changes: 1 addition & 1 deletion libsql-replication/src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl WalIndexMetaData {

pub struct WalIndexMeta {
file: File,
data: Option<WalIndexMetaData>,
pub data: Option<WalIndexMetaData>,
}

impl WalIndexMeta {
Expand Down
1 change: 1 addition & 0 deletions libsql-server/tests/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use common::net::{init_tracing, TestServer, TurmoilAcceptor, TurmoilConnector};
use crate::common::{http::Client, net::SimServer, snapshot_metrics};

mod replica_restart;
mod replication;

fn make_cluster(sim: &mut Sim, num_replica: usize, disable_namespaces: bool) {
init_tracing();
Expand Down
137 changes: 137 additions & 0 deletions libsql-server/tests/cluster/replication.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use std::sync::Arc;
use std::time::Duration;

use sqld::config::{AdminApiConfig, DbConfig, RpcClientConfig, RpcServerConfig};
use tokio::sync::Notify;

use crate::common::{
http::Client,
net::{SimServer, TestServer, TurmoilAcceptor, TurmoilConnector},
};

/// In this test, we first create a primary with a very small max_log_size, and then add a good
/// amount of data to it. This will cause the primary to create a bunch of snaphots a large enough
/// to prevent the replica from applying them all at once. We then start the replica, and check
/// that it replicates correctly to the primary's replicaton index.
#[test]
fn apply_partial_snapshot() {
let mut sim = turmoil::Builder::new()
.tcp_capacity(4096 * 30)
.simulation_duration(Duration::from_secs(3600))
.build();

let prim_tmp = tempfile::tempdir().unwrap();
let notify = Arc::new(Notify::new());

sim.host("primary", {
let prim_path = prim_tmp.path().to_path_buf();
move || {
let prim_path = prim_path.clone();
async move {
let primary = TestServer {
path: prim_path.into(),
db_config: DbConfig {
max_log_size: 1,
..Default::default()
},
admin_api_config: Some(AdminApiConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
connector: TurmoilConnector,
disable_metrics: true,
}),
rpc_server_config: Some(RpcServerConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 5050)).await.unwrap(),
tls_config: None,
}),
..Default::default()
};

primary.start_sim(8080).await.unwrap();

Ok(())
}
}
});

sim.host("replica", {
let notify = notify.clone();
move || {
let notify = notify.clone();
async move {
let tmp = tempfile::tempdir().unwrap();
let replica = TestServer {
path: tmp.path().to_path_buf().into(),
db_config: DbConfig {
max_log_size: 1,
..Default::default()
},
admin_api_config: Some(AdminApiConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
connector: TurmoilConnector,
disable_metrics: true,
}),
rpc_client_config: Some(RpcClientConfig {
remote_url: "http://primary:5050".into(),
tls_config: None,
connector: TurmoilConnector,
}),
..Default::default()
};

notify.notified().await;
replica.start_sim(8080).await.unwrap();

Ok(())
}
}
});

sim.client("client", async move {
let primary = libsql::Database::open_remote_with_connector(
"http://primary:8080",
"",
TurmoilConnector,
)
.unwrap();
let conn = primary.connect().unwrap();
conn.execute("CREATE TABLE TEST (x)", ()).await.unwrap();
// we need a sufficiently large snapshot for the test. Before the fix, 5000 insert would
// trigger an infinite loop.
for _ in 0..5000 {
conn.execute("INSERT INTO TEST VALUES (randomblob(6000))", ())
.await
.unwrap();
}

let client = Client::new();
let resp = client
.get("http://primary:9090/v1/namespaces/default/stats")
.await
.unwrap();
let stats = resp.json_value().await.unwrap();
let primary_replication_index = stats["replication_index"].as_i64().unwrap();

// primary is setup, time to start replica
notify.notify_waiters();

let client = Client::new();
loop {
let resp = client
.get("http://replica:9090/v1/namespaces/default/stats")
.await
.unwrap();
let stats = resp.json_value().await.unwrap();
let replication_index = &stats["replication_index"];
if !replication_index.is_null() {
if replication_index.as_i64().unwrap() == primary_replication_index {
break;
}
}
tokio::time::sleep(Duration::from_millis(1000)).await;
}

Ok(())
});

sim.run().unwrap();
}

0 comments on commit f3c6ca6

Please sign in to comment.