Skip to content

Commit

Permalink
wip(backend): Split file and file_info
Browse files Browse the repository at this point in the history
  • Loading branch information
Frixxie committed Oct 17, 2024
1 parent e087f39 commit 516fef9
Showing 1 changed file with 98 additions and 125 deletions.
223 changes: 98 additions & 125 deletions backend/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,70 @@ use serde::{Deserialize, Serialize};
use sha256::digest;
use sqlx::{prelude::FromRow, PgPool};

pub type File = Vec<u8>;
static BUCKET_NAME: &'static str = "files";

fn get_s3_credentials() -> Result<(Credentials, Region)> {
Ok((Credentials::default()?, Region::from_default_env()?))
}

#[derive(Debug)]
pub struct File {
content: Vec<u8>,
}

impl File {
/// Creates a new [`File`].
pub fn new(content: Vec<u8>) -> Self {
Self { content }
}

pub async fn put_into_s3(
&self,
hash: &str,
credentials: Credentials,
region: Region,
) -> Result<()> {
let bucket = Bucket::new(BUCKET_NAME, region.clone(), credentials.clone())?
.with_path_style();

if !bucket.exists().await? {
Bucket::create_with_path_style(
BUCKET_NAME,
region.clone(),
credentials.clone(),
BucketConfiguration::default(),
)
.await?;
}

bucket.put_object(hash, &self.content).await?;

Ok(())
}

pub async fn get_from_s3(hash: &str, credentials: Credentials, region: Region) -> Result<Self> {
let bucket = Bucket::new(BUCKET_NAME, region.clone(), credentials.clone())
.unwrap()
.with_path_style();

let result = bucket.get_object(hash).await?;
Ok(Self::new(result.into()))
}

pub async fn delete_from_s3(
hash: &str,
credentials: Credentials,
region: Region,
) -> Result<()> {
let bucket = Bucket::new(BUCKET_NAME, region.clone(), credentials.clone())
.unwrap()
.with_path_style();

bucket.delete_object(hash).await?;

Ok(())
}
}

#[derive(FromRow, Serialize, Deserialize, Clone, Debug)]
pub struct FileInfo {
Expand All @@ -22,6 +85,19 @@ impl FileInfo {
}
}

pub async fn insert_into_db(pool: &PgPool, content: &[u8]) -> Result<()> {
let hash = digest(content);
let (credentials, region) = get_s3_credentials()?;
let file = File::new(content.to_vec());
file.put_into_s3(&hash, credentials, region).await?;
sqlx::query("INSERT INTO files (hash, object_storage_location) VALUES ($1, $2)")
.bind(hash.clone())
.bind(BUCKET_NAME)
.execute(pool)
.await?;
Ok(())
}

