Skip to content

Commit

Permalink
✨ Save DB atomically #16
Browse files Browse the repository at this point in the history
  • Loading branch information
wrenger committed Apr 28, 2024
1 parent 7947fcc commit 6357306
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 56 deletions.
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ chrono = "0.4"
clap = { version = "4.4", features = ["derive"] }
csv = "1.3"
email_address = "0.2"
fs4 = "0.8"
hyper = "1.1"
hyper-util = "0.1"
lettre = { version = "0.11", default-features = false, features = ["builder", "smtp-transport", "rustls-tls"] }
Expand All @@ -32,12 +31,12 @@ rand = "0.8"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json"] }
roxmltree = "0.19"
rusqlite = { version = "0.31", features = ["bundled"], optional = true }
rustls = "0.22"
rustls = "0.23"
rustls-pemfile = "2.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.36", features = ["fs", "sync", "time", "macros", "rt-multi-thread"]}
tokio-rustls = "0.25"
tokio-rustls = "0.26"
tower = { version = "0.4", features = ["util", "timeout"] }
tower-http = { version = "0.5", features = ["fs", "trace", "compression-deflate"] }
tower-service = "0.3"
Expand Down
24 changes: 4 additions & 20 deletions src/db/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::io::{BufReader, Seek};
use std::path::Path;
use std::str::FromStr;

use fs4::FileExt;
use serde::{Deserialize, Deserializer, Serialize, Serializer};