pub async fn read_from_db(pool: &PgPool) -> Result<Vec<FileInfo>> {
let files = sqlx::query_as::<_, FileInfo>("SELECT * FROM files")
.fetch_all(pool)
Expand All @@ -47,129 +123,20 @@ impl FileInfo {
Ok(file_info)
}

pub async fn read_from_db_and_s3_by_id(pool: &PgPool, id: i32) -> Result<File> {
let (credentials, region) = Self::get_s3_credentials()?;
let file_info = Self::read_from_db_by_ids(pool, id).await?;
let file = Self::get_from_s3(&file_info.hash, credentials.clone(), region.clone()).await?;
Ok(file)
}

pub async fn delete_from_db_and_s3_by_id(pool: &PgPool, id: i32) -> Result<()> {
let (credentials, region) = Self::get_s3_credentials()?;
let file_info = Self::read_from_db_by_ids(pool, id).await?;
sqlx::query("DELETE FROM files WHERE id = $1")
.bind(id)
.execute(pool)
.await?;
Self::delete_from_s3(&file_info.hash, credentials, region).await?;
Ok(())
}

pub async fn update_from_db_and_s3_by_id(pool: &PgPool, id: i32, file: File) -> Result<()> {
let (credentials, region) = Self::get_s3_credentials()?;
let file_info = Self::read_from_db_by_ids(pool, id).await?;
Self::delete_from_s3(&file_info.hash, credentials.clone(), region.clone()).await?;
Self::put_into_s3(&file_info.hash, &file, credentials.clone(), region.clone()).await?;
Ok(())
}

pub async fn read_from_db_and_s3(pool: &PgPool) -> Result<Vec<(FileInfo, File)>> {
let (credentials, region) = Self::get_s3_credentials()?;
let (credentials, region) = get_s3_credentials()?;
let file_infos = sqlx::query_as::<_, FileInfo>("SELECT * FROM files")
.fetch_all(pool)
.await?;

let mut result: Vec<(FileInfo, File)> = Vec::new();
for file_info in file_infos {
let file =
Self::get_from_s3(&file_info.hash, credentials.clone(), region.clone()).await?;
File::get_from_s3(&file_info.hash, credentials.clone(), region.clone()).await?;
result.push((file_info.clone(), file));
}
Ok(result)
}

fn into_bucket_name() -> String {
format!("files")
}

fn get_s3_credentials() -> Result<(Credentials, Region)> {
Ok((Credentials::default()?, Region::from_default_env()?))
}

pub async fn insert_into_db(pool: &PgPool, file: &[u8]) -> Result<()> {
let hash = digest(file);
let (credentials, region) = Self::get_s3_credentials()?;
Self::put_into_s3(&hash, file, credentials, region).await?;
sqlx::query("INSERT INTO files (hash, object_storage_location) VALUES ($1, $2)")
.bind(hash.clone())
.bind(Self::into_bucket_name())
.execute(pool)
.await?;
Ok(())
}

pub async fn put_into_s3(
hash: &str,
file: &[u8],
credentials: Credentials,
region: Region,
) -> Result<()> {
let bucket = Bucket::new(
&Self::into_bucket_name(),
region.clone(),
credentials.clone(),
)?
.with_path_style();

if !bucket.exists().await? {
Bucket::create_with_path_style(
&Self::into_bucket_name(),
region.clone(),
credentials.clone(),
BucketConfiguration::default(),
)
.await?;
}

bucket.put_object(hash, file).await?;

Ok(())
}

pub async fn get_from_s3(
hash: &str,
credentials: Credentials,
region: Region,
) -> Result<Vec<u8>> {
let bucket = Bucket::new(
&Self::into_bucket_name(),
region.clone(),
credentials.clone(),
)
.unwrap()
.with_path_style();

let result = bucket.get_object(hash).await?;
Ok(result.into())
}

pub async fn delete_from_s3(
hash: &str,
credentials: Credentials,
region: Region,
) -> Result<()> {
let bucket = Bucket::new(
&Self::into_bucket_name(),
region.clone(),
credentials.clone(),
)
.unwrap()
.with_path_style();

bucket.delete_object(hash).await?;

Ok(())
}
}

#[cfg(test)]
Expand All @@ -196,14 +163,14 @@ mod tests {

let files = FileInfo::read_from_db_and_s3(&pool).await.unwrap();

let (file, content) = files.first().unwrap();
let (file_info, file) = files.first().unwrap();

assert_eq!(file.id, 1);
assert_eq!(content, &[1, 2, 3, 4, 5]);
assert_eq!(file_info.id, 1);
assert_eq!(file.content, &[1, 2, 3, 4, 5]);

let (credentials, region) = FileInfo::get_s3_credentials().unwrap();
let (credentials, region) = get_s3_credentials().unwrap();

FileInfo::delete_from_s3(&file.hash, credentials, region)
File::delete_from_s3(&file_info.hash, credentials, region)
.await
.unwrap();
}
Expand All @@ -217,11 +184,14 @@ mod tests {
endpoint: "http://localhost:9000".to_owned(),
};

let res =
FileInfo::put_into_s3("hei", &[1, 2, 3, 4], credentials.clone(), region.clone()).await;
let file = File::new([1, 2, 3, 4].to_vec());

let res = file
.put_into_s3("hei", credentials.clone(), region.clone())
.await;
assert!(res.is_ok());

let res = FileInfo::delete_from_s3("hei", credentials, region).await;
let res = File::delete_from_s3("hei", credentials, region).await;
assert!(res.is_ok());
}

Expand All @@ -234,17 +204,20 @@ mod tests {
endpoint: "http://localhost:9000".to_owned(),
};

let res =
FileInfo::put_into_s3("hei", &[1, 2, 3], credentials.clone(), region.clone()).await;
let file = File::new([1, 2, 3].to_vec());

let res = file
.put_into_s3("hei", credentials.clone(), region.clone())
.await;
assert!(res.is_ok());

let file = FileInfo::get_from_s3("hei", credentials.clone(), region.clone())
let file = File::get_from_s3("hei", credentials.clone(), region.clone())
.await
.unwrap();

assert_eq!(file, &[1, 2, 3]);
assert_eq!(file.content, &[1, 2, 3]);

let res = FileInfo::delete_from_s3("hei", credentials, region).await;
let res = File::delete_from_s3("hei", credentials, region).await;
assert!(res.is_ok());
}
}

0 comments on commit 516fef9

Please sign in to comment.