use super::Database;
Expand All @@ -17,36 +16,21 @@ struct DatabaseVersion {

const MIN_VERSION: Version = Version(0, 9, 0);

pub fn import(path: &Path) -> Result<(File, Database)> {
pub fn import(path: &Path) -> Result<Database> {
#[cfg(feature = "sqlite")]
if path.extension() == Some(std::ffi::OsStr::new("db")) {
tracing::warn!("Try importing old database");
let data = from_db(path)?;
let path = path.with_extension("json");
let file = File::options()
.create_new(true)
.read(true)
.write(true)
.open(&path)?;
file.try_lock_exclusive()?;
data.save(&file)?;
return Ok((file, data));
return from_db(path);
}

let mut file = File::options()
.create(true)
.read(true)
.write(true)
.open(path)?;
file.try_lock_exclusive()?;
let mut file = File::open(path)?;

let DatabaseVersion { version } = serde_json::from_reader(BufReader::new(&file))?;
let pkg_version: Version = crate::PKG_VERSION.parse().unwrap();
if MIN_VERSION <= version && version <= pkg_version {
file.rewind()?;
// TODO: Migration routines
let data = Database::load(&file)?;
Ok((file, data))
Database::load(&file)
} else {
Err(Error::UnsupportedProjectVersion)
}
Expand Down
101 changes: 68 additions & 33 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::cmp::Ordering;
use std::fmt;
use std::ffi::{OsStr, OsString};
use std::fs::File;
use std::io::{self, BufWriter, Seek};
use std::io::{self, BufWriter};
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::{fmt, fs};

use chrono::{Local, NaiveDate};
use fs4::FileExt;
use serde::{Deserialize, Serialize};
use tracing::info;
use tracing::{error, info};

use crate::db::sorted::Sorted;
use crate::error::{Error, Result};
Expand Down Expand Up @@ -308,37 +308,76 @@ impl Database {
/// It also locks the database file, preventing other applications from accessing it.
pub struct AtomicDatabase {
path: PathBuf,
data: RwLock<(File, Database)>,
tmp: PathBuf,
data: RwLock<Database>,
}

impl AtomicDatabase {
pub fn load(path: &Path) -> Result<Self> {
let (file, data) = migrate::import(path)?;
let new_path = path.with_extension("json");
let tmp = Self::tmp_path(&new_path)?;

let data = migrate::import(path)?;
atomic_write(&tmp, &new_path, &data)?;

Ok(Self {
path: path.into(),
data: RwLock::new((file, data)),
path: new_path,
tmp,
data: RwLock::new(data),
})
}

pub fn create(path: &Path) -> Result<Self> {
assert!(path.extension() == Some(OsStr::new("json")));
let tmp = Self::tmp_path(path)?;

let data = Database::default();
let mut file = File::create(path)?;
file.try_lock_exclusive()?;
data.save(&mut file)?;
atomic_write(&tmp, path, &data)?;

Ok(Self {
path: path.into(),
data: RwLock::new((file, data)),
tmp,
data: RwLock::new(data),
})
}

pub fn read(&self) -> AtomicDatabaseRead<'_> {
AtomicDatabaseRead(self.data.read().unwrap())
AtomicDatabaseRead {
data: self.data.read().unwrap(),
}
}
pub fn write(&self) -> AtomicDatabaseWrite<'_> {
AtomicDatabaseWrite(self.data.write().unwrap())
AtomicDatabaseWrite {
path: &self.path,
tmp: &self.tmp,
data: self.data.write().unwrap(),
}
}

fn tmp_path(path: &Path) -> Result<PathBuf> {
let mut tmp_name = OsString::from(".");
tmp_name.push(path.file_name().unwrap_or(OsStr::new("db")));
let tmp = path.with_file_name(tmp_name);
if tmp.exists() {
error!(
"Found orphaned database temporary file '{tmp:?}'. The server has recently crashed or is already running. Delete this before continuing!"
);
return Err(Error::FileOpen);
}
Ok(tmp)
}
}

fn atomic_write(tmp: &Path, path: &Path, data: &Database) -> Result<()> {
{
let mut tmpfile = File::create_new(tmp)?;
data.save(&mut tmpfile)?;
tmpfile.sync_all()?; // just to be sure!
}
fs::rename(&tmp, &path)?;
Ok(())
}

impl fmt::Debug for AtomicDatabase {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AtomicDatabase")
Expand All @@ -350,45 +389,41 @@ impl fmt::Debug for AtomicDatabase {
impl Drop for AtomicDatabase {
fn drop(&mut self) {
info!("Saving database");
let mut guard = self.data.write().unwrap();
let (file, data) = &mut *guard;
// truncate
file.rewind().unwrap();
file.set_len(0).unwrap();
data.save(file).unwrap();
// unlock file
guard.0.unlock().unwrap();
let guard = self.data.read().unwrap();
atomic_write(&self.tmp, &self.path, &guard).unwrap();
}
}

pub struct AtomicDatabaseRead<'a>(RwLockReadGuard<'a, (File, Database)>);
pub struct AtomicDatabaseRead<'a> {
data: RwLockReadGuard<'a, Database>,
}
impl Deref for AtomicDatabaseRead<'_> {
type Target = Database;
fn deref(&self) -> &Self::Target {
&self.0 .1
&self.data
}
}

pub struct AtomicDatabaseWrite<'a>(RwLockWriteGuard<'a, (File, Database)>);
pub struct AtomicDatabaseWrite<'a> {
tmp: &'a Path,
path: &'a Path,
data: RwLockWriteGuard<'a, Database>,
}
impl Deref for AtomicDatabaseWrite<'_> {
type Target = Database;
fn deref(&self) -> &Self::Target {
&self.0 .1
&self.data
}
}
impl DerefMut for AtomicDatabaseWrite<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0 .1
&mut self.data
}
}
impl Drop for AtomicDatabaseWrite<'_> {
fn drop(&mut self) {
info!("Saving database");
let (file, data) = &mut *self.0;
// truncate
file.rewind().unwrap();
file.set_len(0).unwrap();
data.save(file).unwrap();
atomic_write(&self.tmp, &self.path, &self.data).unwrap();
}
}

Expand All @@ -413,7 +448,7 @@ mod test {
let file = Path::new("test/data/schillerbib.db");

let db1 = d1::Database::open(file.into()).unwrap().0;
let db2 = super::migrate::import(file).unwrap().1;
let db2 = super::migrate::import(file).unwrap();

let timer = Instant::now();
let results = db1.books().unwrap();
Expand Down

0 comments on commit 6357306

Please sign in to comment.